Scheduler support #24sched

This commit is contained in:
David Brazda
2024-02-22 23:05:49 +07:00
parent ed6285dcf5
commit d3cb2fa760
28 changed files with 4096 additions and 72 deletions

View File

@ -1,7 +1,7 @@
import os,sys
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.environ["KERAS_BACKEND"] = "jax"
from v2realbot.config import WEB_API_KEY, DATA_DIR, MEDIA_DIRECTORY, LOG_FILE, MODEL_DIR
from v2realbot.config import WEB_API_KEY, DATA_DIR, MEDIA_DIRECTORY, LOG_PATH, MODEL_DIR
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from datetime import datetime
from rich import print
@ -11,7 +11,7 @@ import uvicorn
from uuid import UUID
import v2realbot.controller.services as cs
from v2realbot.utils.ilog import get_log_window
from v2realbot.common.model import StrategyInstance, RunnerView, RunRequest, Trade, RunArchive, RunArchiveView, RunArchiveViewPagination, RunArchiveDetail, Bar, RunArchiveChange, TestList, ConfigItem, InstantIndicator, DataTablesRequest, AnalyzerInputs
from v2realbot.common.model import RunManagerRecord, StrategyInstance, RunnerView, RunRequest, Trade, RunArchive, RunArchiveView, RunArchiveViewPagination, RunArchiveDetail, Bar, RunArchiveChange, TestList, ConfigItem, InstantIndicator, DataTablesRequest, AnalyzerInputs
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Depends, HTTPException, status, WebSocketException, Cookie, Query
from fastapi.responses import FileResponse, StreamingResponse, JSONResponse
from fastapi.staticfiles import StaticFiles
@ -39,6 +39,9 @@ import shutil
from starlette.responses import JSONResponse
import mlroom
import mlroom.utils.mlutils as ml
from typing import List
import v2realbot.controller.run_manager as rm
import v2realbot.scheduler.ap_scheduler as aps
#from async io import Queue, QueueEmpty
#
# install()
@ -249,11 +252,13 @@ def _run_stratin(stratin_id: UUID, runReq: RunRequest):
runReq.bt_to = zoneNY.localize(runReq.bt_to)
#pokud jedeme nad test intervaly anebo je požadováno více dní - pouštíme jako batch day by day
#do budoucna dát na FE jako flag
if runReq.mode != Mode.LIVE and runReq.test_batch_id is not None or (runReq.bt_from.date() != runReq.bt_to.date()):
print(runReq)
if runReq.mode not in [Mode.LIVE, Mode.PAPER] and (runReq.test_batch_id is not None or (runReq.bt_from is not None and runReq.bt_to is not None and runReq.bt_from.date() != runReq.bt_to.date())):
res, id = cs.run_batch_stratin(id=stratin_id, runReq=runReq)
else:
if runReq.weekdays_filter is not None:
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail=f"Weekday only for backtest mode with batch (not single day)")
#not necessary for live/paper the weekdays are simply ignored, in the future maybe add validation if weekdays are presented
#if runReq.weekdays_filter is not None:
# raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail=f"Weekday only for backtest mode with batch (not single day)")
res, id = cs.run_stratin(id=stratin_id, runReq=runReq)
if res == 0: return id
elif res < 0:
@ -555,27 +560,30 @@ def _get_archived_runner_log_byID(runner_id: UUID, timestamp_from: float, timest
# endregion
# A simple function to read the last lines of a file
def tail(file_path, n=10, buffer_size=1024):
with open(file_path, 'rb') as f:
f.seek(0, 2) # Move to the end of the file
file_size = f.tell()
lines = []
buffer = bytearray()
try:
with open(file_path, 'rb') as f:
f.seek(0, 2) # Move to the end of the file
file_size = f.tell()
lines = []
buffer = bytearray()
for i in range(file_size // buffer_size + 1):
read_start = max(-buffer_size * (i + 1), -file_size)
f.seek(read_start, 2)
read_size = min(buffer_size, file_size - buffer_size * i)
buffer[0:0] = f.read(read_size) # Prepend to buffer
for i in range(file_size // buffer_size + 1):
read_start = max(-buffer_size * (i + 1), -file_size)
f.seek(read_start, 2)
read_size = min(buffer_size, file_size - buffer_size * i)
buffer[0:0] = f.read(read_size) # Prepend to buffer
if buffer.count(b'\n') >= n + 1:
break
if buffer.count(b'\n') >= n + 1:
break
lines = buffer.decode(errors='ignore').splitlines()[-n:]
return lines
lines = buffer.decode(errors='ignore').splitlines()[-n:]
return lines
except Exception as e:
return [str(e) + format_exc()]
@app.get("/log", dependencies=[Depends(api_key_auth)])
def read_log(lines: int = 10):
log_path = LOG_FILE
def read_log(lines: int = 700, logfile: str = "strat.log"):
log_path = LOG_PATH / logfile
return {"lines": tail(log_path, lines)}
#get alpaca history bars
@ -674,7 +682,7 @@ def get_testlists():
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"No data found")
# API endpoint to retrieve a single record by ID
@app.get('/testlists/{record_id}')
@app.get('/testlists/{record_id}', dependencies=[Depends(api_key_auth)])
def get_testlist(record_id: str):
res, testlist = cs.get_testlist_byID(record_id=record_id)
@ -684,7 +692,7 @@ def get_testlist(record_id: str):
raise HTTPException(status_code=404, detail='Record not found')
# API endpoint to update a record
@app.put('/testlists/{record_id}')
@app.put('/testlists/{record_id}', dependencies=[Depends(api_key_auth)])
def update_testlist(record_id: str, testlist: TestList):
# Check if the record exists
conn = pool.get_connection()
@ -704,7 +712,7 @@ def update_testlist(record_id: str, testlist: TestList):
return testlist
# API endpoint to delete a record
@app.delete('/testlists/{record_id}')
@app.delete('/testlists/{record_id}', dependencies=[Depends(api_key_auth)])
def delete_testlist(record_id: str):
# Check if the record exists
conn = pool.get_connection()
@ -788,6 +796,66 @@ def delete_item(item_id: int) -> dict:
# endregion
# region scheduler
# 1. Fetch All RunManagerRecords
@app.get("/run_manager_records/", dependencies=[Depends(api_key_auth)], response_model=List[RunManagerRecord])
#TODO zvazit rozsireni vystupu o strat_status (running/stopped)
def get_all_run_manager_records():
result, records = rm.fetch_all_run_manager_records()
if result != 0:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error fetching records")
return records
# 2. Fetch RunManagerRecord by ID
@app.get("/run_manager_records/{record_id}", dependencies=[Depends(api_key_auth)], response_model=RunManagerRecord)
#TODO zvazit rozsireni vystupu o strat_status (running/stopped)
def get_run_manager_record(record_id: UUID):
result, record = rm.fetch_run_manager_record_by_id(record_id)
if result == -2: # Record not found
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail="Record not found")
elif result != 0:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Error fetching record")
return record
# 3. Update RunManagerRecord
@app.patch("/run_manager_records/{record_id}", dependencies=[Depends(api_key_auth)], status_code=status.HTTP_200_OK)
def update_run_manager_record(record_id: UUID, update_data: RunManagerRecord):
#make dates zone aware zoneNY
# if update_data.valid_from is not None:
# update_data.valid_from = zoneNY.localize(update_data.valid_from)
# if update_data.valid_to is not None:
# update_data.valid_to = zoneNY.localize(update_data.valid_to)
result, message = rm.update_run_manager_record(record_id, update_data)
if result == -2: # Update failed
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
elif result != 0:
raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=f"Error during update {result} {message}")
return {"message": "Record updated successfully"}
# 4. Delete RunManagerRecord
@app.delete("/run_manager_records/{record_id}", dependencies=[Depends(api_key_auth)], status_code=status.HTTP_200_OK)
def delete_run_manager_record(record_id: UUID):
result, message = rm.delete_run_manager_record(record_id)
if result == -2: # Delete failed
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail=message)
elif result != 0:
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail=f"Error during deletion {result} {message}")
return {"message": "Record deleted successfully"}
@app.post("/run_manager_records/", status_code=status.HTTP_201_CREATED)
def create_run_manager_record(new_record: RunManagerRecord, api_key_auth: Depends = Depends(api_key_auth)):
#make date zone aware - convert to zoneNY
# if new_record.valid_from is not None:
# new_record.valid_from = zoneNY.localize(new_record.valid_from)
# if new_record.valid_to is not None:
# new_record.valid_to = zoneNY.localize(new_record.valid_to)
result, record_id = rm.add_run_manager_record(new_record)
if result != 0:
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE, detail=f"Error during record creation: {result} {record_id}")
return {"id": record_id}
# endregion
#model section
#UPLOAD MODEL
@app.post("/model/upload_model", dependencies=[Depends(api_key_auth)])
@ -924,7 +992,22 @@ if __name__ == "__main__":
insert_thread = Thread(target=insert_queue2db)
insert_thread.start()
#attach debugGER to be able to debug scheduler jobs (run in separate threads)
# debugpy.listen(('localhost', 5678))
# print("Waiting for debugger to attach...")
# debugpy.wait_for_client() # Script will pause here until debugger is attached
#init scheduled tasks from schedule table
#Add APS scheduler job refresh
res, result = aps.initialize_jobs()
if res < 0:
#raise exception
raise Exception(f"Error {res} initializing APS jobs, error {result}")
uvicorn.run("__main__:app", host="0.0.0.0", port=8000, reload=False)
except Exception as e:
print("Error intializing app: " + str(e) + format_exc())
aps.scheduler.shutdown(wait=False)
finally:
print("closing insert_conn connection")
insert_conn.close()