aggcache introduced

This commit is contained in:
David Brazda
2023-11-23 10:24:07 +01:00
parent c7a6ba9695
commit 159302d168
6 changed files with 167 additions and 110 deletions

View File

@ -12,6 +12,8 @@ from copy import deepcopy
from msgpack import unpackb
import os
from v2realbot.config import DATA_DIR, GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN, AGG_EXCLUDED_TRADES
import pickle
import dill
class TradeAggregator:
def __init__(self,
@ -21,7 +23,8 @@ class TradeAggregator:
update_ltp: bool = False,
align: StartBarAlign = StartBarAlign.ROUND,
mintick: int = 0,
exthours: bool = False):
exthours: bool = False,
excludes: list = AGG_EXCLUDED_TRADES):
"""
UPDATED VERSION - vrací více záznamů
@ -40,6 +43,7 @@ class TradeAggregator:
self.minsize = minsize
self.update_ltp = update_ltp
self.exthours = exthours
self.excludes = excludes
if mintick >= resolution:
print("Mintick musi byt mensi nez resolution")
@ -75,6 +79,10 @@ class TradeAggregator:
self.diff_price = True
self.preconfBar = {}
self.trades_too_close = False
self.cached_object = None
self.cache_output_enabled = False
self.cache_from = None
self.cache_to = None
async def ingest_trade(self, indata, symbol):
"""
@ -95,7 +103,7 @@ class TradeAggregator:
## přidán W - average price trade, U - Extended hours - sold out of sequence, Z - Sold(Out of sequence)
try:
for i in data['c']:
if i in AGG_EXCLUDED_TRADES: return []
if i in self.excludes: return []
except KeyError:
pass
@ -694,21 +702,90 @@ class TradeAggregator:
else:
return []
def populate_file_name(self, date_from: datetime, date_to: datetime):
#nazev obsahuje i child class
#a take excludes result = ''.join(self.excludes.sort())
self.excludes.sort() # Sorts the list in place
excludes_str = ''.join(map(str, self.excludes)) # Joins the sorted elements after converting them to strings
cache_file = self.__class__.__name__ + '-' + self.symbol + '-' + str(int(date_from.timestamp())) + '-' + str(int(date_to.timestamp())) + '-' + str(self.rectype) + "-" + str(self.resolution) + "-" + str(self.minsize) + "-" + str(self.align) + '-' + str(self.mintick) + str(self.exthours) + excludes_str + '.cache'
file_path = DATA_DIR + "/aggcache/" + cache_file
#print(file_path)
return file_path
#returns cached objects for given period
def get_cache(self, date_from: datetime, date_to: datetime):
file_path = self.populate_file_name(date_from, date_to)
if os.path.exists(file_path):
##daily aggregated file exists
with open (file_path, 'rb') as fp:
cachedobject = dill.load(fp)
print("AGG CACHE loaded ", file_path)
if isinstance(cachedobject, Queue):
num = cachedobject.qsize()
else:
num = len(cachedobject)
print("cached items",num)
return cachedobject, file_path
else:
return None, None
#cachujeme jen na exlicitni zapnuti a jen pro BT
def enable_cache_output(self, date_from: datetime, date_to: datetime):
self.cache_output_enabled = True
self.cache_from = date_from
self.cache_to = date_to
#lozi storovany cache object self.cache_object do cache
async def store_cached_object(self):
if self.cache_output_enabled is False or self.cached_object is None or self.cache_from is None or self.cache_to is None:
print("cache not enabled or missing data")
return None
if isinstance(self.cached_object, Queue):
num = self.cached_object.qsize()
else:
num = len(self.cached_object)
file_path = self.populate_file_name(self.cache_from, self.cache_to)
with open(file_path, 'wb') as fp:
dill.dump(self.cached_object, fp)
print(f"AGG CACHE stored ({num}) :{file_path}")
print(f"DATES from:{self.cache_from.strftime('%d.%m.%Y %H:%M')} to:{self.cache_to.strftime('%d.%m.%Y %H:%M')}")
#reset values
self.cached_object = None
self.cache_output_enabled = False
self.cache_from = None
self.cache_to = None
def send_cache_to_output(self, cachedobject):
#this has to be overriden by each child
pass
class TradeAggregator2Queue(TradeAggregator):
"""
Child of TradeAggregator - sends items to given queue
In the future others will be added - TradeAggToTxT etc.
"""
def __init__(self, symbol: str, queue: Queue, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False):
super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours)
def __init__(self, symbol: str, queue: Queue, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False, excludes: list = AGG_EXCLUDED_TRADES):
super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours, excludes=excludes)
self.queue = queue
self.symbol = symbol
self.cached_object = Queue()
#accepts loaded queue and sents it to given output
async def ingest_cached(self, cached_queue):
for element in cached_queue:
self.queue.put(element)
# #accepts loaded queue and sents it to given output
# async def ingest_cached(self, cached_queue):
# for element in cached_queue:
# self.queue.put(element)
#prime presunuti celeho ulozeneho objektu queue
def send_cache_to_output(self, cachedobject):
self.queue.queue = cachedobject.queue
#ingest tradu do agregatoru a odeslani do vystupu
async def ingest_trade(self, data):
#print("ingest ve threadu:",current_thread().name)
res = await super().ingest_trade(data, self.symbol)
@ -725,6 +802,12 @@ class TradeAggregator2Queue(TradeAggregator):
##populate secondary resolution if required
#print("inserted to queue")
self.queue.put(copy)
if self.cache_output_enabled:
self.cached_object.put(copy)
if copy == 'last':
await self.store_cached_object()
res = []
#print("po insertu",res)
@ -732,18 +815,24 @@ class TradeAggregator2List(TradeAggregator):
""""
stores records to the list
"""
def __init__(self, symbol: str, btdata: list, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False):
super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours)
def __init__(self, symbol: str, btdata: list, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False, excludes: list = AGG_EXCLUDED_TRADES):
super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours, excludes=excludes)
self.btdata = btdata
self.symbol = symbol
self.cached_object = []
# self.debugfile = DATA_DIR + "/BACprices.txt"
# if os.path.exists(self.debugfile):
# os.remove(self.debugfile)
#accepts loaded queue and sents it to given output
async def ingest_cached(self, cached_queue):
for element in cached_queue:
self.btdata.append((element['t'],element['p']))
#prime presunuti celeho ulozeneho objektu queue
def send_cache_to_output(self, cachedobject):
self.btdata.extend(cachedobject)
#TODO teoreticky to muzeme vzdy brat z primarniho objektu jako funkce nize - promyslet
# #accepts loaded queue and sents it to given output
# async def ingest_cached(self, cached_queue):
# for element in cached_queue:
# self.btdata.append((element['t'],element['p']))
async def ingest_trade(self, data):
#print("ted vstoupil do tradeagg2list ingestu")
@ -755,6 +844,13 @@ class TradeAggregator2List(TradeAggregator):
copy = obj.copy()
else:
copy = obj
if self.cache_output_enabled:
if copy == 'last':
await self.store_cached_object()
else:
self.cached_object.append((copy['t'],copy['p']))
if obj == 'last': return []
self.btdata.append((copy['t'],copy['p']))
# with open(self.debugfile, "a") as output: