calendar wrapper with retry, histo bars with retry

This commit is contained in:
David Brazda
2024-02-06 11:14:38 +07:00
parent c1ad713a12
commit 1f8afef042
5 changed files with 162 additions and 33 deletions

View File

@ -3,7 +3,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from v2realbot.strategy.base import StrategyState
from v2realbot.strategy.StrategyOrderLimitVykladaciNormalizedMYSELL import StrategyOrderLimitVykladaciNormalizedMYSELL
from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account
from v2realbot.utils.utils import zoneNY, print
from v2realbot.utils.utils import zoneNY, print, fetch_calendar_data
from v2realbot.utils.historicals import get_historical_bars
from datetime import datetime, timedelta
from rich import print as printanyway
@ -16,7 +16,6 @@ from v2realbot.strategyblocks.newtrade.signals import signal_search
from v2realbot.strategyblocks.activetrade.activetrade_hub import manage_active_trade
from v2realbot.strategyblocks.inits.init_indicators import initialize_dynamic_indicators
from v2realbot.strategyblocks.inits.init_directives import intialize_directive_conditions
from alpaca.trading.requests import GetCalendarRequest
from alpaca.trading.client import TradingClient
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR, OFFLINE_MODE
from alpaca.trading.models import Calendar
@ -167,10 +166,13 @@ def init(state: StrategyState):
today = time_to.date()
several_days_ago = today - timedelta(days=60)
#printanyway(f"{today=}",f"{several_days_ago=}")
clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
#clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
#get all market days from here to 40days ago
calendar_request = GetCalendarRequest(start=several_days_ago,end=today)
cal_dates = clientTrading.get_calendar(calendar_request)
#calendar_request = GetCalendarRequest(start=several_days_ago,end=today)
cal_dates = fetch_calendar_data(several_days_ago, today)
#cal_dates = clientTrading.get_calendar(calendar_request)
#find the first market day - 40days ago
#history_datetime_from = zoneNY.localize(cal_dates[0].open)

View File

@ -1,14 +1,13 @@
from v2realbot.loader.aggregator import TradeAggregator, TradeAggregator2List, TradeAggregator2Queue
#from v2realbot.loader.cacher import get_cached_agg_data
from alpaca.trading.requests import GetCalendarRequest
from alpaca.trading.client import TradingClient
from alpaca.data.live import StockDataStream
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR, OFFLINE_MODE
from alpaca.data.enums import DataFeed
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, StockTradesRequest
from threading import Thread, current_thread
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data
from v2realbot.utils.tlog import tlog
from datetime import datetime, timedelta, date
from threading import Thread
@ -26,13 +25,14 @@ from tqdm import tqdm
import time
from traceback import format_exc
from collections import defaultdict
import requests
"""
Trade offline data streamer, based on Alpaca historical data.
"""
class Trade_Offline_Streamer(Thread):
#pro BT se pripojujeme vzdy k primarnimu uctu - pouze tahame historicka data + calendar
client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True)
clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
#clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
def __init__(self, time_from: datetime, time_to: datetime, btdata) -> None:
# Call the Thread class's init function
Thread.__init__(self)
@ -64,6 +64,35 @@ class Trade_Offline_Streamer(Thread):
def stop(self):
pass
def fetch_stock_trades(self, symbol, start, end, max_retries=5, backoff_factor=1):
"""
Attempts to fetch stock trades with exponential backoff. Raises an exception if all retries fail.
:param symbol: The stock symbol to fetch trades for.
:param start: The start time for the trade data.
:param end: The end time for the trade data.
:param max_retries: Maximum number of retries.
:param backoff_factor: Factor to determine the next sleep time.
:return: TradesResponse object.
:raises: ConnectionError if all retries fail.
"""
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=start, end=end)
last_exception = None
for attempt in range(max_retries):
try:
tradesResponse = self.client.get_stock_trades(stockTradeRequest)
print("Remote Fetch DAY DATA Complete", start, end)
return tradesResponse
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
last_exception = e
time.sleep(backoff_factor * (2 ** attempt))
print("All attempts to fetch data failed.")
send_to_telegram(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {last_exception}")
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {last_exception}")
# Override the run() function of Thread class
#odebrano async
def main(self):
@ -114,15 +143,9 @@ class Trade_Offline_Streamer(Thread):
bt_day = Calendar(date=den,open="9:30",close="16:00")
cal_dates = [bt_day]
else:
calendar_request = GetCalendarRequest(start=self.time_from,end=self.time_to)
#toto zatim workaround - dat do retry funkce a obecne vymyslet exception handling, abych byl notifikovan a bylo videt okamzite v logu a na frontendu
try:
cal_dates = self.clientTrading.get_calendar(calendar_request)
except Exception as e:
print("CHYBA - retrying in 4s: " + str(e) + format_exc())
time.sleep(5)
cal_dates = self.clientTrading.get_calendar(calendar_request)
start_date = self.time_from # Assuming this is your start date
end_date = self.time_to # Assuming this is your end date
cal_dates = fetch_calendar_data(start_date, end_date)
#zatim podpora pouze main session
@ -213,9 +236,17 @@ class Trade_Offline_Streamer(Thread):
print("Loading from Trade CACHE", file_path)
#daily file doesnt exist
else:
# TODO refactor pro zpracovani vice symbolu najednou(multithreads), nyni predpokladame pouze 1
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbpole[0], start=day.open,end=day.close)
tradesResponse = self.client.get_stock_trades(stockTradeRequest)
#implement retry mechanism
symbol = symbpole[0] # Assuming symbpole[0] is your target symbol
day_open = day.open # Assuming day.open is the start time
day_close = day.close # Assuming day.close is the end time
tradesResponse = self.fetch_stock_trades(symbol, day_open, day_close)
# # TODO refactor pro zpracovani vice symbolu najednou(multithreads), nyni predpokladame pouze 1
# stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbpole[0], start=day.open,end=day.close)
# tradesResponse = self.client.get_stock_trades(stockTradeRequest)
print("Remote Fetch DAY DATA Complete", day.open, day.close)
#pokud jde o dnešní den a nebyl konec trhu tak cache neukládáme

