diff --git a/v2realbot/ENTRY_ClassicSL_v01.py b/v2realbot/ENTRY_ClassicSL_v01.py index 5314e07..8807add 100644 --- a/v2realbot/ENTRY_ClassicSL_v01.py +++ b/v2realbot/ENTRY_ClassicSL_v01.py @@ -6,7 +6,7 @@ from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account, Orde from v2realbot.indicators.indicators import ema from v2realbot.indicators.oscillators import rsi from v2realbot.common.PrescribedTradeModel import Trade, TradeDirection, TradeStatus, TradeStoplossType -from v2realbot.utils.utils import ltp, isrising, isfalling,trunc,AttributeDict, zoneNY, price2dec, print, safe_get, round2five, is_open_rush, is_close_rush, is_window_open, eval_cond_dict, Average, crossed_down, crossed_up, crossed, is_pivot, json_serial +from v2realbot.utils.utils import ltp, isrising, isfalling,trunc,AttributeDict, zoneNY, price2dec, print, safe_get, round2five, is_open_rush, is_close_rush, is_still, is_window_open, eval_cond_dict, Average, crossed_down, crossed_up, crossed, is_pivot, json_serial from v2realbot.utils.directive_utils import get_conditions_from_configuration from v2realbot.common.model import SLHistory from datetime import datetime @@ -130,6 +130,9 @@ def next(data, state: StrategyState): cond[cond_type][directive+"_"+indname+"_"+str(value)] = is_pivot(source=get_source_or_MA(indname), leg_number=value, type="A") elif directive.endswith("pivot_v"): cond[cond_type][directive+"_"+indname+"_"+str(value)] = is_pivot(source=get_source_or_MA(indname), leg_number=value, type="V") + elif directive.endswith("still_for"): + #for 2 decimals + cond[cond_type][directive+"_"+indname+"_"+str(value)] = is_still(get_source_or_MA(indname),value, 2) #PRIPADNE DALSI SPECIFICKE ZDE # elif directive == "buy_if_necospecifckeho": @@ -389,8 +392,8 @@ def next(data, state: StrategyState): #pokud mame aktivni pozice, nastavime lookbackprice a time podle posledniho tradu #pokud se ale dlouho nenakupuje (uplynulo od posledniho nakupu vic nez back_to_standard_after baru), tak se vracime k prumeru - if state.avgp > 0 and state.bars.index[-1] < int(state.vars.lastbuyindex)+back_to_standard_after: - lb_index = -1 - (state.bars.index[-1] - int(state.vars.lastbuyindex)) + if state.avgp > 0 and state.bars.index[-1] < int(state.vars.last_buy_index)+back_to_standard_after: + lb_index = -1 - (state.bars.index[-1] - int(state.vars.last_buy_index)) lookbackprice = state.bars.vwap[lb_index] state.ilog(e=f"IND {name} slope {leftpoint}- LEFT POINT OVERRIDE bereme ajko cenu lastbuy {lookbackprice=} {lookbacktime=} {lb_index=}") else: @@ -719,7 +722,7 @@ def next(data, state: StrategyState): state.ilog(e=f"PROFIT {def_profit=} {normalized_def_profit=}") - return price2dec(data["close"]+normalized_def_profit,3) if int(state.positions) > 0 else price2dec(data["close"]-normalized_def_profit,3) + return price2dec(float(state.avgp)+normalized_def_profit,3) if int(state.positions) > 0 else price2dec(float(state.avgp)-normalized_def_profit,3) def get_max_profit_price(): directive_name = "max_profit" @@ -729,7 +732,7 @@ def next(data, state: StrategyState): state.ilog(e=f"MAX PROFIT {max_profit=} {normalized_max_profit=}") - return price2dec(data["close"]+normalized_max_profit,3) if int(state.positions) > 0 else price2dec(data["close"]-normalized_max_profit,3) + return price2dec(float(state.avgp)+normalized_max_profit,3) if int(state.positions) > 0 else price2dec(float(state.avgp)-normalized_max_profit,3) #TBD pripadne opet dat parsovani pole do INITu @@ -739,6 +742,18 @@ def next(data, state: StrategyState): else: smer = "short" + #get name of strategy + signal_originator = state.vars.activeTrade.generated_by + + if signal_originator is not None: + exit_cond_only_on_confirmed = safe_get(state.vars.signals[signal_originator], "exit_cond_only_on_confirmed", safe_get(state.vars, "exit_cond_only_on_confirmed", False)) + else: + exit_cond_only_on_confirmed = safe_get(state.vars, "exit_cond_only_on_confirmed", False) + + if exit_cond_only_on_confirmed and data['confirmed'] == 0: + state.ilog("EXIT COND ONLY ON CONFIRMED BAR") + return False + #TOTO ZATIM NEMA VYZNAM # options = safe_get(state.vars, 'exit_conditions', None) # if options is None: @@ -909,11 +924,9 @@ def next(data, state: StrategyState): def eval_close_position(): curr_price = float(data['close']) state.ilog(e="Eval CLOSE", price=curr_price, pos=state.positions, avgp=state.avgp, pending=state.vars.pending, activeTrade=str(state.vars.activeTrade)) - + if int(state.positions) != 0 and float(state.avgp)>0 and state.vars.pending is None: - #podivam se zda dana - #pevny target - presunout toto do INIT a pak jen pristupovat goal_price = get_profit_target_price() max_price = get_max_profit_price() @@ -984,6 +997,7 @@ def next(data, state: StrategyState): trade.last_update = datetime.fromtimestamp(state.time).astimezone(zoneNY) state.ilog(e=f"evaluated LONG", trade=json.loads(json.dumps(trade, default=json_serial)), prescrTrades=json.loads(json.dumps(state.vars.prescribedTrades, default=json_serial))) state.vars.activeTrade = trade + state.vars.last_buy_index = data["index"] break #evaluate shorts if not state.vars.activeTrade: @@ -993,6 +1007,7 @@ def next(data, state: StrategyState): trade.status = TradeStatus.ACTIVATED trade.last_update = datetime.fromtimestamp(state.time).astimezone(zoneNY) state.vars.activeTrade = trade + state.vars.last_buy_index = data["index"] break #odeslani ORDER + NASTAVENI STOPLOSS (zatim hardcoded) @@ -1109,6 +1124,15 @@ def next(data, state: StrategyState): def common_go_preconditions_check(signalname: str, options: dict): #ZAKLADNI KONTROLY ATRIBUTU s fallbackem na obecné #check working windows (open - close, in minutes from the start of marker) + + next_signal_offset = safe_get(options, "next_signal_offset_from_last",safe_get(state.vars, "next_signal_offset_from_last",0)) + + if state.vars.last_buy_index is not None: + index_to_compare = int(state.vars.last_buy_index)+int(next_signal_offset) + if index_to_compare > int(data["index"]): + state.ilog(e=f"NEXT SIGNAL OFFSET {next_signal_offset} waiting - TOO SOON", currindex=data["index"], index_to_compare=index_to_compare) + return False + window_open = safe_get(options, "window_open",safe_get(state.vars, "window_open",0)) window_close = safe_get(options, "window_close",safe_get(state.vars, "window_close",390)) @@ -1390,7 +1414,7 @@ def init(state: StrategyState): state.vars.last_50_deltas = [] state.vars.last_tick_volume = 0 state.vars.next_new = 0 - state.vars.lastbuyindex = 0 + state.vars.last_buy_index = None state.vars.last_update_time = 0 state.vars.reverse_position_waiting_amount = 0 #INIT promenne, ktere byly zbytecne ve stratvars diff --git a/v2realbot/controller/services.py b/v2realbot/controller/services.py index 5d0cd8c..3c4ad67 100644 --- a/v2realbot/controller/services.py +++ b/v2realbot/controller/services.py @@ -279,7 +279,7 @@ def save_history(id: UUID, st: object, runner: Runner, reason: str = None): db.save() #Capsule to run the thread in. Needed in order to update db after strat ends for any reason# -def capsule(target: object, db: object): +def capsule(target: object, db: object, inter_batch_params: dict = None): #TODO zde odchytit pripadnou exceptionu a zapsat do history #cil aby padnuti jedne nezpusobilo pad enginu @@ -305,7 +305,7 @@ def capsule(target: object, db: object): #ukladame radek do historie (pozdeji refactor) save_history(id=i.strat_id, st=target, runner=i, reason=reason) #store in archive header and archive detail - archive_runner(runner=i, strat=target) + archive_runner(runner=i, strat=target, inter_batch_params=inter_batch_params) #mazeme runner po skonceni instance db.runners.remove(i) @@ -373,6 +373,9 @@ def batch_run_manager(id: UUID, runReq: RunRequest, testlist: TestList): interval: Intervals cnt_max = len(testlist.dates) cnt = 0 + #promenna pro sdileni mezi runy jednotlivych batchů (např. daily profit) + inter_batch_params = dict(batch_profit=0) + note_from_run_request = runReq.note for intrvl in testlist.dates: cnt += 1 interval = intrvl @@ -385,10 +388,10 @@ def batch_run_manager(id: UUID, runReq: RunRequest, testlist: TestList): #předání atributů datetime.fromisoformat runReq.bt_from = datetime.fromisoformat(interval.start) runReq.bt_to = datetime.fromisoformat(interval.end) - runReq.note = f"Batch {batch_id} run #{cnt}/{cnt_max} Note:{interval.note}" + runReq.note = f"Batch {batch_id} #{cnt}/{cnt_max} {testlist.name} N:{interval.note} {note_from_run_request}" #protoze jsme v ridicim vlaknu, poustime za sebou jednotlive stratiny v synchronnim modu - res, id_val = run_stratin(id=id, runReq=runReq, synchronous=True) + res, id_val = run_stratin(id=id, runReq=runReq, synchronous=True, inter_batch_params=inter_batch_params) if res < 0: print(f"CHyba v runu #{cnt} od:{runReq.bt_from} do {runReq.bt_to} -> {id_val}") break @@ -397,7 +400,7 @@ def batch_run_manager(id: UUID, runReq: RunRequest, testlist: TestList): #stratin run -def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False): +def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False, inter_batch_params: dict = None): if runReq.mode == Mode.BT: if runReq.bt_from is None: return (-1, "start date required for BT") @@ -469,7 +472,7 @@ def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False): print("Starting strategy", instance.name) #vlakno = Thread(target=instance.start, name=instance.name) #pokus na spusteni v kapsli, abychom po skonceni mohli updatnout stratin - vlakno = Thread(target=capsule, args=(instance,db), name=instance.name) + vlakno = Thread(target=capsule, args=(instance,db, inter_batch_params), name=instance.name) vlakno.start() print("Spuštěna", instance.name) ##storing the attributtes - pozor pri stopu je zase odstranit @@ -498,7 +501,10 @@ def run_stratin(id: UUID, runReq: RunRequest, synchronous: bool = False): print(f"waiting for thread {vlakno} to finish") vlakno.join() - return (0, id) + if inter_batch_params is not None: + return (0, inter_batch_params) + else: + return (0, id) except Exception as e: return (-2, "Exception: "+str(e)+format_exc()) return (-2, "not found") @@ -517,7 +523,7 @@ def get_trade_history(symbol: str, timestamp_from: float, timestamp_to:float): except Exception as e: return (-2, f"problem {e}") -def populate_metrics_output_directory(strat: StrategyInstance): +def populate_metrics_output_directory(strat: StrategyInstance, inter_batch_params: dict = None): """ WIP Spocte zakladni metriky pred ulozenim do archivu @@ -560,6 +566,10 @@ def populate_metrics_output_directory(strat: StrategyInstance): #filt = max_positions['side'] == 'OrderSide.BUY' res = dict(zip(max_positions['qty'], max_positions['count'])) + #naplneni batch sum profitu + if inter_batch_params is not None: + res["batch_sum_profit"] = inter_batch_params["batch_profit"] + #metrikz z prescribedTrades, pokud existuji try: long_profit = 0 @@ -605,7 +615,7 @@ def populate_metrics_output_directory(strat: StrategyInstance): return res #archives runner and details -def archive_runner(runner: Runner, strat: StrategyInstance): +def archive_runner(runner: Runner, strat: StrategyInstance, inter_batch_params: dict = None): results_metrics = dict() print("inside archive_runner") try: @@ -625,12 +635,17 @@ def archive_runner(runner: Runner, strat: StrategyInstance): BT_FILL_CONDITION_BUY_LIMIT=BT_FILL_CONDITION_BUY_LIMIT, BT_FILL_CONDITION_SELL_LIMIT=BT_FILL_CONDITION_SELL_LIMIT)) + + #add profit of this batch iteration to batch_sum_profit + if inter_batch_params is not None: + inter_batch_params["batch_profit"] += round(float(strat.state.profit),2) + #WIP #populate result metrics dictionary (max drawdown etc.) #list of maximum positions (2000 2x, 1800 x 1, 900 x 1, 100 x 20) #list of most profitable trades (pos,avgp + cena) #file pro vyvoj: ouptut_metriky_tradeList.py - results_metrics = populate_metrics_output_directory(strat) + results_metrics = populate_metrics_output_directory(strat, inter_batch_params) runArchive: RunArchive = RunArchive(id = runner.id, strat_id = runner.strat_id, @@ -697,7 +712,8 @@ def archive_runner(runner: Runner, strat: StrategyInstance): trades=strat.state.tradeList, ext_data=strat.state.extData) with lock: - resh = db_arch_h.insert(runArchive.__dict__) + #resh = db_arch_h.insert(runArchive.__dict__) + resh = insert_archive_header(runArchive) resd = insert_archive_detail(runArchiveDetail) #resd = db_arch_d.insert(runArchiveDetail.__dict__) print("archive runner finished") @@ -706,37 +722,116 @@ def archive_runner(runner: Runner, strat: StrategyInstance): print("Exception in archive_runner: " + str(e) + format_exc()) return -2, str(e) + format_exc() +# region ARCH HEADER +def migrate_archived_runners() -> list[RunArchive]: + try: + res = db_arch_h.all() + + #migration part + for item in res: + r = insert_archive_header(RunArchive(**item)) + print("migrated",r) + + return 0, r + except Exception as e: + print("Exception in migration: " + str(e) + format_exc()) + return -2, str(e) + format_exc() + def get_all_archived_runners(): - res = db_arch_h.all() - return 0, res + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + res = c.execute(f"SELECT data FROM runner_header") + finally: + conn.row_factory = None + pool.release_connection(conn) + return 0, res.fetchall() + +#vrátí konkrétní +def get_archived_runner_header_byID(id: UUID): + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + result = c.execute(f"SELECT data FROM runner_header WHERE runner_id='{str(id)}'") + res= result.fetchone() + finally: + conn.row_factory = None + pool.release_connection(conn) + if res==None: + return -2, "not found" + else: + return 0, res + +def insert_archive_header(archeader: RunArchive): + conn = pool.get_connection() + try: + c = conn.cursor() + json_string = json.dumps(archeader, default=json_serial) + statement = f"INSERT INTO runner_header VALUES ('{str(archeader.id)}','{json_string}')" + res = c.execute(statement) + conn.commit() + finally: + pool.release_connection(conn) + return res.rowcount + +#edit archived runner note - headers +def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange): + try: + res, sada = get_archived_runner_header_byID(id=runner_id) + if res == 0: + archOriginal = RunArchive(**sada) + archOriginal.note = archChange.note + try: + conn = pool.get_connection() + c = conn.cursor() + json_string = json.dumps(archOriginal, default=json_serial) + statement = f"UPDATE runner_header SET data = '{json_string}' WHERE runner_id='{str(runner_id)}'" + res = c.execute(statement) + #print(res) + conn.commit() + finally: + pool.release_connection(conn) + return 0, runner_id + else: + return -1, f"Could not find arch runner {runner_id} {res} {sada}" + + except Exception as e: + return -2, str(e) #delete runner in archive and archive detail and runner logs def delete_archived_runners_byID(id: UUID): try: with lock: - print("before header del") - resh = db_arch_h.remove(where('id') == id) - print("before detail del") + print("header del") + resh = delete_archive_header_byID(id) + #resh = db_arch_h.remove(where('id') == id) + print("detail del") #resd = db_arch_d.remove(where('id') == id) resd = delete_archive_detail_byID(id) print("Arch header and detail removed. Log deletition will start.") reslogs = delete_logs(id) - if len(resh) == 0 or resd == 0: + if resh == 0 or resd == 0: return -1, "not found "+str(resh) + " " + str(resd) + " " + str(reslogs) return 0, str(resh) + " " + str(resd) + " " + str(reslogs) except Exception as e: return -2, str(e) - -#edit archived runner note -def edit_archived_runners(runner_id: UUID, archChange: RunArchiveChange): + +#returns number of deleted elements +def delete_archive_header_byID(id: UUID): + conn = pool.get_connection() try: - with lock: - res = db_arch_h.update(set('note', archChange.note), where('id') == str(runner_id)) - if len(res) == 0: - return -1, "not found "+str(runner_id) - return 0, runner_id - except Exception as e: - return -2, str(e) + c = conn.cursor() + res = c.execute(f"DELETE from runner_header WHERE runner_id='{str(id)}';") + conn.commit() + print("deleted", res.rowcount) + finally: + pool.release_connection(conn) + return res.rowcount +# endregion + +# region ARCHIVE DETAIL #returns number of deleted elements def delete_archive_detail_byID(id: UUID): @@ -750,9 +845,6 @@ def delete_archive_detail_byID(id: UUID): pool.release_connection(conn) return res.rowcount -# def get_all_archived_runners_detail_old(): -# res = db_arch_d.all() -# return 0, res def get_all_archived_runners_detail(): conn = pool.get_connection() @@ -798,6 +890,7 @@ def insert_archive_detail(archdetail: RunArchiveDetail): finally: pool.release_connection(conn) return res.rowcount +# endregion # region TESTLISTS db services def get_testlists(): @@ -820,7 +913,8 @@ def get_testlists(): # endregion # region CONFIG db services - +#TODO vytvorit modul pro dotahovani z pythonu (get_from_config(var_name, def_value) {)- stejne jako v js +#TODO zvazit presunuti do TOML z JSONu def get_all_config_items(): conn = pool.get_connection() try: diff --git a/v2realbot/main.py b/v2realbot/main.py index 9167505..f982f87 100644 --- a/v2realbot/main.py +++ b/v2realbot/main.py @@ -1,7 +1,7 @@ import os,sys sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from v2realbot.enums.enums import Mode, Account -from v2realbot.config import WEB_API_KEY +from v2realbot.config import WEB_API_KEY, DATA_DIR from alpaca.data.timeframe import TimeFrame, TimeFrameUnit from datetime import datetime #from icecream import install, ic @@ -271,7 +271,41 @@ def get_trade_history(symbol: str, timestamp_from: float, timestamp_to:float) -> else: raise HTTPException(status_code=404, detail=f"No trades found {res}") +@app.put("/migrate", dependencies=[Depends(api_key_auth)], status_code=status.HTTP_200_OK) +def migrate(): + lock_file = DATA_DIR + "/migr.lock" + + #if lock file not present, we can continue and create the file + if not os.path.exists(lock_file): + + #migration code + print("migration code done") + conn = pool.get_connection() + try: + conn.row_factory = lambda c, r: json.loads(r[0]) + c = conn.cursor() + res = c.execute(f'CREATE TABLE "runner_header" ("runner_id" varchar(32) NOT NULL,"data" json NOT NULL, PRIMARY KEY("runner_id"))') + print(res) + print("table created") + conn.commit() + finally: + conn.row_factory = None + pool.release_connection(conn) + + res, set =cs.migrate_archived_runners() + if res == 0: + open(lock_file, 'w').close() + return set + else: + raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No data found") + + + else: + raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail=f"Migration lock file present {lock_file}") + + #ARCHIVE RUNNERS SECTION +# region Archive runners #get all archived runners header @app.get("/archived_runners/", dependencies=[Depends(api_key_auth)]) @@ -327,6 +361,8 @@ def _get_archived_runner_log_byID(runner_id: UUID, timestamp_from: float, timest else: raise HTTPException(status_code=404, detail=f"No logs found with id: {runner_id} and between {timestamp_from} and {timestamp_to}") +# endregion + #get alpaca history bars @app.get("/history_bars/", dependencies=[Depends(api_key_auth)]) def _get_alpaca_history_bars(symbol: str, datetime_object_from: datetime, datetime_object_to: datetime, timeframe_amount: int, timeframe_unit: TimeFrameUnit) -> list[Bar]: diff --git a/v2realbot/static/index.html b/v2realbot/static/index.html index ee74251..68a45d8 100644 --- a/v2realbot/static/index.html +++ b/v2realbot/static/index.html @@ -25,6 +25,12 @@ + + + +