diff --git a/README.md b/README.md index 9f89894..e049879 100644 --- a/README.md +++ b/README.md @@ -30,28 +30,31 @@ day_start = zoneNY.localize(day_start) day_stop = zoneNY.localize(day_stop) #requested AGG -resolution = 1 +resolution = 12 #12s agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults minsize = 100 main_session_only = True -force_remote = True +force_remote = False -ohlcv_df = load_data(symbol = symbol, +data = load_data(symbol = symbol, agg_type = agg_type, resolution = resolution, start_date = day_start, end_date = day_stop, #exclude_conditions = None, - minsize = 100, - main_session_only = True, - force_remote = False + minsize = minsize, + main_session_only = main_session_only, + force_remote = force_remote, + return_vbt = True, #returns vbt object + verbose = True ) bac_df = ohlcv_df["BAC"] basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY) vbt.settings['plotting']['auto_rangebreaks'] = True basic_data.ohlcv.plot() +data.ohlcv.data[symbol[0]].lw.plot() ``` ## prepare trade cache diff --git a/setup.py b/setup.py index bdbafc2..d11dcac 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='ttools', - version='0.6.0', + version='0.6.1', packages=find_packages(), install_requires=[ # list your dependencies here diff --git a/tests/data_loader_tryme.ipynb b/tests/data_loader_tryme.ipynb index 14744a8..cfe3a18 100644 --- a/tests/data_loader_tryme.ipynb +++ b/tests/data_loader_tryme.ipynb @@ -30,7 +30,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n" + "TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n" ] } ], @@ -69,69 +69,21 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 6, "metadata": {}, "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Contains 3 market days\n" - ] - }, { "name": "stderr", "output_type": "stream", "text": [ - "Processing market days: 100%|██████████| 3/3 [00:00<00:00, 4557.37it/s]\n", - "Processing market days to fetch: 100%|██████████| 3/3 [00:00<00:00, 481.22it/s]\n", - "Fetching data: 0%| | 0/3 [00:00" + "window.cvxqazlq.chart.timeScale().fitContent()</script></body></html>\">" ], "text/plain": [ "" @@ -472,7 +424,7 @@ ], "source": [ "#This is how to call LOAD function\n", - "symbol = [\"BAC\"]\n", + "symbol = [\"BAC\",\"AAPL\"]\n", "#datetime in zoneNY \n", "day_start = datetime(2024, 10, 14, 9, 45, 0)\n", "day_stop = datetime(2024, 10, 16, 15, 1, 0)\n", @@ -487,7 +439,7 @@ "main_session_only = True\n", "force_remote = True\n", "\n", - "ohlcv_df = load_data(symbol = symbol,\n", + "data = load_data(symbol = symbol,\n", " agg_type = agg_type,\n", " resolution = resolution,\n", " start_date = day_start,\n", @@ -495,21 +447,18 @@ " #exclude_conditions = None,\n", " minsize = minsize,\n", " main_session_only = main_session_only,\n", - " force_remote = False\n", + " force_remote = False,\n", + " return_vbt = True, #returns vbt object\n", + " verbose = False\n", " )\n", "\n", - "#returns symbol keyed dict with pd.DataFrame as values\n", - "bac_df = ohlcv_df[\"BAC\"]\n", - "\n", - "basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY)\n", - "vbt.settings['plotting']['auto_rangebreaks'] = True\n", "#basic_data.ohlcv.plot()\n", - "basic_data.ohlcv.data[symbol[0]].lw.plot()\n" + "data.ohlcv.data[symbol[0]].lw.plot()\n" ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 5, "metadata": {}, "outputs": [ { @@ -550,44 +499,44 @@ " \n", " \n", " \n", - " 2024-10-14 09:45:00.080054045-04:00\n", + " 2024-10-14 09:45:00-04:00\n", " 41.9650\n", - " 42.040\n", + " 41.970\n", " 41.950\n", + " 41.9500\n", + " 17895.0\n", + " \n", + " \n", + " 2024-10-14 09:45:12-04:00\n", + " 41.9589\n", + " 41.965\n", + " 41.950\n", + " 41.9650\n", + " 6281.0\n", + " \n", + " \n", + " 2024-10-14 09:45:24-04:00\n", + " 41.9650\n", " 42.005\n", - " 50000.0\n", + " 41.965\n", + " 41.9975\n", + " 3522.0\n", " \n", " \n", - " 2024-10-14 09:46:12.714761019-04:00\n", + " 2024-10-14 09:45:36-04:00\n", + " 41.9900\n", + " 42.005\n", + " 41.990\n", + " 42.0000\n", + " 5960.0\n", + " \n", + " \n", + " 2024-10-14 09:45:48-04:00\n", " 42.0050\n", - " 42.100\n", - " 42.000\n", - " 42.095\n", - " 50000.0\n", - " \n", - " \n", - " 2024-10-14 09:47:20.320812941-04:00\n", - " 42.0950\n", - " 42.115\n", - " 42.065\n", - " 42.105\n", - " 50000.0\n", - " \n", - " \n", - " 2024-10-14 09:48:26.167362928-04:00\n", - " 42.1050\n", - " 42.130\n", - " 42.090\n", - " 42.105\n", - " 50000.0\n", - " \n", - " \n", - " 2024-10-14 09:49:06.589205027-04:00\n", - " 42.1001\n", - " 42.110\n", - " 42.060\n", - " 42.105\n", - " 50000.0\n", + " 42.040\n", + " 42.005\n", + " 42.0300\n", + " 9113.0\n", " \n", " \n", " ...\n", @@ -598,75 +547,75 @@ " ...\n", " \n", " \n", - " 2024-10-16 14:56:44.563248873-04:00\n", - " 42.8900\n", - " 42.890\n", - " 42.880\n", - " 42.880\n", - " 50000.0\n", - " \n", - " \n", - " 2024-10-16 14:57:38.830776930-04:00\n", - " 42.8900\n", + " 2024-10-16 15:00:00-04:00\n", + " 42.9150\n", " 42.915\n", - " 42.880\n", " 42.910\n", - " 50000.0\n", + " 42.9100\n", + " 12872.0\n", " \n", " \n", - " 2024-10-16 14:58:41.628561020-04:00\n", + " 2024-10-16 15:00:12-04:00\n", " 42.9150\n", " 42.920\n", - " 42.905\n", " 42.910\n", - " 50000.0\n", + " 42.9200\n", + " 7574.0\n", " \n", " \n", - " 2024-10-16 14:59:50.505049944-04:00\n", - " 42.9100\n", + " 2024-10-16 15:00:24-04:00\n", + " 42.9200\n", " 42.920\n", " 42.910\n", - " 42.910\n", - " 50000.0\n", + " 42.9200\n", + " 1769.0\n", " \n", " \n", - " 2024-10-16 15:00:47.022783041-04:00\n", - " 42.9100\n", - " 42.910\n", + " 2024-10-16 15:00:36-04:00\n", + " 42.9200\n", + " 42.920\n", + " 42.905\n", + " 42.9050\n", + " 26599.0\n", + " \n", + " \n", + " 2024-10-16 15:00:48-04:00\n", + " 42.9050\n", + " 42.905\n", " 42.880\n", - " 42.880\n", - " 19063.0\n", + " 42.8800\n", + " 9216.0\n", " \n", " \n", "\n", - "

1352 rows × 5 columns

\n", + "

5480 rows × 5 columns

\n", "" ], "text/plain": [ - " open high low close volume\n", - "time \n", - "2024-10-14 09:45:00.080054045-04:00 41.9650 42.040 41.950 42.005 50000.0\n", - "2024-10-14 09:46:12.714761019-04:00 42.0050 42.100 42.000 42.095 50000.0\n", - "2024-10-14 09:47:20.320812941-04:00 42.0950 42.115 42.065 42.105 50000.0\n", - "2024-10-14 09:48:26.167362928-04:00 42.1050 42.130 42.090 42.105 50000.0\n", - "2024-10-14 09:49:06.589205027-04:00 42.1001 42.110 42.060 42.105 50000.0\n", - "... ... ... ... ... ...\n", - "2024-10-16 14:56:44.563248873-04:00 42.8900 42.890 42.880 42.880 50000.0\n", - "2024-10-16 14:57:38.830776930-04:00 42.8900 42.915 42.880 42.910 50000.0\n", - "2024-10-16 14:58:41.628561020-04:00 42.9150 42.920 42.905 42.910 50000.0\n", - "2024-10-16 14:59:50.505049944-04:00 42.9100 42.920 42.910 42.910 50000.0\n", - "2024-10-16 15:00:47.022783041-04:00 42.9100 42.910 42.880 42.880 19063.0\n", + " open high low close volume\n", + "time \n", + "2024-10-14 09:45:00-04:00 41.9650 41.970 41.950 41.9500 17895.0\n", + "2024-10-14 09:45:12-04:00 41.9589 41.965 41.950 41.9650 6281.0\n", + "2024-10-14 09:45:24-04:00 41.9650 42.005 41.965 41.9975 3522.0\n", + "2024-10-14 09:45:36-04:00 41.9900 42.005 41.990 42.0000 5960.0\n", + "2024-10-14 09:45:48-04:00 42.0050 42.040 42.005 42.0300 9113.0\n", + "... ... ... ... ... ...\n", + "2024-10-16 15:00:00-04:00 42.9150 42.915 42.910 42.9100 12872.0\n", + "2024-10-16 15:00:12-04:00 42.9150 42.920 42.910 42.9200 7574.0\n", + "2024-10-16 15:00:24-04:00 42.9200 42.920 42.910 42.9200 1769.0\n", + "2024-10-16 15:00:36-04:00 42.9200 42.920 42.905 42.9050 26599.0\n", + "2024-10-16 15:00:48-04:00 42.9050 42.905 42.880 42.8800 9216.0\n", "\n", - "[1352 rows x 5 columns]" + "[5480 rows x 5 columns]" ] }, - "execution_count": 6, + "execution_count": 5, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "basic_data.ohlcv.data[symbol[0]]" + "data.ohlcv.data[symbol[0]]" ] }, { @@ -1018,6 +967,28 @@ "\n", "prepare_trade_cache(symbols, day_start, day_stop, force_remote)" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Prepare daily trade cache - cli script\n", + "\n", + "Python script prepares trade cache for specified symbols and date range.\n", + "\n", + "Usually 1 day takes about 35s. It is stored in /tradescache/ directory as daily file keyed by symbol.\n", + "\n", + "To run this script in the background with specific arguments:\n", + "\n", + "```bash\n", + "# Running without forcing remote fetch\n", + "python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 2024-10-18 &\n", + "\n", + "# Running with force_remote set to True\n", + "python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 2024-10-18 --force_remote &\n", + "\n", + "```" + ] } ], "metadata": { diff --git a/ttools/config.py b/ttools/config.py index 51755d4..06ccfba 100644 --- a/ttools/config.py +++ b/ttools/config.py @@ -14,9 +14,9 @@ ENV_FILE = find_dotenv() #NALOADUJEME DOTENV ENV VARIABLES if load_dotenv(ENV_FILE, verbose=True) is False: - print(f"Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.") + print(f"TTOOLS: Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.") else: - print(f"Loaded env variables from file {ENV_FILE}") + print(f"TTOOLS: Loaded env variables from file {ENV_FILE}") ACCOUNT1_LIVE_API_KEY = os.environ.get('ACCOUNT1_LIVE_API_KEY') ACCOUNT1_LIVE_SECRET_KEY = os.environ.get('ACCOUNT1_LIVE_SECRET_KEY') diff --git a/ttools/loaders.py b/ttools/loaders.py index d793a4e..f61a0e3 100644 --- a/ttools/loaders.py +++ b/ttools/loaders.py @@ -16,7 +16,7 @@ from time import time as timetime from concurrent.futures import ThreadPoolExecutor from alpaca.data.enums import DataFeed import random -from ttools.utils import AggType, fetch_calendar_data +from ttools.utils import AggType, fetch_calendar_data, print, set_verbose from tqdm import tqdm import threading from typing import List, Union @@ -147,7 +147,7 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No df = df[df['s'] >= minsize] return df -def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP): +def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None): #doc for this function """ Attempts to fetch stock trades either from cache or remote. When remote, it uses retry mechanism with exponential backoff. @@ -178,6 +178,9 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz if not is_same_day: raise ValueError("fetch_daily_stock_trades is not implemented for multiple days!") + if verbose is not None: + set_verbose(verbose) # Change global verbose if specified + #exists in cache? daily_file = f"{symbol}-{str(start.date())}.parquet" file_path = TRADE_CACHE / daily_file @@ -227,7 +230,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz print("All attempts to fetch data failed.") raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}") -def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False): +def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None): """ Fetch trades between ranges. @@ -249,25 +252,28 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC :param no_return: If True, do not return the DataFrame. Used to prepare cached files. :return: DataFrame containing all trades from start_date to end_date. """ + if verbose is not None: + set_verbose(verbose) # Change global verbose if specified + futures = [] results = [] market_open_days = fetch_calendar_data(start_date, end_date) day_count = len(market_open_days) - print("Contains", day_count, " market days") + print(f"{symbol} Contains", day_count, " market days") max_workers = min(10, max(2, day_count // 2)) if max_workers is None else max_workers # Heuristic: half the days to process, but at least 1 and no more than 10 #which days to fetch? days_from_remote = [] days_from_cache = [] if not force_remote: - for market_day in tqdm(market_open_days, desc="Processing market days"): + for market_day in market_open_days: daily_file_new = str(symbol) + '-' + str(market_day.date) + '.parquet' file_path_new = TRADE_CACHE / daily_file_new if file_path_new.exists(): - days_from_remote.append(market_day) - else: days_from_cache.append((market_day,file_path_new)) + else: + days_from_remote.append(market_day) else: days_from_remote = market_open_days @@ -280,16 +286,16 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC with trade_cache_lock: local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache]) final_time = timetime() - s_time - print(f"All {len(days_from_cache)} split files loaded in", final_time, "seconds") + print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds") #the filter is required local_df = filter_trade_df(local_df, start_date, end_date, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only) - print("local_df filtered") + print(f"{symbol} filtered") #do this only for remotes if len(days_from_remote) > 0: with ThreadPoolExecutor(max_workers=max_workers) as executor: #for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)): - for market_day in tqdm(days_from_remote, desc="Processing market days to fetch"): + for market_day in tqdm(days_from_remote, desc=f"{symbol} Remote fetching"): #start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM #end = datetime.combine(single_date, time(16, 0)) # Market closes at 4:00 PM @@ -312,7 +318,7 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote) futures.append(future) - for future in tqdm(futures, desc="Fetching data"): + for future in tqdm(futures, desc=f"{symbol} Receiving trades"): try: result = future.result() results.append(result) @@ -338,25 +344,43 @@ def load_data(symbol: Union[str, List[str]], exclude_conditions: list = EXCLUDE_CONDITIONS, minsize = None, main_session_only = True, - force_remote=False): + force_remote=False, + return_vbt = False, + verbose = None): """Main function to fetch data. + Returns requested aggregated data for give symbol(s) + - if requested agg data already exists in aggcache, returns it + - otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache + + For location of both caches, see config.py + Args: - symbol (str): Symbol + symbol (Union[str, list]): Symbol + agg_type (AggType): Type of aggregation + resolution (Union[str, int]) Resolution of aggregation nased on agg_type start_date (datetime): end_date (datetime): exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None. minsize (_type_, optional): Minimum trade size to include. Defaults to None. main_session_only (bool, optional): Main or ext. hours.. Defaults to True. - force_remote (bool, optional): Force remote fetch and reagreggation. Defaults to False. + force_remote (bool, optional): If True, forces cache reload (doesnt use cache both for trade and agg data) + return_vbt (bool): If True returns vbt Data object with symbols as columns, otherwise returns symbol keyed dict with pd.DataFrame + verbose (bool): If True, prints debug messages. None means keep global settings. - Returns: - pd.Dataframe(): Aggregated data + Returns + ------- + dict keyed by SYMBOL and pd.DataFrame as value + OR + vbt.Data object (keyed by SYMBOL) """ symbols = [symbol] if isinstance(symbol, str) else symbol + if verbose is not None: + set_verbose(verbose) # Change global verbose if specified + if exclude_conditions is None: - exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') + exclude_conditions = EXCLUDE_CONDITIONS def load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote): exclude_conditions.sort() @@ -373,19 +397,23 @@ def load_data(symbol: Union[str, List[str]], ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type) ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow') - print("Saved to agg_cache", file_ohlcv) + print(f"{symbol} Saved to agg_cache", file_ohlcv) return ohlcv_df ret_dict_df = {} for symbol in symbols: ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote) + if return_vbt: + return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY) + return ret_dict_df def prepare_trade_cache(symbol: Union[str, List[str]], start_date: datetime, end_date: datetime, - force_remote=False): + force_remote=False, + verbose = None): """ Fetches trade cache daily files for specified symbols and date range and stores them to trade cache location. @@ -395,7 +423,7 @@ def prepare_trade_cache(symbol: Union[str, List[str]], Note: daily trade cache files contain all trades (main+ext hours) for that day Args: - symbols (str): Symbols to fetch + symbol Union[str, list]: Symbol or list of symbols start_date (datetime): end_date (datetime): force_remote (bool, optional): Force remote fetch. Defaults to False. @@ -408,52 +436,5 @@ def prepare_trade_cache(symbol: Union[str, List[str]], for symbol in symbols: #just cache update print(f"Started for {symbol}") - df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True) - print(f"Finished for {symbol}") - -#this would become new loader in the future -# def load_data_tbd(symbol: Union[str, list], day_start: datetime, day_end: datetime, agg_type: AGG_TYPE, resolution: Union[str, int], excludes: list = EXCLUDE_CONDITIONS, minsize: int = MINSIZE, ext_hours: bool = False, align: StartBarAlign =StartBarAlign.ROUND, as_df: bool = False, force_reload: bool = False) -> None: -# """ -# Returns requested aggregated data for give symbol(s) -# - if requested agg data already exists in cache, returns it -# - otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache - -# For location of both caches, see config.py - -# Note both trades and agg cache are daily_files - -# LOCAL_TRADE_CACHE -# LOCAL_AGG_CACHE - -# Parameters -# ---------- -# symbol : Union[str, list] -# Symbol or list of symbols -# day_start : datetime -# Start date, zone aware -# day_end : datetime -# End date, zone aware -# agg_type : AGG_TYPE -# Type of aggregation -# resolution : Union[str, int] -# Resolution of aggregation nased on agg_type -# excludes : list -# List of trade conditions to exclude -# minsize : int -# Minimum size of trade to be included -# ext_hours : bool -# If True, requests extended hours data -# align : StartBarAlign -# How to align first bar RANDOM vs ROUND -# as_df : bool -# If True, returns dataframe, otherwise returns vbt Data object -# force_reload : bool -# If True, forces cache reload (doesnt use cache both for trade and agg data) - -# Returns -# ------- -# DF or vbt.Data object -# """ -# pass - - + df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True, verbose=verbose) + print(f"Finished for {symbol}") \ No newline at end of file diff --git a/ttools/utils.py b/ttools/utils.py index 4652371..e6fc7aa 100644 --- a/ttools/utils.py +++ b/ttools/utils.py @@ -11,6 +11,20 @@ import pandas_market_calendars as mcal zoneNY = pytz.timezone('US/Eastern') zoneUTC = pytz.utc zonePRG = pytz.timezone('Europe/Amsterdam') +verbose = True #default + +# Save the built-in print function to a different name +built_in_print = print + +# Custom print function that respects the global verbose setting +def print(*args, **kwargs): + if verbose: + built_in_print(*args, **kwargs) + +# Function to set the global verbose variable +def set_verbose(value): + global verbose + verbose = value def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]: """ @@ -114,7 +128,7 @@ def find_dotenv(): #print(start_path) current_path = Path(start_path) - for _ in range(6): # Limit search depth to 5 levels + for _ in range(10): # Limit search depth to 5 levels dotenv_path = current_path / '.env' if dotenv_path.exists(): return dotenv_path