This commit is contained in:
David Brazda
2024-10-30 14:08:59 +01:00
parent ffb3938209
commit ca554cf600
6 changed files with 223 additions and 254 deletions

View File

@ -30,28 +30,31 @@ day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop) day_stop = zoneNY.localize(day_stop)
#requested AGG #requested AGG
resolution = 1 resolution = 12 #12s
agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO
exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults
minsize = 100 minsize = 100
main_session_only = True main_session_only = True
force_remote = True force_remote = False
ohlcv_df = load_data(symbol = symbol, data = load_data(symbol = symbol,
agg_type = agg_type, agg_type = agg_type,
resolution = resolution, resolution = resolution,
start_date = day_start, start_date = day_start,
end_date = day_stop, end_date = day_stop,
#exclude_conditions = None, #exclude_conditions = None,
minsize = 100, minsize = minsize,
main_session_only = True, main_session_only = main_session_only,
force_remote = False force_remote = force_remote,
return_vbt = True, #returns vbt object
verbose = True
) )
bac_df = ohlcv_df["BAC"] bac_df = ohlcv_df["BAC"]
basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY) basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY)
vbt.settings['plotting']['auto_rangebreaks'] = True vbt.settings['plotting']['auto_rangebreaks'] = True
basic_data.ohlcv.plot() basic_data.ohlcv.plot()
data.ohlcv.data[symbol[0]].lw.plot()
``` ```
## prepare trade cache ## prepare trade cache

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name='ttools', name='ttools',
version='0.6.0', version='0.6.1',
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
# list your dependencies here # list your dependencies here

File diff suppressed because one or more lines are too long

View File

@ -14,9 +14,9 @@ ENV_FILE = find_dotenv()
#NALOADUJEME DOTENV ENV VARIABLES #NALOADUJEME DOTENV ENV VARIABLES
if load_dotenv(ENV_FILE, verbose=True) is False: if load_dotenv(ENV_FILE, verbose=True) is False:
print(f"Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.") print(f"TTOOLS: Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.")
else: else:
print(f"Loaded env variables from file {ENV_FILE}") print(f"TTOOLS: Loaded env variables from file {ENV_FILE}")
ACCOUNT1_LIVE_API_KEY = os.environ.get('ACCOUNT1_LIVE_API_KEY') ACCOUNT1_LIVE_API_KEY = os.environ.get('ACCOUNT1_LIVE_API_KEY')
ACCOUNT1_LIVE_SECRET_KEY = os.environ.get('ACCOUNT1_LIVE_SECRET_KEY') ACCOUNT1_LIVE_SECRET_KEY = os.environ.get('ACCOUNT1_LIVE_SECRET_KEY')

View File

