diff --git a/README.md b/README.md index 4b59c69..01fe412 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,11 @@ Modules: Detailed examples in [tests/data_loader_tryme.ipynb](tests/data_loader_tryme.ipynb) ## load_data -Returns vectorized aggregation of given type. If trades for given period are not cached they are remotely fetched from Alpaca first. +Returns vectorized aggregation of given type. + +If aggregated data are already in agg cache with same conditions for same or wider date span they are returned from cache. +Otherwise trade data are aggregated on the fly, saved to cache and returned. +If trades for given period are not cached ,they are remotely fetched from Alpaca first. Example: diff --git a/setup.py b/setup.py index 38f6219..a4f9078 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='ttools', - version='0.6.3', + version='0.6.4', packages=find_packages(), install_requires=[ # list your dependencies here diff --git a/tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb b/tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb new file mode 100644 index 0000000..4d2195d --- /dev/null +++ b/tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb @@ -0,0 +1,178 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Exploring alternative cache storage using duckdb and parquet\n", + "\n", + "https://claude.ai/chat/e49491f7-8b18-4fb7-b301-5c9997746079\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n", + "Start loading data... 1730370862.4833238\n" + ] + }, + { + "data": { + "application/vnd.jupyter.widget-view+json": { + "model_id": "829f7f3d58a74f1fbfdcfc202c2aaf84", + "version_major": 2, + "version_minor": 0 + }, + "text/plain": [ + "FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "fetched parquet -11.310973167419434\n", + "Loaded 1836460 rows\n" + ] + } + ], + "source": [ + "from ttools.tradecache import TradeCache\n", + "from ttools.utils import zoneNY\n", + "from pathlib import Path\n", + "from datetime import datetime\n", + "import logging\n", + "import duckdb\n", + "\n", + "logging.basicConfig(\n", + " level=logging.INFO, # Set the minimum level (DEBUG, INFO, WARNING, ERROR, CRITICAL)\n", + " format='%(levelname)s: %(message)s' # Simple format showing level and message\n", + ")\n", + "\n", + "cache = TradeCache(\n", + " base_path=Path(\"./trade_cache\"),\n", + " max_workers=4, # Adjust based on your CPU\n", + " cleanup_after_days=7\n", + ")\n", + "\n", + "# Load data\n", + "df = cache.load_range(\n", + " symbol=\"BAC\",\n", + " start_date=zoneNY.localize(datetime(2024, 10, 14, 9, 30)),\n", + " end_date=zoneNY.localize(datetime(2024, 10, 20, 16, 0)),\n", + " #columns=['open', 'high', 'low', 'close', 'volume']\n", + ")\n", + "\n", + "print(f\"Loaded {len(df)} rows\")" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "DuckDB Schema:\n", + " column_name column_type null key default extra\n", + "0 x VARCHAR YES None None None\n", + "1 p DOUBLE YES None None None\n", + "2 s BIGINT YES None None None\n", + "3 i BIGINT YES None None None\n", + "4 c VARCHAR[] YES None None None\n", + "5 z VARCHAR YES None None None\n", + "6 t TIMESTAMP WITH TIME ZONE YES None None None\n", + "\n", + "Sample Data:\n", + " x p s i c z \\\n", + "0 T 41.870 27 62879146994030 [ , F, T, I] A \n", + "1 D 41.965 1 71675241580848 [ , I] A \n", + "2 D 41.965 1 71675241644625 [ , I] A \n", + "3 D 41.850 1 71675241772360 [ , I] A \n", + "4 N 41.960 416188 52983525028174 [ , O] A \n", + "\n", + " t \n", + "0 2024-10-14 15:30:00.006480+02:00 \n", + "1 2024-10-14 15:30:00.395802+02:00 \n", + "2 2024-10-14 15:30:00.484008+02:00 \n", + "3 2024-10-14 15:30:00.610005+02:00 \n", + "4 2024-10-14 15:30:01.041599+02:00 \n", + "\n", + "Pandas Info:\n" + ] + }, + { + "ename": "NameError", + "evalue": "name 'pd' is not defined", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[4], line 25\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n\u001b[1;32m 24\u001b[0m \u001b[38;5;66;03m# Let's check the schema first\u001b[39;00m\n\u001b[0;32m---> 25\u001b[0m \u001b[43mcheck_parquet_schema\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "Cell \u001b[0;32mIn[4], line 21\u001b[0m, in \u001b[0;36mcheck_parquet_schema\u001b[0;34m()\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[38;5;66;03m# Method 3: Using pandas\u001b[39;00m\n\u001b[1;32m 20\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124mPandas Info:\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m---> 21\u001b[0m df \u001b[38;5;241m=\u001b[39m \u001b[43mpd\u001b[49m\u001b[38;5;241m.\u001b[39mread_parquet(sample_file)\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n", + "\u001b[0;31mNameError\u001b[0m: name 'pd' is not defined" + ] + } + ], + "source": [ + "import duckdb\n", + "\n", + "def check_parquet_schema():\n", + " # Read one file and print its structure\n", + " sample_file = Path(\"./trade_cache\")/\"temp/BAC_20241014.parquet\"\n", + " \n", + " # Method 1: Using DuckDB describe\n", + " print(\"DuckDB Schema:\")\n", + " print(duckdb.sql(f\"DESCRIBE SELECT * FROM read_parquet('{sample_file}')\").df())\n", + " \n", + " # Method 2: Just look at the data\n", + " print(\"\\nSample Data:\")\n", + " print(duckdb.sql(f\"\"\"\n", + " SELECT *\n", + " FROM read_parquet('{sample_file}')\n", + " LIMIT 5\n", + " \"\"\").df())\n", + " \n", + " # Method 3: Using pandas\n", + " print(\"\\nPandas Info:\")\n", + " df = pd.read_parquet(sample_file)\n", + " print(df.info())\n", + "\n", + "# Let's check the schema first\n", + "check_parquet_schema()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": ".venv", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.11" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/tests/WIP-tradecache_duckdb_approach/tradecache.py b/tests/WIP-tradecache_duckdb_approach/tradecache.py new file mode 100644 index 0000000..86cadbe --- /dev/null +++ b/tests/WIP-tradecache_duckdb_approach/tradecache.py @@ -0,0 +1,324 @@ +#this goes to the main direcotry + + +from pathlib import Path +from datetime import datetime, date, timedelta +from typing import Optional, List, Set, Dict, Tuple +import pandas as pd +import duckdb +import pandas_market_calendars as mcal +from abc import ABC, abstractmethod +import logging +from ttools.utils import zoneNY +from concurrent.futures import ThreadPoolExecutor +from ttools.loaders import fetch_daily_stock_trades +import time +logger = logging.getLogger(__name__) + +class TradeCache: + def __init__( + self, + base_path: Path, + market: str = 'NYSE', + max_workers: int = 4, + cleanup_after_days: int = 7 + ): + """ + Initialize TradeCache with monthly partitions and temp storage + + Args: + base_path: Base directory for cache + market: Market calendar to use + max_workers: Max parallel fetches + cleanup_after_days: Days after which to clean temp files + """ + """Initialize TradeCache with the same parameters but optimized for the new schema""" + self.base_path = Path(base_path) + self.temp_path = self.base_path / "temp" + self.base_path.mkdir(parents=True, exist_ok=True) + self.temp_path.mkdir(parents=True, exist_ok=True) + + self.calendar = mcal.get_calendar(market) + self.max_workers = max_workers + self.cleanup_after_days = cleanup_after_days + + # Initialize DuckDB with schema-specific optimizations + self.con = duckdb.connect() + self.con.execute("SET memory_limit='16GB'") + self.con.execute("SET threads TO 8") + + # Create the schema for our tables + self.schema = """ + x VARCHAR, + p DOUBLE, + s BIGINT, + i BIGINT, + c VARCHAR[], + z VARCHAR, + t TIMESTAMP WITH TIME ZONE + """ + + self._trading_days_cache: Dict[Tuple[date, date], List[date]] = {} + + def get_partition_path(self, symbol: str, year: int, month: int) -> Path: + """Get path for a specific partition""" + return self.base_path / f"symbol={symbol}/year={year}/month={month}" + + def get_temp_path(self, symbol: str, day: date) -> Path: + """Get temporary file path for a day""" + return self.temp_path / f"{symbol}_{day:%Y%m%d}.parquet" + + def get_trading_days(self, start_date: datetime, end_date: datetime) -> List[date]: + """Get trading days with caching""" + key = (start_date.date(), end_date.date()) + if key not in self._trading_days_cache: + schedule = self.calendar.schedule(start_date=start_date, end_date=end_date) + self._trading_days_cache[key] = [d.date() for d in schedule.index] + return self._trading_days_cache[key] + + def cleanup_temp_files(self): + """Clean up old temp files""" + cutoff = datetime.now() - timedelta(days=self.cleanup_after_days) + for file in self.temp_path.glob("*.parquet"): + try: + # Extract date from filename + date_str = file.stem.split('_')[1] + file_date = datetime.strptime(date_str, '%Y%m%d') + if file_date < cutoff: + file.unlink() + except Exception as e: + logger.warning(f"Error cleaning up {file}: {e}") + + + def consolidate_month(self, symbol: str, year: int, month: int) -> bool: + """ + Consolidate daily files into monthly partition only if we have complete month + Returns True if consolidation was successful + """ + # Get all temp files for this symbol and month + temp_files = list(self.temp_path.glob(f"{symbol}_{year:04d}{month:02d}*.parquet")) + + if not temp_files: + return False + + try: + # Get expected trading days for this month + start_date = zoneNY.localize(datetime(year, month, 1)) + if month == 12: + end_date = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1) + else: + end_date = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1) + + trading_days = self.get_trading_days(start_date, end_date) + + # Check if we have data for all trading days + temp_dates = set(datetime.strptime(f.stem.split('_')[1], '%Y%m%d').date() + for f in temp_files) + missing_days = set(trading_days) - temp_dates + + # Only consolidate if we have all trading days + if missing_days: + logger.info(f"Skipping consolidation for {symbol} {year}-{month}: " + f"missing {len(missing_days)} trading days") + return False + + # Proceed with consolidation since we have complete month + partition_path = self.get_partition_path(symbol, year, month) + partition_path.mkdir(parents=True, exist_ok=True) + file_path = partition_path / "data.parquet" + + files_str = ', '.join(f"'{f}'" for f in temp_files) + + # Modified query to handle the new schema + self.con.execute(f""" + COPY ( + SELECT x, p, s, i, c, z, t + FROM read_parquet([{files_str}]) + ORDER BY t + ) + TO '{file_path}' + (FORMAT PARQUET, COMPRESSION 'ZSTD') + """) + + # Remove temp files only after successful write + for f in temp_files: + f.unlink() + + logger.info(f"Successfully consolidated {symbol} {year}-{month} " + f"({len(temp_files)} files)") + return True + + except Exception as e: + logger.error(f"Error consolidating {symbol} {year}-{month}: {e}") + return False + + def fetch_remote_day(self, symbol: str, day: date) -> pd.DataFrame: + """Implement this to fetch single day of data""" + min_datetime = zoneNY.localize(datetime.combine(day, datetime.min.time())) + max_datetime = zoneNY.localize(datetime.combine(day, datetime.max.time())) + return fetch_daily_stock_trades(symbol, min_datetime, max_datetime) + + def _fetch_and_save_day(self, symbol: str, day: date) -> Optional[Path]: + """Fetch and save a single day, returns file path if successful""" + try: + df_day = self.fetch_remote_day(symbol, day) + if df_day.empty: + return None + + temp_file = self.get_temp_path(symbol, day) + df_day.to_parquet(temp_file, compression='ZSTD') + return temp_file + + except Exception as e: + logger.error(f"Error fetching {symbol} for {day}: {e}") + return None + + def load_range( + self, + symbol: str, + start_date: datetime, + end_date: datetime, + columns: Optional[List[str]] = None, + consolidate: bool = False + ) -> pd.DataFrame: + """Load data for date range, consolidating when complete months are detected""" + #self.cleanup_temp_files() + + trading_days = self.get_trading_days(start_date, end_date) + + # Modify column selection for new schema + col_str = '*' if not columns else ', '.join(columns) + + if consolidate: + # First check temp files for complete months + temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet")) + if temp_files: + # Group temp files by month + monthly_temps: Dict[Tuple[int, int], Set[date]] = {} + for file in temp_files: + try: + # Extract date from filename + date_str = file.stem.split('_')[1] + file_date = datetime.strptime(date_str, '%Y%m%d').date() + key = (file_date.year, file_date.month) + if key not in monthly_temps: + monthly_temps[key] = set() + monthly_temps[key].add(file_date) + except Exception as e: + logger.warning(f"Error parsing temp file date {file}: {e}") + continue + + # Check each month for completeness and consolidate if complete + for (year, month), dates in monthly_temps.items(): + # Get trading days for this month + month_start = zoneNY.localize(datetime(year, month, 1)) + if month == 12: + month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1) + else: + month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1) + + month_trading_days = set(self.get_trading_days(month_start, month_end)) + + # If we have all trading days for the month, consolidate + if month_trading_days.issubset(dates): + logger.info(f"Found complete month in temp files for {symbol} {year}-{month}") + self.consolidate_month(symbol, year, month) + + #timing the load + time_start = time.time() + print("Start loading data...", time_start) + # Now load data from both consolidated and temp files + query = f""" + WITH monthly_data AS ( + SELECT {col_str} + FROM read_parquet( + '{self.base_path}/*/*.parquet', + hive_partitioning=1, + union_by_name=true + ) + WHERE t BETWEEN '{start_date}' AND '{end_date}' + ), + temp_data AS ( + SELECT {col_str} + FROM read_parquet( + '{self.temp_path}/{symbol}_*.parquet', + union_by_name=true + ) + WHERE t BETWEEN '{start_date}' AND '{end_date}' + ) + SELECT * FROM ( + SELECT * FROM monthly_data + UNION ALL + SELECT * FROM temp_data + ) + ORDER BY t + """ + + try: + df_cached = self.con.execute(query).df() + except Exception as e: + logger.warning(f"Error reading cached data: {e}") + df_cached = pd.DataFrame() + + print("fetched parquet", time_start - time.time()) + if not df_cached.empty: + cached_days = set(df_cached['t'].dt.date) + missing_days = [d for d in trading_days if d not in cached_days] + else: + missing_days = trading_days + + # Fetch missing days in parallel + if missing_days: + with ThreadPoolExecutor(max_workers=self.max_workers) as executor: + future_to_day = { + executor.submit(self._fetch_and_save_day, symbol, day): day + for day in missing_days + } + + for future in future_to_day: + day = future_to_day[future] + try: + temp_file = future.result() + if temp_file: + logger.debug(f"Successfully fetched {symbol} for {day}") + except Exception as e: + logger.error(f"Error processing {symbol} for {day}: {e}") + + # Check again for complete months after fetching new data + temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet")) + if temp_files: + monthly_temps = {} + for file in temp_files: + try: + date_str = file.stem.split('_')[1] + file_date = datetime.strptime(date_str, '%Y%m%d').date() + key = (file_date.year, file_date.month) + if key not in monthly_temps: + monthly_temps[key] = set() + monthly_temps[key].add(file_date) + except Exception as e: + logger.warning(f"Error parsing temp file date {file}: {e}") + continue + + # Check for complete months again + for (year, month), dates in monthly_temps.items(): + month_start = zoneNY.localize(datetime(year, month, 1)) + if month == 12: + month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1) + else: + month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1) + + month_trading_days = set(self.get_trading_days(month_start, month_end)) + + if month_trading_days.issubset(dates): + logger.info(f"Found complete month after fetching for {symbol} {year}-{month}") + self.consolidate_month(symbol, year, month) + + # Load final data including any new fetches + try: + df_cached = self.con.execute(query).df() + except Exception as e: + logger.warning(f"Error reading final data: {e}") + df_cached = pd.DataFrame() + + return df_cached.sort_values('t') \ No newline at end of file diff --git a/tests/data_loader_tryme.ipynb b/tests/data_loader_tryme.ipynb index b326379..a834587 100644 --- a/tests/data_loader_tryme.ipynb +++ b/tests/data_loader_tryme.ipynb @@ -69,364 +69,173 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 5, "metadata": {}, "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "/Users/davidbrazda/Documents/Development/python/ttools/.venv/lib/python3.10/site-packages/vectorbtpro/data/base.py:1905: UserWarning: Symbols have mismatching index. Setting missing data points to NaN.\n", - " data = cls_or_self.align_index(data, missing=missing_index, silence_warnings=silence_warnings)\n" - ] - }, { "data": { "text/html": [ - "" + " .dataframe thead th {\n", + " text-align: right;\n", + " }\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
openhighlowclosevolume
time
2024-02-15 09:30:00-05:00499.29499.41499.2900499.3200161900.0
2024-02-15 09:30:01-05:00499.32499.41499.3000499.400010900.0
2024-02-15 09:30:02-05:00499.36499.40499.3550499.38007040.0
2024-02-15 09:30:03-05:00499.39499.42499.3800499.40008717.0
2024-02-15 09:30:04-05:00499.40499.40499.3500499.35003265.0
..................
2024-03-18 15:59:55-04:00512.94512.94512.8600512.89007345.0
2024-03-18 15:59:56-04:00512.90512.90512.8700512.88002551.0
2024-03-18 15:59:57-04:00512.89512.91512.8500512.870118063.0
2024-03-18 15:59:58-04:00512.87512.90512.8496512.90007734.0
2024-03-18 15:59:59-04:00512.92512.92512.8200512.870037159.0
\n", + "

417345 rows × 5 columns

\n", + "" ], "text/plain": [ - "" + " open high low close volume\n", + "time \n", + "2024-02-15 09:30:00-05:00 499.29 499.41 499.2900 499.3200 161900.0\n", + "2024-02-15 09:30:01-05:00 499.32 499.41 499.3000 499.4000 10900.0\n", + "2024-02-15 09:30:02-05:00 499.36 499.40 499.3550 499.3800 7040.0\n", + "2024-02-15 09:30:03-05:00 499.39 499.42 499.3800 499.4000 8717.0\n", + "2024-02-15 09:30:04-05:00 499.40 499.40 499.3500 499.3500 3265.0\n", + "... ... ... ... ... ...\n", + "2024-03-18 15:59:55-04:00 512.94 512.94 512.8600 512.8900 7345.0\n", + "2024-03-18 15:59:56-04:00 512.90 512.90 512.8700 512.8800 2551.0\n", + "2024-03-18 15:59:57-04:00 512.89 512.91 512.8500 512.8701 18063.0\n", + "2024-03-18 15:59:58-04:00 512.87 512.90 512.8496 512.9000 7734.0\n", + "2024-03-18 15:59:59-04:00 512.92 512.92 512.8200 512.8700 37159.0\n", + "\n", + "[417345 rows x 5 columns]" ] }, + "execution_count": 5, "metadata": {}, - "output_type": "display_data" + "output_type": "execute_result" } ], "source": [ "#This is how to call LOAD function\n", - "symbol = [\"BAC\",\"AAPL\"]\n", + "symbol = [\"SPY\"]\n", "#datetime in zoneNY \n", - "day_start = datetime(2024, 10, 14, 9, 45, 0)\n", - "day_stop = datetime(2024, 10, 16, 15, 1, 0)\n", + "day_start = datetime(2024, 2, 15, 9, 30, 0)\n", + "day_stop = datetime(2024, 3, 18, 16, 0, 0)\n", "day_start = zoneNY.localize(day_start)\n", "day_stop = zoneNY.localize(day_stop)\n", "\n", "#requested AGG\n", - "resolution = 12 #12s bars\n", + "resolution = 1 #12s bars\n", "agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO\n", "exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults\n", "minsize = 100 #min trade size to include\n", @@ -445,8 +254,8 @@ " return_vbt = True, #returns vbt object\n", " verbose = False\n", " )\n", - "\n", - "data.ohlcv.data[symbol[0]].lw.plot()\n" + "data.ohlcv.data[symbol[0]]\n", + "#data.ohlcv.data[symbol[0]].lw.plot()\n" ] }, { @@ -657,6 +466,340 @@ "\n", "```" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Aggregated data are stored per symbol, date range and conditions. If requested dates are matched with existing stored data with same conditions but wider data spans they are loaded from this file.\n", + "\n", + "This is the matching part:" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "\n", + "File: SPY-AggType.OHLCV-12-2024-01-15T09-30-00-2024-10-20T16-00-00-4679BCFMOPUVWZ-100-True.parquet\n", + "Coverage: 2024-01-15 09:30:00 to 2024-10-20 16:00:00\n", + "Symbol: SPY\n", + "Agg Type: AggType.OHLCV\n", + "Resolution: 12\n", + "Excludes: 4679BCFMOPUVWZ\n", + "Minsize: 100\n", + "Main Session Only: True\n", + "--------------------------------------------------------------------------------\n" + ] + } + ], + "source": [ + "from ttools.utils import list_matching_files, print_matching_files_info, zoneNY\n", + "from datetime import datetime\n", + "from ttools.config import AGG_CACHE\n", + "\n", + "# Find all files covering January 15, 2024 9:30 to 16:00\n", + "files = list_matching_files(\n", + " symbol='SPY',\n", + " resolution=\"1\",\n", + " agg_type='AggType.OHLCV',\n", + " start_date=datetime(2024, 1, 15, 9, 30),\n", + " end_date=datetime(2024, 1, 15, 16, 0)\n", + ")\n", + "\n", + "#print_matching_files_info(files)\n", + "\n", + "# Example with all parameters specified\n", + "specific_files = list_matching_files(\n", + " symbol=\"SPY\",\n", + " agg_type=\"AggType.OHLCV\",\n", + " resolution=\"12\",\n", + " start_date=zoneNY.localize(datetime(2024, 1, 15, 9, 30)),\n", + " end_date=zoneNY.localize(datetime(2024, 1, 15, 16, 0)),\n", + " excludes_str=\"4679BCFMOPUVWZ\",\n", + " minsize=100,\n", + " main_session_only=True\n", + ")\n", + "\n", + "print_matching_files_info(specific_files)\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "And date subset loaded from parquet. Usually this is all done yb `load_data` in loader." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
openhighlowclosevolumetradesupdatedvwapbuyvolumesellvolume
time
2024-01-16 09:30:00-05:00475.250475.3600475.20475.285255386.093.02024-01-16 09:30:01.002183-05:00475.2517253692.0242756.0
2024-01-16 09:30:01-05:00475.335475.3350475.23475.26015161.0100.02024-01-16 09:30:02.007313-05:00475.2833904386.04944.0
2024-01-16 09:30:02-05:00475.250475.3000475.24475.3006993.039.02024-01-16 09:30:03.008912-05:00475.2625071900.02256.0
2024-01-16 09:30:03-05:00475.290475.3200475.24475.2708497.047.02024-01-16 09:30:04.201093-05:00475.2752801300.03200.0
2024-01-16 09:30:04-05:00475.250475.2700475.22475.2705367.037.02024-01-16 09:30:05.004980-05:00475.2343531613.01247.0
.................................
2024-10-18 15:59:55-04:00584.520584.5800584.51584.58010357.047.02024-10-18 15:59:56.008928-04:00584.5438701600.01100.0
2024-10-18 15:59:56-04:00584.570584.6091584.55584.5506527.032.02024-10-18 15:59:57.007658-04:00584.5666431525.01002.0
2024-10-18 15:59:57-04:00584.560584.6100584.56584.6005068.023.02024-10-18 15:59:58.000435-04:00584.5962491960.0900.0
2024-10-18 15:59:58-04:00584.590584.6200584.56584.5608786.023.02024-10-18 15:59:59.041984-04:00584.5922172859.03921.0
2024-10-18 15:59:59-04:00584.560584.6100584.56584.57012583.069.02024-10-18 15:59:59.982132-04:00584.5831315303.01980.0
\n", + "

3384529 rows × 10 columns

\n", + "
" + ], + "text/plain": [ + " open high low close volume \\\n", + "time \n", + "2024-01-16 09:30:00-05:00 475.250 475.3600 475.20 475.285 255386.0 \n", + "2024-01-16 09:30:01-05:00 475.335 475.3350 475.23 475.260 15161.0 \n", + "2024-01-16 09:30:02-05:00 475.250 475.3000 475.24 475.300 6993.0 \n", + "2024-01-16 09:30:03-05:00 475.290 475.3200 475.24 475.270 8497.0 \n", + "2024-01-16 09:30:04-05:00 475.250 475.2700 475.22 475.270 5367.0 \n", + "... ... ... ... ... ... \n", + "2024-10-18 15:59:55-04:00 584.520 584.5800 584.51 584.580 10357.0 \n", + "2024-10-18 15:59:56-04:00 584.570 584.6091 584.55 584.550 6527.0 \n", + "2024-10-18 15:59:57-04:00 584.560 584.6100 584.56 584.600 5068.0 \n", + "2024-10-18 15:59:58-04:00 584.590 584.6200 584.56 584.560 8786.0 \n", + "2024-10-18 15:59:59-04:00 584.560 584.6100 584.56 584.570 12583.0 \n", + "\n", + " trades updated \\\n", + "time \n", + "2024-01-16 09:30:00-05:00 93.0 2024-01-16 09:30:01.002183-05:00 \n", + "2024-01-16 09:30:01-05:00 100.0 2024-01-16 09:30:02.007313-05:00 \n", + "2024-01-16 09:30:02-05:00 39.0 2024-01-16 09:30:03.008912-05:00 \n", + "2024-01-16 09:30:03-05:00 47.0 2024-01-16 09:30:04.201093-05:00 \n", + "2024-01-16 09:30:04-05:00 37.0 2024-01-16 09:30:05.004980-05:00 \n", + "... ... ... \n", + "2024-10-18 15:59:55-04:00 47.0 2024-10-18 15:59:56.008928-04:00 \n", + "2024-10-18 15:59:56-04:00 32.0 2024-10-18 15:59:57.007658-04:00 \n", + "2024-10-18 15:59:57-04:00 23.0 2024-10-18 15:59:58.000435-04:00 \n", + "2024-10-18 15:59:58-04:00 23.0 2024-10-18 15:59:59.041984-04:00 \n", + "2024-10-18 15:59:59-04:00 69.0 2024-10-18 15:59:59.982132-04:00 \n", + "\n", + " vwap buyvolume sellvolume \n", + "time \n", + "2024-01-16 09:30:00-05:00 475.251725 3692.0 242756.0 \n", + "2024-01-16 09:30:01-05:00 475.283390 4386.0 4944.0 \n", + "2024-01-16 09:30:02-05:00 475.262507 1900.0 2256.0 \n", + "2024-01-16 09:30:03-05:00 475.275280 1300.0 3200.0 \n", + "2024-01-16 09:30:04-05:00 475.234353 1613.0 1247.0 \n", + "... ... ... ... \n", + "2024-10-18 15:59:55-04:00 584.543870 1600.0 1100.0 \n", + "2024-10-18 15:59:56-04:00 584.566643 1525.0 1002.0 \n", + "2024-10-18 15:59:57-04:00 584.596249 1960.0 900.0 \n", + "2024-10-18 15:59:58-04:00 584.592217 2859.0 3921.0 \n", + "2024-10-18 15:59:59-04:00 584.583131 5303.0 1980.0 \n", + "\n", + "[3384529 rows x 10 columns]" + ] + }, + "execution_count": 7, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "start = zoneNY.localize(datetime(2024, 1, 15, 9, 30))\n", + "end = zoneNY.localize(datetime(2024, 10, 20, 16, 00))\n", + "\n", + "ohlcv_df = pd.read_parquet(\n", + " AGG_CACHE / \"SPY-AggType.OHLCV-1-2024-01-15T09-30-00-2024-10-20T16-00-00-4679BCFMOPUVWZ-100-True.parquet\", \n", + " engine='pyarrow',\n", + " filters=[('time', '>=', start), \n", + " ('time', '<=', end)]\n", + ")\n", + "\n", + "ohlcv_df" + ] } ], "metadata": { diff --git a/ttools/__init__.py b/ttools/__init__.py index 84a24c9..bf5d974 100644 --- a/ttools/__init__.py +++ b/ttools/__init__.py @@ -1,4 +1,4 @@ from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell from .vbtindicators import register_custom_inds -from .utils import find_dotenv, AggType, zoneNY, zonePRG, zoneUTC +from .utils import AggType, zoneNY, zonePRG, zoneUTC from .loaders import load_data, prepare_trade_cache \ No newline at end of file diff --git a/ttools/config.py b/ttools/config.py index 06ccfba..28dd443 100644 --- a/ttools/config.py +++ b/ttools/config.py @@ -1,14 +1,35 @@ from dotenv import load_dotenv from appdirs import user_data_dir -from ttools.utils import find_dotenv +import ttools.utils as utils import os import pytz -import vectorbtpro as vbt -import pytz from pathlib import Path from dotenv import load_dotenv -import os +def find_dotenv(): + """ + Searches for a .env file in the given directory or its parents and returns the path. + + Args: + start_path: The directory to start searching from. + + Returns: + Path to the .env file if found, otherwise None. + """ + try: + start_path = __file__ + except NameError: + #print("Notebook probably") + start_path = os.getcwd() + #print(start_path) + + current_path = Path(start_path) + for _ in range(10): # Limit search depth to 5 levels + dotenv_path = current_path / '.env' + if dotenv_path.exists(): + return dotenv_path + current_path = current_path.parent + return None ENV_FILE = find_dotenv() diff --git a/ttools/loaders.py b/ttools/loaders.py index df4da15..84e1bee 100644 --- a/ttools/loaders.py +++ b/ttools/loaders.py @@ -1,8 +1,5 @@ from ctypes import Union -from dotenv import load_dotenv -from appdirs import user_data_dir -from ttools.utils import find_dotenv from ttools.config import * from datetime import datetime from alpaca.data.historical import StockHistoricalDataClient @@ -16,7 +13,7 @@ from time import time as timetime from concurrent.futures import ThreadPoolExecutor from alpaca.data.enums import DataFeed import random -from ttools.utils import AggType, fetch_calendar_data, print, set_verbose +from ttools.utils import AggType, fetch_calendar_data, print, print_matching_files_info, set_verbose, list_matching_files from tqdm import tqdm import threading from typing import List, Union @@ -393,9 +390,25 @@ def load_data(symbol: Union[str, List[str]], excludes_str = ''.join(map(str, exclude_conditions)) file_ohlcv = AGG_CACHE / f"{symbol}-{str(agg_type)}-{str(resolution)}-{start_date.strftime('%Y-%m-%dT%H-%M-%S')}-{end_date.strftime('%Y-%m-%dT%H-%M-%S')}-{str(excludes_str)}-{minsize}-{main_session_only}.parquet" - if not force_remote and file_ohlcv.exists(): - ohlcv_df = pd.read_parquet(file_ohlcv, engine='pyarrow') - print("Loaded from agg_cache", file_ohlcv) + #if matching files with same condition and same or wider date span + matched_files = list_matching_files( + symbol=symbol, + agg_type=str(agg_type), + resolution=str(resolution), + start_date=start_date, + end_date=end_date, + excludes_str=str(excludes_str), + minsize=minsize, + main_session_only=main_session_only + ) + print("matched agg files", len(matched_files)) + print_matching_files_info(matched_files) + + if not force_remote and len(matched_files) > 0: + ohlcv_df = pd.read_parquet(matched_files[0], + engine='pyarrow', + filters=[('time', '>=', start_date), ('time', '<=', end_date)]) + print("Loaded from agg_cache", matched_files[0]) return ohlcv_df else: #neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck? @@ -411,6 +424,11 @@ def load_data(symbol: Union[str, List[str]], ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote) if return_vbt: + try: + import vectorbtpro as vbt # Import only when needed + except ImportError: + raise RuntimeError("vectorbtpro is required for return_vbt. Please install it.") + return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY) return ret_dict_df diff --git a/ttools/utils.py b/ttools/utils.py index e6fc7aa..f40ad4e 100644 --- a/ttools/utils.py +++ b/ttools/utils.py @@ -2,8 +2,10 @@ from pathlib import Path from enum import Enum from datetime import datetime, timedelta from typing import List, Tuple +import re import pytz import calendar +from ttools.config import AGG_CACHE import os from alpaca.trading.models import Order, TradeUpdate, Calendar import pandas_market_calendars as mcal @@ -26,6 +28,147 @@ def set_verbose(value): global verbose verbose = value +def parse_filename(filename: str) -> dict: + """Parse filename of AGG_CACHE files into its components using regex. + https://claude.ai/chat/b869644b-f542-4812-ad58-d4439c15fa78 + """ + pattern = r""" + ^ + ([A-Z]+)- # Symbol + ([^-]+)- # Agg type + (\d+)- # Resolution + (\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # Start date + (\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # End date + ([A-Z0-9]+)- # Excludes string + (\d+)- # Minsize + (True|False) # Main session flag + \.parquet$ # File extension + """ + match = re.match(pattern, filename, re.VERBOSE) + if not match: + return None + + try: + symbol, agg_type, resolution, start_str, end_str, excludes, minsize, main_session = match.groups() + + return { + 'symbol': symbol, + 'agg_type': agg_type, + 'resolution': resolution, + 'start_date': datetime.strptime(start_str, '%Y-%m-%dT%H-%M-%S'), + 'end_date': datetime.strptime(end_str, '%Y-%m-%dT%H-%M-%S'), + 'excludes_str': excludes, + 'minsize': int(minsize), + 'main_session_only': main_session == 'True' + } + except (ValueError, AttributeError): + return None + +def list_matching_files( + symbol: str = None, + agg_type: str = None, + resolution: str = None, + start_date: datetime = None, + end_date: datetime = None, + excludes_str: str = None, + minsize: int = None, + main_session_only: bool = None +) -> list[Path]: + """ + List all aggregated files in the cache directory matching the specified criteria. + If start_date and end_date are provided, returns files that cover this interval + (meaning their date range encompasses the requested interval). + If a parameter is None, it matches any value for that component. + + Example: + ```python + # Example with all parameters specified + specific_files = list_matching_files( + symbol="SPY", + agg_type="AggType.OHLCV", + resolution="12", + start_date=datetime(2024, 1, 15, 9, 30), + end_date=datetime(2024, 1, 15, 16, 0), + excludes_str="4679BCFMOPUVWZ", + minsize=100, + main_session_only=True + ) + + print_matching_files_info(specific_files) + ``` + """ + #make date naive + if start_date is not None: + start_date = start_date.replace(tzinfo=None) + if end_date is not None: + end_date = end_date.replace(tzinfo=None) + + agg_cache_dir = AGG_CACHE + def matches_criteria(file_info: dict) -> bool: + """Check if file matches all specified criteria.""" + if not file_info: + return False + + # Check non-date criteria first + if symbol is not None and file_info['symbol'] != symbol: + return False + if agg_type is not None and file_info['agg_type'] != agg_type: + return False + if resolution is not None and file_info['resolution'] != resolution: + return False + if excludes_str is not None and file_info['excludes_str'] != excludes_str: + return False + if minsize is not None and file_info['minsize'] != minsize: + return False + if main_session_only is not None and file_info['main_session_only'] != main_session_only: + return False + + # Check date range coverage if both dates are provided + if start_date is not None and end_date is not None: + return (file_info['start_date'] <= start_date and + file_info['end_date'] >= end_date) + + # If only start_date is provided + if start_date is not None: + return file_info['end_date'] >= start_date + + # If only end_date is provided + if end_date is not None: + return file_info['start_date'] <= end_date + + return True + + # Process all files + matching_files = [] + for file_path in agg_cache_dir.iterdir(): + if not file_path.is_file() or not file_path.name.endswith('.parquet'): + continue + + file_info = parse_filename(file_path.name) + if matches_criteria(file_info): + matching_files.append((file_path, file_info)) + + # Sort files by start date and then end date + matching_files.sort(key=lambda x: (x[1]['start_date'], x[1]['end_date'])) + + # Return just the file paths + return [f[0] for f in matching_files] + +def print_matching_files_info(files: list[Path]): + """Helper function to print detailed information about matching files.""" + for file_path in files: + file_info = parse_filename(file_path.name) + if file_info: + print(f"\nFile: {file_path.name}") + print(f"Coverage: {file_info['start_date']} to {file_info['end_date']}") + print(f"Symbol: {file_info['symbol']}") + print(f"Agg Type: {file_info['agg_type']}") + print(f"Resolution: {file_info['resolution']}") + print(f"Excludes: {file_info['excludes_str']}") + print(f"Minsize: {file_info['minsize']}") + print(f"Main Session Only: {file_info['main_session_only']}") + print("-" * 80) + def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]: """ Fetches the trading schedule for the NYSE (New York Stock Exchange) between the specified start and end dates. @@ -109,33 +252,6 @@ def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tupl return ranges - -def find_dotenv(): - """ - Searches for a .env file in the given directory or its parents and returns the path. - - Args: - start_path: The directory to start searching from. - - Returns: - Path to the .env file if found, otherwise None. - """ - try: - start_path = __file__ - except NameError: - #print("Notebook probably") - start_path = os.getcwd() - #print(start_path) - - current_path = Path(start_path) - for _ in range(10): # Limit search depth to 5 levels - dotenv_path = current_path / '.env' - if dotenv_path.exists(): - return dotenv_path - current_path = current_path.parent - return None - - #create enum AGG_TYPE class AggType(str, Enum): """