From f3821a8f4f81b61531902aeb850296917f92eb12 Mon Sep 17 00:00:00 2001 From: David Brazda Date: Wed, 28 Jun 2023 20:14:30 +0200 Subject: [PATCH] insert via queue a single writer --- testy/migrace/migracesql.py | 2 +- v2realbot/common/db.py | 30 +++++++- v2realbot/controller/services.py | 46 +++++++++---- .../__pycache__/aggregator.cpython-310.pyc | Bin 7085 -> 6994 bytes v2realbot/loader/aggregator.py | 21 +++--- v2realbot/main.py | 31 ++++++++- v2realbot/static/js/archivetables.js | 18 ++--- .../strategy/__pycache__/base.cpython-310.pyc | Bin 14325 -> 14331 bytes v2realbot/strategy/base.py | 6 +- v2realbot/utils/ilog.py | 65 ++++++++++++------ 10 files changed, 160 insertions(+), 59 deletions(-) diff --git a/testy/migrace/migracesql.py b/testy/migrace/migracesql.py index 3bdb177..7965878 100644 --- a/testy/migrace/migracesql.py +++ b/testy/migrace/migracesql.py @@ -132,7 +132,7 @@ def migrate(): res, set = get_all_archived_runners_detail() print(f"fetched {len(set)}") for row in set: - insert_archive_detail(row) + #insert_archive_detail(row) print(f"inserted {row['id']}") bars = {'high': [], diff --git a/v2realbot/common/db.py b/v2realbot/common/db.py index 4e2d43a..91ffa32 100644 --- a/v2realbot/common/db.py +++ b/v2realbot/common/db.py @@ -1,5 +1,33 @@ from v2realbot.config import DATA_DIR import sqlite3 +import queue +import threading sqlite_db_file = DATA_DIR + "/v2trading.db" -conn = sqlite3.connect(sqlite_db_file, check_same_thread=False, isolation_level=None) \ No newline at end of file +# Define the connection pool +class ConnectionPool: + def __init__(self, max_connections): + self.max_connections = max_connections + self.connections = queue.Queue(max_connections) + self.lock = threading.Lock() + + def get_connection(self): + with self.lock: + if self.connections.empty(): + return self.create_connection() + else: + return self.connections.get() + + def release_connection(self, connection): + with self.lock: + self.connections.put(connection) + + def create_connection(self): + connection = sqlite3.connect(sqlite_db_file, check_same_thread=False) + return connection + +#for pool of connections if necessary +pool = ConnectionPool(10) +#for one shared connection (used for writes only in WAL mode) +insert_conn = sqlite3.connect(sqlite_db_file, check_same_thread=False) +insert_queue = queue.Queue() \ No newline at end of file diff --git a/v2realbot/controller/services.py b/v2realbot/controller/services.py index 4886dc9..2243c86 100644 --- a/v2realbot/controller/services.py +++ b/v2realbot/controller/services.py @@ -22,7 +22,7 @@ import pandas as pd from traceback import format_exc from datetime import timedelta, time from threading import Lock -from v2realbot.common.db import conn +from v2realbot.common.db import pool #adding lock to ensure thread safety of TinyDB (in future will be migrated to proper db) lock = Lock() @@ -602,9 +602,14 @@ def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange): #returns number of deleted elements def delete_archive_detail_byID(id: UUID): - c = conn.cursor() - res = c.execute(f"DELETE from runner_detail WHERE runner_id='{str(id)}';") - print("deleted", res.rowcount) + conn = pool.get_connection() + try: + c = conn.cursor() + res = c.execute(f"DELETE from runner_detail WHERE runner_id='{str(id)}';") + conn.commit() + print("deleted", res.rowcount) + finally: + pool.release_connection(conn) return res.rowcount # def get_all_archived_runners_detail_old(): @@ -612,9 +617,13 @@ def delete_archive_detail_byID(id: UUID): # return 0, res def get_all_archived_runners_detail(): - conn.row_factory = lambda c, r: json.loads(r[0]) - c = conn.cursor() - res = c.execute(f"SELECT data FROM runner_detail") + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + res = c.execute(f"SELECT data FROM runner_detail") + finally: + pool.release_connection(conn) return 0, res.fetchall() # def get_archived_runner_details_byID_old(id: UUID): @@ -626,19 +635,28 @@ def get_all_archived_runners_detail(): #vrátí konkrétní def get_archived_runner_details_byID(id: UUID): - conn.row_factory = lambda c, r: json.loads(r[0]) - c = conn.cursor() - result = c.execute(f"SELECT data FROM runner_detail WHERE runner_id='{str(id)}'") - res= result.fetchone() + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + result = c.execute(f"SELECT data FROM runner_detail WHERE runner_id='{str(id)}'") + res= result.fetchone() + finally: + pool.release_connection(conn) if res==None: return -2, "not found" else: return 0, res def insert_archive_detail(archdetail: RunArchiveDetail): - c = conn.cursor() - json_string = json.dumps(archdetail, default=json_serial) - res = c.execute("INSERT INTO runner_detail VALUES (?,?)",[str(archdetail.id), json_string]) + conn = pool.get_connection() + try: + c = conn.cursor() + json_string = json.dumps(archdetail, default=json_serial) + res = c.execute("INSERT INTO runner_detail VALUES (?,?)",[str(archdetail.id), json_string]) + conn.commit() + finally: + pool.release_connection(conn) return res.rowcount #returns b diff --git a/v2realbot/loader/__pycache__/aggregator.cpython-310.pyc b/v2realbot/loader/__pycache__/aggregator.cpython-310.pyc index 2db0c226997d623cd66e0d992746a9993d5ddff8..8602c0e944cbb1ffce905d6107359431f9adbdba 100644 GIT binary patch delta 755 zcmZ8f&ubGw6n<|vWSiY=nl#yDH=7^ZYOSeC(V#`hNf8f%9ts|c5~LGCHf>fmTNK>T zLk=FaT9}Ji!4|zJ2ucom_770hTktP1caP%ROU9nJxwLFyvQypTytF1zH5OfsM??4Vm?3Ptsf$l}2|W(bwM zC}G)vJ`$pK`ly3fZUcF4CPJ}Aroe3RkMZhxtp?s~&=$)YFiz7K#(QGsLX_7If%IBs|H#sb|$vXikUil=*a`a=Fk)`YaJDa=i(? zDQ}Uu{9){zKecx%i0mUmiwf8ClSmzdxXu&s&kLl_~xORg1z; delta 819 zcmZ`%OKTHR6h3DXI?YUG%zGYbp0#R{$=psc|g3vMu`UC6_P;?RWCx}aT>Y16gXu-M6z326P=bU@)$GJE2N?TDRps)O! zvx|Gm>j+xzyU25lKV@=v%mFLKlog@#sr8UA$>;Dq-;!slU}Xo`F>PrmJ0T_+u!CLE z5$y>_v~q+=W`LwTOZEjFx?(HBpDD>rX;^T?9b_pTC??Gk3(yh+{GraG4%CaDMrVmB zv}YU$gH>kgJP1%PGlf>#-#v*IMVDD-TcTJ(S(C&l8n7Zzg-c8(tKJHEymNIuaFJ=! z9t#rpkR)odhdS6NSAakd{350ZD~ss{1S~xv(8+V!myBZ`gMW?| zHcbPVss#<36no}~#bYDPflvEaV%aRPiGv)u1#rfgOf3nW3x_=rI!|_Q6hbHVzfq)) z&w>IA{IOD13O+dLms8{)luDu0Lz;D%=+>Li9k-{+XCB#ZaYZ%c(hedtDQ&&Pt7`3v zOEG5&6(ZC&$;E>+%?bw4d+@xaK5FoHfqsq`O;`l~ii^ag<-nR^@&*uok5K6*E>fQ0 zuax;W^()WCj7YQp{q6AQ{_Z9I{=}W)b$Ym?aQkkfH459yYmJfAX{@(bH|*thtAd@Y z{6_30&&Q|vkJt>q7VqL0z8}9SjgPB1fr)S?'+ 'dnes ' + format_date(data,false,true)+'' } else { //return local datetime - return format_date(data,false,false) + return '
'+ format_date(data,false,false)+'
' } - - }, }, { @@ -282,7 +284,7 @@ var archiveRecords = { targets: [2], render: function ( data, type, row ) { - return '
'+data+'' + return '
'+data+'
' }, }, { @@ -290,13 +292,13 @@ var archiveRecords = render: function ( data, type, row ) { var res = JSON.stringify(data) const unquoted = res.replace(/"([^"]+)":/g, '$1:') - return '
'+unquoted+'' + return '
'+unquoted+'
' }, }, { targets: [4], render: function ( data, type, row ) { - return '
'+data+'' + return '
'+data+'
' }, }, { diff --git a/v2realbot/strategy/__pycache__/base.cpython-310.pyc b/v2realbot/strategy/__pycache__/base.cpython-310.pyc index 595d2104feb94e8024c98ea0fafec9d5b254c150..70879fbb2612548b989728b872a6f0d12c4b4133 100644 GIT binary patch delta 2260 zcmZvdYfKzf6vw&4vfY9_mI5x6c8jo|Iy*s{B`+?j=z zs--nHlE#MQHr5u5CXFK6*675TNPO2veN@{TYvzkKF?{fmXpHfLN%j8kr0jNu4?oU5 zuQ~VJbMOA8{iEe*+1Z(C{O_R+Gr^3p*K?|*;^I(LSE-?dwLv8k4;!IaSXD;iYTVo* z{h2{5w#79_I?P$u6shj!p>!AU2(TO21N3mn%V{&@7NDE;XEtr!32_}DEFv7POXaYf zx4!paT%bwe?zKd353D5BCZOkxLlkOr7k;ByW_9AY&E#d(JyCT+y*DY!)P zj;2P{7S%h)pnQ}K6b*V8i?q@MQDaZKp$2bD@@vt8L`K=QVx?g(qFjslsv-3Oq8_^v zEJJ_??>-JOi^Fp0h8E+MUoY_%rj`q*J~L~jhh!zCV(UvQ`8nPMMd> z_DQXjf6#J@;nQghU*}s(qfuZJ(9dDHbi;3iqCs6wr`Qi22M!8}MJk31HHa8#D?O}6 z?TyFSZxwg?3|KAzw+C8|Q9sq`Yp}e*WM6sVG{Up_#iQb-35WP?e}?&eYn|fIM%f9U zr|>lVQyO&6ocFy$cArOlCRKGU95+H*lz(B&9}`=#GI;Uo0gBtm&!QLqB+EU-KcUfs z3RMSGszw7UeRqlUBkaoBpPL>-c5=WXF^lLH3D8qQF-||@|7D4Oj>s>7r0UGyE1QV( zJ!tz`U6rTxB1F*$R7*bwJ^@6-KZS%E?HrdDC6Tfd%Q$~3$P3QXe8wnMTX2uMeP<3Jy95-10TfwKTc-=5QBkR~~r z=@i6s9M&CTN&*qp7}A3DLFh+-D?l7bw8q|%d9^x&kp1TO>v|+7Mv!A6cUQyXSc@N}dmyy}8-aAdt{74but%LeT;I^}HeXxL7}W+s27MmJIu1F@eg>+c z5I=hLkP;hrCv~V3rY^QnKS@rTu?-Hg=|`@)bpfA)RVW zjsT){1CYdi#6h88aW6!qc>HCjU9GrsKd^rrit;+(B_OIh!+eeRmtTeQ8t^TLwMk#IjZY?so~bQc#~X%5on^4v?Im7D2L=0f8sscN3D XEJr*Z3W|xRGG7aq|7B#e8{7W_Uql3R delta 2310 zcma)-e{54#6vw%3SGvxPbz@_UvNB+@j_D}d;*WsQu>}bkFf&Npz*^eJ*7dcop7%O_ zX$B@FQ~!YEs(=_xh$0iFAx}+|OvK;d5BwSRCZLIlCWe31D1Z2m_j{kLrP&|uk5A7% z_uTW&Irp5qe~iD33ZuN-KTH0-e&={%YV=G+i>tainK3o8qPj5@P3Lqgnbqw+*PkV9 zfcN-@*hYKOcZjimKIyNk+ANQTOKG+Ip?iRSKJ4$T+yJo^P&@tnq(3-UcukA6SeiCs zLt&GB*+1XS2Klw}akiE3s|enzj;?@36nGHW4y*wRj$+wsU#wW=svD59@W--QEt80D zkCE_5n3UoTfsO@mC_G6^OVipS7d#!fHZKk39TL~`sDL8F_f{YRlo@hjE!l_`iSn@UI;_swl;+W~P1a7hSv#@H!Jtz=p=tynrs zd$3dZp{;W-u*&_=7JBNejd(|)+{^Xaq0n@U!X3|818z%8+~DMo+Fx8O$A6d~Z6Cxb z-?YEwkW{f%jQQw>v4Ie2jHl{jfyW`1NeGW=8CjXl<@!)fakwyw+JOaitgOHl?+7-_ zd)^id)ge;Bl7-RX47Z;Q-pgw8XPhr%Yajo8;bz%>Ya6@(e`oP9a?kCi?BQayab#CiVRd5@y&yOhMurK!FQM9hS6>srQ16W&A# z#>*7`b(w^=tB1%SGGRLj+~`VpMlqYOVCM{PYg$tC3_O%e52T(PJt?1wr}$9QvUYWa zn7aY5e2V|+76AhOkAU>8N6lrcgue3d>C%nF2S;bnS#aXX{FdhAIaaai!R{gMW7%U`|svX zR`xcus4bo2k(RpEsuZWOejoS%P?zu_B*f_W_;^}UE;nIm@h4i$G4*VyiuvL(U5{?cP z5`#1X@i>6-aJu&Zq;cRNa7selq?#$7)~sP8LF=H;0$%_Ykgt}Lr2YN!62_ADmDVjT zFFKY^^FVpE-P?9M8#{;fJHUIudEi|@mAI&`s-&v8s-!4xcsAi4=@Z~n;4|QJ0QX6k zB*IKJYj`2O0oC1rGXvZcS?qlT>wexEsjG3q!`c9xX6N@u+S}ihYvIk3u_I~GbJ$!V z5iWBc0`=CmLK^|{S@Pbt(m+8;HetJuUyVG*j@Y9sJ#6epxcP4|pT(|Hc%1Q4il44) z37-ki`9DvUs+@*{U4XhLq@X56`BDEV!rqlm0&3{4NCV+b?TpcoslG4VWb7=C{wQDG z(G&d=&b|V^#;MuSXu^m`qXi9#Zg$$Y0TMcw76ab`eZWzmxL=@4@=F~xzN8extLbO^ cYR6$$%Ug0KJh@yl;rI$GL!j6LpS*3uzdn!={timestamp_from} AND time <={timestamp_to} ORDER BY time") + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + res = c.execute(f"SELECT data FROM runner_logs WHERE runner_id='{str(runner_id)}' AND time >={timestamp_from} AND time <={timestamp_to} ORDER BY time") + finally: + pool.release_connection(conn) return res.fetchall() #returns number of deleted elements def delete_logs(runner_id: UUID): - c = conn.cursor() - res = c.execute(f"DELETE from runner_logs WHERE runner_id='{str(runner_id)}';") - print(res.rowcount) - conn.commit() + conn = pool.get_connection() + try: + c = conn.cursor() + res = c.execute(f"DELETE from runner_logs WHERE runner_id='{str(runner_id)}';") + print(res.rowcount) + conn.commit() + finally: + pool.release_connection(conn) return res.rowcount - # print(insert_log(str(uuid4()), datetime.now().timestamp(), insert)) # c = conn.cursor() # ts_from = 1683108821.08872