@ -16,7 +16,7 @@ from time import time as timetime
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from alpaca.data.enums import DataFeed from alpaca.data.enums import DataFeed
import random import random
from ttools.utils import AggType, fetch_calendar_data from ttools.utils import AggType, fetch_calendar_data, print, set_verbose
from tqdm import tqdm from tqdm import tqdm
import threading import threading
from typing import List, Union from typing import List, Union
@ -147,7 +147,7 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No
df = df[df['s'] >= minsize] df = df[df['s'] >= minsize]
return df return df
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP): def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None):
#doc for this function #doc for this function
""" """
Attempts to fetch stock trades either from cache or remote. When remote, it uses retry mechanism with exponential backoff. Attempts to fetch stock trades either from cache or remote. When remote, it uses retry mechanism with exponential backoff.
@ -178,6 +178,9 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
if not is_same_day: if not is_same_day:
raise ValueError("fetch_daily_stock_trades is not implemented for multiple days!") raise ValueError("fetch_daily_stock_trades is not implemented for multiple days!")
if verbose is not None:
set_verbose(verbose) # Change global verbose if specified
#exists in cache? #exists in cache?
daily_file = f"{symbol}-{str(start.date())}.parquet" daily_file = f"{symbol}-{str(start.date())}.parquet"
file_path = TRADE_CACHE / daily_file file_path = TRADE_CACHE / daily_file
@ -227,7 +230,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
print("All attempts to fetch data failed.") print("All attempts to fetch data failed.")
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}") raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False): def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None):
""" """
Fetch trades between ranges. Fetch trades between ranges.
@ -249,25 +252,28 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
:param no_return: If True, do not return the DataFrame. Used to prepare cached files. :param no_return: If True, do not return the DataFrame. Used to prepare cached files.
:return: DataFrame containing all trades from start_date to end_date. :return: DataFrame containing all trades from start_date to end_date.
""" """
if verbose is not None:
set_verbose(verbose) # Change global verbose if specified
futures = [] futures = []
results = [] results = []
market_open_days = fetch_calendar_data(start_date, end_date) market_open_days = fetch_calendar_data(start_date, end_date)
day_count = len(market_open_days) day_count = len(market_open_days)
print("Contains", day_count, " market days") print(f"{symbol} Contains", day_count, " market days")
max_workers = min(10, max(2, day_count // 2)) if max_workers is None else max_workers # Heuristic: half the days to process, but at least 1 and no more than 10 max_workers = min(10, max(2, day_count // 2)) if max_workers is None else max_workers # Heuristic: half the days to process, but at least 1 and no more than 10
#which days to fetch? #which days to fetch?
days_from_remote = [] days_from_remote = []
days_from_cache = [] days_from_cache = []
if not force_remote: if not force_remote:
for market_day in tqdm(market_open_days, desc="Processing market days"): for market_day in market_open_days:
daily_file_new = str(symbol) + '-' + str(market_day.date) + '.parquet' daily_file_new = str(symbol) + '-' + str(market_day.date) + '.parquet'
file_path_new = TRADE_CACHE / daily_file_new file_path_new = TRADE_CACHE / daily_file_new
if file_path_new.exists(): if file_path_new.exists():
days_from_remote.append(market_day)
else:
days_from_cache.append((market_day,file_path_new)) days_from_cache.append((market_day,file_path_new))
else:
days_from_remote.append(market_day)
else: else:
days_from_remote = market_open_days days_from_remote = market_open_days
@ -280,16 +286,16 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
with trade_cache_lock: with trade_cache_lock:
local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache]) local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
final_time = timetime() - s_time final_time = timetime() - s_time
print(f"All {len(days_from_cache)} split files loaded in", final_time, "seconds") print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds")
#the filter is required #the filter is required
local_df = filter_trade_df(local_df, start_date, end_date, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only) local_df = filter_trade_df(local_df, start_date, end_date, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
print("local_df filtered") print(f"{symbol} filtered")
#do this only for remotes #do this only for remotes
if len(days_from_remote) > 0: if len(days_from_remote) > 0:
with ThreadPoolExecutor(max_workers=max_workers) as executor: with ThreadPoolExecutor(max_workers=max_workers) as executor:
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)): #for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
for market_day in tqdm(days_from_remote, desc="Processing market days to fetch"): for market_day in tqdm(days_from_remote, desc=f"{symbol} Remote fetching"):
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM #start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
#end = datetime.combine(single_date, time(16, 0)) # Market closes at 4:00 PM #end = datetime.combine(single_date, time(16, 0)) # Market closes at 4:00 PM
@ -312,7 +318,7 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote) future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote)
futures.append(future) futures.append(future)
for future in tqdm(futures, desc="Fetching data"): for future in tqdm(futures, desc=f"{symbol} Receiving trades"):
try: try:
result = future.result() result = future.result()
results.append(result) results.append(result)
@ -338,25 +344,43 @@ def load_data(symbol: Union[str, List[str]],
exclude_conditions: list = EXCLUDE_CONDITIONS, exclude_conditions: list = EXCLUDE_CONDITIONS,
minsize = None, minsize = None,
main_session_only = True, main_session_only = True,
force_remote=False): force_remote=False,
return_vbt = False,
verbose = None):
"""Main function to fetch data. """Main function to fetch data.
Returns requested aggregated data for give symbol(s)
- if requested agg data already exists in aggcache, returns it
- otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache
For location of both caches, see config.py
Args: Args:
symbol (str): Symbol symbol (Union[str, list]): Symbol
agg_type (AggType): Type of aggregation
resolution (Union[str, int]) Resolution of aggregation nased on agg_type
start_date (datetime): start_date (datetime):
end_date (datetime): end_date (datetime):
exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None. exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None.
minsize (_type_, optional): Minimum trade size to include. Defaults to None. minsize (_type_, optional): Minimum trade size to include. Defaults to None.
main_session_only (bool, optional): Main or ext. hours.. Defaults to True. main_session_only (bool, optional): Main or ext. hours.. Defaults to True.
force_remote (bool, optional): Force remote fetch and reagreggation. Defaults to False. force_remote (bool, optional): If True, forces cache reload (doesnt use cache both for trade and agg data)
return_vbt (bool): If True returns vbt Data object with symbols as columns, otherwise returns symbol keyed dict with pd.DataFrame
verbose (bool): If True, prints debug messages. None means keep global settings.
Returns: Returns
pd.Dataframe(): Aggregated data -------
dict keyed by SYMBOL and pd.DataFrame as value
OR
vbt.Data object (keyed by SYMBOL)
""" """
symbols = [symbol] if isinstance(symbol, str) else symbol symbols = [symbol] if isinstance(symbol, str) else symbol
if verbose is not None:
set_verbose(verbose) # Change global verbose if specified
if exclude_conditions is None: if exclude_conditions is None:
exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') exclude_conditions = EXCLUDE_CONDITIONS
def load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote): def load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote):
exclude_conditions.sort() exclude_conditions.sort()
@ -373,19 +397,23 @@ def load_data(symbol: Union[str, List[str]],
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type) ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type)
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow') ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow')
print("Saved to agg_cache", file_ohlcv) print(f"{symbol} Saved to agg_cache", file_ohlcv)
return ohlcv_df return ohlcv_df
ret_dict_df = {} ret_dict_df = {}
for symbol in symbols: for symbol in symbols:
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote) ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
if return_vbt:
return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY)
return ret_dict_df return ret_dict_df
def prepare_trade_cache(symbol: Union[str, List[str]], def prepare_trade_cache(symbol: Union[str, List[str]],
start_date: datetime, start_date: datetime,
end_date: datetime, end_date: datetime,
force_remote=False): force_remote=False,
verbose = None):
""" """
Fetches trade cache daily files for specified symbols and date range and stores Fetches trade cache daily files for specified symbols and date range and stores
them to trade cache location. them to trade cache location.
@ -395,7 +423,7 @@ def prepare_trade_cache(symbol: Union[str, List[str]],
Note: daily trade cache files contain all trades (main+ext hours) for that day Note: daily trade cache files contain all trades (main+ext hours) for that day
Args: Args:
symbols (str): Symbols to fetch symbol Union[str, list]: Symbol or list of symbols
start_date (datetime): start_date (datetime):
end_date (datetime): end_date (datetime):
force_remote (bool, optional): Force remote fetch. Defaults to False. force_remote (bool, optional): Force remote fetch. Defaults to False.
@ -408,52 +436,5 @@ def prepare_trade_cache(symbol: Union[str, List[str]],
for symbol in symbols: for symbol in symbols:
#just cache update #just cache update
print(f"Started for {symbol}") print(f"Started for {symbol}")
df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True) df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True, verbose=verbose)
print(f"Finished for {symbol}") print(f"Finished for {symbol}")
#this would become new loader in the future
# def load_data_tbd(symbol: Union[str, list], day_start: datetime, day_end: datetime, agg_type: AGG_TYPE, resolution: Union[str, int], excludes: list = EXCLUDE_CONDITIONS, minsize: int = MINSIZE, ext_hours: bool = False, align: StartBarAlign =StartBarAlign.ROUND, as_df: bool = False, force_reload: bool = False) -> None:
# """
# Returns requested aggregated data for give symbol(s)
# - if requested agg data already exists in cache, returns it
# - otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache
# For location of both caches, see config.py
# Note both trades and agg cache are daily_files
# LOCAL_TRADE_CACHE
# LOCAL_AGG_CACHE
# Parameters
# ----------
# symbol : Union[str, list]
# Symbol or list of symbols
# day_start : datetime
# Start date, zone aware
# day_end : datetime
# End date, zone aware
# agg_type : AGG_TYPE
# Type of aggregation
# resolution : Union[str, int]
# Resolution of aggregation nased on agg_type
# excludes : list
# List of trade conditions to exclude
# minsize : int
# Minimum size of trade to be included
# ext_hours : bool
# If True, requests extended hours data
# align : StartBarAlign
# How to align first bar RANDOM vs ROUND
# as_df : bool
# If True, returns dataframe, otherwise returns vbt Data object
# force_reload : bool
# If True, forces cache reload (doesnt use cache both for trade and agg data)
# Returns
# -------
# DF or vbt.Data object
# """
# pass

View File

@ -11,6 +11,20 @@ import pandas_market_calendars as mcal
zoneNY = pytz.timezone('US/Eastern') zoneNY = pytz.timezone('US/Eastern')
zoneUTC = pytz.utc zoneUTC = pytz.utc
zonePRG = pytz.timezone('Europe/Amsterdam') zonePRG = pytz.timezone('Europe/Amsterdam')
verbose = True #default
# Save the built-in print function to a different name
built_in_print = print
# Custom print function that respects the global verbose setting
def print(*args, **kwargs):
if verbose:
built_in_print(*args, **kwargs)
# Function to set the global verbose variable
def set_verbose(value):
global verbose
verbose = value
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]: def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
""" """
@ -114,7 +128,7 @@ def find_dotenv():
#print(start_path) #print(start_path)
current_path = Path(start_path) current_path = Path(start_path)
for _ in range(6): # Limit search depth to 5 levels for _ in range(10): # Limit search depth to 5 levels
dotenv_path = current_path / '.env' dotenv_path = current_path / '.env'
if dotenv_path.exists(): if dotenv_path.exists():
return dotenv_path return dotenv_path