insert via queue a single writer

This commit is contained in:
David Brazda
2023-06-28 20:14:30 +02:00
parent 3880e145a2
commit f3821a8f4f
10 changed files with 160 additions and 59 deletions

View File

@ -132,7 +132,7 @@ def migrate():
res, set = get_all_archived_runners_detail()
print(f"fetched {len(set)}")
for row in set:
insert_archive_detail(row)
#insert_archive_detail(row)
print(f"inserted {row['id']}")
bars = {'high': [],

View File

@ -1,5 +1,33 @@
from v2realbot.config import DATA_DIR
import sqlite3
import queue
import threading
sqlite_db_file = DATA_DIR + "/v2trading.db"
conn = sqlite3.connect(sqlite_db_file, check_same_thread=False, isolation_level=None)
# Define the connection pool
class ConnectionPool:
def __init__(self, max_connections):
self.max_connections = max_connections
self.connections = queue.Queue(max_connections)
self.lock = threading.Lock()
def get_connection(self):
with self.lock:
if self.connections.empty():
return self.create_connection()
else:
return self.connections.get()
def release_connection(self, connection):
with self.lock:
self.connections.put(connection)
def create_connection(self):
connection = sqlite3.connect(sqlite_db_file, check_same_thread=False)
return connection
#for pool of connections if necessary
pool = ConnectionPool(10)
#for one shared connection (used for writes only in WAL mode)
insert_conn = sqlite3.connect(sqlite_db_file, check_same_thread=False)
insert_queue = queue.Queue()

View File

@ -22,7 +22,7 @@ import pandas as pd
from traceback import format_exc
from datetime import timedelta, time
from threading import Lock
from v2realbot.common.db import conn
from v2realbot.common.db import pool
#adding lock to ensure thread safety of TinyDB (in future will be migrated to proper db)
lock = Lock()
@ -602,9 +602,14 @@ def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange):
#returns number of deleted elements
def delete_archive_detail_byID(id: UUID):
c = conn.cursor()
res = c.execute(f"DELETE from runner_detail WHERE runner_id='{str(id)}';")
print("deleted", res.rowcount)
conn = pool.get_connection()
try:
c = conn.cursor()
res = c.execute(f"DELETE from runner_detail WHERE runner_id='{str(id)}';")
conn.commit()
print("deleted", res.rowcount)
finally:
pool.release_connection(conn)
return res.rowcount
# def get_all_archived_runners_detail_old():
@ -612,9 +617,13 @@ def delete_archive_detail_byID(id: UUID):
# return 0, res
def get_all_archived_runners_detail():
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
res = c.execute(f"SELECT data FROM runner_detail")
conn = pool.get_connection()
try:
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
res = c.execute(f"SELECT data FROM runner_detail")
finally:
pool.release_connection(conn)
return 0, res.fetchall()
# def get_archived_runner_details_byID_old(id: UUID):
@ -626,19 +635,28 @@ def get_all_archived_runners_detail():
#vrátí konkrétní
def get_archived_runner_details_byID(id: UUID):
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
result = c.execute(f"SELECT data FROM runner_detail WHERE runner_id='{str(id)}'")
res= result.fetchone()
conn = pool.get_connection()
try:
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
result = c.execute(f"SELECT data FROM runner_detail WHERE runner_id='{str(id)}'")
res= result.fetchone()
finally:
pool.release_connection(conn)
if res==None:
return -2, "not found"
else:
return 0, res
def insert_archive_detail(archdetail: RunArchiveDetail):
c = conn.cursor()
json_string = json.dumps(archdetail, default=json_serial)
res = c.execute("INSERT INTO runner_detail VALUES (?,?)",[str(archdetail.id), json_string])
conn = pool.get_connection()
try:
c = conn.cursor()
json_string = json.dumps(archdetail, default=json_serial)
res = c.execute("INSERT INTO runner_detail VALUES (?,?)",[str(archdetail.id), json_string])
conn.commit()
finally:
pool.release_connection(conn)
return res.rowcount
#returns b

View File

