optimalizace BT runu - offline loader skrz queue

This commit is contained in:
David Brazda
2023-09-20 16:23:56 +02:00
parent d0d2e91468
commit a0cbeb2bc6
12 changed files with 200 additions and 72 deletions
+3 -3
View File
@@ -107,9 +107,9 @@ class TradeAggregator:
except KeyError:
ltp.price[symbol]=data['p']
if float(data['p']) > float(ltp.price[symbol]) + (float(data['p'])/100*pct_off) or float(data['p']) < float(ltp.price[symbol])-(float(data['p'])/100*pct_off):
print("ZLO", data,ltp.price[symbol])
#DOCASNE VYPNUTO - VYMYSLET JINAK
#if float(data['p']) > float(ltp.price[symbol]) + (float(data['p'])/100*pct_off) or float(data['p']) < float(ltp.price[symbol])-(float(data['p'])/100*pct_off):
#print("ZLO", data,ltp.price[symbol])
#nechavame zlo zatim projit
##return []
# with open("cache/wrongtrades.txt", 'a') as fp:
+49 -13
View File
@@ -17,6 +17,8 @@ from msgpack import packb
from pandas import to_datetime
import pickle
import os
from rich import print
import queue
"""
Trade offline data streamer, based on Alpaca historical data.
@@ -55,6 +57,7 @@ class Trade_Offline_Streamer(Thread):
# Override the run() function of Thread class
#odebrano async
def main(self):
trade_queue = queue.Queue()
print(10*"*","Trade OFFLINE streamer STARTED", current_thread().name,10*"*")
if not self.streams:
@@ -91,7 +94,10 @@ class Trade_Offline_Streamer(Thread):
##PREPSAT jednoduse tak, aby podporovalo jen jeden symbol
#agregator2list bude mit vstup list
#datetime.fromtimestamp(data['updated']).astimezone(zoneNY))
#REFACTOR STARTS HERE
#print(f"{self.time_from=} {self.time_to=}")
calendar_request = GetCalendarRequest(start=self.time_from,end=self.time_to)
cal_dates = self.clientTrading.get_calendar(calendar_request)
#ic(cal_dates)
@@ -101,15 +107,20 @@ class Trade_Offline_Streamer(Thread):
#minimalni jednotka pro CACHE je 1 den - a to jen marketopen to marketclose (extended hours not supported yet)
for day in cal_dates:
print("Processing DAY", day.date)
print(day.date)
#print(day.date)
print(day.open)
print(day.close)
#make it offset aware
day.open = day.open.replace(tzinfo=zoneNY)
day.open = zoneNY.localize(day.open)
#day.open.replace(tzinfo=zoneNY)
#add 20 minutes of premarket
#day.open = day.open - timedelta(minutes=20)
day.close = day.close.replace(tzinfo=zoneNY)
day.close = zoneNY.localize(day.close)
#day.close = day.close.replace(tzinfo=zoneNY)
#print(day.open)
#print(day.close)
#print("dayopentimestamp", day.open.timestamp())
#print("dayclosetimestamp", day.close.timestamp())
##pokud datum do je mensi day.open, tak tento den neresime
if self.time_to < day.open:
print("time_to je pred zacatkem marketu. Vynechavame tento den.")
@@ -184,7 +195,14 @@ class Trade_Offline_Streamer(Thread):
#poustime i 20 minut premarketu pro presnejsi populaci slopu v prvnich minutech
# - timedelta(minutes=20)
if self.time_from < to_datetime(t['t']) < self.time_to:
#homogenizace timestampu s online streamem
#tmp = to_datetime(t['t'], utc=True).timestamp()
datum = to_datetime(t['t'], utc=True)
if self.time_from < datum < self.time_to:
#poustime dal, jinak ne
if wait_for_q:
#cekame na Q nebo na O (nekterym dnum chybelo Q)
@@ -194,7 +212,10 @@ class Trade_Offline_Streamer(Thread):
wait_for_q = False
#homogenizace timestampu s online streamem
t['t'] = Timestamp.from_unix(to_datetime(t['t']).timestamp())
t['t'] = Timestamp.from_unix(datum.timestamp())
#print(f"{t['t']}")
#t['t'] = Timestamp.from_unix(to_datetime(t['t']).timestamp())
#print(to_datetime(t['t']).timestamp())
#print("PROGRESS ",cnt,"/",celkem)
#print(t)
@@ -204,23 +225,38 @@ class Trade_Offline_Streamer(Thread):
#print("zaznam",t)
#print("Ingest", s, "zaznam", t)
#await s.ingest_trade(packb(t))
asyncio.run(s.ingest_trade(packb(t)))
trade_queue.put((s,t))
##asyncio.run(s.ingest_trade(packb(t)))
cnt += 1
#protoze jsou serazene, tak prvni ktery je vetsi muze prerusit
elif to_datetime(t['t']) > self.time_to:
print("prerusujeme")
elif datum > self.time_to:
#print(f"{datum=}")
#print(to_datetime(t['t']))
#print(f"{self.time_to=}")
#print("prerusujeme")
break
#vsem streamum posleme last TODO: (tuto celou cast prepsat a zjednodusit)
#po loadovani vsech dnu
print("naloadovane vse posilame last")
for s in self.to_run[symbpole[0]]:
#zde bylo await
asyncio.run(s.ingest_trade(packb("last")))
trade_queue.put((s,"last"))
##asyncio.run(s.ingest_trade(packb("last")))
print("poslano last")
#loop = asyncio.get_running_loop()
print("stoping loop")
#loop.stop()
async def process_trade_queue(trade_queue):
while not trade_queue.empty():
#print("send trade")
s, trade = trade_queue.get()
await s.ingest_trade(packb(trade))
#spusteni asyncio run - tentokrat jednou, ktera spusti proces jez to z queue odesle
#nevyhoda reseni - kdyz je to pres vice dnu, tak se naloaduji vsechny dny do queue
#ale to mi u Classic zatim nevadi - poustim per days
#uvidim jak to ovlivn rychlost
asyncio.run(process_trade_queue(trade_queue))
print("skoncilo zpracovani ASYNCIO RUN TRADE QUEUE - zpracovany vsechny trady v agreagtorech")
print(10*"*","Trade OFFLINE streamer STOPPED", current_thread().name,10*"*")