draft nove classis strategie

This commit is contained in:
David Brazda
2023-08-24 21:12:45 +02:00
parent 2c7d279303
commit 88bd8c30c2
27 changed files with 1395 additions and 47 deletions
+2 -2
View File
@@ -11,7 +11,7 @@ import threading
from copy import deepcopy
from msgpack import unpackb
import os
from config import DATA_DIR, GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN
from v2realbot.config import DATA_DIR, GROUP_TRADES_WITH_TIMESTAMP_LESS_THAN
class TradeAggregator:
def __init__(self,
@@ -383,7 +383,7 @@ class TradeAggregator2Queue(TradeAggregator):
copy = obj
##populate secondary resolution if required
#print("inserted to queue")
self.queue.put(copy)
res = []
#print("po insertu",res)
+20 -10
View File
@@ -42,17 +42,19 @@ class Trade_Offline_Streamer(Thread):
pass
def run(self):
#create new asyncio loop in the thread
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.create_task(self.main())
loop.run_forever()
self.main()
# #create new asyncio loop in the thread
# loop = asyncio.new_event_loop()
# asyncio.set_event_loop(loop)
# loop.create_task(self.main())
# loop.run_forever()
def stop(self):
pass
# Override the run() function of Thread class
async def main(self):
#odebrano async
def main(self):
print(10*"*","Trade OFFLINE streamer STARTED", current_thread().name,10*"*")
if not self.streams:
@@ -194,16 +196,24 @@ class Trade_Offline_Streamer(Thread):
for s in self.to_run[symbol]:
#print("zaznam",t)
#print("Ingest", s, "zaznam", t)
await s.ingest_trade(packb(t))
#await s.ingest_trade(packb(t))
asyncio.run(s.ingest_trade(packb(t)))
cnt += 1
#protoze jsou serazene, tak prvni ktery je vetsi muze prerusit
elif to_datetime(t['t']) > self.time_to:
print("prerusujeme")
break
#vsem streamum posleme last TODO: (tuto celou cast prepsat a zjednodusit)
#po loadovani vsech dnu
print("naloadovane vse posilame last")
for s in self.to_run[symbpole[0]]:
await s.ingest_trade(packb("last"))
#zde bylo await
asyncio.run(s.ingest_trade(packb("last")))
print("poslano last")
loop = asyncio.get_running_loop()
#loop = asyncio.get_running_loop()
print("stoping loop")
loop.stop()
#loop.stop()
print(10*"*","Trade OFFLINE streamer STOPPED", current_thread().name,10*"*")