first commit

This commit is contained in:
David Brazda
2023-04-12 21:00:03 +02:00
commit af9e944928
158 changed files with 19422 additions and 0 deletions
View File
+329
View File
@@ -0,0 +1,329 @@
"""
Aggregator mdoule containing main aggregator logic for TRADES, BARS and CBAR
"""
from v2realbot.enums.enums import RecordType, StartBarAlign
from datetime import datetime, timedelta
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, Queue,is_open_hours,zoneNY
from queue import Queue
from rich import print
from v2realbot.enums.enums import Mode
import threading
from copy import deepcopy
from msgpack import unpackb
import os
from config import DATA_DIR
class TradeAggregator:
def __init__(self,
rectype: RecordType = RecordType.BAR,
timeframe: int = 5,
minsize: int = 100,
update_ltp: bool = False,
align: StartBarAlign = StartBarAlign.ROUND,
mintick: int = 0,
exthours: bool = False):
"""
Create trade agregator. Instance accepts trades one by one and process them and returns output type
Trade - return trade one by one (no change)
Bar - return finished bar in given timeframe
CBar - returns continuous bar, finished bar is marked by confirmed status
Args:
timeframe (number): Resolution of bar in seconds
update_ltp (bool): Whether to update global variable with price (usually only one instance does that)
align: Defines alignement of first bar. ROUND - according to timeframe( 5,10,15 - for 5s timeframe), RANDOM - according to timestamp of first trade
mintick: Applies for CBAR. Minimální mezera po potvrzeni baru a aktualizaci dalsiho nepotvrzeneho (např. pro 15s, muzeme chtit prvni tick po 5s). po teto dobe realtime.
"""
self.rectype: RecordType = rectype
self.timeframe = timeframe
self.minsize = minsize
self.update_ltp = update_ltp
self.exthours = exthours
if mintick >= timeframe:
print("Mintick musi byt mensi nez timeframe")
raise Exception
self.mintick = mintick
#class variables = starters
self.iterace = 1
self.lasttimestamp = 0
#inicalizace pro prvni agregaci
self.newBar = dict(high=0, low=999999, volume = 0, trades = 0, confirmed = 0, vwap = 0, close=0, index = 1, updated = 0)
self.bar_start = 0
self.align = align
self.tm: datetime = None
self.firstpass = True
self.vwaphelper = 0
self.returnBar = {}
self.lastBarConfirmed = False
#min trade size
self.minsize = minsize
#instance variable to hold last trade price
self.last_price = 0
self.barindex = 1
async def ingest_trade(self, indata, symbol):
"""
Aggregator logic for trade record
Args:
indata (dict): online or offline record
"""
data = unpackb(indata)
#last item signal
if data == "last": return data
#print(data)
##implementing fitlers - zatim natvrdo a jen tyto: size: 1, cond in [O,C,4] opening,closed a derivately priced,
## 22.3. - dal jsem pryc i contingency trades [' ', '7', 'V'] - nasel jsem obchod o 30c mimo
## Q - jsou v pohode, oteviraci trady, ale O jsou jejich duplikaty
try:
for i in data['c']:
if i in ('C','O','4','B','7','V'): return 0
except KeyError:
pass
#EXPERIMENT zkusime vyhodit vsechny pod 50 #puv if int(data['s']) == 1: return 0
#zatim nechavame - výsledek je naprosto stejný jako v tradingview
if int(data['s']) < self.minsize: return 0
#{'t': 1678982075.242897, 'x': 'D', 'p': 29.1333, 's': 18000, 'c': [' ', '7', 'V'], 'i': 79372107591749, 'z': 'A', 'u': 'incorrect'}
if 'u' in data: return 0
#pokud projde TRADE s cenou 0.33% rozdilna oproti predchozi, pak vyhazujeme v ramci cisteni dat (cca 10ticku na 30USD)
pct_off = 0.33
#ic(ltp.price)
#ic(ltp.price[symbol])
try:
ltp.price[symbol]
except KeyError:
ltp.price[symbol]=data['p']
if float(data['p']) > float(ltp.price[symbol]) + (float(data['p'])/100*pct_off) or float(data['p']) < float(ltp.price[symbol])-(float(data['p'])/100*pct_off):
print("ZLO", data,ltp.price[symbol])
#nechavame zlo zatim projit
##return 0
# with open("cache/wrongtrades.txt", 'a') as fp:
# fp.write(str(data) + 'predchozi:'+str(ltp.price[symbol])+'\n')
#timestampy jsou v UTC
#TIMESTAMP format is different for online and offline trade streams
#offline trade
#{'t': '2023-02-17T14:30:00.16111744Z', 'x': 'J', 'p': 35.14, 's': 20, 'c': [' ', 'F', 'I'], 'i': 52983525027938, 'z': 'A'}
#websocket trade
#{'T': 't', 'S': 'MSFT', 'i': 372, 'x': 'V', 'p': 264.58, 's': 25, 'c': ['@', 'I'], 'z': 'C', 't': Timestamp(seconds=1678973696, nanoseconds=67312449), 'r': Timestamp(seconds=1678973696, nanoseconds=72865209)}
#parse alpaca timestamp
# tzn. na offline mohu pouzit >>> datetime.fromisoformat(d).timestamp() 1676644200.161117
#orizne sice nanosekundy ale to nevadi
#print("tady", self.mode, data['t'])
# if self.mode == Mode.BT:
# data['t'] = datetime.fromisoformat(str(data['t'])).timestamp()
# else:
data['t'] = parse_alpaca_timestamp(data['t'])
if not is_open_hours(datetime.fromtimestamp(data['t'])) and self.exthours is False:
#print("AGG: trade not in open hours skipping", datetime.fromtimestamp(data['t']).astimezone(zoneNY))
return 0
#tady bude vzdycky posledni cena a posledni cas
if self.update_ltp:
ltp.price[symbol] = data['p']
ltp.time[symbol] = data['t']
#if data['p'] < self.last_price - 0.02: print("zlo:",data)
if self.rectype == RecordType.TRADE: return data
#print("agr přišel trade", datetime.fromtimestamp(data['t']),data)
#OPIC pokud bude vadit, ze prvni bar neni kompletni - pak zapnout tuto opicarnu
#kddyz jde o prvni iteraci a pozadujeme align, cekame na kulaty cas (pro 5s 0,5,10..)
# if self.lasttimestamp ==0 and self.align:
# if self.firstpass:
# self.tm = datetime.fromtimestamp(data['t'])
# self.tm += timedelta(seconds=self.timeframe)
# self.tm = self.tm - timedelta(seconds=self.tm.second % self.timeframe,microseconds=self.tm.microsecond)
# self.firstpass = False
# print("trade: ",datetime.fromtimestamp(data['t']))
# print("required",self.tm)
# if self.tm > datetime.fromtimestamp(data['t']):
# return
# else: pass
#print("barstart",datetime.fromtimestamp(self.bar_start))
#print("oriznute data z tradu", datetime.fromtimestamp(int(data['t'])))
#print("timeframe",self.timeframe)
if int(data['t']) - self.bar_start < self.timeframe:
issamebar = True
else:
issamebar = False
##flush předchozí bar a incializace (krom prvni iterace)
if self.lasttimestamp ==0: pass
else:
self.newBar['confirmed'] = 1
self.newBar['vwap'] = self.vwaphelper / self.newBar['volume']
#updatujeme čas - obsahuje datum tradu, který confirm triggeroval
self.newBar['updated'] = data['t']
#ulozime datum akt.tradu pro mintick
self.lastBarConfirmed = True
#ukládám si předchozí (confirmed)bar k vrácení
self.returnBar = self.newBar
#print(self.returnBar)
#inicializuji pro nový bar
self.vwaphelper = 0
# return self.newBar
##flush CONFIRMED bar to queue
#self.q.put(self.newBar)
##TODO pridat prubezne odesilani pokud je pozadovano
self.barindex +=1
self.newBar = {
"close": 0,
"high": 0,
"low": 99999999,
"volume": 0,
"trades": 0,
"hlcc4": 0,
"confirmed": 0,
"updated": 0,
"vwap": 0,
"index": self.barindex
}
self.last_price = data['p']
#spočteme vwap - potřebujeme předchozí hodnoty
self.vwaphelper += (data['p'] * data['s'])
self.newBar['updated'] = data['t']
self.newBar['close'] = data['p']
self.newBar['high'] = max(self.newBar['high'],data['p'])
self.newBar['low'] = min(self.newBar['low'],data['p'])
self.newBar['volume'] = self.newBar['volume'] + data['s']
self.newBar['trades'] = self.newBar['trades'] + 1
#pohrat si s timto round
self.newBar['hlcc4'] = round((self.newBar['high']+self.newBar['low']+self.newBar['close']+self.newBar['close'])/4,3)
#predchozi bar byl v jine vterine, tzn. ukladame do noveho (aktualniho) pocatecni hodnoty
if (issamebar == False):
#zaciname novy bar
self.newBar['open'] = data['p']
#zarovname time prvniho baru podle timeframu kam patří (např. 5, 10, 15 ...) (ROUND)
if self.align:
t = datetime.fromtimestamp(data['t'])
t = t - timedelta(seconds=t.second % self.timeframe,microseconds=t.microsecond)
self.bar_start = datetime.timestamp(t)
#nebo pouzijeme datum tradu zaokrouhlene na vteriny (RANDOM)
else:
#ulozime si jeho timestamp (odtum pocitame timeframe)
t = datetime.fromtimestamp(int(data['t']))
#timestamp
self.bar_start = int(data['t'])
self.newBar['time'] = t
self.newBar['resolution'] = self.timeframe
self.newBar['confirmed'] = 0
#uložíme do předchozí hodnoty (poznáme tak open a close)
self.lasttimestamp = data['t']
self.iterace += 1
# print(self.iterace, data)
#je tu maly bug pro CBAR - kdy prvni trade, který potvrzuje predchozi bar
#odesle potvrzeni predchoziho baru a nikoliv open stávajícího, ten posle až druhý trade
#což asi nevadí
#pokud je pripraveny, vracíme předchozí confirmed bar
if len(self.returnBar) > 0:
self.tmp = self.returnBar
self.returnBar = {}
#print(self.tmp)
return self.tmp
#pro cont bar posilame ihned (TBD vwap a min bar tick value)
if self.rectype == RecordType.CBAR:
#pokud je mintick nastavený a předchozí bar byl potvrzený
if self.mintick != 0 and self.lastBarConfirmed:
#d zacatku noveho baru musi ubehnout x sekund nez posilame updazte
#pocatek noveho baru + Xs musi byt vetsi nez aktualni trade
if (self.newBar['time'] + timedelta(seconds=self.mintick)) > datetime.fromtimestamp(data['t']):
#print("waiting for mintick")
return 0
else:
self.lastBarConfirmed = False
#doplnime prubezny vwap
self.newBar['vwap'] = self.vwaphelper / self.newBar['volume']
#print(self.newBar)
return self.newBar
else:
return 0
class TradeAggregator2Queue(TradeAggregator):
"""
Child of TradeAggregator - sends items to given queue
In the future others will be added - TradeAggToTxT etc.
"""
def __init__(self, symbol: str, queue: Queue, rectype: RecordType = RecordType.BAR, timeframe: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False):
super().__init__(rectype=rectype, timeframe=timeframe, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours)
self.queue = queue
self.symbol = symbol
async def ingest_trade(self, data):
#print("ingest ve threadu:",current_thread().name)
res = await super().ingest_trade(data, self.symbol)
if res != 0:
#print(res)
#pri rychlem plneni vetsiho dictionary se prepisovali - vyreseno kopií
if isinstance(res, dict):
copy = res.copy()
else:
copy = res
self.queue.put(copy)
res = {}
#print("po insertu",res)
class TradeAggregator2List(TradeAggregator):
""""
stores records to the list
"""
def __init__(self, symbol: str, btdata: list, rectype: RecordType = RecordType.BAR, timeframe: int = 5, minsize: int = 100, update_ltp: bool = False, align: StartBarAlign = StartBarAlign.ROUND, mintick: int = 0, exthours: bool = False):
super().__init__(rectype=rectype, timeframe=timeframe, minsize=minsize, update_ltp=update_ltp, align=align, mintick=mintick, exthours=exthours)
self.btdata = btdata
self.symbol = symbol
# self.debugfile = DATA_DIR + "/BACprices.txt"
# if os.path.exists(self.debugfile):
# os.remove(self.debugfile)
async def ingest_trade(self, data):
#print("ted vstoupil do tradeagg2list ingestu")
res1 = await super().ingest_trade(data, self.symbol)
#print("ted je po zpracovani", res1)
if res1 != 0:
#pri rychlem plneni vetsiho dictionary se prepisovali - vyreseno kopií
if isinstance(res1, dict):
copy = res1.copy()
else:
copy = res1
if res1 == 'last': return 0
self.btdata.append((copy['t'],copy['p']))
# with open(self.debugfile, "a") as output:
# output.write(str(copy['t']) + ' ' + str(datetime.fromtimestamp(copy['t']).astimezone(zoneNY)) + ' ' + str(copy['p']) + '\n')
res1 = {}
#print("po insertu",res)
@@ -0,0 +1,44 @@
from threading import Thread
from alpaca.trading.stream import TradingStream
from v2realbot.config import Keys
#jelikoz Alpaca podporuje pripojeni libovolneho poctu websocket instanci na order updates
#vytvorime pro kazdou bezici instanci vlastni webservisu (jinak bychom museli delat instanci pro kombinaci ACCOUNT1 - LIVE, ACCOUNT1 - PAPER, ACCOUNT2 - PAPER ..)
#bude jednodussi mit jednu instanci pokazde
"""""
Connects to Alpaca websocket, listens to trade updates
of given account. All notifications of given SYMBOL
routes to strategy callback.
As Alpaca supports connecting of any number of trade updates clients
new instance of this websocket thread is created for each strategy instance.
"""""
class LiveOrderUpdatesStreamer(Thread):
def __init__(self, key: Keys, name: str) -> None:
self.key = key
self.strategy = None
self.client = TradingStream(api_key=key.API_KEY, secret_key=key.SECRET_KEY, paper=key.PAPER)
Thread.__init__(self, name=name)
#notif dispatcher - pouze 1 strategie
async def distributor(self,data):
if self.strategy.symbol == data.order.symbol: await self.strategy.order_updates(data)
# connects callback to interface object - responses for given symbol are routed to interface callback
def connect_callback(self, st):
self.strategy = st
def disconnect_callback(self, st):
print("*"*10, "WS Order Update Streamer stopping for", self.strategy.name, "*"*10)
self.strategy = None
self.client.stop()
def run(self):
## spusti webservice
if self.strategy is None:
print("connect strategy first")
return
self.client.subscribe_trade_updates(self.distributor)
print("*"*10, "WS Order Update Streamer started for", self.strategy.name, "*"*10)
self.client.run()
+207
View File
@@ -0,0 +1,207 @@
from v2realbot.loader.aggregator import TradeAggregator, TradeAggregator2List, TradeAggregator2Queue
from alpaca.trading.requests import GetCalendarRequest
from alpaca.trading.client import TradingClient
from alpaca.data.live import StockDataStream
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR
from alpaca.data.enums import DataFeed
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, StockTradesRequest
from threading import Thread, current_thread
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, print
from v2realbot.utils.tlog import tlog
from datetime import datetime, timedelta
from threading import Thread
import asyncio
from msgpack.ext import Timestamp
from msgpack import packb
from pandas import to_datetime
import pickle
import os
"""
Trade offline data streamer, based on Alpaca historical data.
"""
class Trade_Offline_Streamer(Thread):
#pro BT se pripojujeme vzdy k primarnimu uctu - pouze tahame historicka data + calendar
client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True)
clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
def __init__(self, time_from: datetime, time_to: datetime, btdata) -> None:
# Call the Thread class's init function
Thread.__init__(self)
self.uniquesymbols = set()
self.streams = []
self.to_run = dict()
self.time_from = time_from
self.time_to = time_to
self.btdata = btdata
def add_stream(self, obj: TradeAggregator):
self.streams.append(obj)
def remove_stream(self, obj):
pass
def run(self):
#create new asyncio loop in the thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(self.main())
loop.run_forever()
def stop(self):
pass
# Override the run() function of Thread class
async def main(self):
print(10*"*","Trade OFFLINE streamer STARTED", current_thread().name,10*"*")
if not self.streams:
print("call add streams to queue first")
return 0
#iterujeme nad streamy
for i in self.streams:
self.uniquesymbols.add(i.symbol)
ic(self.uniquesymbols)
##z unikatnich symbolu naplnime keys pro dictionary
# for i in self.uniquesymbols:
# self.to_run
#vytvorime prazdne dict oklicovane unik.symboly a obsahujici prazdne pole
self.to_run = {key: [] for key in self.uniquesymbols}
#stejne tak pro glob.tridu last price
ltp.price = {key: 0 for key in self.uniquesymbols}
#pro kazdy symbol do toho pole ulozime instance na spusteni
print(self.to_run)
for i in self.streams:
self.to_run[i.symbol].append(i)
ic(self.to_run)
#prepare data
symbpole = []
for key in self.uniquesymbols:
symbpole.append(key)
#print(symbpole))
ic(self.time_from.astimezone(tz=zoneNY))
ic(self.time_to.astimezone(tz=zoneNY))
##PREPSAT jednoduse tak, aby podporovalo jen jeden symbol
#agregator2list bude mit vstup list
#REFACTOR STARTS HERE
calendar_request = GetCalendarRequest(start=self.time_from,end=self.time_to)
cal_dates = self.clientTrading.get_calendar(calendar_request)
ic(cal_dates)
#zatim podpora pouze main session
#zatim podpora pouze 1 symbolu, predelat na froloop vsech symbolu ze symbpole
#minimalni jednotka pro CACHE je 1 den - a to jen marketopen to marketclose (extended hours not supported yet)
for day in cal_dates:
print("Processing DAY", day.date)
print(day.date)
print(day.open)
print(day.close)
#make it offset aware
day.open = day.open.replace(tzinfo=zoneNY)
day.close = day.close.replace(tzinfo=zoneNY)
##pokud datum do je mensi day.open, tak tento den neresime
if self.time_to < day.open:
print("time_to je pred zacatkem marketu. Vynechavame tento den.")
continue
daily_file = str(symbpole[0]) + '-' + str(int(day.open.timestamp())) + '-' + str(int(day.close.timestamp())) + '.cache'
print(daily_file)
file_path = DATA_DIR + "/"+daily_file
if os.path.exists(file_path):
##denní file existuje
#loadujeme ze souboru
#pokud je start_time < trade < end_time
#odesíláme do queue
#jinak pass
with open (file_path, 'rb') as fp:
tradesResponse = pickle.load(fp)
print("Loading DATA from CACHE", file_path)
#daily file doesnt exist
else:
# TODO refactor pro zpracovani vice symbolu najednou(multithreads), nyni predpokladame pouze 1
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbpole[0], start=day.open,end=day.close)
tradesResponse = self.client.get_stock_trades(stockTradeRequest)
print("Remote Fetch DAY DATA Complete", day.open, day.close)
#pokud jde o dnešní den a nebyl konec trhu tak cache neukládáme
if day.open < datetime.now().astimezone(zoneNY) < day.close:
print("not saving the cache, market still open today")
ic(datetime.now().astimezone(zoneNY))
ic(day.open, day.close)
else:
with open(file_path, 'wb') as fp:
pickle.dump(tradesResponse, fp)
#zde už máme daily data
#pokud je start_time < trade < end_time
#odesíláme do queue
#jinak ne
#TODO pokud data zahrnuji open (tzn. bud cely den(jednotest nebo v ramci vice dni) a nebo jednotest se zacatkem v 9:30 nebo driv.
#- pockame na trade Q a od nej budeme pocitat
# abychom meli zarovnano s tradingview
#- zaroven pak cekame na M(market close) a od nej uz take nic dál nepoustime (NOT IMPLEMENTED YET)
#protze mi chodi data jen v main sessione, pak jediné, kdy nečekáme na Q, je když time_from je větší než day.open
# (např. požadovaná start až od 10:00)
if self.time_from > day.open:
wait_for_q = False
else:
wait_for_q = True
ic(wait_for_q)
# v tradesResponse je dict = Trades identifikovane symbolem
for symbol in tradesResponse:
#print(tradesResponse[symbol])
celkem = len(tradesResponse[symbol])
ic(symbol, celkem)
#print("POCET: ", celkem)
cnt = 1
for t in tradesResponse[symbol]:
#protoze je zde cely den, poustime dal, jen ty relevantni
#pokud je start_time < trade < end_time
#datetime.fromtimestamp(parse_alpaca_timestamp(t['t']))
#ic(t['t'])
if self.time_from < to_datetime(t['t']) < self.time_to:
#poustime dal, jinak ne
if wait_for_q:
if 'Q' not in t['c']: continue
else:
ic("Q found poustime dal")
wait_for_q = False
#homogenizace timestampu s online streamem
t['t'] = Timestamp.from_unix(to_datetime(t['t']).timestamp())
#print("PROGRESS ",cnt,"/",celkem)
#print(t)
#na rozdil od wwebsocketu zde nemame v zaznamu symbol ['S']
#vsem streamum na tomto symbolu posilame data - tbd mozna udelat i per stream vlakno
for s in self.to_run[symbol]:
#print("zaznam",t)
#print("Ingest", s, "zaznam", t)
await s.ingest_trade(packb(t))
cnt += 1
#vsem streamum posleme last TODO: (tuto celou cast prepsat a zjednodusit)
#po loadovani vsech dnu
for s in self.to_run[symbpole[0]]:
await s.ingest_trade(packb("last"))
loop = asyncio.get_running_loop()
print("stoping loop")
loop.stop()
print(10*"*","Trade OFFLINE streamer STOPPED", current_thread().name,10*"*")
+136
View File
@@ -0,0 +1,136 @@
"""
Classes for streamers (websocket and offline)
currently only streams are Trades
"""
from v2realbot.loader.aggregator import TradeAggregator2Queue
from alpaca.data.live import StockDataStream
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, ACCOUNT1_PAPER_FEED
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, StockTradesRequest
from threading import Thread, current_thread
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp
from datetime import datetime, timedelta
from threading import Thread, Lock
from msgpack import packb
"""
Shared streamer (can be shared amongst concurrently running strategies)
Connects to alpaca websocket client and subscribe for trades on symbols requested
by strategies
"""
class Trade_WS_Streamer(Thread):
##tento ws streamer je pouze jeden pro vsechny, tzn. vyuziváme natvrdo placena data primarniho uctu (nezalezi jestli paper nebo live)
client = StockDataStream(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True, websocket_params={}, feed=ACCOUNT1_PAPER_FEED)
#uniquesymbols = set()
_streams = []
#to_run = dict()
#lock = Lock()
def __init__(self, name: str) -> None:
# Call the Thread class's init function
Thread.__init__(self, name=name)
def symbol_exists(self, symbol):
for i in Trade_WS_Streamer._streams:
if i.symbol == symbol:
return True
return False
def add_stream(self, obj: TradeAggregator2Queue):
print("stav pred pridavanim", Trade_WS_Streamer._streams)
Trade_WS_Streamer._streams.append(obj)
if Trade_WS_Streamer.client._running is False:
print("websocket zatim nebezi, pouze pridavame do pole")
else:
print("websocket client bezi")
if self.symbol_exists(obj.symbol):
print("Symbol",obj.symbol,"již je subscribnuty")
return
Trade_WS_Streamer.client.subscribe_trades(self.datahandler, obj.symbol)
def remove_stream(self, obj: TradeAggregator2Queue):
#delete added stream
try:
Trade_WS_Streamer._streams.remove(obj)
except ValueError:
print("value not found in _streams")
return
#if it is the last item at all, stop the client from running
if len(Trade_WS_Streamer._streams) == 0:
print("removed last item from WS, stopping the client")
Trade_WS_Streamer.client.stop()
return
if not self.symbol_exists(obj.symbol):
Trade_WS_Streamer.client.unsubscribe_trades(obj.symbol)
print("symbol no longer used, unsubscribed from ", obj.symbol)
# dispatch for all streams
@classmethod
async def datahandler(cls, data):
#REFACTOR nemuze byt takto? vyzkouset, pripadne se zbavit to_run dict
#overit i kvuli performance
for i in cls._streams:
if i.symbol == data['S']:
await i.ingest_trade(packb(data))
#pro každý symbol volat příslušné agregátory pro symbol
# for i in self.to_run[data['S']]:
# #print("ingest pro", data['S'], "volano", i)
# await i.ingest_trade(packb(data))
# #print("*"*40)
#zatim vracime do jedne queue - dodelat dynamicky
# Override the run() function of Thread class
def run(self):
if len(Trade_WS_Streamer._streams)==0:
print("call add streams to queue")
print("*"*10, "WS Streamer - run", current_thread().name,"*"*10)
#iterujeme nad streamy
unique = set()
for i in self._streams:
print("symbol ve streams", i.symbol)
unique.add(i.symbol)
##z unikatnich symbolu naplnime keys pro dictionary
#print(self.uniquesymbols)
# for i in self.uniquesymbols:
# self.to_run
#TODO nejspis s lockem? kdyz menime pri bezici strategii
#vytvorime prazdne dict oklicovane unik.symboly a obsahujici prazdne pole
#with self.lock:
##self.to_run = {key: [] for key in self.uniquesymbols}
#stejne tak pro glob.tridu last price
#TODO predelat pro concurrency
#ltp.price = {key: 0 for key in self.uniquesymbols}
#pro kazdy symbol do toho pole ulozime instance na spusteni
# print(self.to_run)
# for i in self._streams:
# self.to_run[i.symbol].append(i)
# print ("promenna to_run:",self.to_run)
# sub for unique symbols
for i in unique:
Trade_WS_Streamer.client.subscribe_trades(Trade_WS_Streamer.datahandler, i)
print("subscribed to",i)
#timto se spusti jenom poprve v 1 vlaknu
#ostatni pouze vyuzivaji
if Trade_WS_Streamer.client._running is False:
print(self.name, "it is not running, starting by calling RUN")
print("*"*10, "WS Streamer STARTED", "*"*10)
Trade_WS_Streamer.client.run()
print("*"*10, "WS Streamer STOPPED", "*"*10)
#tímto se spustí pouze 1.vlakno, nicmene subscribe i pripadny unsubscribe zafunguji
else:
print("Websocket client is running, not calling RUN this time")