diff --git a/testy/migrace/migracesql.py b/testy/migrace/migracesql.py index 3bdb177..7965878 100644 --- a/testy/migrace/migracesql.py +++ b/testy/migrace/migracesql.py @@ -132,7 +132,7 @@ def migrate(): res, set = get_all_archived_runners_detail() print(f"fetched {len(set)}") for row in set: - insert_archive_detail(row) + #insert_archive_detail(row) print(f"inserted {row['id']}") bars = {'high': [], diff --git a/v2realbot/common/db.py b/v2realbot/common/db.py index 4e2d43a..91ffa32 100644 --- a/v2realbot/common/db.py +++ b/v2realbot/common/db.py @@ -1,5 +1,33 @@ from v2realbot.config import DATA_DIR import sqlite3 +import queue +import threading sqlite_db_file = DATA_DIR + "/v2trading.db" -conn = sqlite3.connect(sqlite_db_file, check_same_thread=False, isolation_level=None) \ No newline at end of file +# Define the connection pool +class ConnectionPool: + def __init__(self, max_connections): + self.max_connections = max_connections + self.connections = queue.Queue(max_connections) + self.lock = threading.Lock() + + def get_connection(self): + with self.lock: + if self.connections.empty(): + return self.create_connection() + else: + return self.connections.get() + + def release_connection(self, connection): + with self.lock: + self.connections.put(connection) + + def create_connection(self): + connection = sqlite3.connect(sqlite_db_file, check_same_thread=False) + return connection + +#for pool of connections if necessary +pool = ConnectionPool(10) +#for one shared connection (used for writes only in WAL mode) +insert_conn = sqlite3.connect(sqlite_db_file, check_same_thread=False) +insert_queue = queue.Queue() \ No newline at end of file diff --git a/v2realbot/controller/services.py b/v2realbot/controller/services.py index 4886dc9..2243c86 100644 --- a/v2realbot/controller/services.py +++ b/v2realbot/controller/services.py @@ -22,7 +22,7 @@ import pandas as pd from traceback import format_exc from datetime import timedelta, time from threading import Lock -from v2realbot.common.db import conn +from v2realbot.common.db import pool #adding lock to ensure thread safety of TinyDB (in future will be migrated to proper db) lock = Lock() @@ -602,9 +602,14 @@ def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange): #returns number of deleted elements def delete_archive_detail_byID(id: UUID): - c = conn.cursor() - res = c.execute(f"DELETE from runner_detail WHERE runner_id='{str(id)}';") - print("deleted", res.rowcount) + conn = pool.get_connection() + try: + c = conn.cursor() + res = c.execute(f"DELETE from runner_detail WHERE runner_id='{str(id)}';") + conn.commit() + print("deleted", res.rowcount) + finally: + pool.release_connection(conn) return res.rowcount # def get_all_archived_runners_detail_old(): @@ -612,9 +617,13 @@ def delete_archive_detail_byID(id: UUID): # return 0, res def get_all_archived_runners_detail(): - conn.row_factory = lambda c, r: json.loads(r[0]) - c = conn.cursor() - res = c.execute(f"SELECT data FROM runner_detail") + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + res = c.execute(f"SELECT data FROM runner_detail") + finally: + pool.release_connection(conn) return 0, res.fetchall() # def get_archived_runner_details_byID_old(id: UUID): @@ -626,19 +635,28 @@ def get_all_archived_runners_detail(): #vrátí konkrétní def get_archived_runner_details_byID(id: UUID): - conn.row_factory = lambda c, r: json.loads(r[0]) - c = conn.cursor() - result = c.execute(f"SELECT data FROM runner_detail WHERE runner_id='{str(id)}'") - res= result.fetchone() + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + result = c.execute(f"SELECT data FROM runner_detail WHERE runner_id='{str(id)}'") + res= result.fetchone() + finally: + pool.release_connection(conn) if res==None: return -2, "not found" else: return 0, res def insert_archive_detail(archdetail: RunArchiveDetail): - c = conn.cursor() - json_string = json.dumps(archdetail, default=json_serial) - res = c.execute("INSERT INTO runner_detail VALUES (?,?)",[str(archdetail.id), json_string]) + conn = pool.get_connection() + try: + c = conn.cursor() + json_string = json.dumps(archdetail, default=json_serial) + res = c.execute("INSERT INTO runner_detail VALUES (?,?)",[str(archdetail.id), json_string]) + conn.commit() + finally: + pool.release_connection(conn) return res.rowcount #returns b diff --git a/v2realbot/loader/__pycache__/aggregator.cpython-310.pyc b/v2realbot/loader/__pycache__/aggregator.cpython-310.pyc index 2db0c22..8602c0e 100644 Binary files a/v2realbot/loader/__pycache__/aggregator.cpython-310.pyc and b/v2realbot/loader/__pycache__/aggregator.cpython-310.pyc differ diff --git a/v2realbot/loader/aggregator.py b/v2realbot/loader/aggregator.py index 8d8e669..1403f37 100644 --- a/v2realbot/loader/aggregator.py +++ b/v2realbot/loader/aggregator.py @@ -198,16 +198,19 @@ class TradeAggregator: #zkousime potvrzeni baru dat o chlup mensi cas nez cas noveho baru, ktery jde hned za nim #gui neumi zobrazit duplicity a v RT grafu nejde upravovat zpetne #zarovname na cas baru podle timeframu(např. 5, 10, 15 ...) (ROUND) - if self.align: - t = datetime.fromtimestamp(data['t']) - t = t - timedelta(seconds=t.second % self.timeframe,microseconds=t.microsecond) - #nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM) - else: - #ulozime si jeho timestamp (odtum pocitame timeframe) - t = datetime.fromtimestamp(int(data['t'])) - #self.newBar['updated'] = float(data['t']) - 0.001 - self.newBar['updated'] = datetime.timestamp(t) - 0.000001 + #MUSIME VRATIT ZPET - ten upraveny cas způsobuje spatne plneni v BT, kdyz tento bar triggeruje nakup + # if self.align: + # t = datetime.fromtimestamp(data['t']) + # t = t - timedelta(seconds=t.second % self.timeframe,microseconds=t.microsecond) + # #nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM) + # else: + # #ulozime si jeho timestamp (odtum pocitame timeframe) + # t = datetime.fromtimestamp(int(data['t'])) + + # #self.newBar['updated'] = float(data['t']) - 0.001 + # self.newBar['updated'] = datetime.timestamp(t) - 0.000001 + self.newBar['updated'] = data['t'] #PRO standardní BAR nechavame puvodni else: self.newBar['updated'] = data['t'] diff --git a/v2realbot/main.py b/v2realbot/main.py index 4710279..bc5e37a 100644 --- a/v2realbot/main.py +++ b/v2realbot/main.py @@ -26,6 +26,8 @@ import json from queue import Queue, Empty from threading import Thread import asyncio +from v2realbot.common.db import insert_queue, insert_conn +from v2realbot.utils.utils import json_serial #from async io import Queue, QueueEmpty # install() @@ -329,14 +331,41 @@ def _get_alpaca_history_bars(symbol: str, datetime_object_from: datetime, dateti else: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No data found {res} {set}") +# Thread function to insert data from the queue into the database +def insert_queue2db(): + print("starting insert_queue2db thread") + while True: + # Retrieve data from the queue + data = insert_queue.get() + + # Unpack the data + runner_id, loglist = data + + c = insert_conn.cursor() + insert_data = [] + for i in loglist: + row = (str(runner_id), i["time"], json.dumps(i, default=json_serial)) + insert_data.append(row) + c.executemany("INSERT INTO runner_logs VALUES (?,?,?)", insert_data) + insert_conn.commit() + # Mark the task as done in the queue #join cekej na dokonceni vsech for i in cs.db.runners: i.run_thread.join() if __name__ == "__main__": - uvicorn.run("__main__:app", host="0.0.0.0", port=8000, reload=False) + try: + #TOTO predelat na samostatnou tridu typu vlakna a dat do separatniho souboru, draft jiz na chatgpt + #spusteni vlakna pro zapis logů (mame single write vlakno, thready dodávají pres queue) + insert_thread = Thread(target=insert_queue2db) + insert_thread.start() + uvicorn.run("__main__:app", host="0.0.0.0", port=8000, reload=False) + finally: + print("closing insert_conn connection") + insert_conn.close() + print("closed") ##TODO pridat moznost behu na PAPER a LIVE per strategie # zjistit zda order notification websocket muze bezet na obou soucasne diff --git a/v2realbot/static/js/archivetables.js b/v2realbot/static/js/archivetables.js index 10bd6bc..44eddfc 100644 --- a/v2realbot/static/js/archivetables.js +++ b/v2realbot/static/js/archivetables.js @@ -254,18 +254,20 @@ var archiveRecords = if (type == "sort") { return new Date(data).getTime(); } - + var date = new Date(data); + tit = date.toLocaleString('cs-CZ', { + timeZone: 'America/New_York', + }) + if (isToday(now)) { //return local time only - return 'dnes ' + format_date(data,false,true) + return '
'+ 'dnes ' + format_date(data,false,true)+'
' } else { //return local datetime - return format_date(data,false,false) + return '
'+ format_date(data,false,false)+'
' } - - }, }, { @@ -282,7 +284,7 @@ var archiveRecords = { targets: [2], render: function ( data, type, row ) { - return '
'+data+'' + return '
'+data+'
' }, }, { @@ -290,13 +292,13 @@ var archiveRecords = render: function ( data, type, row ) { var res = JSON.stringify(data) const unquoted = res.replace(/"([^"]+)":/g, '$1:') - return '
'+unquoted+'' + return '
'+unquoted+'
' }, }, { targets: [4], render: function ( data, type, row ) { - return '
'+data+'' + return '
'+data+'
' }, }, { diff --git a/v2realbot/strategy/__pycache__/base.cpython-310.pyc b/v2realbot/strategy/__pycache__/base.cpython-310.pyc index 595d210..70879fb 100644 Binary files a/v2realbot/strategy/__pycache__/base.cpython-310.pyc and b/v2realbot/strategy/__pycache__/base.cpython-310.pyc differ diff --git a/v2realbot/strategy/base.py b/v2realbot/strategy/base.py index 1c4b1ed..4da838f 100644 --- a/v2realbot/strategy/base.py +++ b/v2realbot/strategy/base.py @@ -4,7 +4,7 @@ from datetime import datetime from v2realbot.utils.utils import AttributeDict, zoneNY, is_open_rush, is_close_rush, json_serial, print, safe_get, Average from v2realbot.utils.tlog import tlog -from v2realbot.utils.ilog import insert_log, insert_log_multiple +from v2realbot.utils.ilog import insert_log, insert_log_multiple_queue from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Order, Account from v2realbot.config import BT_DELAYS, get_key, HEARTBEAT_TIMEOUT, QUIET_MODE, LOG_RUNNER_EVENTS import queue @@ -534,8 +534,8 @@ class Strategy: #cleaning iterlog lsit #TODO pridat cistku i mimo RT blok - - if self.ilog_save: insert_log_multiple(self.state.runner_id, self.state.iter_log_list) + #vlozime do queue, odtud si to bere single zapisovaci thread + if self.ilog_save: insert_log_multiple_queue(self.state.runner_id, self.state.iter_log_list) #smazeme logy self.state.iter_log_list = [] diff --git a/v2realbot/utils/ilog.py b/v2realbot/utils/ilog.py index 5d322e8..251c85b 100644 --- a/v2realbot/utils/ilog.py +++ b/v2realbot/utils/ilog.py @@ -4,7 +4,9 @@ from uuid import UUID, uuid4 import json from datetime import datetime from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account -from v2realbot.common.db import conn +from v2realbot.common.db import pool, insert_queue +import sqlite3 + #standardne vraci pole tuplů, kde clen tuplu jsou sloupce #conn.row_factory = lambda c, r: json.loads(r[0]) @@ -22,42 +24,61 @@ from v2realbot.common.db import conn #insert = dict(time=datetime.now(), side="ddd", rectype=RecordType.BAR, id=uuid4()) #insert_list = [dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4()),dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4()),dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4()),dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4())] + +#TOTO PREDELAT NA OBJEKT, který bude mít per thread svoji connectionu + #returns rowcount of inserted rows def insert_log(runner_id: UUID, time: float, logdict: dict): - c = conn.cursor() - json_string = json.dumps(logdict, default=json_serial) - res = c.execute("INSERT INTO runner_logs VALUES (?,?,?)",[str(runner_id), time, json_string]) - conn.commit() + conn = pool.get_connection() + try: + c = conn.cursor() + json_string = json.dumps(logdict, default=json_serial) + res = c.execute("INSERT INTO runner_logs VALUES (?,?,?)",[str(runner_id), time, json_string]) + conn.commit() + finally: + pool.release_connection(conn) return res.rowcount #returns rowcount of inserted rows -def insert_log_multiple(runner_id: UUID, loglist: list): - c = conn.cursor() - insert_data = [] - for i in loglist: - row = (str(runner_id), i["time"], json.dumps(i, default=json_serial)) - insert_data.append(row) - c.executemany("INSERT INTO runner_logs VALUES (?,?,?)", insert_data) - #conn.commit() - return c.rowcount +#single connection in WAL mode +def insert_log_multiple_queue(runner_id:UUID, loglist: list): + insert_queue.put((runner_id, loglist)) + +# def insert_log_multiple(runner_id: UUID, loglist: list): +# conn = sqlite3.connect(sqlite_db_file, check_same_thread=False) +# c = conn.cursor() +# insert_data = [] +# for i in loglist: +# row = (str(runner_id), i["time"], json.dumps(i, default=json_serial)) +# insert_data.append(row) +# c.executemany("INSERT INTO runner_logs VALUES (?,?,?)", insert_data) +# conn.commit() +# return c.rowcount #returns list of ilog jsons def get_log_window(runner_id: UUID, timestamp_from: float = 0, timestamp_to: float = 9682851459): - conn.row_factory = lambda c, r: json.loads(r[0]) - c = conn.cursor() - res = c.execute(f"SELECT data FROM runner_logs WHERE runner_id='{str(runner_id)}' AND time >={timestamp_from} AND time <={timestamp_to} ORDER BY time") + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + res = c.execute(f"SELECT data FROM runner_logs WHERE runner_id='{str(runner_id)}' AND time >={timestamp_from} AND time <={timestamp_to} ORDER BY time") + finally: + pool.release_connection(conn) return res.fetchall() #returns number of deleted elements def delete_logs(runner_id: UUID): - c = conn.cursor() - res = c.execute(f"DELETE from runner_logs WHERE runner_id='{str(runner_id)}';") - print(res.rowcount) - conn.commit() + conn = pool.get_connection() + try: + c = conn.cursor() + res = c.execute(f"DELETE from runner_logs WHERE runner_id='{str(runner_id)}';") + print(res.rowcount) + conn.commit() + finally: + pool.release_connection(conn) return res.rowcount - # print(insert_log(str(uuid4()), datetime.now().timestamp(), insert)) # c = conn.cursor() # ts_from = 1683108821.08872