@ -198,16 +198,19 @@ class TradeAggregator:
#zkousime potvrzeni baru dat o chlup mensi cas nez cas noveho baru, ktery jde hned za nim
#gui neumi zobrazit duplicity a v RT grafu nejde upravovat zpetne
#zarovname na cas baru podle timeframu(např. 5, 10, 15 ...) (ROUND)
if self.align:
t = datetime.fromtimestamp(data['t'])
t = t - timedelta(seconds=t.second % self.timeframe,microseconds=t.microsecond)
#nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM)
else:
#ulozime si jeho timestamp (odtum pocitame timeframe)
t = datetime.fromtimestamp(int(data['t']))
#self.newBar['updated'] = float(data['t']) - 0.001
self.newBar['updated'] = datetime.timestamp(t) - 0.000001
#MUSIME VRATIT ZPET - ten upraveny cas způsobuje spatne plneni v BT, kdyz tento bar triggeruje nakup
# if self.align:
# t = datetime.fromtimestamp(data['t'])
# t = t - timedelta(seconds=t.second % self.timeframe,microseconds=t.microsecond)
# #nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM)
# else:
# #ulozime si jeho timestamp (odtum pocitame timeframe)
# t = datetime.fromtimestamp(int(data['t']))
# #self.newBar['updated'] = float(data['t']) - 0.001
# self.newBar['updated'] = datetime.timestamp(t) - 0.000001
self.newBar['updated'] = data['t']
#PRO standardní BAR nechavame puvodni
else:
self.newBar['updated'] = data['t']

View File

