diff --git a/v2realbot/controller/services.py b/v2realbot/controller/services.py index f6de492..7294d30 100644 --- a/v2realbot/controller/services.py +++ b/v2realbot/controller/services.py @@ -803,8 +803,13 @@ def archive_runner(runner: Runner, strat: StrategyInstance, inter_batch_params: #get rid of attributes that are links to the models strat.state.vars["loaded_models"] = {} + #ulozime informace o nastavení + # if self.mode in [Mode.BT, Mode.PREP]: + # str(self.dataloader.cache_used) + settings = dict(resolution=strat.state.resolution, rectype=strat.state.rectype, + cache_used=strat.dataloader.cache_used, configs=dict( GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN=GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN, BT_FILL_CONS_TRADES_REQUIRED=BT_FILL_CONS_TRADES_REQUIRED, diff --git a/v2realbot/loader/aggregator.py b/v2realbot/loader/aggregator.py index 42eff1c..0ba466a 100644 --- a/v2realbot/loader/aggregator.py +++ b/v2realbot/loader/aggregator.py @@ -12,6 +12,8 @@ from copy import deepcopy from msgpack import unpackb import os from v2realbot.config import DATA_DIR, GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN, AGG_EXCLUDED_TRADES +import pickle +import dill class TradeAggregator: def __init__(self, @@ -21,7 +23,8 @@ class TradeAggregator: update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, - exthours: bool = False): + exthours: bool = False, + excludes: list = AGG_EXCLUDED_TRADES): """ UPDATED VERSION - vrací více záznamů @@ -40,6 +43,7 @@ class TradeAggregator: self.minsize = minsize self.update_ltp = update_ltp self.exthours = exthours + self.excludes = excludes if mintick >= resolution: print("Mintick musi byt mensi nez resolution") @@ -75,6 +79,10 @@ class TradeAggregator: self.diff_price = True self.preconfBar = {} self.trades_too_close = False + self.cached_object = None + self.cache_output_enabled = False + self.cache_from = None + self.cache_to = None async def ingest_trade(self, indata, symbol): """ @@ -95,7 +103,7 @@ class TradeAggregator: ## přidán W - average price trade, U - Extended hours - sold out of sequence, Z - Sold(Out of sequence) try: for i in data['c']: - if i in AGG_EXCLUDED_TRADES: return [] + if i in self.excludes: return [] except KeyError: pass @@ -694,21 +702,90 @@ class TradeAggregator: else: return [] + def populate_file_name(self, date_from: datetime, date_to: datetime): + #nazev obsahuje i child class + #a take excludes result = ''.join(self.excludes.sort()) + self.excludes.sort() # Sorts the list in place + excludes_str = ''.join(map(str, self.excludes)) # Joins the sorted elements after converting them to strings + cache_file = self.__class__.__name__ + '-' + self.symbol + '-' + str(int(date_from.timestamp())) + '-' + str(int(date_to.timestamp())) + '-' + str(self.rectype) + "-" + str(self.resolution) + "-" + str(self.minsize) + "-" + str(self.align) + '-' + str(self.mintick) + str(self.exthours) + excludes_str + '.cache' + file_path = DATA_DIR + "/aggcache/" + cache_file + #print(file_path) + return file_path + + #returns cached objects for given period + def get_cache(self, date_from: datetime, date_to: datetime): + file_path = self.populate_file_name(date_from, date_to) + if os.path.exists(file_path): + ##daily aggregated file exists + with open (file_path, 'rb') as fp: + cachedobject = dill.load(fp) + print("AGG CACHE loaded ", file_path) + + if isinstance(cachedobject, Queue): + num = cachedobject.qsize() + else: + num = len(cachedobject) + + print("cached items",num) + return cachedobject, file_path + else: + return None, None + + #cachujeme jen na exlicitni zapnuti a jen pro BT + def enable_cache_output(self, date_from: datetime, date_to: datetime): + self.cache_output_enabled = True + self.cache_from = date_from + self.cache_to = date_to + + #lozi storovany cache object self.cache_object do cache + async def store_cached_object(self): + if self.cache_output_enabled is False or self.cached_object is None or self.cache_from is None or self.cache_to is None: + print("cache not enabled or missing data") + return None + + if isinstance(self.cached_object, Queue): + num = self.cached_object.qsize() + else: + num = len(self.cached_object) + + file_path = self.populate_file_name(self.cache_from, self.cache_to) + + with open(file_path, 'wb') as fp: + dill.dump(self.cached_object, fp) + print(f"AGG CACHE stored ({num}) :{file_path}") + print(f"DATES from:{self.cache_from.strftime('%d.%m.%Y %H:%M')} to:{self.cache_to.strftime('%d.%m.%Y %H:%M')}") + + #reset values + self.cached_object = None + self.cache_output_enabled = False + self.cache_from = None + self.cache_to = None + + def send_cache_to_output(self, cachedobject): + #this has to be overriden by each child + pass + class TradeAggregator2Queue(TradeAggregator): """ Child of TradeAggregator - sends items to given queue In the future others will be added - TradeAggToTxT etc. """ - def __init__(self, symbol: str, queue: Queue, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False): - super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours) + def __init__(self, symbol: str, queue: Queue, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False, excludes: list = AGG_EXCLUDED_TRADES): + super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours, excludes=excludes) self.queue = queue self.symbol = symbol + self.cached_object = Queue() - #accepts loaded queue and sents it to given output - async def ingest_cached(self, cached_queue): - for element in cached_queue: - self.queue.put(element) + # #accepts loaded queue and sents it to given output + # async def ingest_cached(self, cached_queue): + # for element in cached_queue: + # self.queue.put(element) + #prime presunuti celeho ulozeneho objektu queue + def send_cache_to_output(self, cachedobject): + self.queue.queue = cachedobject.queue + + #ingest tradu do agregatoru a odeslani do vystupu async def ingest_trade(self, data): #print("ingest ve threadu:",current_thread().name) res = await super().ingest_trade(data, self.symbol) @@ -725,6 +802,12 @@ class TradeAggregator2Queue(TradeAggregator): ##populate secondary resolution if required #print("inserted to queue") self.queue.put(copy) + + if self.cache_output_enabled: + self.cached_object.put(copy) + if copy == 'last': + await self.store_cached_object() + res = [] #print("po insertu",res) @@ -732,18 +815,24 @@ class TradeAggregator2List(TradeAggregator): """" stores records to the list """ - def __init__(self, symbol: str, btdata: list, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False): - super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours) + def __init__(self, symbol: str, btdata: list, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False, excludes: list = AGG_EXCLUDED_TRADES): + super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours, excludes=excludes) self.btdata = btdata self.symbol = symbol + self.cached_object = [] # self.debugfile = DATA_DIR + "/BACprices.txt" # if os.path.exists(self.debugfile): # os.remove(self.debugfile) - #accepts loaded queue and sents it to given output - async def ingest_cached(self, cached_queue): - for element in cached_queue: - self.btdata.append((element['t'],element['p'])) + #prime presunuti celeho ulozeneho objektu queue + def send_cache_to_output(self, cachedobject): + self.btdata.extend(cachedobject) + #TODO teoreticky to muzeme vzdy brat z primarniho objektu jako funkce nize - promyslet + + # #accepts loaded queue and sents it to given output + # async def ingest_cached(self, cached_queue): + # for element in cached_queue: + # self.btdata.append((element['t'],element['p'])) async def ingest_trade(self, data): #print("ted vstoupil do tradeagg2list ingestu") @@ -755,6 +844,13 @@ class TradeAggregator2List(TradeAggregator): copy = obj.copy() else: copy = obj + + if self.cache_output_enabled: + if copy == 'last': + await self.store_cached_object() + else: + self.cached_object.append((copy['t'],copy['p'])) + if obj == 'last': return [] self.btdata.append((copy['t'],copy['p'])) # with open(self.debugfile, "a") as output: diff --git a/v2realbot/loader/cacher.py b/v2realbot/loader/cacher.py deleted file mode 100644 index ba7f676..0000000 --- a/v2realbot/loader/cacher.py +++ /dev/null @@ -1,64 +0,0 @@ -from v2realbot.loader.aggregator import TradeAggregator, TradeAggregator2List, TradeAggregator2Queue -from alpaca.trading.requests import GetCalendarRequest -from alpaca.trading.client import TradingClient -from alpaca.data.live import StockDataStream -from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR, OFFLINE_MODE -from alpaca.data.enums import DataFeed -from alpaca.data.historical import StockHistoricalDataClient -from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, StockTradesRequest -from threading import Thread, current_thread -from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, print -from v2realbot.utils.tlog import tlog -from datetime import datetime, timedelta, date -from threading import Thread -import asyncio -from msgpack.ext import Timestamp -from msgpack import packb -from pandas import to_datetime -import pickle -import os -from rich import print -import queue -from alpaca.trading.models import Calendar -from v2realbot.enums.enums import RecordType, StartBarAlign -from datetime import datetime, timedelta -from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, Queue,is_open_hours,zoneNY -from queue import Queue -from rich import print -from v2realbot.enums.enums import Mode -import threading - -class Cacher: - def __init__(self, - - rectype: RecordType, - resolution: int = 5, - minsize: int = 100, - update_ltp: bool = False, - align: StartBarAlign = StartBarAlign.ROUND, - mintick: int = 0, - exthours: bool = False): - -#vstupuje seznam aggregatoru - obvykle 1 pro queue, 1 pro backtest engine -def get_cached_agg_data(agg_list, open, close): - file_path = DATA_DIR + "/cache/"+populate_file_name(agg_list[0], open, close) - - if os.path.exists(file_path): - ##denní file existuje - #loadujeme ze souboru - #pokud je start_time < trade < end_time - #odesíláme do queue - #jinak pass - with open (file_path, 'rb') as fp: - agg_data = pickle.load(fp) - print("Loading AGGREGATED DATA from CACHE", file_path) - - return agg_data - -def store_cache_agg_data(aggregator, open, close): - pass - #ulozi data do fajlu - -def populate_file_name(aggregator, open, close): - aggregated_file = aggregator.symbol + '-' + str(aggregator.rectype) + "-" + aggregator.timeframe + "-" + aggregator.minsize + "-" + aggregator.align + aggregator.mintick + str(aggregator.exthours) + '-' + str(int(open.timestamp())) + '-' + str(int(close.timestamp())) + '.cache' - return aggregated_file \ No newline at end of file diff --git a/v2realbot/loader/trade_offline_streamer.py b/v2realbot/loader/trade_offline_streamer.py index d57fd5f..c0c8357 100644 --- a/v2realbot/loader/trade_offline_streamer.py +++ b/v2realbot/loader/trade_offline_streamer.py @@ -24,6 +24,7 @@ from alpaca.trading.models import Calendar from tqdm import tqdm import time from traceback import format_exc +from collections import defaultdict """ Trade offline data streamer, based on Alpaca historical data. """ @@ -40,6 +41,7 @@ class Trade_Offline_Streamer(Thread): self.time_from = time_from self.time_to = time_to self.btdata = btdata + self.cache_used = defaultdict(list) def add_stream(self, obj: TradeAggregator): self.streams.append(obj) @@ -102,19 +104,6 @@ class Trade_Offline_Streamer(Thread): #REFACTOR STARTS HERE #print(f"{self.time_from=} {self.time_to=}") - def get_calendar_with_retry(self, calendar_request, max_retries=3, delay=4): - attempts = 0 - while attempts < max_retries: - try: - cal_dates = self.clientTrading.get_calendar(calendar_request) - return cal_dates - except Exception as e: - attempts += 1 - if attempts >= max_retries: - raise - print(f"Attempt {attempts}: Error occurred - {e}. Retrying in {delay} seconds...") - time.sleep(delay) - if OFFLINE_MODE: #just one day - same like time_from den = str(self.time_to.date()) @@ -132,7 +121,7 @@ class Trade_Offline_Streamer(Thread): cal_dates = self.clientTrading.get_calendar(calendar_request) #zatim podpora pouze main session - + #zatim podpora pouze 1 symbolu, predelat na froloop vsech symbolu ze symbpole #minimalni jednotka pro CACHE je 1 den - a to jen marketopen to marketclose (extended hours not supported yet) for day in cal_dates: @@ -161,23 +150,53 @@ class Trade_Offline_Streamer(Thread): continue - #check if we have aggregated data in cache - - #agg dat found, load it from file - #and call cacher - #trade daily file + #check if we have data in aggregator cache - for all streams + #zatim jednoduse + # predpokladame ze [0] jsou btdata a [1] hlavni add_data + # - pokud jsou oba, jedeme z cache ,pokud jeden nebo zadny - jedeme trady + # - cache pokryva cely den - #vstupuje pole agregatoru, open, close daneho dne - #cached_aggregated_data = get_cached_agg_data(self.to_run[symbpole[0]], day.open, day.close) + #musim zajistit, ze BT data tam jdou drive nez cache + # to_rem = [] + # for stream in self.to_run[symbpole[0]]: + # cache = stream.get_cache(day.open, day.close) + # if cache is not None: + # stream.send_cache_to_output(cache) + # to_rem.append(stream) - # if cached_aggregated_data is not None: - # #poslu agregovana data do ingest cache aggregatorů pro přeposlání do jednotlivých kanálů + #cache resime jen kdyz backtestujeme cely den + #pokud ne tak ani necteme, ani nezapisujeme do cache + if self.time_to >= day.close: + #tento odstavec obchazime pokud je nastaveno "dont_use_cache" + stream_btdata = self.to_run[symbpole[0]][0] + cache_btdata, file_btdata = stream_btdata.get_cache(day.open, day.close) + stream_main = self.to_run[symbpole[0]][1] + cache_main, file_main = stream_main.get_cache(day.open, day.close) + if cache_btdata is not None and cache_main is not None: + stream_btdata.send_cache_to_output(cache_btdata) + stream_main.send_cache_to_output(cache_main) + #ukladame nazvy souboru pro pozdejsi ulozeni ke strategii + self.cache_used[str(day.date)].append(file_btdata) + self.cache_used[str(day.date)].append(file_main) + continue + + #TBD pokud se jede na cache a testovaci obdobi je mensi nez den + # - bud disablujeme cashovani + # - nebo nechavame dobehnout cely den + + #pokud cache neexistuje, pak ji zapiname + if day.open < datetime.now().astimezone(zoneNY) < day.close: + print("not saving the aggregating cache, market still open today") + else: + if cache_btdata is None: + stream_btdata.enable_cache_output(day.open, day.close) + if cache_main is None: + stream_main.enable_cache_output(day.open, day.close) - #trade daily file daily_file = str(symbpole[0]) + '-' + str(int(day.open.timestamp())) + '-' + str(int(day.close.timestamp())) + '.cache' print(daily_file) - file_path = DATA_DIR + "/"+daily_file + file_path = DATA_DIR + "/tradecache/"+daily_file if os.path.exists(file_path): ##denní file existuje @@ -187,7 +206,7 @@ class Trade_Offline_Streamer(Thread): #jinak pass with open (file_path, 'rb') as fp: tradesResponse = pickle.load(fp) - print("Loading DATA from CACHE", file_path) + print("Loading from Trade CACHE", file_path) #daily file doesnt exist else: # TODO refactor pro zpracovani vice symbolu najednou(multithreads), nyni predpokladame pouze 1 @@ -197,7 +216,7 @@ class Trade_Offline_Streamer(Thread): #pokud jde o dnešní den a nebyl konec trhu tak cache neukládáme if day.open < datetime.now().astimezone(zoneNY) < day.close: - print("not saving the cache, market still open today") + print("not saving trade cache, market still open today") #ic(datetime.now().astimezone(zoneNY)) #ic(day.open, day.close) else: diff --git a/v2realbot/static/js/archivechart.js b/v2realbot/static/js/archivechart.js index 7a7c340..21fca59 100644 --- a/v2realbot/static/js/archivechart.js +++ b/v2realbot/static/js/archivechart.js @@ -1048,7 +1048,7 @@ function chart_archived_run(archRecord, data, oneMinuteBars) { $("#statusAccount").text(archRecord.account) $("#statusIlog").text("Logged:" + archRecord.ilog_save) $("#statusStratvars").text(((archRecord.strat_json)?archRecord.strat_json:archRecord.stratvars),null,2) - $("#statusSettings").text(JSON.stringify(archRecord.metrics,null,2) + " " + JSON.stringify(archRecord.settings,null,2)+ JSON.stringify(data.ext_data,null,2)) + $("#statusSettings").text(JSON.stringify(archRecord.settings,null,2) + JSON.stringify(data.ext_data,null,2)) //TBD other dynamically created indicators } diff --git a/v2realbot/strategy/base.py b/v2realbot/strategy/base.py index 4842f5b..06eb70c 100644 --- a/v2realbot/strategy/base.py +++ b/v2realbot/strategy/base.py @@ -6,7 +6,7 @@ from v2realbot.utils.utils import AttributeDict, zoneNY, is_open_rush, is_close_ from v2realbot.utils.tlog import tlog from v2realbot.utils.ilog import insert_log, insert_log_multiple_queue from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Order, Account -from v2realbot.config import BT_DELAYS, get_key, HEARTBEAT_TIMEOUT, QUIET_MODE, LOG_RUNNER_EVENTS, ILOG_SAVE_LEVEL_FROM,PROFILING_NEXT_ENABLED, PROFILING_OUTPUT_DIR +from v2realbot.config import BT_DELAYS, get_key, HEARTBEAT_TIMEOUT, QUIET_MODE, LOG_RUNNER_EVENTS, ILOG_SAVE_LEVEL_FROM,PROFILING_NEXT_ENABLED, PROFILING_OUTPUT_DIR, AGG_EXCLUDED_TRADES import queue #from rich import print from v2realbot.loader.aggregator import TradeAggregator2Queue, TradeAggregator2List, TradeAggregator @@ -90,7 +90,8 @@ class Strategy: update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, - exthours: bool = False): + exthours: bool = False, + excludes: list = AGG_EXCLUDED_TRADES): ##TODO vytvorit self.datas_here containing dict - queue - SYMBOL - RecType - ##zatim natvrdo @@ -98,7 +99,7 @@ class Strategy: self.rectype = rectype self.state.rectype = rectype self.state.resolution = resolution - stream = TradeAggregator2Queue(symbol=symbol,queue=self.q1,rectype=rectype,resolution=resolution,update_ltp=update_ltp,align=align,mintick = mintick, exthours=exthours, minsize=minsize) + stream = TradeAggregator2Queue(symbol=symbol,queue=self.q1,rectype=rectype,resolution=resolution,update_ltp=update_ltp,align=align,mintick = mintick, exthours=exthours, minsize=minsize, excludes=excludes) self._streams.append(stream) self.dataloader.add_stream(stream) @@ -418,7 +419,7 @@ class Strategy: #main strat loop - print(self.name, "Waiting for DATA") + print(self.name, "Waiting for DATA",self.q1.qsize()) with tqdm(total=self.q1.qsize()) as pbar: while True: try: