fix
This commit is contained in:
15
README.md
15
README.md
@ -30,28 +30,31 @@ day_start = zoneNY.localize(day_start)
|
||||
day_stop = zoneNY.localize(day_stop)
|
||||
|
||||
#requested AGG
|
||||
resolution = 1
|
||||
resolution = 12 #12s
|
||||
agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO
|
||||
exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults
|
||||
minsize = 100
|
||||
main_session_only = True
|
||||
force_remote = True
|
||||
force_remote = False
|
||||
|
||||
ohlcv_df = load_data(symbol = symbol,
|
||||
data = load_data(symbol = symbol,
|
||||
agg_type = agg_type,
|
||||
resolution = resolution,
|
||||
start_date = day_start,
|
||||
end_date = day_stop,
|
||||
#exclude_conditions = None,
|
||||
minsize = 100,
|
||||
main_session_only = True,
|
||||
force_remote = False
|
||||
minsize = minsize,
|
||||
main_session_only = main_session_only,
|
||||
force_remote = force_remote,
|
||||
return_vbt = True, #returns vbt object
|
||||
verbose = True
|
||||
)
|
||||
bac_df = ohlcv_df["BAC"]
|
||||
|
||||
basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY)
|
||||
vbt.settings['plotting']['auto_rangebreaks'] = True
|
||||
basic_data.ohlcv.plot()
|
||||
data.ohlcv.data[symbol[0]].lw.plot()
|
||||
```
|
||||
## prepare trade cache
|
||||
|
||||
|
||||
2
setup.py
2
setup.py
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name='ttools',
|
||||
version='0.6.0',
|
||||
version='0.6.1',
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
# list your dependencies here
|
||||
|
||||
File diff suppressed because one or more lines are too long
@ -14,9 +14,9 @@ ENV_FILE = find_dotenv()
|
||||
|
||||
#NALOADUJEME DOTENV ENV VARIABLES
|
||||
if load_dotenv(ENV_FILE, verbose=True) is False:
|
||||
print(f"Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.")
|
||||
print(f"TTOOLS: Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.")
|
||||
else:
|
||||
print(f"Loaded env variables from file {ENV_FILE}")
|
||||
print(f"TTOOLS: Loaded env variables from file {ENV_FILE}")
|
||||
|
||||
ACCOUNT1_LIVE_API_KEY = os.environ.get('ACCOUNT1_LIVE_API_KEY')
|
||||
ACCOUNT1_LIVE_SECRET_KEY = os.environ.get('ACCOUNT1_LIVE_SECRET_KEY')
|
||||
|
||||
@ -16,7 +16,7 @@ from time import time as timetime
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from alpaca.data.enums import DataFeed
|
||||
import random
|
||||
from ttools.utils import AggType, fetch_calendar_data
|
||||
from ttools.utils import AggType, fetch_calendar_data, print, set_verbose
|
||||
from tqdm import tqdm
|
||||
import threading
|
||||
from typing import List, Union
|
||||
@ -147,7 +147,7 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No
|
||||
df = df[df['s'] >= minsize]
|
||||
return df
|
||||
|
||||
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP):
|
||||
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None):
|
||||
#doc for this function
|
||||
"""
|
||||
Attempts to fetch stock trades either from cache or remote. When remote, it uses retry mechanism with exponential backoff.
|
||||
@ -178,6 +178,9 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
|
||||
if not is_same_day:
|
||||
raise ValueError("fetch_daily_stock_trades is not implemented for multiple days!")
|
||||
|
||||
if verbose is not None:
|
||||
set_verbose(verbose) # Change global verbose if specified
|
||||
|
||||
#exists in cache?
|
||||
daily_file = f"{symbol}-{str(start.date())}.parquet"
|
||||
file_path = TRADE_CACHE / daily_file
|
||||
@ -227,7 +230,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
|
||||
print("All attempts to fetch data failed.")
|
||||
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
|
||||
|
||||
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False):
|
||||
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None):
|
||||
"""
|
||||
Fetch trades between ranges.
|
||||
|
||||
@ -249,25 +252,28 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
|
||||
:param no_return: If True, do not return the DataFrame. Used to prepare cached files.
|
||||
:return: DataFrame containing all trades from start_date to end_date.
|
||||
"""
|
||||
if verbose is not None:
|
||||
set_verbose(verbose) # Change global verbose if specified
|
||||
|
||||
futures = []
|
||||
results = []
|
||||
|
||||
market_open_days = fetch_calendar_data(start_date, end_date)
|
||||
day_count = len(market_open_days)
|
||||
print("Contains", day_count, " market days")
|
||||
print(f"{symbol} Contains", day_count, " market days")
|
||||
max_workers = min(10, max(2, day_count // 2)) if max_workers is None else max_workers # Heuristic: half the days to process, but at least 1 and no more than 10
|
||||
|
||||
#which days to fetch?
|
||||
days_from_remote = []
|
||||
days_from_cache = []
|
||||
if not force_remote:
|
||||
for market_day in tqdm(market_open_days, desc="Processing market days"):
|
||||
for market_day in market_open_days:
|
||||
daily_file_new = str(symbol) + '-' + str(market_day.date) + '.parquet'
|
||||
file_path_new = TRADE_CACHE / daily_file_new
|
||||
if file_path_new.exists():
|
||||
days_from_remote.append(market_day)
|
||||
else:
|
||||
days_from_cache.append((market_day,file_path_new))
|
||||
else:
|
||||
days_from_remote.append(market_day)
|
||||
else:
|
||||
days_from_remote = market_open_days
|
||||
|
||||
@ -280,16 +286,16 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
|
||||
with trade_cache_lock:
|
||||
local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
|
||||
final_time = timetime() - s_time
|
||||
print(f"All {len(days_from_cache)} split files loaded in", final_time, "seconds")
|
||||
print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds")
|
||||
#the filter is required
|
||||
local_df = filter_trade_df(local_df, start_date, end_date, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
|
||||
print("local_df filtered")
|
||||
print(f"{symbol} filtered")
|
||||
|
||||
#do this only for remotes
|
||||
if len(days_from_remote) > 0:
|
||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
|
||||
for market_day in tqdm(days_from_remote, desc="Processing market days to fetch"):
|
||||
for market_day in tqdm(days_from_remote, desc=f"{symbol} Remote fetching"):
|
||||
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
|
||||
#end = datetime.combine(single_date, time(16, 0)) # Market closes at 4:00 PM
|
||||
|
||||
@ -312,7 +318,7 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
|
||||
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote)
|
||||
futures.append(future)
|
||||
|
||||
for future in tqdm(futures, desc="Fetching data"):
|
||||
for future in tqdm(futures, desc=f"{symbol} Receiving trades"):
|
||||
try:
|
||||
result = future.result()
|
||||
results.append(result)
|
||||
@ -338,25 +344,43 @@ def load_data(symbol: Union[str, List[str]],
|
||||
exclude_conditions: list = EXCLUDE_CONDITIONS,
|
||||
minsize = None,
|
||||
main_session_only = True,
|
||||
force_remote=False):
|
||||
force_remote=False,
|
||||
return_vbt = False,
|
||||
verbose = None):
|
||||
"""Main function to fetch data.
|
||||
|
||||
Returns requested aggregated data for give symbol(s)
|
||||
- if requested agg data already exists in aggcache, returns it
|
||||
- otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache
|
||||
|
||||
For location of both caches, see config.py
|
||||
|
||||
Args:
|
||||
symbol (str): Symbol
|
||||
symbol (Union[str, list]): Symbol
|
||||
agg_type (AggType): Type of aggregation
|
||||
resolution (Union[str, int]) Resolution of aggregation nased on agg_type
|
||||
start_date (datetime):
|
||||
end_date (datetime):
|
||||
exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None.
|
||||
minsize (_type_, optional): Minimum trade size to include. Defaults to None.
|
||||
main_session_only (bool, optional): Main or ext. hours.. Defaults to True.
|
||||
force_remote (bool, optional): Force remote fetch and reagreggation. Defaults to False.
|
||||
force_remote (bool, optional): If True, forces cache reload (doesnt use cache both for trade and agg data)
|
||||
return_vbt (bool): If True returns vbt Data object with symbols as columns, otherwise returns symbol keyed dict with pd.DataFrame
|
||||
verbose (bool): If True, prints debug messages. None means keep global settings.
|
||||
|
||||
Returns:
|
||||
pd.Dataframe(): Aggregated data
|
||||
Returns
|
||||
-------
|
||||
dict keyed by SYMBOL and pd.DataFrame as value
|
||||
OR
|
||||
vbt.Data object (keyed by SYMBOL)
|
||||
"""
|
||||
symbols = [symbol] if isinstance(symbol, str) else symbol
|
||||
|
||||
if verbose is not None:
|
||||
set_verbose(verbose) # Change global verbose if specified
|
||||
|
||||
if exclude_conditions is None:
|
||||
exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES')
|
||||
exclude_conditions = EXCLUDE_CONDITIONS
|
||||
|
||||
def load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote):
|
||||
exclude_conditions.sort()
|
||||
@ -373,19 +397,23 @@ def load_data(symbol: Union[str, List[str]],
|
||||
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type)
|
||||
|
||||
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow')
|
||||
print("Saved to agg_cache", file_ohlcv)
|
||||
print(f"{symbol} Saved to agg_cache", file_ohlcv)
|
||||
return ohlcv_df
|
||||
|
||||
ret_dict_df = {}
|
||||
for symbol in symbols:
|
||||
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
|
||||
|
||||
if return_vbt:
|
||||
return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY)
|
||||
|
||||
return ret_dict_df
|
||||
|
||||
def prepare_trade_cache(symbol: Union[str, List[str]],
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
force_remote=False):
|
||||
force_remote=False,
|
||||
verbose = None):
|
||||
"""
|
||||
Fetches trade cache daily files for specified symbols and date range and stores
|
||||
them to trade cache location.
|
||||
@ -395,7 +423,7 @@ def prepare_trade_cache(symbol: Union[str, List[str]],
|
||||
Note: daily trade cache files contain all trades (main+ext hours) for that day
|
||||
|
||||
Args:
|
||||
symbols (str): Symbols to fetch
|
||||
symbol Union[str, list]: Symbol or list of symbols
|
||||
start_date (datetime):
|
||||
end_date (datetime):
|
||||
force_remote (bool, optional): Force remote fetch. Defaults to False.
|
||||
@ -408,52 +436,5 @@ def prepare_trade_cache(symbol: Union[str, List[str]],
|
||||
for symbol in symbols:
|
||||
#just cache update
|
||||
print(f"Started for {symbol}")
|
||||
df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True)
|
||||
print(f"Finished for {symbol}")
|
||||
|
||||
#this would become new loader in the future
|
||||
# def load_data_tbd(symbol: Union[str, list], day_start: datetime, day_end: datetime, agg_type: AGG_TYPE, resolution: Union[str, int], excludes: list = EXCLUDE_CONDITIONS, minsize: int = MINSIZE, ext_hours: bool = False, align: StartBarAlign =StartBarAlign.ROUND, as_df: bool = False, force_reload: bool = False) -> None:
|
||||
# """
|
||||
# Returns requested aggregated data for give symbol(s)
|
||||
# - if requested agg data already exists in cache, returns it
|
||||
# - otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache
|
||||
|
||||
# For location of both caches, see config.py
|
||||
|
||||
# Note both trades and agg cache are daily_files
|
||||
|
||||
# LOCAL_TRADE_CACHE
|
||||
# LOCAL_AGG_CACHE
|
||||
|
||||
# Parameters
|
||||
# ----------
|
||||
# symbol : Union[str, list]
|
||||
# Symbol or list of symbols
|
||||
# day_start : datetime
|
||||
# Start date, zone aware
|
||||
# day_end : datetime
|
||||
# End date, zone aware
|
||||
# agg_type : AGG_TYPE
|
||||
# Type of aggregation
|
||||
# resolution : Union[str, int]
|
||||
# Resolution of aggregation nased on agg_type
|
||||
# excludes : list
|
||||
# List of trade conditions to exclude
|
||||
# minsize : int
|
||||
# Minimum size of trade to be included
|
||||
# ext_hours : bool
|
||||
# If True, requests extended hours data
|
||||
# align : StartBarAlign
|
||||
# How to align first bar RANDOM vs ROUND
|
||||
# as_df : bool
|
||||
# If True, returns dataframe, otherwise returns vbt Data object
|
||||
# force_reload : bool
|
||||
# If True, forces cache reload (doesnt use cache both for trade and agg data)
|
||||
|
||||
# Returns
|
||||
# -------
|
||||
# DF or vbt.Data object
|
||||
# """
|
||||
# pass
|
||||
|
||||
|
||||
df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True, verbose=verbose)
|
||||
print(f"Finished for {symbol}")
|
||||
@ -11,6 +11,20 @@ import pandas_market_calendars as mcal
|
||||
zoneNY = pytz.timezone('US/Eastern')
|
||||
zoneUTC = pytz.utc
|
||||
zonePRG = pytz.timezone('Europe/Amsterdam')
|
||||
verbose = True #default
|
||||
|
||||
# Save the built-in print function to a different name
|
||||
built_in_print = print
|
||||
|
||||
# Custom print function that respects the global verbose setting
|
||||
def print(*args, **kwargs):
|
||||
if verbose:
|
||||
built_in_print(*args, **kwargs)
|
||||
|
||||
# Function to set the global verbose variable
|
||||
def set_verbose(value):
|
||||
global verbose
|
||||
verbose = value
|
||||
|
||||
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
|
||||
"""
|
||||
@ -114,7 +128,7 @@ def find_dotenv():
|
||||
#print(start_path)
|
||||
|
||||
current_path = Path(start_path)
|
||||
for _ in range(6): # Limit search depth to 5 levels
|
||||
for _ in range(10): # Limit search depth to 5 levels
|
||||
dotenv_path = current_path / '.env'
|
||||
if dotenv_path.exists():
|
||||
return dotenv_path
|
||||
|
||||
Reference in New Issue
Block a user