Compare commits

...

30 Commits

Author SHA1 Message Date
68ea25d963 class weights for multiclass for training 2024-12-05 11:12:35 +01:00
277de3f73e fix 2024-11-29 13:36:38 +01:00
d99df2402c fix 2024-11-29 13:32:51 +01:00
f52fb2649c fix 2024-11-28 11:23:12 +01:00
30885171c3 fix 2024-11-27 10:27:03 +01:00
623412406e fix 2024-11-27 10:15:22 +01:00
1b3f5b1b79 update 2024-11-27 09:20:24 +01:00
0705b45351 gpu support added 2024-11-26 18:20:29 +01:00
c5e4c03af7 fix 2024-11-26 18:06:41 +01:00
191b58d11d thr conf matrix added 2024-11-21 16:08:07 +01:00
f2066f419e daily 2024-11-21 13:27:17 +01:00
519163efb5 fix 2024-11-21 09:32:20 +01:00
491bfc9feb ml support added 2024-11-21 09:20:36 +01:00
0ff42e2345 ordering of remote files fix 2024-11-21 05:28:08 +01:00
e22fda2f35 readme change 2024-11-19 10:28:32 +01:00
25b5a53774 Lee-Ready method updated - trades_buy_count and trades_sell_count added 2024-11-19 10:24:31 +01:00
169f07563e fix vol bars 2024-11-14 13:48:20 +01:00
fb5b2369e1 fix 2024-11-14 12:55:47 +01:00
b23a772836 remote fetch 2024-11-10 14:08:41 +01:00
cf6bcede48 remote range in utc 2024-11-01 15:41:23 +01:00
2116679dba optimalizations 2024-11-01 11:18:10 +01:00
c3faa53eff fix 2024-10-31 13:20:56 +01:00
47450e2740 agg cache optimized 2024-10-31 13:19:00 +01:00
5770d8324a fix 2024-10-30 14:39:41 +01:00
478a31c459 fix 2024-10-30 14:36:55 +01:00
53443c197b fix 2024-10-30 14:28:28 +01:00
008ab547c7 fix 2024-10-30 14:25:34 +01:00
d316d06f1d update 2024-10-30 14:10:44 +01:00
652cc02f12 Merge branch 'main' of https://github.com/drew2323/ttools into main 2024-10-30 14:09:13 +01:00
ca554cf600 fix 2024-10-30 14:08:59 +01:00
14 changed files with 4629 additions and 981 deletions

View File

@ -3,59 +3,66 @@ A Python library for tools, utilities, and helpers for my trading research workf
## Installation
```python
```bash
pip install git+https://github.com/drew2323/ttools.git
```
or
```bash
pip install git+https://gitea.stratlab.dev/dwker/ttools.git
```
Modules:
# loaders
- remotely fetches daily trade data
- manages trade cache (daily trade files per symbol) and aggregation cache (per symbola and requested period)
- numba compiled aggregator for required output (time based, dollars, volume bars, renkos...).
- additional columns calculated from tick data and included in bars
- buyvolume, sellvolume - total amount of volume triggered by aggressive orders (estimated by Lee-Ready algorithm)
- buytrades, selltrades - total amount of trades in each bar grouped by side of aggregsive orders
Detailed examples in `tests/data_loader_tryme.ipynb`
Detailed examples in [tests/data_loader_tryme.ipynb](tests/data_loader_tryme.ipynb)
## load_data
Returns vectorized aggregation of given type. If trades for given period are not cached they are remotely fetched from Alpaca first.
Returns vectorized aggregation of given type.
If aggregated data are already in agg cache with same conditions for same or wider date span they are returned from cache.
Otherwise trade data are aggregated on the fly, saved to cache and returned.
If trades for given period are not cached ,they are remotely fetched from Alpaca first.
Example:
```python
from ttools import load_data
#This is how to call LOAD function
symbol = ["BAC"]
#datetime in zoneNY
day_start = datetime(2024, 10, 14, 9, 45, 0)
day_stop = datetime(2024, 10, 16, 15, 1, 0)
day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop)
#requested AGG
resolution = 12 #12s
agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO
exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults
minsize = 100
main_session_only = True
force_remote = False
data = load_data(symbol = symbol,
agg_type = agg_type,
resolution = resolution,
start_date = day_start,
end_date = day_stop,
#exclude_conditions = None,
minsize = minsize,
main_session_only = main_session_only,
force_remote = force_remote,
return_vbt = True, #returns vbt object
verbose = True
vbt_data = load_data(symbol = ["BAC"],
agg_type = AggType.OHLCV, #aggregation types: AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO,
resolution = 12, #12s (for other types might be bricksize etc.)
start_date = datetime(2024, 10, 14, 9, 45, 0),
end_date = datetime(2024, 10, 16, 15, 1, 0),
#exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'],
minsize = 100, #minimum trade size included in aggregation
main_session_only = True, #False for ext hours
force_remote = False, #always refetches trades remotely
return_vbt = True, #returns vbt object with symbols as columns, otherwise dict keyed by symbols with pd.DataFrame
verbose = True # False = silent mode
)
bac_df = ohlcv_df["BAC"]
basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY)
vbt.settings['plotting']['auto_rangebreaks'] = True
basic_data.ohlcv.plot()
data.ohlcv.data[symbol[0]].lw.plot()
vbt_data.ohlcv.data[symbol[0]].lw.plot()
vbt_data.data[symbol[0]]
```
### cache
There are 2 caches created
- trade cache - daily files per symbol with all trades
- agg cache - aggregated output keyed by aggtype, resolution, conditions and ranges
### keys
Required Alpaca API keys in env variables or .env files.
```python
ACCOUNT1_LIVE_API_KEY=api_key
ACCOUNT1_LIVE_SECRET_KEY=secret_key
```
## prepare trade cache
To prepare daily trade cache files for given period.
@ -93,6 +100,28 @@ python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 20
```
## remote loaders
Remote bars of given resolutions from Alpaca.
Available resolutions Minute, Hours, Day. It s not possible to limit included trades.
Use only when no precision required.
```python
from ttools.external_loaders import load_history_bars
from ttools.config import zoneNY
from datetime import datetime, time
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
symbol = "AAPL"
start_date = zoneNY.localize(datetime(2023, 2, 27, 18, 51, 38))
end_date = zoneNY.localize(datetime(2023, 4, 27, 21, 51, 39))
timeframe = TimeFrame(amount=1,unit=TimeFrameUnit.Minute)
df = load_history_bars(symbol, start_date, end_date, timeframe, main_session_only=True)
df.loc[('AAPL',)]
```
# vbtutils
Contains helpers for vbtpro
@ -132,6 +161,8 @@ exits.tail(20)
```
## display plotly figs in single ntb cells
To display various standalone figures in the same cell.
`figs2cell(figlist)`
Example usage:

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='ttools',
version='0.6.1',
version='0.7.99',
packages=find_packages(),
install_requires=[
# list your dependencies here

View File

@ -0,0 +1,178 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Exploring alternative cache storage using duckdb and parquet\n",
"\n",
"https://claude.ai/chat/e49491f7-8b18-4fb7-b301-5c9997746079\n"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n",
"Start loading data... 1730370862.4833238\n"
]
},
{
"data": {
"application/vnd.jupyter.widget-view+json": {
"model_id": "829f7f3d58a74f1fbfdcfc202c2aaf84",
"version_major": 2,
"version_minor": 0
},
"text/plain": [
"FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))"
]
},
"metadata": {},
"output_type": "display_data"
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"fetched parquet -11.310973167419434\n",
"Loaded 1836460 rows\n"
]
}
],
"source": [
"from ttools.tradecache import TradeCache\n",
"from ttools.utils import zoneNY\n",
"from pathlib import Path\n",
"from datetime import datetime\n",
"import logging\n",
"import duckdb\n",
"\n",
"logging.basicConfig(\n",
" level=logging.INFO, # Set the minimum level (DEBUG, INFO, WARNING, ERROR, CRITICAL)\n",
" format='%(levelname)s: %(message)s' # Simple format showing level and message\n",
")\n",
"\n",
"cache = TradeCache(\n",
" base_path=Path(\"./trade_cache\"),\n",
" max_workers=4, # Adjust based on your CPU\n",
" cleanup_after_days=7\n",
")\n",
"\n",
"# Load data\n",
"df = cache.load_range(\n",
" symbol=\"BAC\",\n",
" start_date=zoneNY.localize(datetime(2024, 10, 14, 9, 30)),\n",
" end_date=zoneNY.localize(datetime(2024, 10, 20, 16, 0)),\n",
" #columns=['open', 'high', 'low', 'close', 'volume']\n",
")\n",
"\n",
"print(f\"Loaded {len(df)} rows\")"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"DuckDB Schema:\n",
" column_name column_type null key default extra\n",
"0 x VARCHAR YES None None None\n",
"1 p DOUBLE YES None None None\n",
"2 s BIGINT YES None None None\n",
"3 i BIGINT YES None None None\n",
"4 c VARCHAR[] YES None None None\n",
"5 z VARCHAR YES None None None\n",
"6 t TIMESTAMP WITH TIME ZONE YES None None None\n",
"\n",
"Sample Data:\n",
" x p s i c z \\\n",
"0 T 41.870 27 62879146994030 [ , F, T, I] A \n",
"1 D 41.965 1 71675241580848 [ , I] A \n",
"2 D 41.965 1 71675241644625 [ , I] A \n",
"3 D 41.850 1 71675241772360 [ , I] A \n",
"4 N 41.960 416188 52983525028174 [ , O] A \n",
"\n",
" t \n",
"0 2024-10-14 15:30:00.006480+02:00 \n",
"1 2024-10-14 15:30:00.395802+02:00 \n",
"2 2024-10-14 15:30:00.484008+02:00 \n",
"3 2024-10-14 15:30:00.610005+02:00 \n",
"4 2024-10-14 15:30:01.041599+02:00 \n",
"\n",
"Pandas Info:\n"
]
},
{
"ename": "NameError",
"evalue": "name 'pd' is not defined",
"output_type": "error",
"traceback": [
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
"Cell \u001b[0;32mIn[4], line 25\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n\u001b[1;32m 24\u001b[0m \u001b[38;5;66;03m# Let's check the schema first\u001b[39;00m\n\u001b[0;32m---> 25\u001b[0m \u001b[43mcheck_parquet_schema\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
"Cell \u001b[0;32mIn[4], line 21\u001b[0m, in \u001b[0;36mcheck_parquet_schema\u001b[0;34m()\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[38;5;66;03m# Method 3: Using pandas\u001b[39;00m\n\u001b[1;32m 20\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124mPandas Info:\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m---> 21\u001b[0m df \u001b[38;5;241m=\u001b[39m \u001b[43mpd\u001b[49m\u001b[38;5;241m.\u001b[39mread_parquet(sample_file)\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n",
"\u001b[0;31mNameError\u001b[0m: name 'pd' is not defined"
]
}
],
"source": [
"import duckdb\n",
"\n",
"def check_parquet_schema():\n",
" # Read one file and print its structure\n",
" sample_file = Path(\"./trade_cache\")/\"temp/BAC_20241014.parquet\"\n",
" \n",
" # Method 1: Using DuckDB describe\n",
" print(\"DuckDB Schema:\")\n",
" print(duckdb.sql(f\"DESCRIBE SELECT * FROM read_parquet('{sample_file}')\").df())\n",
" \n",
" # Method 2: Just look at the data\n",
" print(\"\\nSample Data:\")\n",
" print(duckdb.sql(f\"\"\"\n",
" SELECT *\n",
" FROM read_parquet('{sample_file}')\n",
" LIMIT 5\n",
" \"\"\").df())\n",
" \n",
" # Method 3: Using pandas\n",
" print(\"\\nPandas Info:\")\n",
" df = pd.read_parquet(sample_file)\n",
" print(df.info())\n",
"\n",
"# Let's check the schema first\n",
"check_parquet_schema()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,324 @@
#this goes to the main direcotry
from pathlib import Path
from datetime import datetime, date, timedelta
from typing import Optional, List, Set, Dict, Tuple
import pandas as pd
import duckdb
import pandas_market_calendars as mcal
from abc import ABC, abstractmethod
import logging
from ttools.utils import zoneNY
from concurrent.futures import ThreadPoolExecutor
from ttools.loaders import fetch_daily_stock_trades
import time
logger = logging.getLogger(__name__)
class TradeCache:
def __init__(
self,
base_path: Path,
market: str = 'NYSE',
max_workers: int = 4,
cleanup_after_days: int = 7
):
"""
Initialize TradeCache with monthly partitions and temp storage
Args:
base_path: Base directory for cache
market: Market calendar to use
max_workers: Max parallel fetches
cleanup_after_days: Days after which to clean temp files
"""
"""Initialize TradeCache with the same parameters but optimized for the new schema"""
self.base_path = Path(base_path)
self.temp_path = self.base_path / "temp"
self.base_path.mkdir(parents=True, exist_ok=True)
self.temp_path.mkdir(parents=True, exist_ok=True)
self.calendar = mcal.get_calendar(market)
self.max_workers = max_workers
self.cleanup_after_days = cleanup_after_days
# Initialize DuckDB with schema-specific optimizations
self.con = duckdb.connect()
self.con.execute("SET memory_limit='16GB'")
self.con.execute("SET threads TO 8")
# Create the schema for our tables
self.schema = """
x VARCHAR,
p DOUBLE,
s BIGINT,
i BIGINT,
c VARCHAR[],
z VARCHAR,
t TIMESTAMP WITH TIME ZONE
"""
self._trading_days_cache: Dict[Tuple[date, date], List[date]] = {}
def get_partition_path(self, symbol: str, year: int, month: int) -> Path:
"""Get path for a specific partition"""
return self.base_path / f"symbol={symbol}/year={year}/month={month}"
def get_temp_path(self, symbol: str, day: date) -> Path:
"""Get temporary file path for a day"""
return self.temp_path / f"{symbol}_{day:%Y%m%d}.parquet"
def get_trading_days(self, start_date: datetime, end_date: datetime) -> List[date]:
"""Get trading days with caching"""
key = (start_date.date(), end_date.date())
if key not in self._trading_days_cache:
schedule = self.calendar.schedule(start_date=start_date, end_date=end_date)
self._trading_days_cache[key] = [d.date() for d in schedule.index]
return self._trading_days_cache[key]
def cleanup_temp_files(self):
"""Clean up old temp files"""
cutoff = datetime.now() - timedelta(days=self.cleanup_after_days)
for file in self.temp_path.glob("*.parquet"):
try:
# Extract date from filename
date_str = file.stem.split('_')[1]
file_date = datetime.strptime(date_str, '%Y%m%d')
if file_date < cutoff:
file.unlink()
except Exception as e:
logger.warning(f"Error cleaning up {file}: {e}")
def consolidate_month(self, symbol: str, year: int, month: int) -> bool:
"""
Consolidate daily files into monthly partition only if we have complete month
Returns True if consolidation was successful
"""
# Get all temp files for this symbol and month
temp_files = list(self.temp_path.glob(f"{symbol}_{year:04d}{month:02d}*.parquet"))
if not temp_files:
return False
try:
# Get expected trading days for this month
start_date = zoneNY.localize(datetime(year, month, 1))
if month == 12:
end_date = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
else:
end_date = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
trading_days = self.get_trading_days(start_date, end_date)
# Check if we have data for all trading days
temp_dates = set(datetime.strptime(f.stem.split('_')[1], '%Y%m%d').date()
for f in temp_files)
missing_days = set(trading_days) - temp_dates
# Only consolidate if we have all trading days
if missing_days:
logger.info(f"Skipping consolidation for {symbol} {year}-{month}: "
f"missing {len(missing_days)} trading days")
return False
# Proceed with consolidation since we have complete month
partition_path = self.get_partition_path(symbol, year, month)
partition_path.mkdir(parents=True, exist_ok=True)
file_path = partition_path / "data.parquet"
files_str = ', '.join(f"'{f}'" for f in temp_files)
# Modified query to handle the new schema
self.con.execute(f"""
COPY (
SELECT x, p, s, i, c, z, t
FROM read_parquet([{files_str}])
ORDER BY t
)
TO '{file_path}'
(FORMAT PARQUET, COMPRESSION 'ZSTD')
""")
# Remove temp files only after successful write
for f in temp_files:
f.unlink()
logger.info(f"Successfully consolidated {symbol} {year}-{month} "
f"({len(temp_files)} files)")
return True
except Exception as e:
logger.error(f"Error consolidating {symbol} {year}-{month}: {e}")
return False
def fetch_remote_day(self, symbol: str, day: date) -> pd.DataFrame:
"""Implement this to fetch single day of data"""
min_datetime = zoneNY.localize(datetime.combine(day, datetime.min.time()))
max_datetime = zoneNY.localize(datetime.combine(day, datetime.max.time()))
return fetch_daily_stock_trades(symbol, min_datetime, max_datetime)
def _fetch_and_save_day(self, symbol: str, day: date) -> Optional[Path]:
"""Fetch and save a single day, returns file path if successful"""
try:
df_day = self.fetch_remote_day(symbol, day)
if df_day.empty:
return None
temp_file = self.get_temp_path(symbol, day)
df_day.to_parquet(temp_file, compression='ZSTD')
return temp_file
except Exception as e:
logger.error(f"Error fetching {symbol} for {day}: {e}")
return None
def load_range(
self,
symbol: str,
start_date: datetime,
end_date: datetime,
columns: Optional[List[str]] = None,
consolidate: bool = False
) -> pd.DataFrame:
"""Load data for date range, consolidating when complete months are detected"""
#self.cleanup_temp_files()
trading_days = self.get_trading_days(start_date, end_date)
# Modify column selection for new schema
col_str = '*' if not columns else ', '.join(columns)
if consolidate:
# First check temp files for complete months
temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet"))
if temp_files:
# Group temp files by month
monthly_temps: Dict[Tuple[int, int], Set[date]] = {}
for file in temp_files:
try:
# Extract date from filename
date_str = file.stem.split('_')[1]
file_date = datetime.strptime(date_str, '%Y%m%d').date()
key = (file_date.year, file_date.month)
if key not in monthly_temps:
monthly_temps[key] = set()
monthly_temps[key].add(file_date)
except Exception as e:
logger.warning(f"Error parsing temp file date {file}: {e}")
continue
# Check each month for completeness and consolidate if complete
for (year, month), dates in monthly_temps.items():
# Get trading days for this month
month_start = zoneNY.localize(datetime(year, month, 1))
if month == 12:
month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
else:
month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
month_trading_days = set(self.get_trading_days(month_start, month_end))
# If we have all trading days for the month, consolidate
if month_trading_days.issubset(dates):
logger.info(f"Found complete month in temp files for {symbol} {year}-{month}")
self.consolidate_month(symbol, year, month)
#timing the load
time_start = time.time()
print("Start loading data...", time_start)
# Now load data from both consolidated and temp files
query = f"""
WITH monthly_data AS (
SELECT {col_str}
FROM read_parquet(
'{self.base_path}/*/*.parquet',
hive_partitioning=1,
union_by_name=true
)
WHERE t BETWEEN '{start_date}' AND '{end_date}'
),
temp_data AS (
SELECT {col_str}
FROM read_parquet(
'{self.temp_path}/{symbol}_*.parquet',
union_by_name=true
)
WHERE t BETWEEN '{start_date}' AND '{end_date}'
)
SELECT * FROM (
SELECT * FROM monthly_data
UNION ALL
SELECT * FROM temp_data
)
ORDER BY t
"""
try:
df_cached = self.con.execute(query).df()
except Exception as e:
logger.warning(f"Error reading cached data: {e}")
df_cached = pd.DataFrame()
print("fetched parquet", time_start - time.time())
if not df_cached.empty:
cached_days = set(df_cached['t'].dt.date)
missing_days = [d for d in trading_days if d not in cached_days]
else:
missing_days = trading_days
# Fetch missing days in parallel
if missing_days:
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_day = {
executor.submit(self._fetch_and_save_day, symbol, day): day
for day in missing_days
}
for future in future_to_day:
day = future_to_day[future]
try:
temp_file = future.result()
if temp_file:
logger.debug(f"Successfully fetched {symbol} for {day}")
except Exception as e:
logger.error(f"Error processing {symbol} for {day}: {e}")
# Check again for complete months after fetching new data
temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet"))
if temp_files:
monthly_temps = {}
for file in temp_files:
try:
date_str = file.stem.split('_')[1]
file_date = datetime.strptime(date_str, '%Y%m%d').date()
key = (file_date.year, file_date.month)
if key not in monthly_temps:
monthly_temps[key] = set()
monthly_temps[key].add(file_date)
except Exception as e:
logger.warning(f"Error parsing temp file date {file}: {e}")
continue
# Check for complete months again
for (year, month), dates in monthly_temps.items():
month_start = zoneNY.localize(datetime(year, month, 1))
if month == 12:
month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
else:
month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
month_trading_days = set(self.get_trading_days(month_start, month_end))
if month_trading_days.issubset(dates):
logger.info(f"Found complete month after fetching for {symbol} {year}-{month}")
self.consolidate_month(symbol, year, month)
# Load final data including any new fetches
try:
df_cached = self.con.execute(query).df()
except Exception as e:
logger.warning(f"Error reading final data: {e}")
df_cached = pd.DataFrame()
return df_cached.sort_values('t')

460
tests/alpaca_loader.ipynb Normal file
View File

@ -0,0 +1,460 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n"
]
}
],
"source": [
"from ttools.external_loaders import load_history_bars\n",
"from ttools.config import zoneNY\n",
"from datetime import datetime, time\n",
"from alpaca.data.timeframe import TimeFrame, TimeFrameUnit\n",
"\n",
"symbol = \"AAPL\"\n",
"start_date = zoneNY.localize(datetime(2023, 2, 27, 18, 51, 38))\n",
"end_date = zoneNY.localize(datetime(2023, 4, 27, 21, 51, 39))\n",
"timeframe = TimeFrame(amount=1,unit=TimeFrameUnit.Minute)\n",
"\n",
"df = load_history_bars(symbol, start_date, end_date, timeframe, True)\n",
"df.loc[('AAPL',)]"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th>open</th>\n",
" <th>high</th>\n",
" <th>low</th>\n",
" <th>close</th>\n",
" <th>volume</th>\n",
" <th>trade_count</th>\n",
" <th>vwap</th>\n",
" </tr>\n",
" <tr>\n",
" <th>timestamp</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th>2023-02-28 09:30:00-05:00</th>\n",
" <td>147.050</td>\n",
" <td>147.380</td>\n",
" <td>146.830</td>\n",
" <td>147.2700</td>\n",
" <td>1554100.0</td>\n",
" <td>6447.0</td>\n",
" <td>146.914560</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-28 09:31:00-05:00</th>\n",
" <td>147.250</td>\n",
" <td>147.320</td>\n",
" <td>147.180</td>\n",
" <td>147.2942</td>\n",
" <td>159387.0</td>\n",
" <td>6855.0</td>\n",
" <td>147.252171</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-28 09:32:00-05:00</th>\n",
" <td>147.305</td>\n",
" <td>147.330</td>\n",
" <td>147.090</td>\n",
" <td>147.1600</td>\n",
" <td>214536.0</td>\n",
" <td>7435.0</td>\n",
" <td>147.210128</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-28 09:33:00-05:00</th>\n",
" <td>147.140</td>\n",
" <td>147.230</td>\n",
" <td>147.090</td>\n",
" <td>147.1500</td>\n",
" <td>171487.0</td>\n",
" <td>7235.0</td>\n",
" <td>147.154832</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-28 09:34:00-05:00</th>\n",
" <td>147.160</td>\n",
" <td>147.160</td>\n",
" <td>146.880</td>\n",
" <td>146.9850</td>\n",
" <td>235915.0</td>\n",
" <td>4965.0</td>\n",
" <td>147.001762</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 15:26:00-04:00</th>\n",
" <td>168.400</td>\n",
" <td>168.415</td>\n",
" <td>168.340</td>\n",
" <td>168.3601</td>\n",
" <td>163973.0</td>\n",
" <td>1398.0</td>\n",
" <td>168.368809</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 15:27:00-04:00</th>\n",
" <td>168.360</td>\n",
" <td>168.400</td>\n",
" <td>168.330</td>\n",
" <td>168.3800</td>\n",
" <td>130968.0</td>\n",
" <td>1420.0</td>\n",
" <td>168.364799</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 15:28:00-04:00</th>\n",
" <td>168.380</td>\n",
" <td>168.430</td>\n",
" <td>168.320</td>\n",
" <td>168.3285</td>\n",
" <td>152193.0</td>\n",
" <td>1361.0</td>\n",
" <td>168.372671</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 15:29:00-04:00</th>\n",
" <td>168.325</td>\n",
" <td>168.330</td>\n",
" <td>168.260</td>\n",
" <td>168.2850</td>\n",
" <td>208426.0</td>\n",
" <td>1736.0</td>\n",
" <td>168.297379</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 15:30:00-04:00</th>\n",
" <td>168.280</td>\n",
" <td>168.350</td>\n",
" <td>168.255</td>\n",
" <td>168.3450</td>\n",
" <td>218077.0</td>\n",
" <td>1694.0</td>\n",
" <td>168.308873</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>15162 rows × 7 columns</p>\n",
"</div>"
],
"text/plain": [
" open high low close volume \\\n",
"timestamp \n",
"2023-02-28 09:30:00-05:00 147.050 147.380 146.830 147.2700 1554100.0 \n",
"2023-02-28 09:31:00-05:00 147.250 147.320 147.180 147.2942 159387.0 \n",
"2023-02-28 09:32:00-05:00 147.305 147.330 147.090 147.1600 214536.0 \n",
"2023-02-28 09:33:00-05:00 147.140 147.230 147.090 147.1500 171487.0 \n",
"2023-02-28 09:34:00-05:00 147.160 147.160 146.880 146.9850 235915.0 \n",
"... ... ... ... ... ... \n",
"2023-04-27 15:26:00-04:00 168.400 168.415 168.340 168.3601 163973.0 \n",
"2023-04-27 15:27:00-04:00 168.360 168.400 168.330 168.3800 130968.0 \n",
"2023-04-27 15:28:00-04:00 168.380 168.430 168.320 168.3285 152193.0 \n",
"2023-04-27 15:29:00-04:00 168.325 168.330 168.260 168.2850 208426.0 \n",
"2023-04-27 15:30:00-04:00 168.280 168.350 168.255 168.3450 218077.0 \n",
"\n",
" trade_count vwap \n",
"timestamp \n",
"2023-02-28 09:30:00-05:00 6447.0 146.914560 \n",
"2023-02-28 09:31:00-05:00 6855.0 147.252171 \n",
"2023-02-28 09:32:00-05:00 7435.0 147.210128 \n",
"2023-02-28 09:33:00-05:00 7235.0 147.154832 \n",
"2023-02-28 09:34:00-05:00 4965.0 147.001762 \n",
"... ... ... \n",
"2023-04-27 15:26:00-04:00 1398.0 168.368809 \n",
"2023-04-27 15:27:00-04:00 1420.0 168.364799 \n",
"2023-04-27 15:28:00-04:00 1361.0 168.372671 \n",
"2023-04-27 15:29:00-04:00 1736.0 168.297379 \n",
"2023-04-27 15:30:00-04:00 1694.0 168.308873 \n",
"\n",
"[15162 rows x 7 columns]"
]
},
"execution_count": 5,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df.loc[('AAPL',)]"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [
{
"data": {
"text/html": [
"<div>\n",
"<style scoped>\n",
" .dataframe tbody tr th:only-of-type {\n",
" vertical-align: middle;\n",
" }\n",
"\n",
" .dataframe tbody tr th {\n",
" vertical-align: top;\n",
" }\n",
"\n",
" .dataframe thead th {\n",
" text-align: right;\n",
" }\n",
"</style>\n",
"<table border=\"1\" class=\"dataframe\">\n",
" <thead>\n",
" <tr style=\"text-align: right;\">\n",
" <th></th>\n",
" <th></th>\n",
" <th>open</th>\n",
" <th>high</th>\n",
" <th>low</th>\n",
" <th>close</th>\n",
" <th>volume</th>\n",
" <th>trade_count</th>\n",
" <th>vwap</th>\n",
" </tr>\n",
" <tr>\n",
" <th>symbol</th>\n",
" <th>timestamp</th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" <th></th>\n",
" </tr>\n",
" </thead>\n",
" <tbody>\n",
" <tr>\n",
" <th rowspan=\"11\" valign=\"top\">AAPL</th>\n",
" <th>2023-02-27 18:52:00-05:00</th>\n",
" <td>148.0200</td>\n",
" <td>148.02</td>\n",
" <td>148.0200</td>\n",
" <td>148.02</td>\n",
" <td>112.0</td>\n",
" <td>7.0</td>\n",
" <td>148.020000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-27 18:56:00-05:00</th>\n",
" <td>148.0200</td>\n",
" <td>148.02</td>\n",
" <td>148.0200</td>\n",
" <td>148.02</td>\n",
" <td>175.0</td>\n",
" <td>10.0</td>\n",
" <td>148.020000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-27 19:00:00-05:00</th>\n",
" <td>148.0299</td>\n",
" <td>148.03</td>\n",
" <td>148.0299</td>\n",
" <td>148.03</td>\n",
" <td>1957.0</td>\n",
" <td>10.0</td>\n",
" <td>148.029993</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-27 19:06:00-05:00</th>\n",
" <td>148.0600</td>\n",
" <td>148.06</td>\n",
" <td>148.0600</td>\n",
" <td>148.06</td>\n",
" <td>122.0</td>\n",
" <td>7.0</td>\n",
" <td>148.060000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-02-27 19:09:00-05:00</th>\n",
" <td>148.0500</td>\n",
" <td>148.10</td>\n",
" <td>148.0500</td>\n",
" <td>148.10</td>\n",
" <td>1604.0</td>\n",
" <td>33.0</td>\n",
" <td>148.075109</td>\n",
" </tr>\n",
" <tr>\n",
" <th>...</th>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" <td>...</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 19:54:00-04:00</th>\n",
" <td>167.8000</td>\n",
" <td>167.80</td>\n",
" <td>167.8000</td>\n",
" <td>167.80</td>\n",
" <td>534.0</td>\n",
" <td>15.0</td>\n",
" <td>167.800000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 19:56:00-04:00</th>\n",
" <td>167.8800</td>\n",
" <td>167.88</td>\n",
" <td>167.8800</td>\n",
" <td>167.88</td>\n",
" <td>1386.0</td>\n",
" <td>28.0</td>\n",
" <td>167.880000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 19:57:00-04:00</th>\n",
" <td>167.8000</td>\n",
" <td>167.80</td>\n",
" <td>167.8000</td>\n",
" <td>167.80</td>\n",
" <td>912.0</td>\n",
" <td>60.0</td>\n",
" <td>167.800000</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 19:58:00-04:00</th>\n",
" <td>167.8000</td>\n",
" <td>167.88</td>\n",
" <td>167.8000</td>\n",
" <td>167.88</td>\n",
" <td>3311.0</td>\n",
" <td>22.0</td>\n",
" <td>167.877333</td>\n",
" </tr>\n",
" <tr>\n",
" <th>2023-04-27 19:59:00-04:00</th>\n",
" <td>167.9000</td>\n",
" <td>167.94</td>\n",
" <td>167.9000</td>\n",
" <td>167.94</td>\n",
" <td>1969.0</td>\n",
" <td>64.0</td>\n",
" <td>167.918150</td>\n",
" </tr>\n",
" </tbody>\n",
"</table>\n",
"<p>31217 rows × 7 columns</p>\n",
"</div>"
],
"text/plain": [
" open high low close volume \\\n",
"symbol timestamp \n",
"AAPL 2023-02-27 18:52:00-05:00 148.0200 148.02 148.0200 148.02 112.0 \n",
" 2023-02-27 18:56:00-05:00 148.0200 148.02 148.0200 148.02 175.0 \n",
" 2023-02-27 19:00:00-05:00 148.0299 148.03 148.0299 148.03 1957.0 \n",
" 2023-02-27 19:06:00-05:00 148.0600 148.06 148.0600 148.06 122.0 \n",
" 2023-02-27 19:09:00-05:00 148.0500 148.10 148.0500 148.10 1604.0 \n",
"... ... ... ... ... ... \n",
" 2023-04-27 19:54:00-04:00 167.8000 167.80 167.8000 167.80 534.0 \n",
" 2023-04-27 19:56:00-04:00 167.8800 167.88 167.8800 167.88 1386.0 \n",
" 2023-04-27 19:57:00-04:00 167.8000 167.80 167.8000 167.80 912.0 \n",
" 2023-04-27 19:58:00-04:00 167.8000 167.88 167.8000 167.88 3311.0 \n",
" 2023-04-27 19:59:00-04:00 167.9000 167.94 167.9000 167.94 1969.0 \n",
"\n",
" trade_count vwap \n",
"symbol timestamp \n",
"AAPL 2023-02-27 18:52:00-05:00 7.0 148.020000 \n",
" 2023-02-27 18:56:00-05:00 10.0 148.020000 \n",
" 2023-02-27 19:00:00-05:00 10.0 148.029993 \n",
" 2023-02-27 19:06:00-05:00 7.0 148.060000 \n",
" 2023-02-27 19:09:00-05:00 33.0 148.075109 \n",
"... ... ... \n",
" 2023-04-27 19:54:00-04:00 15.0 167.800000 \n",
" 2023-04-27 19:56:00-04:00 28.0 167.880000 \n",
" 2023-04-27 19:57:00-04:00 60.0 167.800000 \n",
" 2023-04-27 19:58:00-04:00 22.0 167.877333 \n",
" 2023-04-27 19:59:00-04:00 64.0 167.918150 \n",
"\n",
"[31217 rows x 7 columns]"
]
},
"execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"df"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because one or more lines are too long

View File

@ -1,69 +0,0 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"The autoreload extension is already loaded. To reload it, use:\n",
" %reload_ext autoreload\n"
]
},
{
"data": {
"text/plain": [
"['CUVWAP', 'DIVRELN']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"%load_ext autoreload\n",
"%autoreload 2\n",
"import vectorbtpro as vbt\n",
"from ttools.vbtindicators import register_custom_inds\n",
"from ttools.indicators import CUVWAP\n",
"\n",
"\n",
"register_custom_inds(None, \"override\")\n",
"#chopiness = vbt.indicator(\"technical:CHOPINESS\").run(s12_data.open, s12_data.high, s12_data.low, s12_data.close, s12_data.volume, window = 100)\n",
"#vwap_cum_roll = vbt.indicator(\"technical:ROLLING_VWAP\").run(s12_data.open, s12_data.high, s12_data.low, s12_data.close, s12_data.volume, window = 100, min_periods = 5)\n",
"#vwap_cum_d = vbt.indicator(\"ttools:CUVWAP\").run(s12_data.high, s12_data.low, s12_data.close, s12_data.volume, anchor=\"D\", drag=50)\n",
"#vwap_lin_angle = vbt.indicator(\"talib:LINEARREG_ANGLE\").run(vwap_cum_d.vwap, timeperiod=2)\n",
"\n",
"vbt.IF.list_indicators(\"ttools\")\n",
"\n",
"\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -1,4 +1,5 @@
from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell
from .vbtindicators import register_custom_inds
from .utils import find_dotenv, AggType, zoneNY, zonePRG, zoneUTC
from .loaders import load_data, prepare_trade_cache
from .utils import AggType, zoneNY, zonePRG, zoneUTC
from .loaders import load_data, prepare_trade_cache
from .external_loaders import load_history_bars

View File

@ -10,8 +10,80 @@ Includes fetch (remote/cached) methods and numba aggregator function for TIME BA
"""""
def aggregate_trades_optimized(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV, clear_input: bool = False):
"""
Optimized version of trade aggregation function with reduced memory footprint.
"""
# 1. Get timestamps from index if 't' is not in columns
if 't' not in trades_df.columns:
timestamps = trades_df.index.values
else:
timestamps = trades_df['t'].values
# 2. Select only needed columns for prices and sizes
prices = trades_df['p'].values
sizes = trades_df['s'].values
#Clears input to freeup memory
if clear_input:
del trades_df
# 3. Convert timestamps maintaining exact precision
# Convert directly to int64 nanoseconds, then to float seconds - there was a problem
#unix_timestamps_s = timestamps.view('int64').astype(np.float64) / 1e6
#original not optimized, in case of issues (5x slower)
unix_timestamps_s = timestamps.astype('datetime64[ns]').astype(np.float64) / 1e9
# 4. Create ticks array efficiently
# 3. Pre-allocate array for better memory efficiency
ticks = np.empty((len(timestamps), 3), dtype=np.float64)
ticks[:, 0] = unix_timestamps_s
ticks[:, 1] = prices
ticks[:, 2] = sizes
# 5. Clear memory of intermediate objects
del timestamps, prices, sizes, unix_timestamps_s
# 6. Process based on type using existing pattern
try:
match type:
case AggType.OHLCV:
ohlcv_bars = generate_time_bars_nb(ticks, resolution)
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades',
'updated', 'vwap', 'buyvolume', 'sellvolume', 'buytrades', 'selltrades']
case AggType.OHLCV_VOL:
ohlcv_bars = generate_volume_bars_nb(ticks, resolution)
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades',
'updated', 'buyvolume', 'sellvolume', 'buytrades', 'selltrades']
case AggType.OHLCV_DOL:
ohlcv_bars = generate_dollar_bars_nb(ticks, resolution)
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades',
'amount', 'updated']
case _:
raise ValueError("Invalid AggType type. Supported types are 'time', 'volume' and 'dollar'.")
finally:
# 7. Clear large numpy array as soon as possible
del ticks
# 8. Create DataFrame and handle timestamps - keeping original working approach
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
del ohlcv_bars
# 9. Use the original timestamp handling that we know works
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
ohlcv_df['updated'] = pd.to_datetime(ohlcv_df['updated'], unit="s").dt.tz_localize('UTC').dt.tz_convert(zoneNY)
# 10. Round microseconds as in original
ohlcv_df['updated'] = ohlcv_df['updated'].dt.round('us')
# 11. Set index last, as in original
ohlcv_df.set_index('time', inplace=True)
return ohlcv_df
def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV):
""""
Original replaced by optimized version
Accepts dataframe with trades keyed by symbol. Preparess dataframe to
numpy and calls Numba optimized aggregator for given bar type. (time/volume/dollar)
"""""
@ -44,9 +116,13 @@ def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type
columns.append('vwap')
columns.append('buyvolume')
columns.append('sellvolume')
columns.append('buytrades')
columns.append('selltrades')
if type == AggType.OHLCV_VOL:
columns.append('buyvolume')
columns.append('sellvolume')
columns.append('buytrades')
columns.append('selltrades')
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
#print(ohlcv_df['updated'])
@ -174,22 +250,24 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
close_price = ticks[0, 1]
volume = 0
trades_count = 0
trades_buy_count = 0
trades_sell_count = 0
current_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
bar_time = ticks[0, 0] # Initialize bar time with the time of the first tick
buy_volume = 0 # Volume of buy trades
sell_volume = 0 # Volume of sell trades
prev_price = ticks[0, 1] # Initialize previous price for the first tick
last_tick_up = None
for tick in ticks:
tick_time = tick[0]
price = tick[1]
tick_volume = tick[2]
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
splitted = False
# Check if the new tick is from a different day, then close the current bar
if tick_day != current_day:
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
# Reset for the new day using the current tick data
open_price = price
high_price = price
@ -197,6 +275,8 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
close_price = price
volume = 0
trades_count = 0
trades_buy_count = 0
trades_sell_count = 0
remaining_volume = volume_per_bar
current_day = tick_day
bar_time = tick_time # Update bar time to the current tick time
@ -219,8 +299,21 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
# Update buy and sell volumes
if price > prev_price:
buy_volume += tick_volume
trades_buy_count += 1
last_tick_up = True
elif price < prev_price:
sell_volume += tick_volume
trades_sell_count += 1
last_tick_up = False
else: #same price, use last direction
if last_tick_up is None:
pass
elif last_tick_up:
buy_volume += tick_volume
trades_buy_count += 1
else:
sell_volume += tick_volume
trades_sell_count += 1
tick_volume = 0
else:
@ -233,11 +326,24 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
# Update buy and sell volumes
if price > prev_price:
buy_volume += volume_to_add
trades_buy_count += 1
last_tick_up = True
elif price < prev_price:
sell_volume += volume_to_add
trades_sell_count += 1
last_tick_up = False
else: #same price, use last direction
if last_tick_up is None:
pass
elif last_tick_up:
buy_volume += volume_to_add
trades_buy_count += 1
else:
sell_volume += volume_to_add
trades_sell_count += 1
# Append the completed bar to the list
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
# Reset bar values for the new bar using the current tick data
open_price = price
@ -246,21 +352,26 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
close_price = price
volume = 0
trades_count = 0
trades_buy_count = 0
trades_sell_count = 0
remaining_volume = volume_per_bar
buy_volume = 0
sell_volume = 0
# Increment bar time if splitting a trade
if tick_volume > 0: # If there's remaining volume in the trade, set bar time slightly later
bar_time = tick_time + 1e-6
#if the same trade opened the bar (we are splitting trade to more bars)
#first splitted identified by time, next by flag
if bar_time == tick_time or splitted:
bar_time = bar_time + 1e-6
splitted = True
else:
bar_time = tick_time # Otherwise, set bar time to the tick time
bar_time = tick_time
splitted = False
prev_price = price
# Add the last bar if it contains any trades
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
return np.array(ohlcv_bars)
@ -284,13 +395,15 @@ def generate_time_bars_nb(ticks, resolution):
close_price = 0
volume = 0
trades_count = 0
trades_buy_count = 0
trades_sell_count = 0
vwap_cum_volume_price = 0 # Cumulative volume * price
cum_volume = 0 # Cumulative volume for VWAP
buy_volume = 0 # Volume of buy trades
sell_volume = 0 # Volume of sell trades
prev_price = ticks[0, 1] # Initialize previous price for the first tick
prev_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
last_tick_up = None
for tick in ticks:
curr_time = tick[0] #updated time
tick_time = np.floor(tick[0] / resolution) * resolution
@ -307,7 +420,7 @@ def generate_time_bars_nb(ticks, resolution):
if tick_time != start_time + current_bar_index * resolution:
if current_bar_index >= 0 and trades_count > 0: # Save the previous bar if trades happened
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
# Reset bar values
current_bar_index = int((tick_time - start_time) / resolution)
@ -316,6 +429,8 @@ def generate_time_bars_nb(ticks, resolution):
low_price = price
volume = 0
trades_count = 0
trades_buy_count = 0
trades_sell_count = 0
vwap_cum_volume_price = 0
cum_volume = 0
buy_volume = 0
@ -333,15 +448,28 @@ def generate_time_bars_nb(ticks, resolution):
# Update buy and sell volumes
if price > prev_price:
buy_volume += tick_volume
trades_buy_count += 1
last_tick_up = True
elif price < prev_price:
sell_volume += tick_volume
trades_sell_count += 1
last_tick_up = False
else: #same price, use last direction
if last_tick_up is None:
pass
elif last_tick_up:
buy_volume += tick_volume
trades_buy_count += 1
else:
sell_volume += tick_volume
trades_sell_count += 1
prev_price = price
# Save the last processed bar
if trades_count > 0:
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
return np.array(ohlcv_bars)

View File

@ -1,14 +1,35 @@
from dotenv import load_dotenv
from appdirs import user_data_dir
from ttools.utils import find_dotenv
import ttools.utils as utils
import os
import pytz
import vectorbtpro as vbt
import pytz
from pathlib import Path
from dotenv import load_dotenv
import os
def find_dotenv():
"""
Searches for a .env file in the given directory or its parents and returns the path.
Args:
start_path: The directory to start searching from.
Returns:
Path to the .env file if found, otherwise None.
"""
try:
start_path = __file__
except NameError:
#print("Notebook probably")
start_path = os.getcwd()
#print(start_path)
current_path = Path(start_path)
for _ in range(10): # Limit search depth to 5 levels
dotenv_path = current_path / '.env'
if dotenv_path.exists():
return dotenv_path
current_path = current_path.parent
return None
ENV_FILE = find_dotenv()

View File

@ -0,0 +1,55 @@
from ctypes import Union
from ttools import zoneUTC
from ttools.config import *
from datetime import datetime
from alpaca.data.historical import StockHistoricalDataClient
from ttools.config import ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY
from datetime import timedelta, datetime, time
from alpaca.data.enums import DataFeed
from typing import List, Union
import pandas as pd
from alpaca.data.historical import StockHistoricalDataClient
from alpaca.data.requests import StockBarsRequest
from alpaca.data.enums import DataFeed
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
def load_history_bars(symbol: Union[str, List[str]], datetime_object_from: datetime, datetime_object_to: datetime, timeframe: TimeFrame, main_session_only: bool = True):
"""Returns dataframe fetched remotely from Alpaca.
Args:
symbol: symbol or list of symbols
datetime_object_from: datetime in zoneNY
datetime_object_to: datetime in zoneNY
timeframe: timeframe
main_session_only: boolean to fetch only main session data
Returns:
dataframe
Example:
```python
from ttools.external_loaders import load_history_bars
from ttools.config import zoneNY
from datetime import datetime
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
symbol = "AAPL"
start_date = zoneNY.localize(datetime(2023, 2, 27, 18, 51, 38))
end_date = zoneNY.localize(datetime(2023, 4, 27, 21, 51, 39))
timeframe = TimeFrame(amount=1,unit=TimeFrameUnit.Minute)
df = load_history_bars(symbol, start_date, end_date, timeframe)
```
"""
client = StockHistoricalDataClient(ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY, raw_data=False)
#datetime_object_from = datetime(2023, 2, 27, 18, 51, 38, tzinfo=datetime.timezone.utc)
#datetime_object_to = datetime(2023, 2, 27, 21, 51, 39, tzinfo=datetime.timezone.utc)
bar_request = StockBarsRequest(symbol_or_symbols=symbol,timeframe=timeframe, start=datetime_object_from, end=datetime_object_to, feed=DataFeed.SIP)
#print("before df")
df = client.get_stock_bars(bar_request).df
df.index = df.index.set_levels(df.index.get_level_values(1).tz_convert(zoneNY), level=1)
if main_session_only:
start_time = time(9, 30, 0)
end_time = time(15, 30, 0)
df = df.loc[(df.index.get_level_values(1).time >= start_time) & (df.index.get_level_values(1).time <= end_time)]
return df

View File

@ -1,8 +1,6 @@
from ctypes import Union
from dotenv import load_dotenv
from appdirs import user_data_dir
from ttools.utils import find_dotenv
from ttools import zoneUTC
from ttools.config import *
from datetime import datetime
from alpaca.data.historical import StockHistoricalDataClient
@ -16,12 +14,18 @@ from time import time as timetime
from concurrent.futures import ThreadPoolExecutor
from alpaca.data.enums import DataFeed
import random
from ttools.utils import AggType, fetch_calendar_data, print, set_verbose
from ttools.utils import AggType, fetch_calendar_data, print, print_matching_files_info, set_verbose, list_matching_files
from tqdm import tqdm
import threading
from typing import List, Union
from ttools.aggregator_vectorized import aggregate_trades
from ttools.aggregator_vectorized import aggregate_trades, aggregate_trades_optimized
import numpy as np
import pandas as pd
import pyarrow.dataset as ds
import pandas as pd
from concurrent.futures import ThreadPoolExecutor
import math
import os
"""
Module for fetching stock data. Supports
1) cache management
@ -90,6 +94,8 @@ def convert_dict_to_multiindex_df(tradesResponse, rename_labels = True, keep_sym
final_df.reset_index(inplace=True) # Reset index to remove MultiIndex levels, making them columns
final_df.drop(columns=['symbol'], inplace=True) #remove symbol column
final_df.set_index(timestamp_col, inplace=True) #reindex by timestamp
#print index datetime resolution
#print(final_df.index.dtype)
return final_df
@ -109,6 +115,28 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No
Returns:
df: pd.DataFrame
"""
def fast_filter(df, exclude_conditions):
# Convert arrays to strings once
str_series = df['c'].apply(lambda x: ','.join(x))
# Create mask using vectorized string operations
mask = np.zeros(len(df), dtype=bool)
for cond in exclude_conditions:
mask |= str_series.str.contains(cond, regex=False)
# Apply filter
return df[~mask]
def vectorized_string_sets(df, exclude_conditions):
# Convert exclude_conditions to set for O(1) lookup
exclude_set = set(exclude_conditions)
# Vectorized operation using sets intersection
arrays = df['c'].values
mask = np.array([bool(set(arr) & exclude_set) for arr in arrays])
return df[~mask]
# 9:30 to 16:00
if main_session_only:
@ -123,30 +151,50 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No
#REQUIRED FILTERING
# Create a mask to filter rows within the specified time range
if start is not None and end is not None:
print(f"filtering {start.time()} {end.time()}")
print(f"Trimming {start} {end}")
if symbol_included:
mask = (df.index.get_level_values('t') >= start) & \
(df.index.get_level_values('t') <= end)
df = df[mask]
else:
mask = (df.index >= start) & (df.index <= end)
# Apply the mask to the DataFrame
df = df[mask]
df = df.loc[start:end]
if exclude_conditions is not None:
print(f"excluding {exclude_conditions}")
# Create a mask to exclude rows with any of the specified conditions
mask = df['c'].apply(lambda x: any(cond in exclude_conditions for cond in x))
# Filter out the rows with specified conditions
df = df[~mask]
df = vectorized_string_sets(df, exclude_conditions)
print("exclude done")
if minsize is not None:
print(f"minsize {minsize}")
#exclude conditions
df = df[df['s'] >= minsize]
print("minsize done")
return df
def calculate_optimal_workers(file_count, min_workers=4, max_workers=32):
"""
Calculate optimal number of workers based on file count and system resources
Rules of thumb:
- Minimum of 4 workers to ensure parallelization
- Maximum of 32 workers to avoid thread overhead
- For 100 files, aim for around 16-24 workers
- Scale with CPU count but don't exceed max_workers
"""
cpu_count = os.cpu_count() or 4
# Base calculation: 2-4x CPU count for I/O bound tasks
suggested_workers = cpu_count * 3
# Scale based on file count (1 worker per 4-6 files is a good ratio)
files_based_workers = math.ceil(file_count / 5)
# Take the smaller of the two suggestions
optimal_workers = min(suggested_workers, files_based_workers)
# Clamp between min and max workers
return max(min_workers, min(optimal_workers, max_workers))
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None):
#doc for this function
"""
@ -155,8 +203,8 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
by using force_remote - forcess using remote data always and thus refreshing cache for these dates
Attributes:
:param symbol: The stock symbol to fetch trades for.
:param start: The start time for the trade data.
:param end: The end time for the trade data.
:param start: The start time for the trade data, in market timezone.
:param end: The end time for the trade data, in market timezone.
:exclude_conditions: list of string conditions to exclude from the data
:minsize minimum size of trade to be included in the data
:no_return: If True, do not return the DataFrame. Used to prepare cached files.
@ -184,24 +232,34 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
#exists in cache?
daily_file = f"{symbol}-{str(start.date())}.parquet"
file_path = TRADE_CACHE / daily_file
if file_path.exists() and (not force_remote or not no_return):
if file_path.exists() and (not force_remote and not no_return):
with trade_cache_lock:
df = pd.read_parquet(file_path)
print("Loaded from CACHE", file_path)
df = filter_trade_df(df, start, end, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
return df
day_next = start.date() + timedelta(days=1)
#lets create borders of day in UTC as Alpaca has only UTC date
start_date = start.date()
# Create min/max times in NY timezone
ny_day_min = zoneNY.localize(datetime.combine(start_date, time.min))
ny_day_max = zoneNY.localize(datetime.combine(start_date, time.max))
# Convert both to UTC
utc_day_min = ny_day_min.astimezone(zoneUTC)
utc_day_max = ny_day_max.astimezone(zoneUTC)
print("Fetching from remote.")
client = StockHistoricalDataClient(ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY, raw_data=True)
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=start.date(), end=day_next, feed=data_feed)
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=utc_day_min, end=utc_day_max, feed=data_feed)
last_exception = None
for attempt in range(max_retries):
try:
tradesResponse = client.get_stock_trades(stockTradeRequest)
print(f"Remote fetched completed.", start.date(), day_next)
print(f"Remote fetched completed whole day", start.date())
print(f"Exact UTC range fetched: {utc_day_min} - {utc_day_max}")
if not tradesResponse[symbol]:
print(f"EMPTY")
return pd.DataFrame()
@ -209,7 +267,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
df = convert_dict_to_multiindex_df(tradesResponse, rename_labels=rename_labels, keep_symbols=keep_symbols)
#if today is market still open, dont cache - also dont cache for IEX feeed
if datetime.now().astimezone(zoneNY).date() < day_next or data_feed == DataFeed.IEX:
if datetime.now().astimezone(zoneNY).date() < start_date + timedelta(days=1) or data_feed == DataFeed.IEX:
print("not saving trade cache, market still open today or IEX datapoint")
#ic(datetime.now().astimezone(zoneNY))
#ic(day.open, day.close)
@ -230,7 +288,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
print("All attempts to fetch data failed.")
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None):
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = None, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None):
"""
Fetch trades between ranges.
@ -284,7 +342,12 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
#speed it up , locals first and then fetches
s_time = timetime()
with trade_cache_lock:
local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
file_paths = [f for _, f in days_from_cache]
dataset = ds.dataset(file_paths, format='parquet')
local_df = dataset.to_table().to_pandas()
del dataset
#original version
#local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
final_time = timetime() - s_time
print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds")
#the filter is required
@ -294,6 +357,7 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
#do this only for remotes
if len(days_from_remote) > 0:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures_with_date = []
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
for market_day in tqdm(days_from_remote, desc=f"{symbol} Remote fetching"):
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
@ -316,14 +380,19 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
end = min(end_date, max_day_time)
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote)
futures.append(future)
futures_with_date.append((future,start))
for future in tqdm(futures, desc=f"{symbol} Receiving trades"):
results_with_dates = []
for future, date in tqdm(futures_with_date, desc=f"{symbol} Receiving trades"):
try:
result = future.result()
results.append(result)
if result is not None:
results_with_dates.append((result,date))
except Exception as e:
print(f"Error fetching data for a day: {e}")
# Sort by date before concatenating
results_with_dates.sort(key=lambda x: x[1])
results = [r for r, _ in results_with_dates]
if not no_return:
# Batch concatenation to improve speed
@ -359,8 +428,8 @@ def load_data(symbol: Union[str, List[str]],
symbol (Union[str, list]): Symbol
agg_type (AggType): Type of aggregation
resolution (Union[str, int]) Resolution of aggregation nased on agg_type
start_date (datetime):
end_date (datetime):
start_date (datetime): Start period, timezone aware
end_date (datetime): Start period, timezone aware
exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None.
minsize (_type_, optional): Minimum trade size to include. Defaults to None.
main_session_only (bool, optional): Main or ext. hours.. Defaults to True.
@ -379,6 +448,12 @@ def load_data(symbol: Union[str, List[str]],
if verbose is not None:
set_verbose(verbose) # Change global verbose if specified
if start_date.tzinfo is None:
start_date = zoneNY.localize(start_date)
if end_date.tzinfo is None:
end_date = zoneNY.localize(end_date)
if exclude_conditions is None:
exclude_conditions = EXCLUDE_CONDITIONS
@ -387,14 +462,30 @@ def load_data(symbol: Union[str, List[str]],
excludes_str = ''.join(map(str, exclude_conditions))
file_ohlcv = AGG_CACHE / f"{symbol}-{str(agg_type)}-{str(resolution)}-{start_date.strftime('%Y-%m-%dT%H-%M-%S')}-{end_date.strftime('%Y-%m-%dT%H-%M-%S')}-{str(excludes_str)}-{minsize}-{main_session_only}.parquet"
if not force_remote and file_ohlcv.exists():
ohlcv_df = pd.read_parquet(file_ohlcv, engine='pyarrow')
print("Loaded from agg_cache", file_ohlcv)
#if matching files with same condition and same or wider date span
matched_files = list_matching_files(
symbol=symbol,
agg_type=str(agg_type),
resolution=str(resolution),
start_date=start_date,
end_date=end_date,
excludes_str=str(excludes_str),
minsize=minsize,
main_session_only=main_session_only
)
print("matched agg files", len(matched_files))
print_matching_files_info(matched_files)
if not force_remote and len(matched_files) > 0:
ohlcv_df = pd.read_parquet(matched_files[0],
engine='pyarrow',
filters=[('time', '>=', start_date), ('time', '<=', end_date)])
print("Loaded from agg_cache", matched_files[0])
return ohlcv_df
else:
#neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck?
df = fetch_trades_parallel(symbol, start_date, end_date, minsize=minsize, exclude_conditions=exclude_conditions, main_session_only=main_session_only, force_remote=force_remote) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type)
ohlcv_df = aggregate_trades_optimized(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type, clear_input = True)
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow')
print(f"{symbol} Saved to agg_cache", file_ohlcv)
@ -405,6 +496,11 @@ def load_data(symbol: Union[str, List[str]],
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
if return_vbt:
try:
import vectorbtpro as vbt # Import only when needed
except ImportError:
raise RuntimeError("vectorbtpro is required for return_vbt. Please install it.")
return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY)
return ret_dict_df

2226
ttools/models.py Normal file

File diff suppressed because it is too large Load Diff

View File

@ -2,8 +2,10 @@ from pathlib import Path
from enum import Enum
from datetime import datetime, timedelta
from typing import List, Tuple
import re
import pytz
import calendar
from ttools.config import AGG_CACHE
import os
from alpaca.trading.models import Order, TradeUpdate, Calendar
import pandas_market_calendars as mcal
@ -26,6 +28,147 @@ def set_verbose(value):
global verbose
verbose = value
def parse_filename(filename: str) -> dict:
"""Parse filename of AGG_CACHE files into its components using regex.
https://claude.ai/chat/b869644b-f542-4812-ad58-d4439c15fa78
"""
pattern = r"""
^
([A-Z]+)- # Symbol
([^-]+)- # Agg type
(\d+)- # Resolution
(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # Start date
(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # End date
([A-Z0-9]+)- # Excludes string
(\d+)- # Minsize
(True|False) # Main session flag
\.parquet$ # File extension
"""
match = re.match(pattern, filename, re.VERBOSE)
if not match:
return None
try:
symbol, agg_type, resolution, start_str, end_str, excludes, minsize, main_session = match.groups()
return {
'symbol': symbol,
'agg_type': agg_type,
'resolution': resolution,
'start_date': datetime.strptime(start_str, '%Y-%m-%dT%H-%M-%S'),
'end_date': datetime.strptime(end_str, '%Y-%m-%dT%H-%M-%S'),
'excludes_str': excludes,
'minsize': int(minsize),
'main_session_only': main_session == 'True'
}
except (ValueError, AttributeError):
return None
def list_matching_files(
symbol: str = None,
agg_type: str = None,
resolution: str = None,
start_date: datetime = None,
end_date: datetime = None,
excludes_str: str = None,
minsize: int = None,
main_session_only: bool = None
) -> list[Path]:
"""
List all aggregated files in the cache directory matching the specified criteria.
If start_date and end_date are provided, returns files that cover this interval
(meaning their date range encompasses the requested interval).
If a parameter is None, it matches any value for that component.
Example:
```python
# Example with all parameters specified
specific_files = list_matching_files(
symbol="SPY",
agg_type="AggType.OHLCV",
resolution="12",
start_date=datetime(2024, 1, 15, 9, 30),
end_date=datetime(2024, 1, 15, 16, 0),
excludes_str="4679BCFMOPUVWZ",
minsize=100,
main_session_only=True
)
print_matching_files_info(specific_files)
```
"""
#make date naive
if start_date is not None:
start_date = start_date.replace(tzinfo=None)
if end_date is not None:
end_date = end_date.replace(tzinfo=None)
agg_cache_dir = AGG_CACHE
def matches_criteria(file_info: dict) -> bool:
"""Check if file matches all specified criteria."""
if not file_info:
return False
# Check non-date criteria first
if symbol is not None and file_info['symbol'] != symbol:
return False
if agg_type is not None and file_info['agg_type'] != agg_type:
return False
if resolution is not None and file_info['resolution'] != resolution:
return False
if excludes_str is not None and file_info['excludes_str'] != excludes_str:
return False
if minsize is not None and file_info['minsize'] != minsize:
return False
if main_session_only is not None and file_info['main_session_only'] != main_session_only:
return False
# Check date range coverage if both dates are provided
if start_date is not None and end_date is not None:
return (file_info['start_date'] <= start_date and
file_info['end_date'] >= end_date)
# If only start_date is provided
if start_date is not None:
return file_info['end_date'] >= start_date
# If only end_date is provided
if end_date is not None:
return file_info['start_date'] <= end_date
return True
# Process all files
matching_files = []
for file_path in agg_cache_dir.iterdir():
if not file_path.is_file() or not file_path.name.endswith('.parquet'):
continue
file_info = parse_filename(file_path.name)
if matches_criteria(file_info):
matching_files.append((file_path, file_info))
# Sort files by start date and then end date
matching_files.sort(key=lambda x: (x[1]['start_date'], x[1]['end_date']))
# Return just the file paths
return [f[0] for f in matching_files]
def print_matching_files_info(files: list[Path]):
"""Helper function to print detailed information about matching files."""
for file_path in files:
file_info = parse_filename(file_path.name)
if file_info:
print(f"\nFile: {file_path.name}")
print(f"Coverage: {file_info['start_date']} to {file_info['end_date']}")
print(f"Symbol: {file_info['symbol']}")
print(f"Agg Type: {file_info['agg_type']}")
print(f"Resolution: {file_info['resolution']}")
print(f"Excludes: {file_info['excludes_str']}")
print(f"Minsize: {file_info['minsize']}")
print(f"Main Session Only: {file_info['main_session_only']}")
print("-" * 80)
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
"""
Fetches the trading schedule for the NYSE (New York Stock Exchange) between the specified start and end dates.
@ -109,33 +252,6 @@ def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tupl
return ranges
def find_dotenv():
"""
Searches for a .env file in the given directory or its parents and returns the path.
Args:
start_path: The directory to start searching from.
Returns:
Path to the .env file if found, otherwise None.
"""
try:
start_path = __file__
except NameError:
#print("Notebook probably")
start_path = os.getcwd()
#print(start_path)
current_path = Path(start_path)
for _ in range(10): # Limit search depth to 5 levels
dotenv_path = current_path / '.env'
if dotenv_path.exists():
return dotenv_path
current_path = current_path.parent
return None
#create enum AGG_TYPE
class AggType(str, Enum):
"""
@ -157,4 +273,147 @@ class StartBarAlign(str, Enum):
RANDOM = first bar starts when first trade occurs
"""
ROUND = "round"
RANDOM = "random"
RANDOM = "random"
def compare_dataframes(df1, df2, name1="DataFrame 1", name2="DataFrame 2", check_dtype=True):
"""
Compare two DataFrames and provide detailed analysis of their differences.
Parameters:
-----------
df1, df2 : pandas.DataFrame
The DataFrames to compare
name1, name2 : str
Names to identify the DataFrames in the output
check_dtype : bool
Whether to check if dtypes match for columns
Returns:
--------
bool
True if DataFrames are identical (based on check_dtype parameter)
dict
Detailed comparison results
"""
results = {
'are_equal': False,
'shape_match': False,
'column_match': False,
'index_match': False,
'dtype_match': False,
'content_match': False,
'differences': {}
}
# Shape comparison
if df1.shape != df2.shape:
results['differences']['shape'] = {
name1: df1.shape,
name2: df2.shape
}
else:
results['shape_match'] = True
# Column comparison
cols1 = set(df1.columns)
cols2 = set(df2.columns)
if cols1 != cols2:
results['differences']['columns'] = {
f'unique_to_{name1}': list(cols1 - cols2),
f'unique_to_{name2}': list(cols2 - cols1),
'common': list(cols1 & cols2)
}
else:
results['column_match'] = True
# Index comparison
idx1 = set(df1.index)
idx2 = set(df2.index)
if idx1 != idx2:
results['differences']['index'] = {
f'unique_to_{name1}': list(idx1 - idx2),
f'unique_to_{name2}': list(idx2 - idx1),
'common': list(idx1 & idx2)
}
else:
results['index_match'] = True
# dtype comparison
if check_dtype and results['column_match']:
dtype_diff = {}
for col in cols1:
if df1[col].dtype != df2[col].dtype:
dtype_diff[col] = {
name1: str(df1[col].dtype),
name2: str(df2[col].dtype)
}
if dtype_diff:
results['differences']['dtypes'] = dtype_diff
else:
results['dtype_match'] = True
# Content comparison (only for matching columns and indices)
if results['column_match'] and results['index_match']:
common_cols = list(cols1)
common_idx = list(idx1)
value_diff = {}
for col in common_cols:
# Compare values
if not df1[col].equals(df2[col]):
# Find specific differences
mask = df1[col] != df2[col]
if any(mask):
diff_indices = df1.index[mask]
value_diff[col] = {
'different_at_indices': list(diff_indices),
'sample_differences': {
str(idx): {
name1: df1.loc[idx, col],
name2: df2.loc[idx, col]
} for idx in list(diff_indices)[:5] # Show first 5 differences
}
}
if value_diff:
results['differences']['values'] = value_diff
else:
results['content_match'] = True
# Overall equality
results['are_equal'] = all([
results['shape_match'],
results['column_match'],
results['index_match'],
results['content_match'],
(results['dtype_match'] if check_dtype else True)
])
# Print summary
print(f"\nComparison Summary of {name1} vs {name2}:")
print(f"Shape Match: {results['shape_match']} ({df1.shape} vs {df2.shape})")
print(f"Column Match: {results['column_match']}")
print(f"Index Match: {results['index_match']}")
print(f"Dtype Match: {results['dtype_match']}" if check_dtype else "Dtype Check: Skipped")
print(f"Content Match: {results['content_match']}")
print(f"\nOverall Equal: {results['are_equal']}")
# Print detailed differences if any
if not results['are_equal']:
print("\nDetailed Differences:")
for diff_type, diff_content in results['differences'].items():
print(f"\n{diff_type.upper()}:")
if diff_type == 'values':
print(f"Number of columns with differences: {len(diff_content)}")
for col, details in diff_content.items():
print(f"\nColumn '{col}':")
print(f"Number of different values: {len(details['different_at_indices'])}")
print("First few differences:")
for idx, vals in details['sample_differences'].items():
print(f" At index {idx}:")
print(f" {name1}: {vals[name1]}")
print(f" {name2}: {vals[name2]}")
else:
print(diff_content)
return results['are_equal'], results