@ -26,6 +26,8 @@ import json
from queue import Queue, Empty
from threading import Thread
import asyncio
from v2realbot.common.db import insert_queue, insert_conn
from v2realbot.utils.utils import json_serial
#from async io import Queue, QueueEmpty
# install()
@ -329,14 +331,41 @@ def _get_alpaca_history_bars(symbol: str, datetime_object_from: datetime, dateti
else:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No data found {res} {set}")
# Thread function to insert data from the queue into the database
def insert_queue2db():
print("starting insert_queue2db thread")
while True:
# Retrieve data from the queue
data = insert_queue.get()
# Unpack the data
runner_id, loglist = data
c = insert_conn.cursor()
insert_data = []
for i in loglist:
row = (str(runner_id), i["time"], json.dumps(i, default=json_serial))
insert_data.append(row)
c.executemany("INSERT INTO runner_logs VALUES (?,?,?)", insert_data)
insert_conn.commit()
# Mark the task as done in the queue
#join cekej na dokonceni vsech
for i in cs.db.runners:
i.run_thread.join()
if __name__ == "__main__":
uvicorn.run("__main__:app", host="0.0.0.0", port=8000, reload=False)
try:
#TOTO predelat na samostatnou tridu typu vlakna a dat do separatniho souboru, draft jiz na chatgpt
#spusteni vlakna pro zapis logů (mame single write vlakno, thready dodávají pres queue)
insert_thread = Thread(target=insert_queue2db)
insert_thread.start()
uvicorn.run("__main__:app", host="0.0.0.0", port=8000, reload=False)
finally:
print("closing insert_conn connection")
insert_conn.close()
print("closed")
##TODO pridat moznost behu na PAPER a LIVE per strategie
# zjistit zda order notification websocket muze bezet na obou soucasne

View File

@ -254,18 +254,20 @@ var archiveRecords =
if (type == "sort") {
return new Date(data).getTime();
}
var date = new Date(data);
tit = date.toLocaleString('cs-CZ', {
timeZone: 'America/New_York',
})
if (isToday(now)) {
//return local time only
return 'dnes ' + format_date(data,false,true)
return '<div title="'+tit+'">'+ 'dnes ' + format_date(data,false,true)+'</div>'
}
else
{
//return local datetime
return format_date(data,false,false)
return '<div title="'+tit+'">'+ format_date(data,false,false)+'</div>'
}
},
},
{
@ -282,7 +284,7 @@ var archiveRecords =
{
targets: [2],
render: function ( data, type, row ) {
return '<div class="tdname" title="'+data+'">'+data+'</i>'
return '<div class="tdname" title="'+data+'">'+data+'</div>'
},
},
{
@ -290,13 +292,13 @@ var archiveRecords =
render: function ( data, type, row ) {
var res = JSON.stringify(data)
const unquoted = res.replace(/"([^"]+)":/g, '$1:')
return '<div class="tdmetrics" title="'+unquoted+'">'+unquoted+'</i>'
return '<div class="tdmetrics" title="'+unquoted+'">'+unquoted+'</div>'
},
},
{
targets: [4],
render: function ( data, type, row ) {
return '<div class="tdnote" title="'+data+'">'+data+'</i>'
return '<div class="tdnote" title="'+data+'">'+data+'</div>'
},
},
{

View File

@ -4,7 +4,7 @@
from datetime import datetime
from v2realbot.utils.utils import AttributeDict, zoneNY, is_open_rush, is_close_rush, json_serial, print, safe_get, Average
from v2realbot.utils.tlog import tlog
from v2realbot.utils.ilog import insert_log, insert_log_multiple
from v2realbot.utils.ilog import insert_log, insert_log_multiple_queue
from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Order, Account
from v2realbot.config import BT_DELAYS, get_key, HEARTBEAT_TIMEOUT, QUIET_MODE, LOG_RUNNER_EVENTS
import queue
@ -534,8 +534,8 @@ class Strategy:
#cleaning iterlog lsit
#TODO pridat cistku i mimo RT blok
if self.ilog_save: insert_log_multiple(self.state.runner_id, self.state.iter_log_list)
#vlozime do queue, odtud si to bere single zapisovaci thread
if self.ilog_save: insert_log_multiple_queue(self.state.runner_id, self.state.iter_log_list)
#smazeme logy
self.state.iter_log_list = []

View File

@ -4,7 +4,9 @@ from uuid import UUID, uuid4
import json
from datetime import datetime
from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account
from v2realbot.common.db import conn
from v2realbot.common.db import pool, insert_queue
import sqlite3
#standardne vraci pole tuplů, kde clen tuplu jsou sloupce
#conn.row_factory = lambda c, r: json.loads(r[0])
@ -22,42 +24,61 @@ from v2realbot.common.db import conn
#insert = dict(time=datetime.now(), side="ddd", rectype=RecordType.BAR, id=uuid4())
#insert_list = [dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4()),dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4()),dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4()),dict(time=datetime.now().timestamp(), side="ddd", rectype=RecordType.BAR, id=uuid4())]
#TOTO PREDELAT NA OBJEKT, který bude mít per thread svoji connectionu
#returns rowcount of inserted rows
def insert_log(runner_id: UUID, time: float, logdict: dict):
c = conn.cursor()
json_string = json.dumps(logdict, default=json_serial)
res = c.execute("INSERT INTO runner_logs VALUES (?,?,?)",[str(runner_id), time, json_string])
conn.commit()
conn = pool.get_connection()
try:
c = conn.cursor()
json_string = json.dumps(logdict, default=json_serial)
res = c.execute("INSERT INTO runner_logs VALUES (?,?,?)",[str(runner_id), time, json_string])
conn.commit()
finally:
pool.release_connection(conn)
return res.rowcount
#returns rowcount of inserted rows
def insert_log_multiple(runner_id: UUID, loglist: list):
c = conn.cursor()
insert_data = []
for i in loglist:
row = (str(runner_id), i["time"], json.dumps(i, default=json_serial))
insert_data.append(row)
c.executemany("INSERT INTO runner_logs VALUES (?,?,?)", insert_data)
#conn.commit()
return c.rowcount
#single connection in WAL mode
def insert_log_multiple_queue(runner_id:UUID, loglist: list):
insert_queue.put((runner_id, loglist))
# def insert_log_multiple(runner_id: UUID, loglist: list):
# conn = sqlite3.connect(sqlite_db_file, check_same_thread=False)
# c = conn.cursor()
# insert_data = []
# for i in loglist:
# row = (str(runner_id), i["time"], json.dumps(i, default=json_serial))
# insert_data.append(row)
# c.executemany("INSERT INTO runner_logs VALUES (?,?,?)", insert_data)
# conn.commit()
# return c.rowcount
#returns list of ilog jsons
def get_log_window(runner_id: UUID, timestamp_from: float = 0, timestamp_to: float = 9682851459):
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
res = c.execute(f"SELECT data FROM runner_logs WHERE runner_id='{str(runner_id)}' AND time >={timestamp_from} AND time <={timestamp_to} ORDER BY time")
conn = pool.get_connection()
try:
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
res = c.execute(f"SELECT data FROM runner_logs WHERE runner_id='{str(runner_id)}' AND time >={timestamp_from} AND time <={timestamp_to} ORDER BY time")
finally:
pool.release_connection(conn)
return res.fetchall()
#returns number of deleted elements
def delete_logs(runner_id: UUID):
c = conn.cursor()
res = c.execute(f"DELETE from runner_logs WHERE runner_id='{str(runner_id)}';")
print(res.rowcount)
conn.commit()
conn = pool.get_connection()
try:
c = conn.cursor()
res = c.execute(f"DELETE from runner_logs WHERE runner_id='{str(runner_id)}';")
print(res.rowcount)
conn.commit()
finally:
pool.release_connection(conn)
return res.rowcount
# print(insert_log(str(uuid4()), datetime.now().timestamp(), insert))
# c = conn.cursor()
# ts_from = 1683108821.08872