diff --git a/v2realbot/ENTRY_ClassicSL_v01.py b/v2realbot/ENTRY_ClassicSL_v01.py index 7eed9d3..dcace2b 100644 --- a/v2realbot/ENTRY_ClassicSL_v01.py +++ b/v2realbot/ENTRY_ClassicSL_v01.py @@ -3,7 +3,7 @@ sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from v2realbot.strategy.base import StrategyState from v2realbot.strategy.StrategyOrderLimitVykladaciNormalizedMYSELL import StrategyOrderLimitVykladaciNormalizedMYSELL from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account -from v2realbot.utils.utils import zoneNY, print +from v2realbot.utils.utils import zoneNY, print, fetch_calendar_data from v2realbot.utils.historicals import get_historical_bars from datetime import datetime, timedelta from rich import print as printanyway @@ -16,7 +16,6 @@ from v2realbot.strategyblocks.newtrade.signals import signal_search from v2realbot.strategyblocks.activetrade.activetrade_hub import manage_active_trade from v2realbot.strategyblocks.inits.init_indicators import initialize_dynamic_indicators from v2realbot.strategyblocks.inits.init_directives import intialize_directive_conditions -from alpaca.trading.requests import GetCalendarRequest from alpaca.trading.client import TradingClient from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR, OFFLINE_MODE from alpaca.trading.models import Calendar @@ -167,10 +166,13 @@ def init(state: StrategyState): today = time_to.date() several_days_ago = today - timedelta(days=60) #printanyway(f"{today=}",f"{several_days_ago=}") - clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False) + #clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False) #get all market days from here to 40days ago - calendar_request = GetCalendarRequest(start=several_days_ago,end=today) - cal_dates = clientTrading.get_calendar(calendar_request) + + #calendar_request = GetCalendarRequest(start=several_days_ago,end=today) + + cal_dates = fetch_calendar_data(several_days_ago, today) + #cal_dates = clientTrading.get_calendar(calendar_request) #find the first market day - 40days ago #history_datetime_from = zoneNY.localize(cal_dates[0].open) diff --git a/v2realbot/loader/trade_offline_streamer.py b/v2realbot/loader/trade_offline_streamer.py index 69bce01..a5dc936 100644 --- a/v2realbot/loader/trade_offline_streamer.py +++ b/v2realbot/loader/trade_offline_streamer.py @@ -1,14 +1,13 @@ from v2realbot.loader.aggregator import TradeAggregator, TradeAggregator2List, TradeAggregator2Queue #from v2realbot.loader.cacher import get_cached_agg_data from alpaca.trading.requests import GetCalendarRequest -from alpaca.trading.client import TradingClient from alpaca.data.live import StockDataStream from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR, OFFLINE_MODE from alpaca.data.enums import DataFeed from alpaca.data.historical import StockHistoricalDataClient from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, StockTradesRequest from threading import Thread, current_thread -from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY +from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data from v2realbot.utils.tlog import tlog from datetime import datetime, timedelta, date from threading import Thread @@ -26,13 +25,14 @@ from tqdm import tqdm import time from traceback import format_exc from collections import defaultdict +import requests """ Trade offline data streamer, based on Alpaca historical data. """ class Trade_Offline_Streamer(Thread): #pro BT se pripojujeme vzdy k primarnimu uctu - pouze tahame historicka data + calendar client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True) - clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False) + #clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False) def __init__(self, time_from: datetime, time_to: datetime, btdata) -> None: # Call the Thread class's init function Thread.__init__(self) @@ -64,6 +64,35 @@ class Trade_Offline_Streamer(Thread): def stop(self): pass + def fetch_stock_trades(self, symbol, start, end, max_retries=5, backoff_factor=1): + """ + Attempts to fetch stock trades with exponential backoff. Raises an exception if all retries fail. + + :param symbol: The stock symbol to fetch trades for. + :param start: The start time for the trade data. + :param end: The end time for the trade data. + :param max_retries: Maximum number of retries. + :param backoff_factor: Factor to determine the next sleep time. + :return: TradesResponse object. + :raises: ConnectionError if all retries fail. + """ + stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=start, end=end) + last_exception = None + + for attempt in range(max_retries): + try: + tradesResponse = self.client.get_stock_trades(stockTradeRequest) + print("Remote Fetch DAY DATA Complete", start, end) + return tradesResponse + except Exception as e: + print(f"Attempt {attempt + 1} failed: {e}") + last_exception = e + time.sleep(backoff_factor * (2 ** attempt)) + + print("All attempts to fetch data failed.") + send_to_telegram(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {last_exception}") + raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {last_exception}") + # Override the run() function of Thread class #odebrano async def main(self): @@ -114,15 +143,9 @@ class Trade_Offline_Streamer(Thread): bt_day = Calendar(date=den,open="9:30",close="16:00") cal_dates = [bt_day] else: - calendar_request = GetCalendarRequest(start=self.time_from,end=self.time_to) - - #toto zatim workaround - dat do retry funkce a obecne vymyslet exception handling, abych byl notifikovan a bylo videt okamzite v logu a na frontendu - try: - cal_dates = self.clientTrading.get_calendar(calendar_request) - except Exception as e: - print("CHYBA - retrying in 4s: " + str(e) + format_exc()) - time.sleep(5) - cal_dates = self.clientTrading.get_calendar(calendar_request) + start_date = self.time_from # Assuming this is your start date + end_date = self.time_to # Assuming this is your end date + cal_dates = fetch_calendar_data(start_date, end_date) #zatim podpora pouze main session @@ -213,9 +236,17 @@ class Trade_Offline_Streamer(Thread): print("Loading from Trade CACHE", file_path) #daily file doesnt exist else: - # TODO refactor pro zpracovani vice symbolu najednou(multithreads), nyni predpokladame pouze 1 - stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbpole[0], start=day.open,end=day.close) - tradesResponse = self.client.get_stock_trades(stockTradeRequest) + + #implement retry mechanism + symbol = symbpole[0] # Assuming symbpole[0] is your target symbol + day_open = day.open # Assuming day.open is the start time + day_close = day.close # Assuming day.close is the end time + + tradesResponse = self.fetch_stock_trades(symbol, day_open, day_close) + + # # TODO refactor pro zpracovani vice symbolu najednou(multithreads), nyni predpokladame pouze 1 + # stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbpole[0], start=day.open,end=day.close) + # tradesResponse = self.client.get_stock_trades(stockTradeRequest) print("Remote Fetch DAY DATA Complete", day.open, day.close) #pokud jde o dnešní den a nebyl konec trhu tak cache neukládáme diff --git a/v2realbot/strategyblocks/indicators/custom/classes/DWT.py b/v2realbot/strategyblocks/indicators/custom/classes/DWT.py new file mode 100644 index 0000000..38b6b64 --- /dev/null +++ b/v2realbot/strategyblocks/indicators/custom/classes/DWT.py @@ -0,0 +1,26 @@ +from v2realbot.strategyblocks.indicators.custom.classes.indicatorbase import IndicatorBase +import pywt +import numpy as np + +class DWT(IndicatorBase): + def __init__(self, state=None, wavelet='db1', levels=2): + super().__init__(state) + self.wavelet = wavelet + self.levels = levels + + def next(self, close): + coeffs = pywt.wavedec(close, self.wavelet, level=self.levels) + # Zeroing out all detail coefficients + coeffs = [coeffs[0]] + [np.zeros_like(c) for c in coeffs[1:]] + + # Reconstruct the signal using only the approximation coefficients + reconstructed_signal = pywt.waverec(coeffs, self.wavelet) + + # Handle length difference + length_difference = len(close) - len(reconstructed_signal) + if length_difference > 0: + reconstructed_signal = np.pad(reconstructed_signal, (0, length_difference), 'constant', constant_values=(0, 0)) + + self.state.indicators["MultiLevelDWT"] = reconstructed_signal.tolist() + + return float(reconstructed_signal[-1]) diff --git a/v2realbot/utils/historicals.py b/v2realbot/utils/historicals.py index 44c0a46..5ee3356 100644 --- a/v2realbot/utils/historicals.py +++ b/v2realbot/utils/historicals.py @@ -3,7 +3,7 @@ from alpaca.data.requests import StockLatestQuoteRequest, StockBarsRequest, Stoc from alpaca.data import Quote, Trade, Snapshot, Bar from alpaca.data.models import BarSet, QuoteSet, TradeSet from alpaca.data.timeframe import TimeFrame, TimeFrameUnit -from v2realbot.utils.utils import zoneNY +from v2realbot.utils.utils import zoneNY, send_to_telegram from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY from alpaca.data.enums import DataFeed from datetime import datetime, timedelta @@ -12,6 +12,7 @@ from rich import print from collections import defaultdict from pandas import to_datetime from msgpack.ext import Timestamp +import time def convert_historical_bars(daily_bars): """Converts a list of daily bars into a dictionary with the specified keys. @@ -80,15 +81,48 @@ def get_todays_open(): pass ##vrati historicke bary v nasem formatu -def get_historical_bars(symbol: str, time_from: datetime, time_to: datetime, timeframe: TimeFrame): - stock_client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True) - # snapshotRequest = StockSnapshotRequest(symbol_or_symbols=[symbol], feed=DataFeed.SIP) - # snapshotResponse = stock_client.get_stock_snapshot(snapshotRequest) - # print("snapshot", snapshotResponse) +# def get_historical_bars(symbol: str, time_from: datetime, time_to: datetime, timeframe: TimeFrame): +# stock_client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True) +# # snapshotRequest = StockSnapshotRequest(symbol_or_symbols=[symbol], feed=DataFeed.SIP) +# # snapshotResponse = stock_client.get_stock_snapshot(snapshotRequest) +# # print("snapshot", snapshotResponse) - bar_request = StockBarsRequest(symbol_or_symbols=symbol,timeframe=timeframe, start=time_from, end=time_to, feed=DataFeed.SIP) - bars: BarSet = stock_client.get_stock_bars(bar_request) - #print("puvodni bars", bars["BAC"]) - if bars[symbol][0] is None: - return None - return convert_historical_bars(bars[symbol]) +# bar_request = StockBarsRequest(symbol_or_symbols=symbol,timeframe=timeframe, start=time_from, end=time_to, feed=DataFeed.SIP) +# bars: BarSet = stock_client.get_stock_bars(bar_request) +# #print("puvodni bars", bars["BAC"]) +# if bars[symbol][0] is None: +# return None +# return convert_historical_bars(bars[symbol]) + +def get_historical_bars(symbol: str, time_from: datetime, time_to: datetime, timeframe: TimeFrame, max_retries=5, backoff_factor=1): + """ + Fetches historical bar data with retries on failure. + + :param symbol: Stock symbol. + :param time_from: Start time for the data. + :param time_to: End time for the data. + :param timeframe: Timeframe for the data. + :param max_retries: Maximum number of retries. + :param backoff_factor: Factor to determine the next sleep time. + :return: Converted historical bar data. + :raises: Exception if all retries fail. + """ + stock_client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=True) + bar_request = StockBarsRequest(symbol_or_symbols=symbol, timeframe=timeframe, start=time_from, end=time_to, feed=DataFeed.SIP) + + last_exception = None + + for attempt in range(max_retries): + try: + bars = stock_client.get_stock_bars(bar_request) + if bars[symbol][0] is None: + return None + return convert_historical_bars(bars[symbol]) + except Exception as e: + print(f"Load historical bars Attempt {attempt + 1} failed: {e}") + last_exception = e + time.sleep(backoff_factor * (2 ** attempt)) + + print("All attempts to fetch historical bar data failed.") + send_to_telegram(f"Failed to fetch historical bar data after {max_retries} retries. Last exception: {last_exception}") + raise Exception(f"Failed to fetch historical bar data after {max_retries} retries. Last exception: {last_exception}") diff --git a/v2realbot/utils/utils.py b/v2realbot/utils/utils.py index d8fce84..e00c0c7 100644 --- a/v2realbot/utils/utils.py +++ b/v2realbot/utils/utils.py @@ -13,7 +13,7 @@ from v2realbot.common.model import StrategyInstance, Runner, RunArchive, RunArch from v2realbot.common.PrescribedTradeModel import Trade, TradeDirection, TradeStatus, TradeStoplossType from typing import List import tomli -from v2realbot.config import DATA_DIR, QUIET_MODE,NORMALIZED_TICK_BASE_PRICE +from v2realbot.config import DATA_DIR, QUIET_MODE,NORMALIZED_TICK_BASE_PRICE,ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY import requests from uuid import UUID #from decimal import Decimal @@ -26,6 +26,42 @@ import pandas as pd from collections import deque import socket import numpy as np +from alpaca.trading.requests import GetCalendarRequest +from alpaca.trading.client import TradingClient +import time as timepkg + +#Alpaca Calendar wrapper with retry +def fetch_calendar_data(start, end, max_retries=5, backoff_factor=1): + """ + Attempts to fetch calendar data with exponential backoff. Raises an exception if all retries fail. + + TODO sem pridat local caching mechanism + + :param client: Alpaca API client instance. + :param start: The start date for the calendar data. + :param end: The end date for the calendar data. + :param max_retries: Maximum number of retries. + :param backoff_factor: Factor to determine the next sleep time. + :return: Calendar data. + :raises: ConnectionError if all retries fail. + """ + clientTrading = TradingClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False) + calendar_request = GetCalendarRequest(start=start, end=end) + last_exception = None + + for attempt in range(max_retries): + try: + cal_dates = clientTrading.get_calendar(calendar_request) + richprint("Calendar data fetch successful", start, end) + return cal_dates + except Exception as e: + richprint(f"Attempt {attempt + 1} failed: {e}") + last_exception = e + timepkg.sleep(backoff_factor * (2 ** attempt)) + + richprint("****All attempts to fetch calendar data failed.****") + send_to_telegram(f"FETCH_CALENDER_DATA_FAILED. {last_exception} BACKEST STOPPED" ) + raise ConnectionError(f"Failed to fetch calendar data after {max_retries} retries. Last exception: {last_exception}") def concatenate_weekdays(weekday_filter): # Mapping of weekdays where 0 is Monday and 6 is Sunday