Files
v2realbot/v2realbot/loader/aggregator.py

1061 lines
48 KiB
Python

"""
Aggregator mdoule containing main aggregator logic for TRADES, BARS and CBAR
"""
from v2realbot.enums.enums import RecordType, StartBarAlign
from datetime import datetime, timedelta
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, Queue,is_open_hours,zoneNY, zoneUTC
from queue import Queue
from rich import print
from v2realbot.enums.enums import Mode
import threading
from copy import deepcopy
from msgpack import unpackb
import os
from v2realbot.config import DATA_DIR
import dill
import gzip
import v2realbot.utils.config_handler as cfh
class TradeAggregator:
def __init__(self,
rectype: RecordType = RecordType.BAR,
resolution: int = 5,
minsize: int = 100,
update_ltp: bool = False,
align: StartBarAlign = StartBarAlign.ROUND,
mintick: int = 0,
exthours: bool = False,
excludes: list = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES'),
skip_cache: bool = False):
"""
UPDATED VERSION - vrací více záznamů
Create trade agregator. Instance accepts trades one by one and process them and returns output type
Trade - return trade one by one (no change)
Bar - return finished bar in given resolution
CBar - returns continuous bar, finished bar is marked by confirmed status
Args:
resolution (number): Resolution of bar in seconds
update_ltp (bool): Whether to update global variable with price (usually only one instance does that)
align: Defines alignement of first bar. ROUND - according to resolution( 5,10,15 - for 5s resolution), RANDOM - according to timestamp of first trade
mintick: Applies for CBAR. Minimální mezera po potvrzeni baru a aktualizaci dalsiho nepotvrzeneho (např. pro 15s, muzeme chtit prvni tick po 5s). po teto dobe realtime.
"""
self.rectype: RecordType = rectype
self.resolution = resolution
self.minsize = minsize
self.update_ltp = update_ltp
self.exthours = exthours
self.excludes = excludes
self.skip_cache = skip_cache
if resolution > 0 and mintick >= resolution:
print("Mintick musi byt mensi nez resolution")
raise Exception
self.mintick = mintick
#class variables = starters
self.iterace = 1
self.lasttimestamp = 0
#inicalizace pro prvni agregaci
self.newBar = dict(high=0, low=999999, volume = 0, trades = 0, confirmed = 0, vwap = 0, close=0, index = 1, updated = 0)
self.openedBar = None
self.lastConfirmedTime = 0
self.bar_start = 0
self.curr_bar_volume = None
self.current_bar_open = None
self.align = align
self.tm: datetime = None
self.firstpass = True
self.vwaphelper = 0
self.returnBar = {}
self.lastBarConfirmed = False
self.lastConfirmedBar = None
self.lasthigh = None
self.lastlow = None
#min trade size
self.minsize = minsize
#instance variable to hold last trade price
self.last_price = 0
self.barindex = 1
self.diff_price = True
self.preconfBar = {}
self.trades_too_close = False
self.cached_object = None
self.cache_output_enabled = False
self.cache_from = None
self.cache_to = None
async def ingest_trade(self, indata, symbol):
"""
Aggregator logic for trade record
Args:
indata (dict): online or offline record
"""
data = unpackb(indata)
#last item signal
if data == "last": return [data]
#print(data)
##implementing fitlers - zatim natvrdo a jen tyto: size: 1, cond in [O,C,4] opening,closed a derivately priced,
## 22.3. - dal jsem pryc i contingency trades [' ', '7', 'V'] - nasel jsem obchod o 30c mimo
## dán pryč P - prior reference time + 25centu mimo, {'t': '2023-04-12T19:45:08.63257344Z', 'x': 'D', 'p': 28.68, 's': 1000, 'c': [' ', 'P'], 'i': 71693108525109, 'z': 'A'},
## Q - jsou v pohode, oteviraci trady, ale O jsou jejich duplikaty
## přidán W - average price trade, U - Extended hours - sold out of sequence, Z - Sold(Out of sequence)
try:
for i in data['c']:
if i in self.excludes: return []
except KeyError:
pass
#EXPERIMENT zkusime vyhodit vsechny pod 50 #puv if int(data['s']) == 1: return []
#zatim nechavame - výsledek je naprosto stejný jako v tradingview
if int(data['s']) < self.minsize: return []
#{'t': 1678982075.242897, 'x': 'D', 'p': 29.1333, 's': 18000, 'c': [' ', '7', 'V'], 'i': 79372107591749, 'z': 'A', 'u': 'incorrect'}
if 'u' in data: return []
#pokud projde TRADE s cenou 0.33% rozdilna oproti predchozi, pak vyhazujeme v ramci cisteni dat (cca 10ticku na 30USD)
pct_off = 0.33
##ic(ltp.price)
##ic(ltp.price[symbol])
try:
ltp.price[symbol]
except KeyError:
ltp.price[symbol]=data['p']
#DOCASNE VYPNUTO - VYMYSLET JINAK
#if float(data['p']) > float(ltp.price[symbol]) + (float(data['p'])/100*pct_off) or float(data['p']) < float(ltp.price[symbol])-(float(data['p'])/100*pct_off):
#print("ZLO", data,ltp.price[symbol])
#nechavame zlo zatim projit
##return []
# with open("cache/wrongtrades.txt", 'a') as fp:
# fp.write(str(data) + 'predchozi:'+str(ltp.price[symbol])+'\n')
#timestampy jsou v UTC
#TIMESTAMP format is different for online and offline trade streams
#offline trade
#{'t': '2023-02-17T14:30:00.16111744Z', 'x': 'J', 'p': 35.14, 's': 20, 'c': [' ', 'F', 'I'], 'i': 52983525027938, 'z': 'A'}
#websocket trade
#{'T': 't', 'S': 'MSFT', 'i': 372, 'x': 'V', 'p': 264.58, 's': 25, 'c': ['@', 'I'], 'z': 'C', 't': Timestamp(seconds=1678973696, nanoseconds=67312449), 'r': Timestamp(seconds=1678973696, nanoseconds=72865209)}
#parse alpaca timestamp
# tzn. na offline mohu pouzit >>> datetime.fromisoformat(d).timestamp() 1676644200.161117
#orizne sice nanosekundy ale to nevadi
#print("tady", self.mode, data['t'])
# if self.mode == Mode.BT:
# data['t'] = datetime.fromisoformat(str(data['t'])).timestamp()
# else:
data['t'] = parse_alpaca_timestamp(data['t'])
if not is_open_hours(datetime.fromtimestamp(data['t'], tz=zoneUTC)) and self.exthours is False:
#print("AGG: trade not in open hours skipping", datetime.fromtimestamp(data['t']).astimezone(zoneNY))
return []
#tady bude vzdycky posledni cena a posledni cas
if self.update_ltp:
ltp.price[symbol] = data['p']
ltp.time[symbol] = data['t']
#if data['p'] < self.last_price - 0.02: print("zlo:",data)
if self.rectype == RecordType.TRADE: return [data]
#print("agr přišel trade", datetime.fromtimestamp(data['t']),data)
#OPIC pokud bude vadit, ze prvni bar neni kompletni - pak zapnout tuto opicarnu
#kddyz jde o prvni iteraci a pozadujeme align, cekame na kulaty cas (pro 5s 0,5,10..)
# if self.lasttimestamp ==0 and self.align:
# if self.firstpass:
# self.tm = datetime.fromtimestamp(data['t'])
# self.tm += timedelta(seconds=self.resolution)
# self.tm = self.tm - timedelta(seconds=self.tm.second % self.resolution,microseconds=self.tm.microsecond)
# self.firstpass = False
# print("trade: ",datetime.fromtimestamp(data['t']))
# print("required",self.tm)
# if self.tm > datetime.fromtimestamp(data['t']):
# return
# else: pass
# if self.rectype in (RecordType.BAR, RecordType.CBAR):
# return await self.calculate_time_bar(data, symbol)
# if self.rectype == RecordType.CBARVOLUME:
# return await self.calculate_volume_bar(data, symbol)
# if self.rectype == RecordType.CBARVOLUME:
# return await self.calculate_volume_bar(data, symbol)
# if self.rectype == RecordType.CBARRENKO:
# return await self.calculate_renko_bar(data, symbol)
match self.rectype:
case RecordType.BAR | RecordType.CBAR:
return await self.calculate_time_bar(data, symbol)
case RecordType.CBARVOLUME:
return await self.calculate_volume_bar(data, symbol)
case RecordType.CBARDOLLAR:
return await self.calculate_dollar_bar(data, symbol)
case RecordType.CBARRENKO:
return await self.calculate_renko_bar(data, symbol)
async def calculate_time_bar(self, data, symbol):
#print("barstart",datetime.fromtimestamp(self.bar_start))
#print("oriznute data z tradu", datetime.fromtimestamp(int(data['t'])))
#print("resolution",self.resolution)
if int(data['t']) - self.bar_start < self.resolution:
issamebar = True
else:
issamebar = False
##flush předchozí bar a incializace (krom prvni iterace)
if self.lasttimestamp ==0: pass
else:
self.newBar['confirmed'] = 1
self.newBar['vwap'] = self.vwaphelper / self.newBar['volume']
#HACK pro update casu, který confirm triggeroval
#u CBARu v confirmnutem muze byt
# 1) no trades (pak potvrzujeme predchozi)
# 2) trades with same price , ktere zaroven timto flushujeme (v tomto pripade je cas updatu cas predchoziho tradu)
# variantu vyse pozname podle nastavene self.diff_price = True (mame trady a i ulozeny cas)
if self.rectype == RecordType.CBAR:
#UPDATE ať confirmace nenese zadna data, vsechny zmenena data jsou vyflusnute predtim
#pokud byly nejake trady
if self.diff_price is False:
#self.newBar['updated'] = self.lasttimestamp
#TODO tady bychom nejdriv vyflushnuly nekonfirmovany bar s trady
#a nasladne poslali prazdny confirmacni bar
self.preconfBar = deepcopy(self.newBar)
self.preconfBar['updated'] = self.lasttimestamp
self.preconfBar['confirmed'] = 0
#pridat do promenne
#else:
#NASTY HACK pro GUI
#zkousime potvrzeni baru dat o chlup mensi cas nez cas noveho baru, ktery jde hned za nim
#gui neumi zobrazit duplicity a v RT grafu nejde upravovat zpetne
#zarovname na cas baru podle timeframu(např. 5, 10, 15 ...) (ROUND)
#MUSIME VRATIT ZPET - ten upraveny cas způsobuje spatne plneni v BT, kdyz tento bar triggeruje nakup
# if self.align:
# t = datetime.fromtimestamp(data['t'])
# t = t - timedelta(seconds=t.second % self.resolution,microseconds=t.microsecond)
# #nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM)
# else:
# #ulozime si jeho timestamp (odtum pocitame resolution)
# t = datetime.fromtimestamp(int(data['t']))
# #self.newBar['updated'] = float(data['t']) - 0.001
# self.newBar['updated'] = datetime.timestamp(t) - 0.000001
self.newBar['updated'] = data['t']
#PRO standardní BAR nechavame puvodni
else:
self.newBar['updated'] = data['t']
#ulozime datum akt.tradu pro mintick
self.lastBarConfirmed = True
#ukládám si předchozí (confirmed)bar k vrácení
self.returnBar = self.newBar
#print(self.returnBar)
#inicializuji pro nový bar
self.vwaphelper = 0
# return self.newBar
##flush CONFIRMED bar to queue
#self.q.put(self.newBar)
##TODO pridat prubezne odesilani pokud je pozadovano
self.barindex +=1
self.newBar = {
"close": 0,
"high": 0,
"low": 99999999,
"volume": 0,
"trades": 0,
"hlcc4": 0,
"confirmed": 0,
"updated": 0,
"vwap": 0,
"index": self.barindex
}
#je cena stejna od predchoziho tradu? pro nepotvrzeny cbar vracime jen pri zmene ceny
if self.last_price == data['p']:
self.diff_price = False
else:
self.diff_price = True
self.last_price = data['p']
if float(data['t']) - float(self.lasttimestamp) < cfh.config_handler.get_val('GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN'):
self.trades_too_close = True
else:
self.trades_too_close = False
#spočteme vwap - potřebujeme předchozí hodnoty
self.vwaphelper += (data['p'] * data['s'])
self.newBar['updated'] = data['t']
self.newBar['close'] = data['p']
self.newBar['high'] = max(self.newBar['high'],data['p'])
self.newBar['low'] = min(self.newBar['low'],data['p'])
self.newBar['volume'] = self.newBar['volume'] + data['s']
self.newBar['trades'] = self.newBar['trades'] + 1
#pohrat si s timto round
self.newBar['hlcc4'] = round((self.newBar['high']+self.newBar['low']+self.newBar['close']+self.newBar['close'])/4,3)
#predchozi bar byl v jine vterine, tzn. ukladame do noveho (aktualniho) pocatecni hodnoty
#NEW BAR POPULATION
if (issamebar == False):
#zaciname novy bar
self.newBar['open'] = data['p']
#UPRAVENO - pouze pro prvni bar a ROUND, jinak bereme cas baru podle noveho tradu
#TODO: do budoucna vymyslet, kdyz bude mene tradu, tak to radit vzdy do spravneho intervalu
#zarovname time prvniho baru podle timeframu kam patří (např. 5, 10, 15 ...) (ROUND)
if self.align == StartBarAlign.ROUND and self.bar_start == 0:
t = datetime.fromtimestamp(data['t'], tz=zoneUTC)
t = t - timedelta(seconds=t.second % self.resolution,microseconds=t.microsecond)
self.bar_start = datetime.timestamp(t)
#nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM)
else:
#ulozime si jeho timestamp (odtum pocitame resolution)
t = datetime.fromtimestamp(int(data['t']), tz=zoneUTC)
#timestamp
self.bar_start = int(data['t'])
self.newBar['time'] = t
self.newBar['resolution'] = self.resolution
self.newBar['confirmed'] = 0
#uložíme do předchozí hodnoty (poznáme tak open a close)
self.lasttimestamp = data['t']
self.iterace += 1
# print(self.iterace, data)
#je tu maly bug pro CBAR - kdy prvni trade, který potvrzuje predchozi bar
#odesle potvrzeni predchoziho baru a nikoliv open stávajícího, ten posle až druhý trade
#což asi nevadí
#OPRAVENO
#pokud je pripraveny, vracíme předchozí confirmed bar PLUS NOVY, který ho triggeroval. pokud bylo
# pred confirmem nejake trady beze zmeny ceny flushujeme je take (preconfBar)
#predchozi bar muze obsahovat zmenena data
if len(self.returnBar) > 0:
return_set = []
#pridame preconfirm bar pokud je
if len(self.preconfBar)>0:
return_set.append(self.preconfBar)
self.preconfBar = {}
#pridame confirmation bar
return_set.append(self.returnBar)
#self.tmp = self.returnBar
self.returnBar = []
#doplnime prubezny vwap
self.newBar['vwap'] = self.vwaphelper / self.newBar['volume']
return_set.append(self.newBar)
#TODO pridat sem podporu pro mintick jako nize, tzn. pokud je v ochrannem okne, tak novy bar nevracet
#zatim je novy bar odesilan nehlede na mintick
#return_set = [self.tmp, self.newBar]
return return_set
#pro cont bar posilame ihned (TBD vwap a min bar tick value)
if self.rectype == RecordType.CBAR:
#pokud je mintick nastavený a předchozí bar byl potvrzený
if self.mintick != 0 and self.lastBarConfirmed:
#d zacatku noveho baru musi ubehnout x sekund nez posilame updazte
#pocatek noveho baru + Xs musi byt vetsi nez aktualni trade
if (self.newBar['time'] + timedelta(seconds=self.mintick)) > datetime.fromtimestamp(data['t'], tz=zoneUTC):
#print("waiting for mintick")
return []
else:
self.lastBarConfirmed = False
#doplnime prubezny vwap
self.newBar['vwap'] = self.vwaphelper / self.newBar['volume']
#print(self.newBar)
#pro (nepotvrzeny) cbar vracime jen pri zmene ceny
#nevracime pokud predchozi timestamp a novy od sebe nema alespon 1 ms (vyhneme se kulometum)
#127788.123000 127788.124000 (rozdil 0.001)
#zkousime pustit i stejnou cenu(potrebujeme kvuli MYSELLU), ale blokoval kulomet,tzn. trady mensi nez GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN (1ms)
#if self.diff_price is True:
#pripadne jeste vratit jako subpodminkiu
if self.trades_too_close is False:
return [self.newBar]
else:
return []
else:
return []
async def calculate_volume_bar(self, data, symbol):
""""
Agreguje VOLUME BARS -
hlavni promenne
- self.openedBar (dict) = stavová obsahují aktivní nepotvrzený bar
- confirmedBars (list) = nestavová obsahuje confirmnute bary, které budou na konci funkceflushnuty
"""""
#volume_bucket = 10000 #daily MA volume z emackova na 30 deleno 50ti - dat do configu
volume_bucket = self.resolution
#potvrzene pripravene k vraceni
confirmedBars = []
#potvrdi existujici a nastavi k vraceni
def confirm_existing():
self.openedBar['confirmed'] = 1
self.openedBar['vwap'] = self.vwaphelper / self.openedBar['volume']
self.vwaphelper = 0
#ulozime zacatek potvrzeneho baru
#self.lastBarConfirmed = self.openedBar['time']
self.openedBar['updated'] = data['t']
confirmedBars.append(deepcopy(self.openedBar))
self.openedBar = None
#TBD po každém potvrzení zvýšíme čas o nanosekundu (pro zobrazení v gui)
#data['t'] = data['t'] + 0.000001
#init unconfirmed - velikost bucketu kontrolovana predtim
def initialize_unconfirmed(size):
#inicializuji pro nový bar
self.vwaphelper += (data['p'] * size)
self.barindex +=1
self.openedBar = {
"close": data['p'],
"high": data['p'],
"low": data['p'],
"open": data['p'],
"volume": size,
"trades": 1,
"hlcc4": data['p'],
"confirmed": 0,
"time": datetime.fromtimestamp(data['t'], tz=zoneUTC),
"updated": data['t'],
"vwap": data['p'],
"index": self.barindex,
"resolution":volume_bucket
}
def update_unconfirmed(size):
#spočteme vwap - potřebujeme předchozí hodnoty
self.vwaphelper += (data['p'] * size)
self.openedBar['updated'] = data['t']
self.openedBar['close'] = data['p']
self.openedBar['high'] = max(self.openedBar['high'],data['p'])
self.openedBar['low'] = min(self.openedBar['low'],data['p'])
self.openedBar['volume'] = self.openedBar['volume'] + size
self.openedBar['trades'] = self.openedBar['trades'] + 1
self.openedBar['vwap'] = self.vwaphelper / self.openedBar['volume']
#pohrat si s timto round
self.openedBar['hlcc4'] = round((self.openedBar['high']+self.openedBar['low']+self.openedBar['close']+self.openedBar['close'])/4,3)
#init new - confirmed
def initialize_confirmed(size):
#ulozime zacatek potvrzeneho baru
#self.lastBarConfirmed = datetime.fromtimestamp(data['t'])
self.barindex +=1
confirmedBars.append({
"close": data['p'],
"high": data['p'],
"low": data['p'],
"open": data['p'],
"volume": size,
"trades": 1,
"hlcc4":data['p'],
"confirmed": 1,
"time": datetime.fromtimestamp(data['t'], tz=zoneUTC),
"updated": data['t'],
"vwap": data['p'],
"index": self.barindex,
"resolution":volume_bucket
})
#existuje stávající bar a vejdeme se do nej
if self.openedBar is not None and int(data['s']) + self.openedBar['volume'] < volume_bucket:
#vejdeme se do stávajícího baru (tzn. neprekracujeme bucket)
update_unconfirmed(int(data['s']))
#updatujeme stávající nepotvrzeny bar
#nevejdem se do nej nebo neexistuje predchozi bar
else:
#1)existuje predchozi bar - doplnime zbytkem do valikosti bucketu a nastavime confirmed
if self.openedBar is not None:
#doplnime je zbytkem
bucket_left = volume_bucket - self.openedBar['volume']
# - update and confirm bar
update_unconfirmed(bucket_left)
confirm_existing()
#zbytek mnozství jde do dalsiho zpracovani
data['s'] = int(data['s']) - bucket_left
#nastavime cas o nanosekundu vyssi
data['t'] = round((data['t']) + 0.000001,6)
#2 vytvarime novy bar (bary) a vejdeme se do nej
if int(data['s']) < volume_bucket:
#vytvarime novy nepotvrzeny bar
initialize_unconfirmed(int(data['s']))
#nevejdeme se do nej - pak vytvarime 1 až N dalsich baru (posledni nepotvrzený)
else:
# >>> for i in range(0, 550, 500):
# ... print(i)
# ...
# 0
# 500
#vytvarime plne potvrzene buckety (kolik se jich plne vejde)
for size in range(volume_bucket, int(data['s']), volume_bucket):
initialize_confirmed(volume_bucket)
#nastavime cas o nanosekundu vyssi
data['t'] = round((data['t']) + 0.000001,6)
#create complete full bucket with same prices and size
#naplnit do return pole
#pokud je zbytek vytvorime z nej nepotvrzeny bar
zbytek = int(data['s']) % volume_bucket
#ze zbytku vytvorime nepotvrzeny bar
if zbytek > 0:
initialize_unconfirmed(zbytek)
#create new open bar with size zbytek s otevrenym
#je cena stejna od predchoziho tradu? pro nepotvrzeny cbar vracime jen pri zmene ceny
if self.last_price == data['p']:
self.diff_price = False
else:
self.diff_price = True
self.last_price = data['p']
if float(data['t']) - float(self.lasttimestamp) < cfh.config_handler.get_val('GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN'):
self.trades_too_close = True
else:
self.trades_too_close = False
#uložíme do předchozí hodnoty (poznáme tak open a close)
self.lasttimestamp = data['t']
self.iterace += 1
# print(self.iterace, data)
#pokud mame confirm bary, tak FLUSHNEME confirm a i případný open (zrejme se pak nejaky vytvoril)
if len(confirmedBars) > 0:
return_set = confirmedBars + ([self.openedBar] if self.openedBar is not None else [])
confirmedBars = []
return return_set
#nemame confirm, FLUSHUJEME CBARVOLUME open - neresime zmenu ceny, ale neposilame kulomet (pokud nam nevytvari conf. bar)
if self.openedBar is not None and self.rectype == RecordType.CBARVOLUME:
#zkousime pustit i stejnou cenu(potrebujeme kvuli MYSELLU), ale blokoval kulomet,tzn. trady mensi nez GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN (1ms)
#if self.diff_price is True:
if self.trades_too_close is False:
return [self.openedBar]
else:
return []
else:
return []
#WIP - revidovant kod a otestovat
async def calculate_dollar_bar(self, data, symbol):
""""
Agreguje DOLLAR BARS -
hlavni promenne
- self.openedBar (dict) = stavová obsahují aktivní nepotvrzený bar
- confirmedBars (list) = nestavová obsahuje confirmnute bary, které budou na konci funkceflushnuty
"""""
#volume_bucket = 10000 #daily MA volume z emackova na 30 deleno 50ti - dat do configu
dollar_bucket = self.resolution
#potvrzene pripravene k vraceni
confirmedBars = []
#potvrdi existujici a nastavi k vraceni
def confirm_existing():
self.openedBar['confirmed'] = 1
self.openedBar['vwap'] = self.vwaphelper / self.openedBar['volume']
self.vwaphelper = 0
#ulozime zacatek potvrzeneho baru
#self.lastBarConfirmed = self.openedBar['time']
self.openedBar['updated'] = data['t']
confirmedBars.append(deepcopy(self.openedBar))
self.openedBar = None
#TBD po každém potvrzení zvýšíme čas o nanosekundu (pro zobrazení v gui)
#data['t'] = data['t'] + 0.000001
#init unconfirmed - velikost bucketu kontrolovana predtim
def initialize_unconfirmed(size):
#inicializuji pro nový bar
self.vwaphelper += (data['p'] * size)
self.barindex +=1
self.openedBar = {
"close": data['p'],
"high": data['p'],
"low": data['p'],
"open": data['p'],
"volume": size,
"trades": 1,
"hlcc4": data['p'],
"confirmed": 0,
"time": datetime.fromtimestamp(data['t'], tz=zoneUTC),
"updated": data['t'],
"vwap": data['p'],
"index": self.barindex,
"resolution":dollar_bucket
}
def update_unconfirmed(size):
#spočteme vwap - potřebujeme předchozí hodnoty
self.vwaphelper += (data['p'] * size)
self.openedBar['updated'] = data['t']
self.openedBar['close'] = data['p']
self.openedBar['high'] = max(self.openedBar['high'],data['p'])
self.openedBar['low'] = min(self.openedBar['low'],data['p'])
self.openedBar['volume'] = self.openedBar['volume'] + size
self.openedBar['trades'] = self.openedBar['trades'] + 1
self.openedBar['vwap'] = self.vwaphelper / self.openedBar['volume']
#pohrat si s timto round
self.openedBar['hlcc4'] = round((self.openedBar['high']+self.openedBar['low']+self.openedBar['close']+self.openedBar['close'])/4,3)
#init new - confirmed
def initialize_confirmed(size):
#ulozime zacatek potvrzeneho baru
#self.lastBarConfirmed = datetime.fromtimestamp(data['t'])
self.barindex +=1
confirmedBars.append({
"close": data['p'],
"high": data['p'],
"low": data['p'],
"open": data['p'],
"volume": size,
"trades": 1,
"hlcc4":data['p'],
"confirmed": 1,
"time": datetime.fromtimestamp(data['t'], tz=zoneUTC),
"updated": data['t'],
"vwap": data['p'],
"index": self.barindex,
"resolution": dollar_bucket
})
#current trade dollar value
trade_dollar_val = int(data['s'])*float(data['p'])
#existuje stávající bar a vejdeme se do nej
if self.openedBar is not None and trade_dollar_val + self.openedBar['volume']*self.openedBar['close'] < dollar_bucket:
#vejdeme se do stávajícího baru (tzn. neprekracujeme bucket)
update_unconfirmed(int(data['s']))
#updatujeme stávající nepotvrzeny bar
#nevejdem se do nej nebo neexistuje predchozi bar
else:
#1)existuje predchozi bar - doplnime zbytkem do valikosti bucketu a nastavime confirmed
if self.openedBar is not None:
#doplnime je zbytkem (v bucket left-je zbyvajici volume)
opened_bar_dollar_val = self.openedBar['volume']*self.openedBar['close']
bucket_left = int((dollar_bucket - opened_bar_dollar_val)/float(data['p']))
# - update and confirm bar
update_unconfirmed(bucket_left)
confirm_existing()
#zbytek mnozství jde do dalsiho zpracovani
data['s'] = int(data['s']) - bucket_left
#nastavime cas o nanosekundu vyssi
data['t'] = round((data['t']) + 0.000001,6)
#2 vytvarime novy bar (bary) a vejdeme se do nej
if int(data['s'])*float(data['p']) < dollar_bucket:
#vytvarime novy nepotvrzeny bar
initialize_unconfirmed(int(data['s']))
#nevejdeme se do nej - pak vytvarime 1 až N dalsich baru (posledni nepotvrzený)
else:
# >>> for i in range(0, 550, 500):
# ... print(i)
# ...
# 0
# 500
#vytvarime plne potvrzene buckety (kolik se jich plne vejde)
for size in range(int(dollar_bucket/float(data['p'])), int(data['s']), int(dollar_bucket/float(data['p']))):
initialize_confirmed(dollar_bucket/float(data['p']))
#nastavime cas o nanosekundu vyssi
data['t'] = round((data['t']) + 0.000001,6)
#create complete full bucket with same prices and size
#naplnit do return pole
#pokud je zbytek vytvorime z nej nepotvrzeny bar
zbytek = int(data['s'])*float(data['p']) % dollar_bucket
#ze zbytku vytvorime nepotvrzeny bar
if zbytek > 0:
#prevedeme zpatky na volume
zbytek = int(zbytek/float(data['p']))
initialize_unconfirmed(zbytek)
#create new open bar with size zbytek s otevrenym
#je cena stejna od predchoziho tradu? pro nepotvrzeny cbar vracime jen pri zmene ceny
if self.last_price == data['p']:
self.diff_price = False
else:
self.diff_price = True
self.last_price = data['p']
if float(data['t']) - float(self.lasttimestamp) < cfh.config_handler.get_val('GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN'):
self.trades_too_close = True
else:
self.trades_too_close = False
#uložíme do předchozí hodnoty (poznáme tak open a close)
self.lasttimestamp = data['t']
self.iterace += 1
# print(self.iterace, data)
#pokud mame confirm bary, tak FLUSHNEME confirm a i případný open (zrejme se pak nejaky vytvoril)
if len(confirmedBars) > 0:
return_set = confirmedBars + ([self.openedBar] if self.openedBar is not None else [])
confirmedBars = []
return return_set
#nemame confirm, FLUSHUJEME CBARVOLUME open - neresime zmenu ceny, ale neposilame kulomet (pokud nam nevytvari conf. bar)
if self.openedBar is not None and self.rectype == RecordType.CBARDOLLAR:
#zkousime pustit i stejnou cenu(potrebujeme kvuli MYSELLU), ale blokoval kulomet,tzn. trady mensi nez GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN (1ms)
#if self.diff_price is True:
if self.trades_too_close is False:
return [self.openedBar]
else:
return []
else:
return []
async def calculate_renko_bar(self, data, symbol):
""""
Agreguje RENKO BARS - dle brick size
hlavni promenne
- self.openedBar (dict) = stavová obsahují aktivní nepotvrzený bar
- confirmedBars (list) = nestavová obsahuje confirmnute bary, které budou na konci funkceflushnuty
Omezeni: vzhledek tomu, že strategie v CBARu potřebuje realný průběh tick by tick a skutečné Renko bary znamenají
vyřazování určitých průběhů cenu, tak je realizováno Renko bary s high and low a následným updatem open ceny před confirmací.
open a close bude tedy v potvrzeném baru správně, high-low bude ukazovat na celkový pohyb cen v rámci baru.
Ve strategii je třeba počítat s tím, že open v nepotvrzeném baru není finální.
"""""
if self.resolution < 0: # Treat as percentage
reference_price = self.lastConfirmedBar['close'] if self.lastConfirmedBar is not None else float(data['p'])
brick_size = abs(self.resolution) * reference_price / 100.0
else: # Treat as absolute value pocet ticku
brick_size = self.resolution
#pocet ticku např. 10ticků, případně pak na procenta
#brick_size = self.resolution
#potvrzene pripravene k vraceni
confirmedBars = []
#potvrdi existujici a nastavi k vraceni
def confirm_existing():
self.openedBar['confirmed'] = 1
self.openedBar['vwap'] = self.vwaphelper / self.openedBar['volume']
self.vwaphelper = 0
self.openedBar['updated'] = data['t']
obar_copy = deepcopy(self.openedBar)
confirmedBars.append(obar_copy)
self.lastConfirmedBar = obar_copy
self.openedBar = None
#TBD po každém potvrzení zvýšíme čas o nanosekundu (pro zobrazení v gui)
#data['t'] = data['t'] + 0.000001
#init unconfirmed - velikost bucketu kontrolovana predtim
def initialize_unconfirmed():
#inicializuji pro nový bar
self.vwaphelper += (data['p'] * int(data['s']))
self.barindex +=1
self.openedBar = {
"close": data['p'],
"high": data['p'],
"low": data['p'],
"open": data['p'],
"volume": int(data['s']),
"trades": 1,
"hlcc4": data['p'],
"confirmed": 0,
"time": datetime.fromtimestamp(data['t'], tz=zoneUTC),
"updated": data['t'],
"vwap": data['p'],
"index": self.barindex,
"resolution":self.resolution
}
def update_unconfirmed(open = None):
if open is not None:
self.openedBar['open'] = open
#spočteme vwap - potřebujeme předchozí hodnoty
self.vwaphelper += (data['p'] * int(data['s']))
self.openedBar['updated'] = data['t']
self.openedBar['close'] = data['p']
self.openedBar['high'] = max(self.openedBar['high'],data['p'])
self.openedBar['low'] = min(self.openedBar['low'],data['p'])
self.openedBar['volume'] = self.openedBar['volume'] + int(data['s'])
self.openedBar['trades'] = self.openedBar['trades'] + 1
self.openedBar['vwap'] = self.vwaphelper / self.openedBar['volume']
#pohrat si s timto round
self.openedBar['hlcc4'] = round((self.openedBar['high']+self.openedBar['low']+self.openedBar['close']+self.openedBar['close'])/4,3)
#init new - confirmed
def initialize_confirmed(size):
self.barindex +=1
cf_bar = {
"close": data['p'],
"high": data['p'],
"low": data['p'],
"open": data['p'],
"volume": size,
"trades": 1,
"hlcc4":data['p'],
"confirmed": 1,
"time": datetime.fromtimestamp(data['t'], tz=zoneUTC),
"updated": data['t'],
"vwap": data['p'],
"index": self.barindex,
"resolution":self.resolution
}
self.lastConfirmedBar = cf_bar
confirmedBars.append(cf_bar)
#nastaveni top a low boundary comparatorů bud podle h/l predchoziho potvrzeneho baru
if self.lastConfirmedBar is not None:
top_boundary = max(self.lastConfirmedBar["open"], self.lastConfirmedBar["close"])
low_boundary = min(self.lastConfirmedBar["open"], self.lastConfirmedBar["close"])
#nebo openu, pokud mame jen nepotvrzeny
elif self.openedBar is not None:
top_boundary = self.openedBar["open"]
low_boundary = self.openedBar["open"]
if self.openedBar is None:
initialize_unconfirmed()
#pct variant: brick_size = self.brick_percentage * self.open_price / 100.0
elif data['p'] >= top_boundary + brick_size: # Check if the price has moved by the brick size
#confirm nese novou cenu, muzou tam byt skryte trady se stejnou cenou nebo kulomet o ktere bychom prisli
#jinymi slovy prekonací tick renkobaru patří do starého baru
#novy bar je vytvoren az dalsim tickem, snad to nebude vadit
#updatujeme open, kam patri
update_unconfirmed(open=top_boundary)
confirm_existing()
elif data['p'] <= low_boundary - brick_size:
update_unconfirmed(open=low_boundary)
confirm_existing()
else:
#update stávající
update_unconfirmed()
#je cena stejna od predchoziho tradu? pro nepotvrzeny cbar vracime jen pri zmene ceny
if self.last_price == data['p']:
self.diff_price = False
else:
self.diff_price = True
self.last_price = data['p']
if float(data['t']) - float(self.lasttimestamp) < cfh.config_handler.get_val('GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN'):
self.trades_too_close = True
else:
self.trades_too_close = False
#uložíme do předchozí hodnoty (poznáme tak open a close)
self.lasttimestamp = data['t']
self.iterace += 1
# print(self.iterace, data)
#pokud mame confirm bary, tak FLUSHNEME confirm a i případný open (zrejme se pak nejaky vytvoril)
if len(confirmedBars) > 0:
return_set = confirmedBars + ([self.openedBar] if self.openedBar is not None else [])
confirmedBars = []
return return_set
#nemame confirm, FLUSHUJEME CBARVOLUME open - neresime zmenu ceny, ale neposilame kulomet (pokud nam nevytvari conf. bar)
if self.openedBar is not None and self.rectype == RecordType.CBARRENKO:
#zkousime pustit i stejnou cenu(potrebujeme kvuli MYSELLU), ale blokoval kulomet,tzn. trady mensi nez GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN (1ms)
#if self.diff_price is True:
if self.trades_too_close is False:
return [self.openedBar]
else:
return []
else:
return []
def populate_file_name(self, date_from: datetime, date_to: datetime):
#nazev obsahuje i child class
#a take excludes result = ''.join(self.excludes.sort())
self.excludes.sort() # Sorts the list in place
excludes_str = ''.join(map(str, self.excludes)) # Joins the sorted elements after converting them to strings
cache_file = self.__class__.__name__ + '-' + self.symbol + '-' + str(int(date_from.timestamp())) + '-' + str(int(date_to.timestamp())) + '-' + str(self.rectype) + "-" + str(self.resolution) + "-" + str(self.minsize) + "-" + str(self.align) + '-' + str(self.mintick) + str(self.exthours) + excludes_str + '.cache.gz'
file_path = DATA_DIR + "/aggcache/" + cache_file
#print(file_path)
return file_path
#returns cached objects for given period
def get_cache(self, date_from: datetime, date_to: datetime):
file_path = self.populate_file_name(date_from, date_to)
if self.skip_cache is False and os.path.exists(file_path):
##daily aggregated file exists
with gzip.open (file_path, 'rb') as fp:
cachedobject = dill.load(fp)
print("AGG CACHE loaded ", file_path)
if isinstance(cachedobject, Queue):
num = cachedobject.qsize()
else:
num = len(cachedobject)
print("cached items",num)
return cachedobject, file_path
else:
return None, None
#cachujeme jen na exlicitni zapnuti a jen pro BT
def enable_cache_output(self, date_from: datetime, date_to: datetime):
self.cache_output_enabled = True
self.cache_from = date_from
self.cache_to = date_to
#lozi storovany cache object self.cache_object do cache
async def store_cached_object(self):
if self.cache_output_enabled is False or self.cached_object is None or self.cache_from is None or self.cache_to is None:
print("cache not enabled or missing data")
return None
if isinstance(self.cached_object, Queue):
num = self.cached_object.qsize()
else:
num = len(self.cached_object)
file_path = self.populate_file_name(self.cache_from, self.cache_to)
with gzip.open(file_path, 'wb') as fp:
dill.dump(self.cached_object, fp)
print(f"AGG CACHE stored ({num}) :{file_path}")
print(f"DATES from:{self.cache_from.strftime('%d.%m.%Y %H:%M')} to:{self.cache_to.strftime('%d.%m.%Y %H:%M')}")
#reset values
self.cached_object = None
self.cache_output_enabled = False
self.cache_from = None
self.cache_to = None
def send_cache_to_output(self, cachedobject):
#this has to be overriden by each child
pass
class TradeAggregator2Queue(TradeAggregator):
"""
Child of TradeAggregator - sends items to given queue
In the future others will be added - TradeAggToTxT etc.
"""
def __init__(self, symbol: str, queue: Queue, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False, excludes: list = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES'), skip_cache: bool = False):
super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours, excludes=excludes, skip_cache=skip_cache)
self.queue = queue
self.symbol = symbol
self.cached_object = Queue()
# #accepts loaded queue and sents it to given output
# async def ingest_cached(self, cached_queue):
# for element in cached_queue:
# self.queue.put(element)
#prime presunuti celeho ulozeneho objektu queue
def send_cache_to_output(self, cachedobject):
self.queue.queue = cachedobject.queue
#ingest tradu do agregatoru a odeslani do vystupu
async def ingest_trade(self, data):
#print("ingest ve threadu:",current_thread().name)
res = await super().ingest_trade(data, self.symbol)
#if len(res) > 0:
for obj in res:
#print(res)
#pri rychlem plneni vetsiho dictionary se prepisovali - vyreseno kopií
if isinstance(obj, dict):
copy = obj.copy()
else:
copy = obj
##populate secondary resolution if required
#print("inserted to queue")
self.queue.put(copy)
if self.cache_output_enabled:
self.cached_object.put(copy)
if copy == 'last':
await self.store_cached_object()
res = []
#print("po insertu",res)
class TradeAggregator2List(TradeAggregator):
""""
stores records to the list
"""
def __init__(self, symbol: str, btdata: list, rectype: RecordType = RecordType.BAR, resolution: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False, excludes: list = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES'), skip_cache: bool = False):
super().__init__(rectype=rectype, resolution=resolution, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours, excludes=excludes, skip_cache=skip_cache)
self.btdata = btdata
self.symbol = symbol
self.cached_object = []
# self.debugfile = DATA_DIR + "/BACprices.txt"
# if os.path.exists(self.debugfile):
# os.remove(self.debugfile)
#prime presunuti celeho ulozeneho objektu queue
def send_cache_to_output(self, cachedobject):
self.btdata.extend(cachedobject)
#TODO teoreticky to muzeme vzdy brat z primarniho objektu jako funkce nize - promyslet
# #accepts loaded queue and sents it to given output
# async def ingest_cached(self, cached_queue):
# for element in cached_queue:
# self.btdata.append((element['t'],element['p']))
async def ingest_trade(self, data):
#print("ted vstoupil do tradeagg2list ingestu")
res1 = await super().ingest_trade(data, self.symbol)
#print("ted je po zpracovani", res1)
for obj in res1:
#pri rychlem plneni vetsiho dictionary se prepisovali - vyreseno kopií
if isinstance(obj, dict):
copy = obj.copy()
else:
copy = obj
if self.cache_output_enabled:
if copy == 'last':
await self.store_cached_object()
else:
self.cached_object.append((copy['t'],copy['p']))
if obj == 'last': return []
self.btdata.append((copy['t'],copy['p']))
# with open(self.debugfile, "a") as output:
# output.write(str(copy['t']) + ' ' + str(datetime.fromtimestamp(copy['t']).astimezone(zoneNY)) + ' ' + str(copy['p']) + '\n')
res1 = []
#print("po insertu",res)