View File

@ -0,0 +1,26 @@
from v2realbot.strategyblocks.indicators.custom.classes.indicatorbase import IndicatorBase
import pywt
import numpy as np
class DWT(IndicatorBase):
def __init__(self, state=None, wavelet='db1', levels=2):
super().__init__(state)
self.wavelet = wavelet
self.levels = levels
def next(self, close):
coeffs = pywt.wavedec(close, self.wavelet, level=self.levels)
# Zeroing out all detail coefficients
coeffs = [coeffs[0]] + [np.zeros_like(c) for c in coeffs[1:]]
# Reconstruct the signal using only the approximation coefficients
reconstructed_signal = pywt.waverec(coeffs, self.wavelet)
# Handle length difference
length_difference = len(close) - len(reconstructed_signal)
if length_difference > 0:
reconstructed_signal = np.pad(reconstructed_signal, (0, length_difference), 'constant', constant_values=(0, 0))
self.state.indicators["MultiLevelDWT"] = reconstructed_signal.tolist()
return float(reconstructed_signal[-1])

View File

@ -3,7 +3,7 @@ from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, Stoc
from alpaca.data import Quote, Trade, Snapshot, Bar
from alpaca.data.models import BarSet, QuoteSet, TradeSet
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
from v2realbot.utils.utils import zoneNY
from v2realbot.utils.utils import zoneNY, send_to_telegram
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY
from alpaca.data.enums import DataFeed
from datetime import datetime, timedelta
@ -12,6 +12,7 @@ from rich import print
from collections import defaultdict
from pandas import to_datetime
from msgpack.ext import Timestamp
import time
def convert_historical_bars(daily_bars):
"""Converts a list of daily bars into a dictionary with the specified keys.
@ -80,15 +81,48 @@ def get_todays_open():
pass
##vrati historicke bary v nasem formatu
def get_historical_bars(symbol: str, time_from: datetime, time_to: datetime, timeframe: TimeFrame):
stock_client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True)
# snapshotRequest = StockSnapshotRequest(symbol_or_symbols=[symbol], feed=DataFeed.SIP)
# snapshotResponse = stock_client.get_stock_snapshot(snapshotRequest)
# print("snapshot", snapshotResponse)
# def get_historical_bars(symbol: str, time_from: datetime, time_to: datetime, timeframe: TimeFrame):
# stock_client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True)
# # snapshotRequest = StockSnapshotRequest(symbol_or_symbols=[symbol], feed=DataFeed.SIP)
# # snapshotResponse = stock_client.get_stock_snapshot(snapshotRequest)
# # print("snapshot", snapshotResponse)
bar_request = StockBarsRequest(symbol_or_symbols=symbol,timeframe=timeframe, start=time_from, end=time_to, feed=DataFeed.SIP)
bars: BarSet = stock_client.get_stock_bars(bar_request)
#print("puvodni bars", bars["BAC"])
# bar_request = StockBarsRequest(symbol_or_symbols=symbol,timeframe=timeframe, start=time_from, end=time_to, feed=DataFeed.SIP)
# bars: BarSet = stock_client.get_stock_bars(bar_request)
# #print("puvodni bars", bars["BAC"])
# if bars[symbol][0] is None:
# return None
# return convert_historical_bars(bars[symbol])
def get_historical_bars(symbol: str, time_from: datetime, time_to: datetime, timeframe: TimeFrame, max_retries=5, backoff_factor=1):
"""
Fetches historical bar data with retries on failure.
:param symbol: Stock symbol.
:param time_from: Start time for the data.
:param time_to: End time for the data.
:param timeframe: Timeframe for the data.
:param max_retries: Maximum number of retries.
:param backoff_factor: Factor to determine the next sleep time.
:return: Converted historical bar data.
:raises: Exception if all retries fail.
"""
stock_client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True)
bar_request = StockBarsRequest(symbol_or_symbols=symbol, timeframe=timeframe, start=time_from, end=time_to, feed=DataFeed.SIP)
last_exception = None
for attempt in range(max_retries):
try:
bars = stock_client.get_stock_bars(bar_request)
if bars[symbol][0] is None:
return None
return convert_historical_bars(bars[symbol])
except Exception as e:
print(f"Load historical bars Attempt {attempt + 1} failed: {e}")
last_exception = e
time.sleep(backoff_factor * (2 ** attempt))
print("All attempts to fetch historical bar data failed.")
send_to_telegram(f"Failed to fetch historical bar data after {max_retries} retries. Last exception: {last_exception}")
raise Exception(f"Failed to fetch historical bar data after {max_retries} retries. Last exception: {last_exception}")

View File

@ -13,7 +13,7 @@ from v2realbot.common.model import StrategyInstance, Runner, RunArchive, RunArch
from v2realbot.common.PrescribedTradeModel import Trade, TradeDirection, TradeStatus, TradeStoplossType
from typing import List
import tomli
from v2realbot.config import DATA_DIR, QUIET_MODE,NORMALIZED_TICK_BASE_PRICE
from v2realbot.config import DATA_DIR, QUIET_MODE,NORMALIZED_TICK_BASE_PRICE,ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY
import requests
from uuid import UUID
#from decimal import Decimal
@ -26,6 +26,42 @@ import pandas as pd
from collections import deque
import socket
import numpy as np
from alpaca.trading.requests import GetCalendarRequest
from alpaca.trading.client import TradingClient
import time as timepkg
#Alpaca Calendar wrapper with retry
def fetch_calendar_data(start, end, max_retries=5, backoff_factor=1):
"""
Attempts to fetch calendar data with exponential backoff. Raises an exception if all retries fail.
TODO sem pridat local caching mechanism
:param client: Alpaca API client instance.
:param start: The start date for the calendar data.
:param end: The end date for the calendar data.
:param max_retries: Maximum number of retries.
:param backoff_factor: Factor to determine the next sleep time.
:return: Calendar data.
:raises: ConnectionError if all retries fail.
"""
clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
calendar_request = GetCalendarRequest(start=start, end=end)
last_exception = None
for attempt in range(max_retries):
try:
cal_dates = clientTrading.get_calendar(calendar_request)
richprint("Calendar data fetch successful", start, end)
return cal_dates
except Exception as e:
richprint(f"Attempt {attempt + 1} failed: {e}")
last_exception = e
timepkg.sleep(backoff_factor * (2 ** attempt))
richprint("****All attempts to fetch calendar data failed.****")
send_to_telegram(f"FETCH_CALENDER_DATA_FAILED. {last_exception} BACKEST STOPPED" )
raise ConnectionError(f"Failed to fetch calendar data after {max_retries} retries. Last exception: {last_exception}")
def concatenate_weekdays(weekday_filter):
# Mapping of weekdays where 0 is Monday and 6 is Sunday