migrace z tinydb, nove direktivy

This commit is contained in:
David Brazda
2023-09-06 10:03:45 +02:00
parent a22aedf978
commit 76411c991a
11 changed files with 277 additions and 54 deletions

View File

@ -279,7 +279,7 @@ def save_history(id: UUID, st: object, runner: Runner, reason: str = None):
db.save()
#Capsule to run the thread in. Needed in order to update db after strat ends for any reason#
def capsule(target: object, db: object):
def capsule(target: object, db: object, inter_batch_params: dict = None):
#TODO zde odchytit pripadnou exceptionu a zapsat do history
#cil aby padnuti jedne nezpusobilo pad enginu
@ -305,7 +305,7 @@ def capsule(target: object, db: object):
#ukladame radek do historie (pozdeji refactor)
save_history(id=i.strat_id, st=target, runner=i, reason=reason)
#store in archive header and archive detail
archive_runner(runner=i, strat=target)
archive_runner(runner=i, strat=target, inter_batch_params=inter_batch_params)
#mazeme runner po skonceni instance
db.runners.remove(i)
@ -373,6 +373,9 @@ def batch_run_manager(id: UUID, runReq: RunRequest, testlist: TestList):
interval: Intervals
cnt_max = len(testlist.dates)
cnt = 0
#promenna pro sdileni mezi runy jednotlivych batchů (např. daily profit)
inter_batch_params = dict(batch_profit=0)
note_from_run_request = runReq.note
for intrvl in testlist.dates:
cnt += 1
interval = intrvl
@ -385,10 +388,10 @@ def batch_run_manager(id: UUID, runReq: RunRequest, testlist: TestList):
#předání atributů datetime.fromisoformat
runReq.bt_from = datetime.fromisoformat(interval.start)
runReq.bt_to = datetime.fromisoformat(interval.end)
runReq.note = f"Batch {batch_id} run #{cnt}/{cnt_max} Note:{interval.note}"
runReq.note = f"Batch {batch_id} #{cnt}/{cnt_max} {testlist.name} N:{interval.note} {note_from_run_request}"
#protoze jsme v ridicim vlaknu, poustime za sebou jednotlive stratiny v synchronnim modu
res, id_val = run_stratin(id=id, runReq=runReq, synchronous=True)
res, id_val = run_stratin(id=id, runReq=runReq, synchronous=True, inter_batch_params=inter_batch_params)
if res < 0:
print(f"CHyba v runu #{cnt} od:{runReq.bt_from} do {runReq.bt_to} -> {id_val}")
break
@ -397,7 +400,7 @@ def batch_run_manager(id: UUID, runReq: RunRequest, testlist: TestList):
#stratin run
def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False):
def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False, inter_batch_params: dict = None):
if runReq.mode == Mode.BT:
if runReq.bt_from is None:
return (-1, "start date required for BT")
@ -469,7 +472,7 @@ def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False):
print("Starting strategy", instance.name)
#vlakno = Thread(target=instance.start, name=instance.name)
#pokus na spusteni v kapsli, abychom po skonceni mohli updatnout stratin
vlakno = Thread(target=capsule, args=(instance,db), name=instance.name)
vlakno = Thread(target=capsule, args=(instance,db, inter_batch_params), name=instance.name)
vlakno.start()
print("Spuštěna", instance.name)
##storing the attributtes - pozor pri stopu je zase odstranit
@ -498,7 +501,10 @@ def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False):
print(f"waiting for thread {vlakno} to finish")
vlakno.join()
return (0, id)
if inter_batch_params is not None:
return (0, inter_batch_params)
else:
return (0, id)
except Exception as e:
return (-2, "Exception: "+str(e)+format_exc())
return (-2, "not found")
@ -517,7 +523,7 @@ def get_trade_history(symbol: str, timestamp_from: float, timestamp_to:float):
except Exception as e:
return (-2, f"problem {e}")
def populate_metrics_output_directory(strat: StrategyInstance):
def populate_metrics_output_directory(strat: StrategyInstance, inter_batch_params: dict = None):
"""
WIP
Spocte zakladni metriky pred ulozenim do archivu
@ -560,6 +566,10 @@ def populate_metrics_output_directory(strat: StrategyInstance):
#filt = max_positions['side'] == 'OrderSide.BUY'
res = dict(zip(max_positions['qty'], max_positions['count']))
#naplneni batch sum profitu
if inter_batch_params is not None:
res["batch_sum_profit"] = inter_batch_params["batch_profit"]
#metrikz z prescribedTrades, pokud existuji
try:
long_profit = 0
@ -605,7 +615,7 @@ def populate_metrics_output_directory(strat: StrategyInstance):
return res
#archives runner and details
def archive_runner(runner: Runner, strat: StrategyInstance):
def archive_runner(runner: Runner, strat: StrategyInstance, inter_batch_params: dict = None):
results_metrics = dict()
print("inside archive_runner")
try:
@ -625,12 +635,17 @@ def archive_runner(runner: Runner, strat: StrategyInstance):
BT_FILL_CONDITION_BUY_LIMIT=BT_FILL_CONDITION_BUY_LIMIT,
BT_FILL_CONDITION_SELL_LIMIT=BT_FILL_CONDITION_SELL_LIMIT))
#add profit of this batch iteration to batch_sum_profit
if inter_batch_params is not None:
inter_batch_params["batch_profit"] += round(float(strat.state.profit),2)
#WIP
#populate result metrics dictionary (max drawdown etc.)
#list of maximum positions (2000 2x, 1800 x 1, 900 x 1, 100 x 20)
#list of most profitable trades (pos,avgp + cena)
#file pro vyvoj: ouptut_metriky_tradeList.py
results_metrics = populate_metrics_output_directory(strat)
results_metrics = populate_metrics_output_directory(strat, inter_batch_params)
runArchive: RunArchive = RunArchive(id = runner.id,
strat_id = runner.strat_id,
@ -697,7 +712,8 @@ def archive_runner(runner: Runner, strat: StrategyInstance):
trades=strat.state.tradeList,
ext_data=strat.state.extData)
with lock:
resh = db_arch_h.insert(runArchive.__dict__)
#resh = db_arch_h.insert(runArchive.__dict__)
resh = insert_archive_header(runArchive)
resd = insert_archive_detail(runArchiveDetail)
#resd = db_arch_d.insert(runArchiveDetail.__dict__)
print("archive runner finished")
@ -706,37 +722,116 @@ def archive_runner(runner: Runner, strat: StrategyInstance):
print("Exception in archive_runner: " + str(e) + format_exc())
return -2, str(e) + format_exc()
# region ARCH HEADER
def migrate_archived_runners() -> list[RunArchive]:
try:
res = db_arch_h.all()
#migration part
for item in res:
r = insert_archive_header(RunArchive(**item))
print("migrated",r)
return 0, r
except Exception as e:
print("Exception in migration: " + str(e) + format_exc())
return -2, str(e) + format_exc()
def get_all_archived_runners():
res = db_arch_h.all()
return 0, res
conn = pool.get_connection()
try:
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
res = c.execute(f"SELECT data FROM runner_header")
finally:
conn.row_factory = None
pool.release_connection(conn)
return 0, res.fetchall()
#vrátí konkrétní
def get_archived_runner_header_byID(id: UUID):
conn = pool.get_connection()
try:
conn.row_factory = lambda c, r: json.loads(r[0])
c = conn.cursor()
result = c.execute(f"SELECT data FROM runner_header WHERE runner_id='{str(id)}'")
res= result.fetchone()
finally:
conn.row_factory = None
pool.release_connection(conn)
if res==None:
return -2, "not found"
else:
return 0, res
def insert_archive_header(archeader: RunArchive):
conn = pool.get_connection()
try:
c = conn.cursor()
json_string = json.dumps(archeader, default=json_serial)
statement = f"INSERT INTO runner_header VALUES ('{str(archeader.id)}','{json_string}')"
res = c.execute(statement)
conn.commit()
finally:
pool.release_connection(conn)
return res.rowcount
#edit archived runner note - headers
def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange):
try:
res, sada = get_archived_runner_header_byID(id=runner_id)
if res == 0:
archOriginal = RunArchive(**sada)
archOriginal.note = archChange.note
try:
conn = pool.get_connection()
c = conn.cursor()
json_string = json.dumps(archOriginal, default=json_serial)
statement = f"UPDATE runner_header SET data = '{json_string}' WHERE runner_id='{str(runner_id)}'"
res = c.execute(statement)
#print(res)
conn.commit()
finally:
pool.release_connection(conn)
return 0, runner_id
else:
return -1, f"Could not find arch runner {runner_id} {res} {sada}"
except Exception as e:
return -2, str(e)
#delete runner in archive and archive detail and runner logs
def delete_archived_runners_byID(id: UUID):
try:
with lock:
print("before header del")
resh = db_arch_h.remove(where('id') == id)
print("before detail del")
print("header del")
resh = delete_archive_header_byID(id)
#resh = db_arch_h.remove(where('id') == id)
print("detail del")
#resd = db_arch_d.remove(where('id') == id)
resd = delete_archive_detail_byID(id)
print("Arch header and detail removed. Log deletition will start.")
reslogs = delete_logs(id)
if len(resh) == 0 or resd == 0:
if resh == 0 or resd == 0:
return -1, "not found "+str(resh) + " " + str(resd) + " " + str(reslogs)
return 0, str(resh) + " " + str(resd) + " " + str(reslogs)
except Exception as e:
return -2, str(e)
#edit archived runner note
def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange):
#returns number of deleted elements
def delete_archive_header_byID(id: UUID):
conn = pool.get_connection()
try:
with lock:
res = db_arch_h.update(set('note', archChange.note), where('id') == str(runner_id))
if len(res) == 0:
return -1, "not found "+str(runner_id)
return 0, runner_id
except Exception as e:
return -2, str(e)
c = conn.cursor()
res = c.execute(f"DELETE from runner_header WHERE runner_id='{str(id)}';")
conn.commit()
print("deleted", res.rowcount)
finally:
pool.release_connection(conn)
return res.rowcount
# endregion
# region ARCHIVE DETAIL
#returns number of deleted elements
def delete_archive_detail_byID(id: UUID):
@ -750,9 +845,6 @@ def delete_archive_detail_byID(id: UUID):
pool.release_connection(conn)
return res.rowcount
# def get_all_archived_runners_detail_old():
# res = db_arch_d.all()
# return 0, res
def get_all_archived_runners_detail():
conn = pool.get_connection()
@ -798,6 +890,7 @@ def insert_archive_detail(archdetail: RunArchiveDetail):
finally:
pool.release_connection(conn)
return res.rowcount
# endregion
# region TESTLISTS db services
def get_testlists():
@ -820,7 +913,8 @@ def get_testlists():
# endregion
# region CONFIG db services
#TODO vytvorit modul pro dotahovani z pythonu (get_from_config(var_name, def_value) {)- stejne jako v js
#TODO zvazit presunuti do TOML z JSONu
def get_all_config_items():
conn = pool.get_connection()
try: