Compare commits
2 Commits
478a31c459
...
47450e2740
| Author | SHA1 | Date | |
|---|---|---|---|
| 47450e2740 | |||
| 5770d8324a |
@ -18,7 +18,11 @@ Modules:
|
||||
Detailed examples in [tests/data_loader_tryme.ipynb](tests/data_loader_tryme.ipynb)
|
||||
|
||||
## load_data
|
||||
Returns vectorized aggregation of given type. If trades for given period are not cached they are remotely fetched from Alpaca first.
|
||||
Returns vectorized aggregation of given type.
|
||||
|
||||
If aggregated data are already in agg cache with same conditions for same or wider date span they are returned from cache.
|
||||
Otherwise trade data are aggregated on the fly, saved to cache and returned.
|
||||
If trades for given period are not cached ,they are remotely fetched from Alpaca first.
|
||||
|
||||
Example:
|
||||
|
||||
@ -130,6 +134,8 @@ exits.tail(20)
|
||||
```
|
||||
## display plotly figs in single ntb cells
|
||||
|
||||
To display various standalone figures in the same cell.
|
||||
|
||||
`figs2cell(figlist)`
|
||||
|
||||
Example usage:
|
||||
|
||||
2
setup.py
2
setup.py
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
||||
|
||||
setup(
|
||||
name='ttools',
|
||||
version='0.6.3',
|
||||
version='0.6.4',
|
||||
packages=find_packages(),
|
||||
install_requires=[
|
||||
# list your dependencies here
|
||||
|
||||
178
tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb
Normal file
178
tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb
Normal file
@ -0,0 +1,178 @@
|
||||
{
|
||||
"cells": [
|
||||
{
|
||||
"cell_type": "markdown",
|
||||
"metadata": {},
|
||||
"source": [
|
||||
"Exploring alternative cache storage using duckdb and parquet\n",
|
||||
"\n",
|
||||
"https://claude.ai/chat/e49491f7-8b18-4fb7-b301-5c9997746079\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 1,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n",
|
||||
"Start loading data... 1730370862.4833238\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"data": {
|
||||
"application/vnd.jupyter.widget-view+json": {
|
||||
"model_id": "829f7f3d58a74f1fbfdcfc202c2aaf84",
|
||||
"version_major": 2,
|
||||
"version_minor": 0
|
||||
},
|
||||
"text/plain": [
|
||||
"FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))"
|
||||
]
|
||||
},
|
||||
"metadata": {},
|
||||
"output_type": "display_data"
|
||||
},
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"fetched parquet -11.310973167419434\n",
|
||||
"Loaded 1836460 rows\n"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"from ttools.tradecache import TradeCache\n",
|
||||
"from ttools.utils import zoneNY\n",
|
||||
"from pathlib import Path\n",
|
||||
"from datetime import datetime\n",
|
||||
"import logging\n",
|
||||
"import duckdb\n",
|
||||
"\n",
|
||||
"logging.basicConfig(\n",
|
||||
" level=logging.INFO, # Set the minimum level (DEBUG, INFO, WARNING, ERROR, CRITICAL)\n",
|
||||
" format='%(levelname)s: %(message)s' # Simple format showing level and message\n",
|
||||
")\n",
|
||||
"\n",
|
||||
"cache = TradeCache(\n",
|
||||
" base_path=Path(\"./trade_cache\"),\n",
|
||||
" max_workers=4, # Adjust based on your CPU\n",
|
||||
" cleanup_after_days=7\n",
|
||||
")\n",
|
||||
"\n",
|
||||
"# Load data\n",
|
||||
"df = cache.load_range(\n",
|
||||
" symbol=\"BAC\",\n",
|
||||
" start_date=zoneNY.localize(datetime(2024, 10, 14, 9, 30)),\n",
|
||||
" end_date=zoneNY.localize(datetime(2024, 10, 20, 16, 0)),\n",
|
||||
" #columns=['open', 'high', 'low', 'close', 'volume']\n",
|
||||
")\n",
|
||||
"\n",
|
||||
"print(f\"Loaded {len(df)} rows\")"
|
||||
]
|
||||
},
|
||||
{
|
||||
"cell_type": "code",
|
||||
"execution_count": 4,
|
||||
"metadata": {},
|
||||
"outputs": [
|
||||
{
|
||||
"name": "stdout",
|
||||
"output_type": "stream",
|
||||
"text": [
|
||||
"DuckDB Schema:\n",
|
||||
" column_name column_type null key default extra\n",
|
||||
"0 x VARCHAR YES None None None\n",
|
||||
"1 p DOUBLE YES None None None\n",
|
||||
"2 s BIGINT YES None None None\n",
|
||||
"3 i BIGINT YES None None None\n",
|
||||
"4 c VARCHAR[] YES None None None\n",
|
||||
"5 z VARCHAR YES None None None\n",
|
||||
"6 t TIMESTAMP WITH TIME ZONE YES None None None\n",
|
||||
"\n",
|
||||
"Sample Data:\n",
|
||||
" x p s i c z \\\n",
|
||||
"0 T 41.870 27 62879146994030 [ , F, T, I] A \n",
|
||||
"1 D 41.965 1 71675241580848 [ , I] A \n",
|
||||
"2 D 41.965 1 71675241644625 [ , I] A \n",
|
||||
"3 D 41.850 1 71675241772360 [ , I] A \n",
|
||||
"4 N 41.960 416188 52983525028174 [ , O] A \n",
|
||||
"\n",
|
||||
" t \n",
|
||||
"0 2024-10-14 15:30:00.006480+02:00 \n",
|
||||
"1 2024-10-14 15:30:00.395802+02:00 \n",
|
||||
"2 2024-10-14 15:30:00.484008+02:00 \n",
|
||||
"3 2024-10-14 15:30:00.610005+02:00 \n",
|
||||
"4 2024-10-14 15:30:01.041599+02:00 \n",
|
||||
"\n",
|
||||
"Pandas Info:\n"
|
||||
]
|
||||
},
|
||||
{
|
||||
"ename": "NameError",
|
||||
"evalue": "name 'pd' is not defined",
|
||||
"output_type": "error",
|
||||
"traceback": [
|
||||
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
||||
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
|
||||
"Cell \u001b[0;32mIn[4], line 25\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n\u001b[1;32m 24\u001b[0m \u001b[38;5;66;03m# Let's check the schema first\u001b[39;00m\n\u001b[0;32m---> 25\u001b[0m \u001b[43mcheck_parquet_schema\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
|
||||
"Cell \u001b[0;32mIn[4], line 21\u001b[0m, in \u001b[0;36mcheck_parquet_schema\u001b[0;34m()\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[38;5;66;03m# Method 3: Using pandas\u001b[39;00m\n\u001b[1;32m 20\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124mPandas Info:\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m---> 21\u001b[0m df \u001b[38;5;241m=\u001b[39m \u001b[43mpd\u001b[49m\u001b[38;5;241m.\u001b[39mread_parquet(sample_file)\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n",
|
||||
"\u001b[0;31mNameError\u001b[0m: name 'pd' is not defined"
|
||||
]
|
||||
}
|
||||
],
|
||||
"source": [
|
||||
"import duckdb\n",
|
||||
"\n",
|
||||
"def check_parquet_schema():\n",
|
||||
" # Read one file and print its structure\n",
|
||||
" sample_file = Path(\"./trade_cache\")/\"temp/BAC_20241014.parquet\"\n",
|
||||
" \n",
|
||||
" # Method 1: Using DuckDB describe\n",
|
||||
" print(\"DuckDB Schema:\")\n",
|
||||
" print(duckdb.sql(f\"DESCRIBE SELECT * FROM read_parquet('{sample_file}')\").df())\n",
|
||||
" \n",
|
||||
" # Method 2: Just look at the data\n",
|
||||
" print(\"\\nSample Data:\")\n",
|
||||
" print(duckdb.sql(f\"\"\"\n",
|
||||
" SELECT *\n",
|
||||
" FROM read_parquet('{sample_file}')\n",
|
||||
" LIMIT 5\n",
|
||||
" \"\"\").df())\n",
|
||||
" \n",
|
||||
" # Method 3: Using pandas\n",
|
||||
" print(\"\\nPandas Info:\")\n",
|
||||
" df = pd.read_parquet(sample_file)\n",
|
||||
" print(df.info())\n",
|
||||
"\n",
|
||||
"# Let's check the schema first\n",
|
||||
"check_parquet_schema()"
|
||||
]
|
||||
}
|
||||
],
|
||||
"metadata": {
|
||||
"kernelspec": {
|
||||
"display_name": ".venv",
|
||||
"language": "python",
|
||||
"name": "python3"
|
||||
},
|
||||
"language_info": {
|
||||
"codemirror_mode": {
|
||||
"name": "ipython",
|
||||
"version": 3
|
||||
},
|
||||
"file_extension": ".py",
|
||||
"mimetype": "text/x-python",
|
||||
"name": "python",
|
||||
"nbconvert_exporter": "python",
|
||||
"pygments_lexer": "ipython3",
|
||||
"version": "3.10.11"
|
||||
}
|
||||
},
|
||||
"nbformat": 4,
|
||||
"nbformat_minor": 2
|
||||
}
|
||||
324
tests/WIP-tradecache_duckdb_approach/tradecache.py
Normal file
324
tests/WIP-tradecache_duckdb_approach/tradecache.py
Normal file
@ -0,0 +1,324 @@
|
||||
#this goes to the main direcotry
|
||||
|
||||
|
||||
from pathlib import Path
|
||||
from datetime import datetime, date, timedelta
|
||||
from typing import Optional, List, Set, Dict, Tuple
|
||||
import pandas as pd
|
||||
import duckdb
|
||||
import pandas_market_calendars as mcal
|
||||
from abc import ABC, abstractmethod
|
||||
import logging
|
||||
from ttools.utils import zoneNY
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from ttools.loaders import fetch_daily_stock_trades
|
||||
import time
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class TradeCache:
|
||||
def __init__(
|
||||
self,
|
||||
base_path: Path,
|
||||
market: str = 'NYSE',
|
||||
max_workers: int = 4,
|
||||
cleanup_after_days: int = 7
|
||||
):
|
||||
"""
|
||||
Initialize TradeCache with monthly partitions and temp storage
|
||||
|
||||
Args:
|
||||
base_path: Base directory for cache
|
||||
market: Market calendar to use
|
||||
max_workers: Max parallel fetches
|
||||
cleanup_after_days: Days after which to clean temp files
|
||||
"""
|
||||
"""Initialize TradeCache with the same parameters but optimized for the new schema"""
|
||||
self.base_path = Path(base_path)
|
||||
self.temp_path = self.base_path / "temp"
|
||||
self.base_path.mkdir(parents=True, exist_ok=True)
|
||||
self.temp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
self.calendar = mcal.get_calendar(market)
|
||||
self.max_workers = max_workers
|
||||
self.cleanup_after_days = cleanup_after_days
|
||||
|
||||
# Initialize DuckDB with schema-specific optimizations
|
||||
self.con = duckdb.connect()
|
||||
self.con.execute("SET memory_limit='16GB'")
|
||||
self.con.execute("SET threads TO 8")
|
||||
|
||||
# Create the schema for our tables
|
||||
self.schema = """
|
||||
x VARCHAR,
|
||||
p DOUBLE,
|
||||
s BIGINT,
|
||||
i BIGINT,
|
||||
c VARCHAR[],
|
||||
z VARCHAR,
|
||||
t TIMESTAMP WITH TIME ZONE
|
||||
"""
|
||||
|
||||
self._trading_days_cache: Dict[Tuple[date, date], List[date]] = {}
|
||||
|
||||
def get_partition_path(self, symbol: str, year: int, month: int) -> Path:
|
||||
"""Get path for a specific partition"""
|
||||
return self.base_path / f"symbol={symbol}/year={year}/month={month}"
|
||||
|
||||
def get_temp_path(self, symbol: str, day: date) -> Path:
|
||||
"""Get temporary file path for a day"""
|
||||
return self.temp_path / f"{symbol}_{day:%Y%m%d}.parquet"
|
||||
|
||||
def get_trading_days(self, start_date: datetime, end_date: datetime) -> List[date]:
|
||||
"""Get trading days with caching"""
|
||||
key = (start_date.date(), end_date.date())
|
||||
if key not in self._trading_days_cache:
|
||||
schedule = self.calendar.schedule(start_date=start_date, end_date=end_date)
|
||||
self._trading_days_cache[key] = [d.date() for d in schedule.index]
|
||||
return self._trading_days_cache[key]
|
||||
|
||||
def cleanup_temp_files(self):
|
||||
"""Clean up old temp files"""
|
||||
cutoff = datetime.now() - timedelta(days=self.cleanup_after_days)
|
||||
for file in self.temp_path.glob("*.parquet"):
|
||||
try:
|
||||
# Extract date from filename
|
||||
date_str = file.stem.split('_')[1]
|
||||
file_date = datetime.strptime(date_str, '%Y%m%d')
|
||||
if file_date < cutoff:
|
||||
file.unlink()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error cleaning up {file}: {e}")
|
||||
|
||||
|
||||
def consolidate_month(self, symbol: str, year: int, month: int) -> bool:
|
||||
"""
|
||||
Consolidate daily files into monthly partition only if we have complete month
|
||||
Returns True if consolidation was successful
|
||||
"""
|
||||
# Get all temp files for this symbol and month
|
||||
temp_files = list(self.temp_path.glob(f"{symbol}_{year:04d}{month:02d}*.parquet"))
|
||||
|
||||
if not temp_files:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Get expected trading days for this month
|
||||
start_date = zoneNY.localize(datetime(year, month, 1))
|
||||
if month == 12:
|
||||
end_date = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
|
||||
else:
|
||||
end_date = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
|
||||
|
||||
trading_days = self.get_trading_days(start_date, end_date)
|
||||
|
||||
# Check if we have data for all trading days
|
||||
temp_dates = set(datetime.strptime(f.stem.split('_')[1], '%Y%m%d').date()
|
||||
for f in temp_files)
|
||||
missing_days = set(trading_days) - temp_dates
|
||||
|
||||
# Only consolidate if we have all trading days
|
||||
if missing_days:
|
||||
logger.info(f"Skipping consolidation for {symbol} {year}-{month}: "
|
||||
f"missing {len(missing_days)} trading days")
|
||||
return False
|
||||
|
||||
# Proceed with consolidation since we have complete month
|
||||
partition_path = self.get_partition_path(symbol, year, month)
|
||||
partition_path.mkdir(parents=True, exist_ok=True)
|
||||
file_path = partition_path / "data.parquet"
|
||||
|
||||
files_str = ', '.join(f"'{f}'" for f in temp_files)
|
||||
|
||||
# Modified query to handle the new schema
|
||||
self.con.execute(f"""
|
||||
COPY (
|
||||
SELECT x, p, s, i, c, z, t
|
||||
FROM read_parquet([{files_str}])
|
||||
ORDER BY t
|
||||
)
|
||||
TO '{file_path}'
|
||||
(FORMAT PARQUET, COMPRESSION 'ZSTD')
|
||||
""")
|
||||
|
||||
# Remove temp files only after successful write
|
||||
for f in temp_files:
|
||||
f.unlink()
|
||||
|
||||
logger.info(f"Successfully consolidated {symbol} {year}-{month} "
|
||||
f"({len(temp_files)} files)")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error consolidating {symbol} {year}-{month}: {e}")
|
||||
return False
|
||||
|
||||
def fetch_remote_day(self, symbol: str, day: date) -> pd.DataFrame:
|
||||
"""Implement this to fetch single day of data"""
|
||||
min_datetime = zoneNY.localize(datetime.combine(day, datetime.min.time()))
|
||||
max_datetime = zoneNY.localize(datetime.combine(day, datetime.max.time()))
|
||||
return fetch_daily_stock_trades(symbol, min_datetime, max_datetime)
|
||||
|
||||
def _fetch_and_save_day(self, symbol: str, day: date) -> Optional[Path]:
|
||||
"""Fetch and save a single day, returns file path if successful"""
|
||||
try:
|
||||
df_day = self.fetch_remote_day(symbol, day)
|
||||
if df_day.empty:
|
||||
return None
|
||||
|
||||
temp_file = self.get_temp_path(symbol, day)
|
||||
df_day.to_parquet(temp_file, compression='ZSTD')
|
||||
return temp_file
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching {symbol} for {day}: {e}")
|
||||
return None
|
||||
|
||||
def load_range(
|
||||
self,
|
||||
symbol: str,
|
||||
start_date: datetime,
|
||||
end_date: datetime,
|
||||
columns: Optional[List[str]] = None,
|
||||
consolidate: bool = False
|
||||
) -> pd.DataFrame:
|
||||
"""Load data for date range, consolidating when complete months are detected"""
|
||||
#self.cleanup_temp_files()
|
||||
|
||||
trading_days = self.get_trading_days(start_date, end_date)
|
||||
|
||||
# Modify column selection for new schema
|
||||
col_str = '*' if not columns else ', '.join(columns)
|
||||
|
||||
if consolidate:
|
||||
# First check temp files for complete months
|
||||
temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet"))
|
||||
if temp_files:
|
||||
# Group temp files by month
|
||||
monthly_temps: Dict[Tuple[int, int], Set[date]] = {}
|
||||
for file in temp_files:
|
||||
try:
|
||||
# Extract date from filename
|
||||
date_str = file.stem.split('_')[1]
|
||||
file_date = datetime.strptime(date_str, '%Y%m%d').date()
|
||||
key = (file_date.year, file_date.month)
|
||||
if key not in monthly_temps:
|
||||
monthly_temps[key] = set()
|
||||
monthly_temps[key].add(file_date)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error parsing temp file date {file}: {e}")
|
||||
continue
|
||||
|
||||
# Check each month for completeness and consolidate if complete
|
||||
for (year, month), dates in monthly_temps.items():
|
||||
# Get trading days for this month
|
||||
month_start = zoneNY.localize(datetime(year, month, 1))
|
||||
if month == 12:
|
||||
month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
|
||||
else:
|
||||
month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
|
||||
|
||||
month_trading_days = set(self.get_trading_days(month_start, month_end))
|
||||
|
||||
# If we have all trading days for the month, consolidate
|
||||
if month_trading_days.issubset(dates):
|
||||
logger.info(f"Found complete month in temp files for {symbol} {year}-{month}")
|
||||
self.consolidate_month(symbol, year, month)
|
||||
|
||||
#timing the load
|
||||
time_start = time.time()
|
||||
print("Start loading data...", time_start)
|
||||
# Now load data from both consolidated and temp files
|
||||
query = f"""
|
||||
WITH monthly_data AS (
|
||||
SELECT {col_str}
|
||||
FROM read_parquet(
|
||||
'{self.base_path}/*/*.parquet',
|
||||
hive_partitioning=1,
|
||||
union_by_name=true
|
||||
)
|
||||
WHERE t BETWEEN '{start_date}' AND '{end_date}'
|
||||
),
|
||||
temp_data AS (
|
||||
SELECT {col_str}
|
||||
FROM read_parquet(
|
||||
'{self.temp_path}/{symbol}_*.parquet',
|
||||
union_by_name=true
|
||||
)
|
||||
WHERE t BETWEEN '{start_date}' AND '{end_date}'
|
||||
)
|
||||
SELECT * FROM (
|
||||
SELECT * FROM monthly_data
|
||||
UNION ALL
|
||||
SELECT * FROM temp_data
|
||||
)
|
||||
ORDER BY t
|
||||
"""
|
||||
|
||||
try:
|
||||
df_cached = self.con.execute(query).df()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error reading cached data: {e}")
|
||||
df_cached = pd.DataFrame()
|
||||
|
||||
print("fetched parquet", time_start - time.time())
|
||||
if not df_cached.empty:
|
||||
cached_days = set(df_cached['t'].dt.date)
|
||||
missing_days = [d for d in trading_days if d not in cached_days]
|
||||
else:
|
||||
missing_days = trading_days
|
||||
|
||||
# Fetch missing days in parallel
|
||||
if missing_days:
|
||||
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||
future_to_day = {
|
||||
executor.submit(self._fetch_and_save_day, symbol, day): day
|
||||
for day in missing_days
|
||||
}
|
||||
|
||||
for future in future_to_day:
|
||||
day = future_to_day[future]
|
||||
try:
|
||||
temp_file = future.result()
|
||||
if temp_file:
|
||||
logger.debug(f"Successfully fetched {symbol} for {day}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {symbol} for {day}: {e}")
|
||||
|
||||
# Check again for complete months after fetching new data
|
||||
temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet"))
|
||||
if temp_files:
|
||||
monthly_temps = {}
|
||||
for file in temp_files:
|
||||
try:
|
||||
date_str = file.stem.split('_')[1]
|
||||
file_date = datetime.strptime(date_str, '%Y%m%d').date()
|
||||
key = (file_date.year, file_date.month)
|
||||
if key not in monthly_temps:
|
||||
monthly_temps[key] = set()
|
||||
monthly_temps[key].add(file_date)
|
||||
except Exception as e:
|
||||
logger.warning(f"Error parsing temp file date {file}: {e}")
|
||||
continue
|
||||
|
||||
# Check for complete months again
|
||||
for (year, month), dates in monthly_temps.items():
|
||||
month_start = zoneNY.localize(datetime(year, month, 1))
|
||||
if month == 12:
|
||||
month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
|
||||
else:
|
||||
month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
|
||||
|
||||
month_trading_days = set(self.get_trading_days(month_start, month_end))
|
||||
|
||||
if month_trading_days.issubset(dates):
|
||||
logger.info(f"Found complete month after fetching for {symbol} {year}-{month}")
|
||||
self.consolidate_month(symbol, year, month)
|
||||
|
||||
# Load final data including any new fetches
|
||||
try:
|
||||
df_cached = self.con.execute(query).df()
|
||||
except Exception as e:
|
||||
logger.warning(f"Error reading final data: {e}")
|
||||
df_cached = pd.DataFrame()
|
||||
|
||||
return df_cached.sort_values('t')
|
||||
File diff suppressed because one or more lines are too long
@ -1,4 +1,4 @@
|
||||
from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell
|
||||
from .vbtindicators import register_custom_inds
|
||||
from .utils import find_dotenv, AggType, zoneNY, zonePRG, zoneUTC
|
||||
from .utils import AggType, zoneNY, zonePRG, zoneUTC
|
||||
from .loaders import load_data, prepare_trade_cache
|
||||
@ -1,14 +1,35 @@
|
||||
|
||||
from dotenv import load_dotenv
|
||||
from appdirs import user_data_dir
|
||||
from ttools.utils import find_dotenv
|
||||
import ttools.utils as utils
|
||||
import os
|
||||
import pytz
|
||||
import vectorbtpro as vbt
|
||||
import pytz
|
||||
from pathlib import Path
|
||||
from dotenv import load_dotenv
|
||||
import os
|
||||
def find_dotenv():
|
||||
"""
|
||||
Searches for a .env file in the given directory or its parents and returns the path.
|
||||
|
||||
Args:
|
||||
start_path: The directory to start searching from.
|
||||
|
||||
Returns:
|
||||
Path to the .env file if found, otherwise None.
|
||||
"""
|
||||
try:
|
||||
start_path = __file__
|
||||
except NameError:
|
||||
#print("Notebook probably")
|
||||
start_path = os.getcwd()
|
||||
#print(start_path)
|
||||
|
||||
current_path = Path(start_path)
|
||||
for _ in range(10): # Limit search depth to 5 levels
|
||||
dotenv_path = current_path / '.env'
|
||||
if dotenv_path.exists():
|
||||
return dotenv_path
|
||||
current_path = current_path.parent
|
||||
return None
|
||||
|
||||
ENV_FILE = find_dotenv()
|
||||
|
||||
|
||||
@ -1,8 +1,5 @@
|
||||
|
||||
from ctypes import Union
|
||||
from dotenv import load_dotenv
|
||||
from appdirs import user_data_dir
|
||||
from ttools.utils import find_dotenv
|
||||
from ttools.config import *
|
||||
from datetime import datetime
|
||||
from alpaca.data.historical import StockHistoricalDataClient
|
||||
@ -16,7 +13,7 @@ from time import time as timetime
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from alpaca.data.enums import DataFeed
|
||||
import random
|
||||
from ttools.utils import AggType, fetch_calendar_data, print, set_verbose
|
||||
from ttools.utils import AggType, fetch_calendar_data, print, print_matching_files_info, set_verbose, list_matching_files
|
||||
from tqdm import tqdm
|
||||
import threading
|
||||
from typing import List, Union
|
||||
@ -393,9 +390,25 @@ def load_data(symbol: Union[str, List[str]],
|
||||
excludes_str = ''.join(map(str, exclude_conditions))
|
||||
file_ohlcv = AGG_CACHE / f"{symbol}-{str(agg_type)}-{str(resolution)}-{start_date.strftime('%Y-%m-%dT%H-%M-%S')}-{end_date.strftime('%Y-%m-%dT%H-%M-%S')}-{str(excludes_str)}-{minsize}-{main_session_only}.parquet"
|
||||
|
||||
if not force_remote and file_ohlcv.exists():
|
||||
ohlcv_df = pd.read_parquet(file_ohlcv, engine='pyarrow')
|
||||
print("Loaded from agg_cache", file_ohlcv)
|
||||
#if matching files with same condition and same or wider date span
|
||||
matched_files = list_matching_files(
|
||||
symbol=symbol,
|
||||
agg_type=str(agg_type),
|
||||
resolution=str(resolution),
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
excludes_str=str(excludes_str),
|
||||
minsize=minsize,
|
||||
main_session_only=main_session_only
|
||||
)
|
||||
print("matched agg files", len(matched_files))
|
||||
print_matching_files_info(matched_files)
|
||||
|
||||
if not force_remote and len(matched_files) > 0:
|
||||
ohlcv_df = pd.read_parquet(matched_files[0],
|
||||
engine='pyarrow',
|
||||
filters=[('time', '>=', start_date), ('time', '<=', end_date)])
|
||||
print("Loaded from agg_cache", matched_files[0])
|
||||
return ohlcv_df
|
||||
else:
|
||||
#neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck?
|
||||
@ -411,6 +424,11 @@ def load_data(symbol: Union[str, List[str]],
|
||||
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
|
||||
|
||||
if return_vbt:
|
||||
try:
|
||||
import vectorbtpro as vbt # Import only when needed
|
||||
except ImportError:
|
||||
raise RuntimeError("vectorbtpro is required for return_vbt. Please install it.")
|
||||
|
||||
return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY)
|
||||
|
||||
return ret_dict_df
|
||||
|
||||
170
ttools/utils.py
170
ttools/utils.py
@ -2,8 +2,10 @@ from pathlib import Path
|
||||
from enum import Enum
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Tuple
|
||||
import re
|
||||
import pytz
|
||||
import calendar
|
||||
from ttools.config import AGG_CACHE
|
||||
import os
|
||||
from alpaca.trading.models import Order, TradeUpdate, Calendar
|
||||
import pandas_market_calendars as mcal
|
||||
@ -26,6 +28,147 @@ def set_verbose(value):
|
||||
global verbose
|
||||
verbose = value
|
||||
|
||||
def parse_filename(filename: str) -> dict:
|
||||
"""Parse filename of AGG_CACHE files into its components using regex.
|
||||
https://claude.ai/chat/b869644b-f542-4812-ad58-d4439c15fa78
|
||||
"""
|
||||
pattern = r"""
|
||||
^
|
||||
([A-Z]+)- # Symbol
|
||||
([^-]+)- # Agg type
|
||||
(\d+)- # Resolution
|
||||
(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # Start date
|
||||
(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # End date
|
||||
([A-Z0-9]+)- # Excludes string
|
||||
(\d+)- # Minsize
|
||||
(True|False) # Main session flag
|
||||
\.parquet$ # File extension
|
||||
"""
|
||||
match = re.match(pattern, filename, re.VERBOSE)
|
||||
if not match:
|
||||
return None
|
||||
|
||||
try:
|
||||
symbol, agg_type, resolution, start_str, end_str, excludes, minsize, main_session = match.groups()
|
||||
|
||||
return {
|
||||
'symbol': symbol,
|
||||
'agg_type': agg_type,
|
||||
'resolution': resolution,
|
||||
'start_date': datetime.strptime(start_str, '%Y-%m-%dT%H-%M-%S'),
|
||||
'end_date': datetime.strptime(end_str, '%Y-%m-%dT%H-%M-%S'),
|
||||
'excludes_str': excludes,
|
||||
'minsize': int(minsize),
|
||||
'main_session_only': main_session == 'True'
|
||||
}
|
||||
except (ValueError, AttributeError):
|
||||
return None
|
||||
|
||||
def list_matching_files(
|
||||
symbol: str = None,
|
||||
agg_type: str = None,
|
||||
resolution: str = None,
|
||||
start_date: datetime = None,
|
||||
end_date: datetime = None,
|
||||
excludes_str: str = None,
|
||||
minsize: int = None,
|
||||
main_session_only: bool = None
|
||||
) -> list[Path]:
|
||||
"""
|
||||
List all aggregated files in the cache directory matching the specified criteria.
|
||||
If start_date and end_date are provided, returns files that cover this interval
|
||||
(meaning their date range encompasses the requested interval).
|
||||
If a parameter is None, it matches any value for that component.
|
||||
|
||||
Example:
|
||||
```python
|
||||
# Example with all parameters specified
|
||||
specific_files = list_matching_files(
|
||||
symbol="SPY",
|
||||
agg_type="AggType.OHLCV",
|
||||
resolution="12",
|
||||
start_date=datetime(2024, 1, 15, 9, 30),
|
||||
end_date=datetime(2024, 1, 15, 16, 0),
|
||||
excludes_str="4679BCFMOPUVWZ",
|
||||
minsize=100,
|
||||
main_session_only=True
|
||||
)
|
||||
|
||||
print_matching_files_info(specific_files)
|
||||
```
|
||||
"""
|
||||
#make date naive
|
||||
if start_date is not None:
|
||||
start_date = start_date.replace(tzinfo=None)
|
||||
if end_date is not None:
|
||||
end_date = end_date.replace(tzinfo=None)
|
||||
|
||||
agg_cache_dir = AGG_CACHE
|
||||
def matches_criteria(file_info: dict) -> bool:
|
||||
"""Check if file matches all specified criteria."""
|
||||
if not file_info:
|
||||
return False
|
||||
|
||||
# Check non-date criteria first
|
||||
if symbol is not None and file_info['symbol'] != symbol:
|
||||
return False
|
||||
if agg_type is not None and file_info['agg_type'] != agg_type:
|
||||
return False
|
||||
if resolution is not None and file_info['resolution'] != resolution:
|
||||
return False
|
||||
if excludes_str is not None and file_info['excludes_str'] != excludes_str:
|
||||
return False
|
||||
if minsize is not None and file_info['minsize'] != minsize:
|
||||
return False
|
||||
if main_session_only is not None and file_info['main_session_only'] != main_session_only:
|
||||
return False
|
||||
|
||||
# Check date range coverage if both dates are provided
|
||||
if start_date is not None and end_date is not None:
|
||||
return (file_info['start_date'] <= start_date and
|
||||
file_info['end_date'] >= end_date)
|
||||
|
||||
# If only start_date is provided
|
||||
if start_date is not None:
|
||||
return file_info['end_date'] >= start_date
|
||||
|
||||
# If only end_date is provided
|
||||
if end_date is not None:
|
||||
return file_info['start_date'] <= end_date
|
||||
|
||||
return True
|
||||
|
||||
# Process all files
|
||||
matching_files = []
|
||||
for file_path in agg_cache_dir.iterdir():
|
||||
if not file_path.is_file() or not file_path.name.endswith('.parquet'):
|
||||
continue
|
||||
|
||||
file_info = parse_filename(file_path.name)
|
||||
if matches_criteria(file_info):
|
||||
matching_files.append((file_path, file_info))
|
||||
|
||||
# Sort files by start date and then end date
|
||||
matching_files.sort(key=lambda x: (x[1]['start_date'], x[1]['end_date']))
|
||||
|
||||
# Return just the file paths
|
||||
return [f[0] for f in matching_files]
|
||||
|
||||
def print_matching_files_info(files: list[Path]):
|
||||
"""Helper function to print detailed information about matching files."""
|
||||
for file_path in files:
|
||||
file_info = parse_filename(file_path.name)
|
||||
if file_info:
|
||||
print(f"\nFile: {file_path.name}")
|
||||
print(f"Coverage: {file_info['start_date']} to {file_info['end_date']}")
|
||||
print(f"Symbol: {file_info['symbol']}")
|
||||
print(f"Agg Type: {file_info['agg_type']}")
|
||||
print(f"Resolution: {file_info['resolution']}")
|
||||
print(f"Excludes: {file_info['excludes_str']}")
|
||||
print(f"Minsize: {file_info['minsize']}")
|
||||
print(f"Main Session Only: {file_info['main_session_only']}")
|
||||
print("-" * 80)
|
||||
|
||||
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
|
||||
"""
|
||||
Fetches the trading schedule for the NYSE (New York Stock Exchange) between the specified start and end dates.
|
||||
@ -109,33 +252,6 @@ def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tupl
|
||||
|
||||
return ranges
|
||||
|
||||
|
||||
def find_dotenv():
|
||||
"""
|
||||
Searches for a .env file in the given directory or its parents and returns the path.
|
||||
|
||||
Args:
|
||||
start_path: The directory to start searching from.
|
||||
|
||||
Returns:
|
||||
Path to the .env file if found, otherwise None.
|
||||
"""
|
||||
try:
|
||||
start_path = __file__
|
||||
except NameError:
|
||||
#print("Notebook probably")
|
||||
start_path = os.getcwd()
|
||||
#print(start_path)
|
||||
|
||||
current_path = Path(start_path)
|
||||
for _ in range(10): # Limit search depth to 5 levels
|
||||
dotenv_path = current_path / '.env'
|
||||
if dotenv_path.exists():
|
||||
return dotenv_path
|
||||
current_path = current_path.parent
|
||||
return None
|
||||
|
||||
|
||||
#create enum AGG_TYPE
|
||||
class AggType(str, Enum):
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user