Compare commits
28 Commits
652cc02f12
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 68ea25d963 | |||
| 277de3f73e | |||
| d99df2402c | |||
| f52fb2649c | |||
| 30885171c3 | |||
| 623412406e | |||
| 1b3f5b1b79 | |||
| 0705b45351 | |||
| c5e4c03af7 | |||
| 191b58d11d | |||
| f2066f419e | |||
| 519163efb5 | |||
| 491bfc9feb | |||
| 0ff42e2345 | |||
| e22fda2f35 | |||
| 25b5a53774 | |||
| 169f07563e | |||
| fb5b2369e1 | |||
| b23a772836 | |||
| cf6bcede48 | |||
| 2116679dba | |||
| c3faa53eff | |||
| 47450e2740 | |||
| 5770d8324a | |||
| 478a31c459 | |||
| 53443c197b | |||
| 008ab547c7 | |||
| d316d06f1d |
99
README.md
99
README.md
@ -3,59 +3,66 @@ A Python library for tools, utilities, and helpers for my trading research workf
|
|||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
|
||||||
```python
|
```bash
|
||||||
pip install git+https://github.com/drew2323/ttools.git
|
pip install git+https://github.com/drew2323/ttools.git
|
||||||
```
|
```
|
||||||
|
or
|
||||||
|
```bash
|
||||||
|
pip install git+https://gitea.stratlab.dev/dwker/ttools.git
|
||||||
|
```
|
||||||
Modules:
|
Modules:
|
||||||
# loaders
|
# loaders
|
||||||
|
|
||||||
- remotely fetches daily trade data
|
- remotely fetches daily trade data
|
||||||
- manages trade cache (daily trade files per symbol) and aggregation cache (per symbola and requested period)
|
- manages trade cache (daily trade files per symbol) and aggregation cache (per symbola and requested period)
|
||||||
- numba compiled aggregator for required output (time based, dollars, volume bars, renkos...).
|
- numba compiled aggregator for required output (time based, dollars, volume bars, renkos...).
|
||||||
|
- additional columns calculated from tick data and included in bars
|
||||||
|
- buyvolume, sellvolume - total amount of volume triggered by aggressive orders (estimated by Lee-Ready algorithm)
|
||||||
|
- buytrades, selltrades - total amount of trades in each bar grouped by side of aggregsive orders
|
||||||
|
|
||||||
Detailed examples in `tests/data_loader_tryme.ipynb`
|
Detailed examples in [tests/data_loader_tryme.ipynb](tests/data_loader_tryme.ipynb)
|
||||||
|
|
||||||
## load_data
|
## load_data
|
||||||
Returns vectorized aggregation of given type. If trades for given period are not cached they are remotely fetched from Alpaca first.
|
Returns vectorized aggregation of given type.
|
||||||
|
|
||||||
|
If aggregated data are already in agg cache with same conditions for same or wider date span they are returned from cache.
|
||||||
|
Otherwise trade data are aggregated on the fly, saved to cache and returned.
|
||||||
|
If trades for given period are not cached ,they are remotely fetched from Alpaca first.
|
||||||
|
|
||||||
Example:
|
Example:
|
||||||
|
|
||||||
```python
|
```python
|
||||||
|
from ttools import load_data
|
||||||
#This is how to call LOAD function
|
#This is how to call LOAD function
|
||||||
symbol = ["BAC"]
|
vbt_data = load_data(symbol = ["BAC"],
|
||||||
#datetime in zoneNY
|
agg_type = AggType.OHLCV, #aggregation types: AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO,
|
||||||
day_start = datetime(2024, 10, 14, 9, 45, 0)
|
resolution = 12, #12s (for other types might be bricksize etc.)
|
||||||
day_stop = datetime(2024, 10, 16, 15, 1, 0)
|
start_date = datetime(2024, 10, 14, 9, 45, 0),
|
||||||
day_start = zoneNY.localize(day_start)
|
end_date = datetime(2024, 10, 16, 15, 1, 0),
|
||||||
day_stop = zoneNY.localize(day_stop)
|
#exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'],
|
||||||
|
minsize = 100, #minimum trade size included in aggregation
|
||||||
#requested AGG
|
main_session_only = True, #False for ext hours
|
||||||
resolution = 12 #12s
|
force_remote = False, #always refetches trades remotely
|
||||||
agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO
|
return_vbt = True, #returns vbt object with symbols as columns, otherwise dict keyed by symbols with pd.DataFrame
|
||||||
exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults
|
verbose = True # False = silent mode
|
||||||
minsize = 100
|
|
||||||
main_session_only = True
|
|
||||||
force_remote = False
|
|
||||||
|
|
||||||
data = load_data(symbol = symbol,
|
|
||||||
agg_type = agg_type,
|
|
||||||
resolution = resolution,
|
|
||||||
start_date = day_start,
|
|
||||||
end_date = day_stop,
|
|
||||||
#exclude_conditions = None,
|
|
||||||
minsize = minsize,
|
|
||||||
main_session_only = main_session_only,
|
|
||||||
force_remote = force_remote,
|
|
||||||
return_vbt = True, #returns vbt object
|
|
||||||
verbose = True
|
|
||||||
)
|
)
|
||||||
bac_df = ohlcv_df["BAC"]
|
|
||||||
|
|
||||||
basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY)
|
vbt_data.ohlcv.data[symbol[0]].lw.plot()
|
||||||
vbt.settings['plotting']['auto_rangebreaks'] = True
|
vbt_data.data[symbol[0]]
|
||||||
basic_data.ohlcv.plot()
|
|
||||||
data.ohlcv.data[symbol[0]].lw.plot()
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### cache
|
||||||
|
There are 2 caches created
|
||||||
|
- trade cache - daily files per symbol with all trades
|
||||||
|
- agg cache - aggregated output keyed by aggtype, resolution, conditions and ranges
|
||||||
|
|
||||||
|
### keys
|
||||||
|
Required Alpaca API keys in env variables or .env files.
|
||||||
|
```python
|
||||||
|
ACCOUNT1_LIVE_API_KEY=api_key
|
||||||
|
ACCOUNT1_LIVE_SECRET_KEY=secret_key
|
||||||
|
```
|
||||||
|
|
||||||
## prepare trade cache
|
## prepare trade cache
|
||||||
|
|
||||||
To prepare daily trade cache files for given period.
|
To prepare daily trade cache files for given period.
|
||||||
@ -93,6 +100,28 @@ python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 20
|
|||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
## remote loaders
|
||||||
|
|
||||||
|
Remote bars of given resolutions from Alpaca.
|
||||||
|
|
||||||
|
Available resolutions Minute, Hours, Day. It s not possible to limit included trades.
|
||||||
|
Use only when no precision required.
|
||||||
|
|
||||||
|
```python
|
||||||
|
from ttools.external_loaders import load_history_bars
|
||||||
|
from ttools.config import zoneNY
|
||||||
|
from datetime import datetime, time
|
||||||
|
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
|
||||||
|
|
||||||
|
symbol = "AAPL"
|
||||||
|
start_date = zoneNY.localize(datetime(2023, 2, 27, 18, 51, 38))
|
||||||
|
end_date = zoneNY.localize(datetime(2023, 4, 27, 21, 51, 39))
|
||||||
|
timeframe = TimeFrame(amount=1,unit=TimeFrameUnit.Minute)
|
||||||
|
|
||||||
|
df = load_history_bars(symbol, start_date, end_date, timeframe, main_session_only=True)
|
||||||
|
df.loc[('AAPL',)]
|
||||||
|
```
|
||||||
|
|
||||||
# vbtutils
|
# vbtutils
|
||||||
|
|
||||||
Contains helpers for vbtpro
|
Contains helpers for vbtpro
|
||||||
@ -132,6 +161,8 @@ exits.tail(20)
|
|||||||
```
|
```
|
||||||
## display plotly figs in single ntb cells
|
## display plotly figs in single ntb cells
|
||||||
|
|
||||||
|
To display various standalone figures in the same cell.
|
||||||
|
|
||||||
`figs2cell(figlist)`
|
`figs2cell(figlist)`
|
||||||
|
|
||||||
Example usage:
|
Example usage:
|
||||||
|
|||||||
2
setup.py
2
setup.py
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
|||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='ttools',
|
name='ttools',
|
||||||
version='0.6.1',
|
version='0.7.99',
|
||||||
packages=find_packages(),
|
packages=find_packages(),
|
||||||
install_requires=[
|
install_requires=[
|
||||||
# list your dependencies here
|
# list your dependencies here
|
||||||
|
|||||||
178
tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb
Normal file
178
tests/WIP-tradecache_duckdb_approach/hive_cache.ipynb
Normal file
@ -0,0 +1,178 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "markdown",
|
||||||
|
"metadata": {},
|
||||||
|
"source": [
|
||||||
|
"Exploring alternative cache storage using duckdb and parquet\n",
|
||||||
|
"\n",
|
||||||
|
"https://claude.ai/chat/e49491f7-8b18-4fb7-b301-5c9997746079\n"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 1,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n",
|
||||||
|
"Start loading data... 1730370862.4833238\n"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"application/vnd.jupyter.widget-view+json": {
|
||||||
|
"model_id": "829f7f3d58a74f1fbfdcfc202c2aaf84",
|
||||||
|
"version_major": 2,
|
||||||
|
"version_minor": 0
|
||||||
|
},
|
||||||
|
"text/plain": [
|
||||||
|
"FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "display_data"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"fetched parquet -11.310973167419434\n",
|
||||||
|
"Loaded 1836460 rows\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"from ttools.tradecache import TradeCache\n",
|
||||||
|
"from ttools.utils import zoneNY\n",
|
||||||
|
"from pathlib import Path\n",
|
||||||
|
"from datetime import datetime\n",
|
||||||
|
"import logging\n",
|
||||||
|
"import duckdb\n",
|
||||||
|
"\n",
|
||||||
|
"logging.basicConfig(\n",
|
||||||
|
" level=logging.INFO, # Set the minimum level (DEBUG, INFO, WARNING, ERROR, CRITICAL)\n",
|
||||||
|
" format='%(levelname)s: %(message)s' # Simple format showing level and message\n",
|
||||||
|
")\n",
|
||||||
|
"\n",
|
||||||
|
"cache = TradeCache(\n",
|
||||||
|
" base_path=Path(\"./trade_cache\"),\n",
|
||||||
|
" max_workers=4, # Adjust based on your CPU\n",
|
||||||
|
" cleanup_after_days=7\n",
|
||||||
|
")\n",
|
||||||
|
"\n",
|
||||||
|
"# Load data\n",
|
||||||
|
"df = cache.load_range(\n",
|
||||||
|
" symbol=\"BAC\",\n",
|
||||||
|
" start_date=zoneNY.localize(datetime(2024, 10, 14, 9, 30)),\n",
|
||||||
|
" end_date=zoneNY.localize(datetime(2024, 10, 20, 16, 0)),\n",
|
||||||
|
" #columns=['open', 'high', 'low', 'close', 'volume']\n",
|
||||||
|
")\n",
|
||||||
|
"\n",
|
||||||
|
"print(f\"Loaded {len(df)} rows\")"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 4,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"DuckDB Schema:\n",
|
||||||
|
" column_name column_type null key default extra\n",
|
||||||
|
"0 x VARCHAR YES None None None\n",
|
||||||
|
"1 p DOUBLE YES None None None\n",
|
||||||
|
"2 s BIGINT YES None None None\n",
|
||||||
|
"3 i BIGINT YES None None None\n",
|
||||||
|
"4 c VARCHAR[] YES None None None\n",
|
||||||
|
"5 z VARCHAR YES None None None\n",
|
||||||
|
"6 t TIMESTAMP WITH TIME ZONE YES None None None\n",
|
||||||
|
"\n",
|
||||||
|
"Sample Data:\n",
|
||||||
|
" x p s i c z \\\n",
|
||||||
|
"0 T 41.870 27 62879146994030 [ , F, T, I] A \n",
|
||||||
|
"1 D 41.965 1 71675241580848 [ , I] A \n",
|
||||||
|
"2 D 41.965 1 71675241644625 [ , I] A \n",
|
||||||
|
"3 D 41.850 1 71675241772360 [ , I] A \n",
|
||||||
|
"4 N 41.960 416188 52983525028174 [ , O] A \n",
|
||||||
|
"\n",
|
||||||
|
" t \n",
|
||||||
|
"0 2024-10-14 15:30:00.006480+02:00 \n",
|
||||||
|
"1 2024-10-14 15:30:00.395802+02:00 \n",
|
||||||
|
"2 2024-10-14 15:30:00.484008+02:00 \n",
|
||||||
|
"3 2024-10-14 15:30:00.610005+02:00 \n",
|
||||||
|
"4 2024-10-14 15:30:01.041599+02:00 \n",
|
||||||
|
"\n",
|
||||||
|
"Pandas Info:\n"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"ename": "NameError",
|
||||||
|
"evalue": "name 'pd' is not defined",
|
||||||
|
"output_type": "error",
|
||||||
|
"traceback": [
|
||||||
|
"\u001b[0;31m---------------------------------------------------------------------------\u001b[0m",
|
||||||
|
"\u001b[0;31mNameError\u001b[0m Traceback (most recent call last)",
|
||||||
|
"Cell \u001b[0;32mIn[4], line 25\u001b[0m\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n\u001b[1;32m 24\u001b[0m \u001b[38;5;66;03m# Let's check the schema first\u001b[39;00m\n\u001b[0;32m---> 25\u001b[0m \u001b[43mcheck_parquet_schema\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n",
|
||||||
|
"Cell \u001b[0;32mIn[4], line 21\u001b[0m, in \u001b[0;36mcheck_parquet_schema\u001b[0;34m()\u001b[0m\n\u001b[1;32m 19\u001b[0m \u001b[38;5;66;03m# Method 3: Using pandas\u001b[39;00m\n\u001b[1;32m 20\u001b[0m \u001b[38;5;28mprint\u001b[39m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;130;01m\\n\u001b[39;00m\u001b[38;5;124mPandas Info:\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n\u001b[0;32m---> 21\u001b[0m df \u001b[38;5;241m=\u001b[39m \u001b[43mpd\u001b[49m\u001b[38;5;241m.\u001b[39mread_parquet(sample_file)\n\u001b[1;32m 22\u001b[0m \u001b[38;5;28mprint\u001b[39m(df\u001b[38;5;241m.\u001b[39minfo())\n",
|
||||||
|
"\u001b[0;31mNameError\u001b[0m: name 'pd' is not defined"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"import duckdb\n",
|
||||||
|
"\n",
|
||||||
|
"def check_parquet_schema():\n",
|
||||||
|
" # Read one file and print its structure\n",
|
||||||
|
" sample_file = Path(\"./trade_cache\")/\"temp/BAC_20241014.parquet\"\n",
|
||||||
|
" \n",
|
||||||
|
" # Method 1: Using DuckDB describe\n",
|
||||||
|
" print(\"DuckDB Schema:\")\n",
|
||||||
|
" print(duckdb.sql(f\"DESCRIBE SELECT * FROM read_parquet('{sample_file}')\").df())\n",
|
||||||
|
" \n",
|
||||||
|
" # Method 2: Just look at the data\n",
|
||||||
|
" print(\"\\nSample Data:\")\n",
|
||||||
|
" print(duckdb.sql(f\"\"\"\n",
|
||||||
|
" SELECT *\n",
|
||||||
|
" FROM read_parquet('{sample_file}')\n",
|
||||||
|
" LIMIT 5\n",
|
||||||
|
" \"\"\").df())\n",
|
||||||
|
" \n",
|
||||||
|
" # Method 3: Using pandas\n",
|
||||||
|
" print(\"\\nPandas Info:\")\n",
|
||||||
|
" df = pd.read_parquet(sample_file)\n",
|
||||||
|
" print(df.info())\n",
|
||||||
|
"\n",
|
||||||
|
"# Let's check the schema first\n",
|
||||||
|
"check_parquet_schema()"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": ".venv",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.10.11"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
||||||
324
tests/WIP-tradecache_duckdb_approach/tradecache.py
Normal file
324
tests/WIP-tradecache_duckdb_approach/tradecache.py
Normal file
@ -0,0 +1,324 @@
|
|||||||
|
#this goes to the main direcotry
|
||||||
|
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from datetime import datetime, date, timedelta
|
||||||
|
from typing import Optional, List, Set, Dict, Tuple
|
||||||
|
import pandas as pd
|
||||||
|
import duckdb
|
||||||
|
import pandas_market_calendars as mcal
|
||||||
|
from abc import ABC, abstractmethod
|
||||||
|
import logging
|
||||||
|
from ttools.utils import zoneNY
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
from ttools.loaders import fetch_daily_stock_trades
|
||||||
|
import time
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
class TradeCache:
|
||||||
|
def __init__(
|
||||||
|
self,
|
||||||
|
base_path: Path,
|
||||||
|
market: str = 'NYSE',
|
||||||
|
max_workers: int = 4,
|
||||||
|
cleanup_after_days: int = 7
|
||||||
|
):
|
||||||
|
"""
|
||||||
|
Initialize TradeCache with monthly partitions and temp storage
|
||||||
|
|
||||||
|
Args:
|
||||||
|
base_path: Base directory for cache
|
||||||
|
market: Market calendar to use
|
||||||
|
max_workers: Max parallel fetches
|
||||||
|
cleanup_after_days: Days after which to clean temp files
|
||||||
|
"""
|
||||||
|
"""Initialize TradeCache with the same parameters but optimized for the new schema"""
|
||||||
|
self.base_path = Path(base_path)
|
||||||
|
self.temp_path = self.base_path / "temp"
|
||||||
|
self.base_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
self.temp_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
|
||||||
|
self.calendar = mcal.get_calendar(market)
|
||||||
|
self.max_workers = max_workers
|
||||||
|
self.cleanup_after_days = cleanup_after_days
|
||||||
|
|
||||||
|
# Initialize DuckDB with schema-specific optimizations
|
||||||
|
self.con = duckdb.connect()
|
||||||
|
self.con.execute("SET memory_limit='16GB'")
|
||||||
|
self.con.execute("SET threads TO 8")
|
||||||
|
|
||||||
|
# Create the schema for our tables
|
||||||
|
self.schema = """
|
||||||
|
x VARCHAR,
|
||||||
|
p DOUBLE,
|
||||||
|
s BIGINT,
|
||||||
|
i BIGINT,
|
||||||
|
c VARCHAR[],
|
||||||
|
z VARCHAR,
|
||||||
|
t TIMESTAMP WITH TIME ZONE
|
||||||
|
"""
|
||||||
|
|
||||||
|
self._trading_days_cache: Dict[Tuple[date, date], List[date]] = {}
|
||||||
|
|
||||||
|
def get_partition_path(self, symbol: str, year: int, month: int) -> Path:
|
||||||
|
"""Get path for a specific partition"""
|
||||||
|
return self.base_path / f"symbol={symbol}/year={year}/month={month}"
|
||||||
|
|
||||||
|
def get_temp_path(self, symbol: str, day: date) -> Path:
|
||||||
|
"""Get temporary file path for a day"""
|
||||||
|
return self.temp_path / f"{symbol}_{day:%Y%m%d}.parquet"
|
||||||
|
|
||||||
|
def get_trading_days(self, start_date: datetime, end_date: datetime) -> List[date]:
|
||||||
|
"""Get trading days with caching"""
|
||||||
|
key = (start_date.date(), end_date.date())
|
||||||
|
if key not in self._trading_days_cache:
|
||||||
|
schedule = self.calendar.schedule(start_date=start_date, end_date=end_date)
|
||||||
|
self._trading_days_cache[key] = [d.date() for d in schedule.index]
|
||||||
|
return self._trading_days_cache[key]
|
||||||
|
|
||||||
|
def cleanup_temp_files(self):
|
||||||
|
"""Clean up old temp files"""
|
||||||
|
cutoff = datetime.now() - timedelta(days=self.cleanup_after_days)
|
||||||
|
for file in self.temp_path.glob("*.parquet"):
|
||||||
|
try:
|
||||||
|
# Extract date from filename
|
||||||
|
date_str = file.stem.split('_')[1]
|
||||||
|
file_date = datetime.strptime(date_str, '%Y%m%d')
|
||||||
|
if file_date < cutoff:
|
||||||
|
file.unlink()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error cleaning up {file}: {e}")
|
||||||
|
|
||||||
|
|
||||||
|
def consolidate_month(self, symbol: str, year: int, month: int) -> bool:
|
||||||
|
"""
|
||||||
|
Consolidate daily files into monthly partition only if we have complete month
|
||||||
|
Returns True if consolidation was successful
|
||||||
|
"""
|
||||||
|
# Get all temp files for this symbol and month
|
||||||
|
temp_files = list(self.temp_path.glob(f"{symbol}_{year:04d}{month:02d}*.parquet"))
|
||||||
|
|
||||||
|
if not temp_files:
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get expected trading days for this month
|
||||||
|
start_date = zoneNY.localize(datetime(year, month, 1))
|
||||||
|
if month == 12:
|
||||||
|
end_date = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
|
||||||
|
else:
|
||||||
|
end_date = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
|
||||||
|
|
||||||
|
trading_days = self.get_trading_days(start_date, end_date)
|
||||||
|
|
||||||
|
# Check if we have data for all trading days
|
||||||
|
temp_dates = set(datetime.strptime(f.stem.split('_')[1], '%Y%m%d').date()
|
||||||
|
for f in temp_files)
|
||||||
|
missing_days = set(trading_days) - temp_dates
|
||||||
|
|
||||||
|
# Only consolidate if we have all trading days
|
||||||
|
if missing_days:
|
||||||
|
logger.info(f"Skipping consolidation for {symbol} {year}-{month}: "
|
||||||
|
f"missing {len(missing_days)} trading days")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Proceed with consolidation since we have complete month
|
||||||
|
partition_path = self.get_partition_path(symbol, year, month)
|
||||||
|
partition_path.mkdir(parents=True, exist_ok=True)
|
||||||
|
file_path = partition_path / "data.parquet"
|
||||||
|
|
||||||
|
files_str = ', '.join(f"'{f}'" for f in temp_files)
|
||||||
|
|
||||||
|
# Modified query to handle the new schema
|
||||||
|
self.con.execute(f"""
|
||||||
|
COPY (
|
||||||
|
SELECT x, p, s, i, c, z, t
|
||||||
|
FROM read_parquet([{files_str}])
|
||||||
|
ORDER BY t
|
||||||
|
)
|
||||||
|
TO '{file_path}'
|
||||||
|
(FORMAT PARQUET, COMPRESSION 'ZSTD')
|
||||||
|
""")
|
||||||
|
|
||||||
|
# Remove temp files only after successful write
|
||||||
|
for f in temp_files:
|
||||||
|
f.unlink()
|
||||||
|
|
||||||
|
logger.info(f"Successfully consolidated {symbol} {year}-{month} "
|
||||||
|
f"({len(temp_files)} files)")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error consolidating {symbol} {year}-{month}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
def fetch_remote_day(self, symbol: str, day: date) -> pd.DataFrame:
|
||||||
|
"""Implement this to fetch single day of data"""
|
||||||
|
min_datetime = zoneNY.localize(datetime.combine(day, datetime.min.time()))
|
||||||
|
max_datetime = zoneNY.localize(datetime.combine(day, datetime.max.time()))
|
||||||
|
return fetch_daily_stock_trades(symbol, min_datetime, max_datetime)
|
||||||
|
|
||||||
|
def _fetch_and_save_day(self, symbol: str, day: date) -> Optional[Path]:
|
||||||
|
"""Fetch and save a single day, returns file path if successful"""
|
||||||
|
try:
|
||||||
|
df_day = self.fetch_remote_day(symbol, day)
|
||||||
|
if df_day.empty:
|
||||||
|
return None
|
||||||
|
|
||||||
|
temp_file = self.get_temp_path(symbol, day)
|
||||||
|
df_day.to_parquet(temp_file, compression='ZSTD')
|
||||||
|
return temp_file
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error fetching {symbol} for {day}: {e}")
|
||||||
|
return None
|
||||||
|
|
||||||
|
def load_range(
|
||||||
|
self,
|
||||||
|
symbol: str,
|
||||||
|
start_date: datetime,
|
||||||
|
end_date: datetime,
|
||||||
|
columns: Optional[List[str]] = None,
|
||||||
|
consolidate: bool = False
|
||||||
|
) -> pd.DataFrame:
|
||||||
|
"""Load data for date range, consolidating when complete months are detected"""
|
||||||
|
#self.cleanup_temp_files()
|
||||||
|
|
||||||
|
trading_days = self.get_trading_days(start_date, end_date)
|
||||||
|
|
||||||
|
# Modify column selection for new schema
|
||||||
|
col_str = '*' if not columns else ', '.join(columns)
|
||||||
|
|
||||||
|
if consolidate:
|
||||||
|
# First check temp files for complete months
|
||||||
|
temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet"))
|
||||||
|
if temp_files:
|
||||||
|
# Group temp files by month
|
||||||
|
monthly_temps: Dict[Tuple[int, int], Set[date]] = {}
|
||||||
|
for file in temp_files:
|
||||||
|
try:
|
||||||
|
# Extract date from filename
|
||||||
|
date_str = file.stem.split('_')[1]
|
||||||
|
file_date = datetime.strptime(date_str, '%Y%m%d').date()
|
||||||
|
key = (file_date.year, file_date.month)
|
||||||
|
if key not in monthly_temps:
|
||||||
|
monthly_temps[key] = set()
|
||||||
|
monthly_temps[key].add(file_date)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error parsing temp file date {file}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check each month for completeness and consolidate if complete
|
||||||
|
for (year, month), dates in monthly_temps.items():
|
||||||
|
# Get trading days for this month
|
||||||
|
month_start = zoneNY.localize(datetime(year, month, 1))
|
||||||
|
if month == 12:
|
||||||
|
month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
|
||||||
|
else:
|
||||||
|
month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
|
||||||
|
|
||||||
|
month_trading_days = set(self.get_trading_days(month_start, month_end))
|
||||||
|
|
||||||
|
# If we have all trading days for the month, consolidate
|
||||||
|
if month_trading_days.issubset(dates):
|
||||||
|
logger.info(f"Found complete month in temp files for {symbol} {year}-{month}")
|
||||||
|
self.consolidate_month(symbol, year, month)
|
||||||
|
|
||||||
|
#timing the load
|
||||||
|
time_start = time.time()
|
||||||
|
print("Start loading data...", time_start)
|
||||||
|
# Now load data from both consolidated and temp files
|
||||||
|
query = f"""
|
||||||
|
WITH monthly_data AS (
|
||||||
|
SELECT {col_str}
|
||||||
|
FROM read_parquet(
|
||||||
|
'{self.base_path}/*/*.parquet',
|
||||||
|
hive_partitioning=1,
|
||||||
|
union_by_name=true
|
||||||
|
)
|
||||||
|
WHERE t BETWEEN '{start_date}' AND '{end_date}'
|
||||||
|
),
|
||||||
|
temp_data AS (
|
||||||
|
SELECT {col_str}
|
||||||
|
FROM read_parquet(
|
||||||
|
'{self.temp_path}/{symbol}_*.parquet',
|
||||||
|
union_by_name=true
|
||||||
|
)
|
||||||
|
WHERE t BETWEEN '{start_date}' AND '{end_date}'
|
||||||
|
)
|
||||||
|
SELECT * FROM (
|
||||||
|
SELECT * FROM monthly_data
|
||||||
|
UNION ALL
|
||||||
|
SELECT * FROM temp_data
|
||||||
|
)
|
||||||
|
ORDER BY t
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
df_cached = self.con.execute(query).df()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error reading cached data: {e}")
|
||||||
|
df_cached = pd.DataFrame()
|
||||||
|
|
||||||
|
print("fetched parquet", time_start - time.time())
|
||||||
|
if not df_cached.empty:
|
||||||
|
cached_days = set(df_cached['t'].dt.date)
|
||||||
|
missing_days = [d for d in trading_days if d not in cached_days]
|
||||||
|
else:
|
||||||
|
missing_days = trading_days
|
||||||
|
|
||||||
|
# Fetch missing days in parallel
|
||||||
|
if missing_days:
|
||||||
|
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
|
||||||
|
future_to_day = {
|
||||||
|
executor.submit(self._fetch_and_save_day, symbol, day): day
|
||||||
|
for day in missing_days
|
||||||
|
}
|
||||||
|
|
||||||
|
for future in future_to_day:
|
||||||
|
day = future_to_day[future]
|
||||||
|
try:
|
||||||
|
temp_file = future.result()
|
||||||
|
if temp_file:
|
||||||
|
logger.debug(f"Successfully fetched {symbol} for {day}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error processing {symbol} for {day}: {e}")
|
||||||
|
|
||||||
|
# Check again for complete months after fetching new data
|
||||||
|
temp_files = list(self.temp_path.glob(f"{symbol}_*.parquet"))
|
||||||
|
if temp_files:
|
||||||
|
monthly_temps = {}
|
||||||
|
for file in temp_files:
|
||||||
|
try:
|
||||||
|
date_str = file.stem.split('_')[1]
|
||||||
|
file_date = datetime.strptime(date_str, '%Y%m%d').date()
|
||||||
|
key = (file_date.year, file_date.month)
|
||||||
|
if key not in monthly_temps:
|
||||||
|
monthly_temps[key] = set()
|
||||||
|
monthly_temps[key].add(file_date)
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error parsing temp file date {file}: {e}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Check for complete months again
|
||||||
|
for (year, month), dates in monthly_temps.items():
|
||||||
|
month_start = zoneNY.localize(datetime(year, month, 1))
|
||||||
|
if month == 12:
|
||||||
|
month_end = zoneNY.localize(datetime(year + 1, 1, 1)) - timedelta(days=1)
|
||||||
|
else:
|
||||||
|
month_end = zoneNY.localize(datetime(year, month + 1, 1)) - timedelta(days=1)
|
||||||
|
|
||||||
|
month_trading_days = set(self.get_trading_days(month_start, month_end))
|
||||||
|
|
||||||
|
if month_trading_days.issubset(dates):
|
||||||
|
logger.info(f"Found complete month after fetching for {symbol} {year}-{month}")
|
||||||
|
self.consolidate_month(symbol, year, month)
|
||||||
|
|
||||||
|
# Load final data including any new fetches
|
||||||
|
try:
|
||||||
|
df_cached = self.con.execute(query).df()
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error reading final data: {e}")
|
||||||
|
df_cached = pd.DataFrame()
|
||||||
|
|
||||||
|
return df_cached.sort_values('t')
|
||||||
460
tests/alpaca_loader.ipynb
Normal file
460
tests/alpaca_loader.ipynb
Normal file
@ -0,0 +1,460 @@
|
|||||||
|
{
|
||||||
|
"cells": [
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 1,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"name": "stdout",
|
||||||
|
"output_type": "stream",
|
||||||
|
"text": [
|
||||||
|
"TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"from ttools.external_loaders import load_history_bars\n",
|
||||||
|
"from ttools.config import zoneNY\n",
|
||||||
|
"from datetime import datetime, time\n",
|
||||||
|
"from alpaca.data.timeframe import TimeFrame, TimeFrameUnit\n",
|
||||||
|
"\n",
|
||||||
|
"symbol = \"AAPL\"\n",
|
||||||
|
"start_date = zoneNY.localize(datetime(2023, 2, 27, 18, 51, 38))\n",
|
||||||
|
"end_date = zoneNY.localize(datetime(2023, 4, 27, 21, 51, 39))\n",
|
||||||
|
"timeframe = TimeFrame(amount=1,unit=TimeFrameUnit.Minute)\n",
|
||||||
|
"\n",
|
||||||
|
"df = load_history_bars(symbol, start_date, end_date, timeframe, True)\n",
|
||||||
|
"df.loc[('AAPL',)]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 5,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/html": [
|
||||||
|
"<div>\n",
|
||||||
|
"<style scoped>\n",
|
||||||
|
" .dataframe tbody tr th:only-of-type {\n",
|
||||||
|
" vertical-align: middle;\n",
|
||||||
|
" }\n",
|
||||||
|
"\n",
|
||||||
|
" .dataframe tbody tr th {\n",
|
||||||
|
" vertical-align: top;\n",
|
||||||
|
" }\n",
|
||||||
|
"\n",
|
||||||
|
" .dataframe thead th {\n",
|
||||||
|
" text-align: right;\n",
|
||||||
|
" }\n",
|
||||||
|
"</style>\n",
|
||||||
|
"<table border=\"1\" class=\"dataframe\">\n",
|
||||||
|
" <thead>\n",
|
||||||
|
" <tr style=\"text-align: right;\">\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th>open</th>\n",
|
||||||
|
" <th>high</th>\n",
|
||||||
|
" <th>low</th>\n",
|
||||||
|
" <th>close</th>\n",
|
||||||
|
" <th>volume</th>\n",
|
||||||
|
" <th>trade_count</th>\n",
|
||||||
|
" <th>vwap</th>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>timestamp</th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" </thead>\n",
|
||||||
|
" <tbody>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-28 09:30:00-05:00</th>\n",
|
||||||
|
" <td>147.050</td>\n",
|
||||||
|
" <td>147.380</td>\n",
|
||||||
|
" <td>146.830</td>\n",
|
||||||
|
" <td>147.2700</td>\n",
|
||||||
|
" <td>1554100.0</td>\n",
|
||||||
|
" <td>6447.0</td>\n",
|
||||||
|
" <td>146.914560</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-28 09:31:00-05:00</th>\n",
|
||||||
|
" <td>147.250</td>\n",
|
||||||
|
" <td>147.320</td>\n",
|
||||||
|
" <td>147.180</td>\n",
|
||||||
|
" <td>147.2942</td>\n",
|
||||||
|
" <td>159387.0</td>\n",
|
||||||
|
" <td>6855.0</td>\n",
|
||||||
|
" <td>147.252171</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-28 09:32:00-05:00</th>\n",
|
||||||
|
" <td>147.305</td>\n",
|
||||||
|
" <td>147.330</td>\n",
|
||||||
|
" <td>147.090</td>\n",
|
||||||
|
" <td>147.1600</td>\n",
|
||||||
|
" <td>214536.0</td>\n",
|
||||||
|
" <td>7435.0</td>\n",
|
||||||
|
" <td>147.210128</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-28 09:33:00-05:00</th>\n",
|
||||||
|
" <td>147.140</td>\n",
|
||||||
|
" <td>147.230</td>\n",
|
||||||
|
" <td>147.090</td>\n",
|
||||||
|
" <td>147.1500</td>\n",
|
||||||
|
" <td>171487.0</td>\n",
|
||||||
|
" <td>7235.0</td>\n",
|
||||||
|
" <td>147.154832</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-28 09:34:00-05:00</th>\n",
|
||||||
|
" <td>147.160</td>\n",
|
||||||
|
" <td>147.160</td>\n",
|
||||||
|
" <td>146.880</td>\n",
|
||||||
|
" <td>146.9850</td>\n",
|
||||||
|
" <td>235915.0</td>\n",
|
||||||
|
" <td>4965.0</td>\n",
|
||||||
|
" <td>147.001762</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>...</th>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 15:26:00-04:00</th>\n",
|
||||||
|
" <td>168.400</td>\n",
|
||||||
|
" <td>168.415</td>\n",
|
||||||
|
" <td>168.340</td>\n",
|
||||||
|
" <td>168.3601</td>\n",
|
||||||
|
" <td>163973.0</td>\n",
|
||||||
|
" <td>1398.0</td>\n",
|
||||||
|
" <td>168.368809</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 15:27:00-04:00</th>\n",
|
||||||
|
" <td>168.360</td>\n",
|
||||||
|
" <td>168.400</td>\n",
|
||||||
|
" <td>168.330</td>\n",
|
||||||
|
" <td>168.3800</td>\n",
|
||||||
|
" <td>130968.0</td>\n",
|
||||||
|
" <td>1420.0</td>\n",
|
||||||
|
" <td>168.364799</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 15:28:00-04:00</th>\n",
|
||||||
|
" <td>168.380</td>\n",
|
||||||
|
" <td>168.430</td>\n",
|
||||||
|
" <td>168.320</td>\n",
|
||||||
|
" <td>168.3285</td>\n",
|
||||||
|
" <td>152193.0</td>\n",
|
||||||
|
" <td>1361.0</td>\n",
|
||||||
|
" <td>168.372671</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 15:29:00-04:00</th>\n",
|
||||||
|
" <td>168.325</td>\n",
|
||||||
|
" <td>168.330</td>\n",
|
||||||
|
" <td>168.260</td>\n",
|
||||||
|
" <td>168.2850</td>\n",
|
||||||
|
" <td>208426.0</td>\n",
|
||||||
|
" <td>1736.0</td>\n",
|
||||||
|
" <td>168.297379</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 15:30:00-04:00</th>\n",
|
||||||
|
" <td>168.280</td>\n",
|
||||||
|
" <td>168.350</td>\n",
|
||||||
|
" <td>168.255</td>\n",
|
||||||
|
" <td>168.3450</td>\n",
|
||||||
|
" <td>218077.0</td>\n",
|
||||||
|
" <td>1694.0</td>\n",
|
||||||
|
" <td>168.308873</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" </tbody>\n",
|
||||||
|
"</table>\n",
|
||||||
|
"<p>15162 rows × 7 columns</p>\n",
|
||||||
|
"</div>"
|
||||||
|
],
|
||||||
|
"text/plain": [
|
||||||
|
" open high low close volume \\\n",
|
||||||
|
"timestamp \n",
|
||||||
|
"2023-02-28 09:30:00-05:00 147.050 147.380 146.830 147.2700 1554100.0 \n",
|
||||||
|
"2023-02-28 09:31:00-05:00 147.250 147.320 147.180 147.2942 159387.0 \n",
|
||||||
|
"2023-02-28 09:32:00-05:00 147.305 147.330 147.090 147.1600 214536.0 \n",
|
||||||
|
"2023-02-28 09:33:00-05:00 147.140 147.230 147.090 147.1500 171487.0 \n",
|
||||||
|
"2023-02-28 09:34:00-05:00 147.160 147.160 146.880 146.9850 235915.0 \n",
|
||||||
|
"... ... ... ... ... ... \n",
|
||||||
|
"2023-04-27 15:26:00-04:00 168.400 168.415 168.340 168.3601 163973.0 \n",
|
||||||
|
"2023-04-27 15:27:00-04:00 168.360 168.400 168.330 168.3800 130968.0 \n",
|
||||||
|
"2023-04-27 15:28:00-04:00 168.380 168.430 168.320 168.3285 152193.0 \n",
|
||||||
|
"2023-04-27 15:29:00-04:00 168.325 168.330 168.260 168.2850 208426.0 \n",
|
||||||
|
"2023-04-27 15:30:00-04:00 168.280 168.350 168.255 168.3450 218077.0 \n",
|
||||||
|
"\n",
|
||||||
|
" trade_count vwap \n",
|
||||||
|
"timestamp \n",
|
||||||
|
"2023-02-28 09:30:00-05:00 6447.0 146.914560 \n",
|
||||||
|
"2023-02-28 09:31:00-05:00 6855.0 147.252171 \n",
|
||||||
|
"2023-02-28 09:32:00-05:00 7435.0 147.210128 \n",
|
||||||
|
"2023-02-28 09:33:00-05:00 7235.0 147.154832 \n",
|
||||||
|
"2023-02-28 09:34:00-05:00 4965.0 147.001762 \n",
|
||||||
|
"... ... ... \n",
|
||||||
|
"2023-04-27 15:26:00-04:00 1398.0 168.368809 \n",
|
||||||
|
"2023-04-27 15:27:00-04:00 1420.0 168.364799 \n",
|
||||||
|
"2023-04-27 15:28:00-04:00 1361.0 168.372671 \n",
|
||||||
|
"2023-04-27 15:29:00-04:00 1736.0 168.297379 \n",
|
||||||
|
"2023-04-27 15:30:00-04:00 1694.0 168.308873 \n",
|
||||||
|
"\n",
|
||||||
|
"[15162 rows x 7 columns]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 5,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"df.loc[('AAPL',)]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"cell_type": "code",
|
||||||
|
"execution_count": 3,
|
||||||
|
"metadata": {},
|
||||||
|
"outputs": [
|
||||||
|
{
|
||||||
|
"data": {
|
||||||
|
"text/html": [
|
||||||
|
"<div>\n",
|
||||||
|
"<style scoped>\n",
|
||||||
|
" .dataframe tbody tr th:only-of-type {\n",
|
||||||
|
" vertical-align: middle;\n",
|
||||||
|
" }\n",
|
||||||
|
"\n",
|
||||||
|
" .dataframe tbody tr th {\n",
|
||||||
|
" vertical-align: top;\n",
|
||||||
|
" }\n",
|
||||||
|
"\n",
|
||||||
|
" .dataframe thead th {\n",
|
||||||
|
" text-align: right;\n",
|
||||||
|
" }\n",
|
||||||
|
"</style>\n",
|
||||||
|
"<table border=\"1\" class=\"dataframe\">\n",
|
||||||
|
" <thead>\n",
|
||||||
|
" <tr style=\"text-align: right;\">\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th>open</th>\n",
|
||||||
|
" <th>high</th>\n",
|
||||||
|
" <th>low</th>\n",
|
||||||
|
" <th>close</th>\n",
|
||||||
|
" <th>volume</th>\n",
|
||||||
|
" <th>trade_count</th>\n",
|
||||||
|
" <th>vwap</th>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>symbol</th>\n",
|
||||||
|
" <th>timestamp</th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" <th></th>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" </thead>\n",
|
||||||
|
" <tbody>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th rowspan=\"11\" valign=\"top\">AAPL</th>\n",
|
||||||
|
" <th>2023-02-27 18:52:00-05:00</th>\n",
|
||||||
|
" <td>148.0200</td>\n",
|
||||||
|
" <td>148.02</td>\n",
|
||||||
|
" <td>148.0200</td>\n",
|
||||||
|
" <td>148.02</td>\n",
|
||||||
|
" <td>112.0</td>\n",
|
||||||
|
" <td>7.0</td>\n",
|
||||||
|
" <td>148.020000</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-27 18:56:00-05:00</th>\n",
|
||||||
|
" <td>148.0200</td>\n",
|
||||||
|
" <td>148.02</td>\n",
|
||||||
|
" <td>148.0200</td>\n",
|
||||||
|
" <td>148.02</td>\n",
|
||||||
|
" <td>175.0</td>\n",
|
||||||
|
" <td>10.0</td>\n",
|
||||||
|
" <td>148.020000</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-27 19:00:00-05:00</th>\n",
|
||||||
|
" <td>148.0299</td>\n",
|
||||||
|
" <td>148.03</td>\n",
|
||||||
|
" <td>148.0299</td>\n",
|
||||||
|
" <td>148.03</td>\n",
|
||||||
|
" <td>1957.0</td>\n",
|
||||||
|
" <td>10.0</td>\n",
|
||||||
|
" <td>148.029993</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-27 19:06:00-05:00</th>\n",
|
||||||
|
" <td>148.0600</td>\n",
|
||||||
|
" <td>148.06</td>\n",
|
||||||
|
" <td>148.0600</td>\n",
|
||||||
|
" <td>148.06</td>\n",
|
||||||
|
" <td>122.0</td>\n",
|
||||||
|
" <td>7.0</td>\n",
|
||||||
|
" <td>148.060000</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-02-27 19:09:00-05:00</th>\n",
|
||||||
|
" <td>148.0500</td>\n",
|
||||||
|
" <td>148.10</td>\n",
|
||||||
|
" <td>148.0500</td>\n",
|
||||||
|
" <td>148.10</td>\n",
|
||||||
|
" <td>1604.0</td>\n",
|
||||||
|
" <td>33.0</td>\n",
|
||||||
|
" <td>148.075109</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>...</th>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" <td>...</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 19:54:00-04:00</th>\n",
|
||||||
|
" <td>167.8000</td>\n",
|
||||||
|
" <td>167.80</td>\n",
|
||||||
|
" <td>167.8000</td>\n",
|
||||||
|
" <td>167.80</td>\n",
|
||||||
|
" <td>534.0</td>\n",
|
||||||
|
" <td>15.0</td>\n",
|
||||||
|
" <td>167.800000</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 19:56:00-04:00</th>\n",
|
||||||
|
" <td>167.8800</td>\n",
|
||||||
|
" <td>167.88</td>\n",
|
||||||
|
" <td>167.8800</td>\n",
|
||||||
|
" <td>167.88</td>\n",
|
||||||
|
" <td>1386.0</td>\n",
|
||||||
|
" <td>28.0</td>\n",
|
||||||
|
" <td>167.880000</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 19:57:00-04:00</th>\n",
|
||||||
|
" <td>167.8000</td>\n",
|
||||||
|
" <td>167.80</td>\n",
|
||||||
|
" <td>167.8000</td>\n",
|
||||||
|
" <td>167.80</td>\n",
|
||||||
|
" <td>912.0</td>\n",
|
||||||
|
" <td>60.0</td>\n",
|
||||||
|
" <td>167.800000</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 19:58:00-04:00</th>\n",
|
||||||
|
" <td>167.8000</td>\n",
|
||||||
|
" <td>167.88</td>\n",
|
||||||
|
" <td>167.8000</td>\n",
|
||||||
|
" <td>167.88</td>\n",
|
||||||
|
" <td>3311.0</td>\n",
|
||||||
|
" <td>22.0</td>\n",
|
||||||
|
" <td>167.877333</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" <tr>\n",
|
||||||
|
" <th>2023-04-27 19:59:00-04:00</th>\n",
|
||||||
|
" <td>167.9000</td>\n",
|
||||||
|
" <td>167.94</td>\n",
|
||||||
|
" <td>167.9000</td>\n",
|
||||||
|
" <td>167.94</td>\n",
|
||||||
|
" <td>1969.0</td>\n",
|
||||||
|
" <td>64.0</td>\n",
|
||||||
|
" <td>167.918150</td>\n",
|
||||||
|
" </tr>\n",
|
||||||
|
" </tbody>\n",
|
||||||
|
"</table>\n",
|
||||||
|
"<p>31217 rows × 7 columns</p>\n",
|
||||||
|
"</div>"
|
||||||
|
],
|
||||||
|
"text/plain": [
|
||||||
|
" open high low close volume \\\n",
|
||||||
|
"symbol timestamp \n",
|
||||||
|
"AAPL 2023-02-27 18:52:00-05:00 148.0200 148.02 148.0200 148.02 112.0 \n",
|
||||||
|
" 2023-02-27 18:56:00-05:00 148.0200 148.02 148.0200 148.02 175.0 \n",
|
||||||
|
" 2023-02-27 19:00:00-05:00 148.0299 148.03 148.0299 148.03 1957.0 \n",
|
||||||
|
" 2023-02-27 19:06:00-05:00 148.0600 148.06 148.0600 148.06 122.0 \n",
|
||||||
|
" 2023-02-27 19:09:00-05:00 148.0500 148.10 148.0500 148.10 1604.0 \n",
|
||||||
|
"... ... ... ... ... ... \n",
|
||||||
|
" 2023-04-27 19:54:00-04:00 167.8000 167.80 167.8000 167.80 534.0 \n",
|
||||||
|
" 2023-04-27 19:56:00-04:00 167.8800 167.88 167.8800 167.88 1386.0 \n",
|
||||||
|
" 2023-04-27 19:57:00-04:00 167.8000 167.80 167.8000 167.80 912.0 \n",
|
||||||
|
" 2023-04-27 19:58:00-04:00 167.8000 167.88 167.8000 167.88 3311.0 \n",
|
||||||
|
" 2023-04-27 19:59:00-04:00 167.9000 167.94 167.9000 167.94 1969.0 \n",
|
||||||
|
"\n",
|
||||||
|
" trade_count vwap \n",
|
||||||
|
"symbol timestamp \n",
|
||||||
|
"AAPL 2023-02-27 18:52:00-05:00 7.0 148.020000 \n",
|
||||||
|
" 2023-02-27 18:56:00-05:00 10.0 148.020000 \n",
|
||||||
|
" 2023-02-27 19:00:00-05:00 10.0 148.029993 \n",
|
||||||
|
" 2023-02-27 19:06:00-05:00 7.0 148.060000 \n",
|
||||||
|
" 2023-02-27 19:09:00-05:00 33.0 148.075109 \n",
|
||||||
|
"... ... ... \n",
|
||||||
|
" 2023-04-27 19:54:00-04:00 15.0 167.800000 \n",
|
||||||
|
" 2023-04-27 19:56:00-04:00 28.0 167.880000 \n",
|
||||||
|
" 2023-04-27 19:57:00-04:00 60.0 167.800000 \n",
|
||||||
|
" 2023-04-27 19:58:00-04:00 22.0 167.877333 \n",
|
||||||
|
" 2023-04-27 19:59:00-04:00 64.0 167.918150 \n",
|
||||||
|
"\n",
|
||||||
|
"[31217 rows x 7 columns]"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"execution_count": 3,
|
||||||
|
"metadata": {},
|
||||||
|
"output_type": "execute_result"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"source": [
|
||||||
|
"df"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"metadata": {
|
||||||
|
"kernelspec": {
|
||||||
|
"display_name": ".venv",
|
||||||
|
"language": "python",
|
||||||
|
"name": "python3"
|
||||||
|
},
|
||||||
|
"language_info": {
|
||||||
|
"codemirror_mode": {
|
||||||
|
"name": "ipython",
|
||||||
|
"version": 3
|
||||||
|
},
|
||||||
|
"file_extension": ".py",
|
||||||
|
"mimetype": "text/x-python",
|
||||||
|
"name": "python",
|
||||||
|
"nbconvert_exporter": "python",
|
||||||
|
"pygments_lexer": "ipython3",
|
||||||
|
"version": "3.10.11"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nbformat": 4,
|
||||||
|
"nbformat_minor": 2
|
||||||
|
}
|
||||||
File diff suppressed because one or more lines are too long
@ -1,69 +0,0 @@
|
|||||||
{
|
|
||||||
"cells": [
|
|
||||||
{
|
|
||||||
"cell_type": "code",
|
|
||||||
"execution_count": 2,
|
|
||||||
"metadata": {},
|
|
||||||
"outputs": [
|
|
||||||
{
|
|
||||||
"name": "stdout",
|
|
||||||
"output_type": "stream",
|
|
||||||
"text": [
|
|
||||||
"The autoreload extension is already loaded. To reload it, use:\n",
|
|
||||||
" %reload_ext autoreload\n"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"data": {
|
|
||||||
"text/plain": [
|
|
||||||
"['CUVWAP', 'DIVRELN']"
|
|
||||||
]
|
|
||||||
},
|
|
||||||
"execution_count": 2,
|
|
||||||
"metadata": {},
|
|
||||||
"output_type": "execute_result"
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"source": [
|
|
||||||
"%load_ext autoreload\n",
|
|
||||||
"%autoreload 2\n",
|
|
||||||
"import vectorbtpro as vbt\n",
|
|
||||||
"from ttools.vbtindicators import register_custom_inds\n",
|
|
||||||
"from ttools.indicators import CUVWAP\n",
|
|
||||||
"\n",
|
|
||||||
"\n",
|
|
||||||
"register_custom_inds(None, \"override\")\n",
|
|
||||||
"#chopiness = vbt.indicator(\"technical:CHOPINESS\").run(s12_data.open, s12_data.high, s12_data.low, s12_data.close, s12_data.volume, window = 100)\n",
|
|
||||||
"#vwap_cum_roll = vbt.indicator(\"technical:ROLLING_VWAP\").run(s12_data.open, s12_data.high, s12_data.low, s12_data.close, s12_data.volume, window = 100, min_periods = 5)\n",
|
|
||||||
"#vwap_cum_d = vbt.indicator(\"ttools:CUVWAP\").run(s12_data.high, s12_data.low, s12_data.close, s12_data.volume, anchor=\"D\", drag=50)\n",
|
|
||||||
"#vwap_lin_angle = vbt.indicator(\"talib:LINEARREG_ANGLE\").run(vwap_cum_d.vwap, timeperiod=2)\n",
|
|
||||||
"\n",
|
|
||||||
"vbt.IF.list_indicators(\"ttools\")\n",
|
|
||||||
"\n",
|
|
||||||
"\n",
|
|
||||||
"\n"
|
|
||||||
]
|
|
||||||
}
|
|
||||||
],
|
|
||||||
"metadata": {
|
|
||||||
"kernelspec": {
|
|
||||||
"display_name": ".venv",
|
|
||||||
"language": "python",
|
|
||||||
"name": "python3"
|
|
||||||
},
|
|
||||||
"language_info": {
|
|
||||||
"codemirror_mode": {
|
|
||||||
"name": "ipython",
|
|
||||||
"version": 3
|
|
||||||
},
|
|
||||||
"file_extension": ".py",
|
|
||||||
"mimetype": "text/x-python",
|
|
||||||
"name": "python",
|
|
||||||
"nbconvert_exporter": "python",
|
|
||||||
"pygments_lexer": "ipython3",
|
|
||||||
"version": "3.10.11"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nbformat": 4,
|
|
||||||
"nbformat_minor": 2
|
|
||||||
}
|
|
||||||
@ -1,4 +1,5 @@
|
|||||||
from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell
|
from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell
|
||||||
from .vbtindicators import register_custom_inds
|
from .vbtindicators import register_custom_inds
|
||||||
from .utils import find_dotenv, AggType, zoneNY, zonePRG, zoneUTC
|
from .utils import AggType, zoneNY, zonePRG, zoneUTC
|
||||||
from .loaders import load_data, prepare_trade_cache
|
from .loaders import load_data, prepare_trade_cache
|
||||||
|
from .external_loaders import load_history_bars
|
||||||
@ -10,8 +10,80 @@ Includes fetch (remote/cached) methods and numba aggregator function for TIME BA
|
|||||||
|
|
||||||
"""""
|
"""""
|
||||||
|
|
||||||
|
def aggregate_trades_optimized(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV, clear_input: bool = False):
|
||||||
|
"""
|
||||||
|
Optimized version of trade aggregation function with reduced memory footprint.
|
||||||
|
"""
|
||||||
|
# 1. Get timestamps from index if 't' is not in columns
|
||||||
|
if 't' not in trades_df.columns:
|
||||||
|
timestamps = trades_df.index.values
|
||||||
|
else:
|
||||||
|
timestamps = trades_df['t'].values
|
||||||
|
|
||||||
|
# 2. Select only needed columns for prices and sizes
|
||||||
|
prices = trades_df['p'].values
|
||||||
|
sizes = trades_df['s'].values
|
||||||
|
|
||||||
|
#Clears input to freeup memory
|
||||||
|
if clear_input:
|
||||||
|
del trades_df
|
||||||
|
|
||||||
|
# 3. Convert timestamps maintaining exact precision
|
||||||
|
# Convert directly to int64 nanoseconds, then to float seconds - there was a problem
|
||||||
|
#unix_timestamps_s = timestamps.view('int64').astype(np.float64) / 1e6
|
||||||
|
#original not optimized, in case of issues (5x slower)
|
||||||
|
unix_timestamps_s = timestamps.astype('datetime64[ns]').astype(np.float64) / 1e9
|
||||||
|
|
||||||
|
# 4. Create ticks array efficiently
|
||||||
|
# 3. Pre-allocate array for better memory efficiency
|
||||||
|
ticks = np.empty((len(timestamps), 3), dtype=np.float64)
|
||||||
|
ticks[:, 0] = unix_timestamps_s
|
||||||
|
ticks[:, 1] = prices
|
||||||
|
ticks[:, 2] = sizes
|
||||||
|
|
||||||
|
# 5. Clear memory of intermediate objects
|
||||||
|
del timestamps, prices, sizes, unix_timestamps_s
|
||||||
|
|
||||||
|
# 6. Process based on type using existing pattern
|
||||||
|
try:
|
||||||
|
match type:
|
||||||
|
case AggType.OHLCV:
|
||||||
|
ohlcv_bars = generate_time_bars_nb(ticks, resolution)
|
||||||
|
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades',
|
||||||
|
'updated', 'vwap', 'buyvolume', 'sellvolume', 'buytrades', 'selltrades']
|
||||||
|
case AggType.OHLCV_VOL:
|
||||||
|
ohlcv_bars = generate_volume_bars_nb(ticks, resolution)
|
||||||
|
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades',
|
||||||
|
'updated', 'buyvolume', 'sellvolume', 'buytrades', 'selltrades']
|
||||||
|
case AggType.OHLCV_DOL:
|
||||||
|
ohlcv_bars = generate_dollar_bars_nb(ticks, resolution)
|
||||||
|
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades',
|
||||||
|
'amount', 'updated']
|
||||||
|
case _:
|
||||||
|
raise ValueError("Invalid AggType type. Supported types are 'time', 'volume' and 'dollar'.")
|
||||||
|
finally:
|
||||||
|
# 7. Clear large numpy array as soon as possible
|
||||||
|
del ticks
|
||||||
|
|
||||||
|
# 8. Create DataFrame and handle timestamps - keeping original working approach
|
||||||
|
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
|
||||||
|
del ohlcv_bars
|
||||||
|
|
||||||
|
# 9. Use the original timestamp handling that we know works
|
||||||
|
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
|
||||||
|
ohlcv_df['updated'] = pd.to_datetime(ohlcv_df['updated'], unit="s").dt.tz_localize('UTC').dt.tz_convert(zoneNY)
|
||||||
|
|
||||||
|
# 10. Round microseconds as in original
|
||||||
|
ohlcv_df['updated'] = ohlcv_df['updated'].dt.round('us')
|
||||||
|
|
||||||
|
# 11. Set index last, as in original
|
||||||
|
ohlcv_df.set_index('time', inplace=True)
|
||||||
|
|
||||||
|
return ohlcv_df
|
||||||
|
|
||||||
def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV):
|
def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV):
|
||||||
""""
|
""""
|
||||||
|
Original replaced by optimized version
|
||||||
Accepts dataframe with trades keyed by symbol. Preparess dataframe to
|
Accepts dataframe with trades keyed by symbol. Preparess dataframe to
|
||||||
numpy and calls Numba optimized aggregator for given bar type. (time/volume/dollar)
|
numpy and calls Numba optimized aggregator for given bar type. (time/volume/dollar)
|
||||||
"""""
|
"""""
|
||||||
@ -44,9 +116,13 @@ def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type
|
|||||||
columns.append('vwap')
|
columns.append('vwap')
|
||||||
columns.append('buyvolume')
|
columns.append('buyvolume')
|
||||||
columns.append('sellvolume')
|
columns.append('sellvolume')
|
||||||
|
columns.append('buytrades')
|
||||||
|
columns.append('selltrades')
|
||||||
if type == AggType.OHLCV_VOL:
|
if type == AggType.OHLCV_VOL:
|
||||||
columns.append('buyvolume')
|
columns.append('buyvolume')
|
||||||
columns.append('sellvolume')
|
columns.append('sellvolume')
|
||||||
|
columns.append('buytrades')
|
||||||
|
columns.append('selltrades')
|
||||||
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
|
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
|
||||||
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
|
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
|
||||||
#print(ohlcv_df['updated'])
|
#print(ohlcv_df['updated'])
|
||||||
@ -174,22 +250,24 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
|
|||||||
close_price = ticks[0, 1]
|
close_price = ticks[0, 1]
|
||||||
volume = 0
|
volume = 0
|
||||||
trades_count = 0
|
trades_count = 0
|
||||||
|
trades_buy_count = 0
|
||||||
|
trades_sell_count = 0
|
||||||
current_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
|
current_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
|
||||||
bar_time = ticks[0, 0] # Initialize bar time with the time of the first tick
|
bar_time = ticks[0, 0] # Initialize bar time with the time of the first tick
|
||||||
buy_volume = 0 # Volume of buy trades
|
buy_volume = 0 # Volume of buy trades
|
||||||
sell_volume = 0 # Volume of sell trades
|
sell_volume = 0 # Volume of sell trades
|
||||||
prev_price = ticks[0, 1] # Initialize previous price for the first tick
|
prev_price = ticks[0, 1] # Initialize previous price for the first tick
|
||||||
|
last_tick_up = None
|
||||||
for tick in ticks:
|
for tick in ticks:
|
||||||
tick_time = tick[0]
|
tick_time = tick[0]
|
||||||
price = tick[1]
|
price = tick[1]
|
||||||
tick_volume = tick[2]
|
tick_volume = tick[2]
|
||||||
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
|
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
|
||||||
|
splitted = False
|
||||||
# Check if the new tick is from a different day, then close the current bar
|
# Check if the new tick is from a different day, then close the current bar
|
||||||
if tick_day != current_day:
|
if tick_day != current_day:
|
||||||
if trades_count > 0:
|
if trades_count > 0:
|
||||||
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
|
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
|
||||||
# Reset for the new day using the current tick data
|
# Reset for the new day using the current tick data
|
||||||
open_price = price
|
open_price = price
|
||||||
high_price = price
|
high_price = price
|
||||||
@ -197,6 +275,8 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
|
|||||||
close_price = price
|
close_price = price
|
||||||
volume = 0
|
volume = 0
|
||||||
trades_count = 0
|
trades_count = 0
|
||||||
|
trades_buy_count = 0
|
||||||
|
trades_sell_count = 0
|
||||||
remaining_volume = volume_per_bar
|
remaining_volume = volume_per_bar
|
||||||
current_day = tick_day
|
current_day = tick_day
|
||||||
bar_time = tick_time # Update bar time to the current tick time
|
bar_time = tick_time # Update bar time to the current tick time
|
||||||
@ -219,8 +299,21 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
|
|||||||
# Update buy and sell volumes
|
# Update buy and sell volumes
|
||||||
if price > prev_price:
|
if price > prev_price:
|
||||||
buy_volume += tick_volume
|
buy_volume += tick_volume
|
||||||
|
trades_buy_count += 1
|
||||||
|
last_tick_up = True
|
||||||
elif price < prev_price:
|
elif price < prev_price:
|
||||||
sell_volume += tick_volume
|
sell_volume += tick_volume
|
||||||
|
trades_sell_count += 1
|
||||||
|
last_tick_up = False
|
||||||
|
else: #same price, use last direction
|
||||||
|
if last_tick_up is None:
|
||||||
|
pass
|
||||||
|
elif last_tick_up:
|
||||||
|
buy_volume += tick_volume
|
||||||
|
trades_buy_count += 1
|
||||||
|
else:
|
||||||
|
sell_volume += tick_volume
|
||||||
|
trades_sell_count += 1
|
||||||
|
|
||||||
tick_volume = 0
|
tick_volume = 0
|
||||||
else:
|
else:
|
||||||
@ -233,11 +326,24 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
|
|||||||
# Update buy and sell volumes
|
# Update buy and sell volumes
|
||||||
if price > prev_price:
|
if price > prev_price:
|
||||||
buy_volume += volume_to_add
|
buy_volume += volume_to_add
|
||||||
|
trades_buy_count += 1
|
||||||
|
last_tick_up = True
|
||||||
elif price < prev_price:
|
elif price < prev_price:
|
||||||
sell_volume += volume_to_add
|
sell_volume += volume_to_add
|
||||||
|
trades_sell_count += 1
|
||||||
|
last_tick_up = False
|
||||||
|
else: #same price, use last direction
|
||||||
|
if last_tick_up is None:
|
||||||
|
pass
|
||||||
|
elif last_tick_up:
|
||||||
|
buy_volume += volume_to_add
|
||||||
|
trades_buy_count += 1
|
||||||
|
else:
|
||||||
|
sell_volume += volume_to_add
|
||||||
|
trades_sell_count += 1
|
||||||
|
|
||||||
# Append the completed bar to the list
|
# Append the completed bar to the list
|
||||||
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
|
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
|
||||||
|
|
||||||
# Reset bar values for the new bar using the current tick data
|
# Reset bar values for the new bar using the current tick data
|
||||||
open_price = price
|
open_price = price
|
||||||
@ -246,21 +352,26 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
|
|||||||
close_price = price
|
close_price = price
|
||||||
volume = 0
|
volume = 0
|
||||||
trades_count = 0
|
trades_count = 0
|
||||||
|
trades_buy_count = 0
|
||||||
|
trades_sell_count = 0
|
||||||
remaining_volume = volume_per_bar
|
remaining_volume = volume_per_bar
|
||||||
buy_volume = 0
|
buy_volume = 0
|
||||||
sell_volume = 0
|
sell_volume = 0
|
||||||
|
|
||||||
# Increment bar time if splitting a trade
|
#if the same trade opened the bar (we are splitting trade to more bars)
|
||||||
if tick_volume > 0: # If there's remaining volume in the trade, set bar time slightly later
|
#first splitted identified by time, next by flag
|
||||||
bar_time = tick_time + 1e-6
|
if bar_time == tick_time or splitted:
|
||||||
|
bar_time = bar_time + 1e-6
|
||||||
|
splitted = True
|
||||||
else:
|
else:
|
||||||
bar_time = tick_time # Otherwise, set bar time to the tick time
|
bar_time = tick_time
|
||||||
|
splitted = False
|
||||||
|
|
||||||
prev_price = price
|
prev_price = price
|
||||||
|
|
||||||
# Add the last bar if it contains any trades
|
# Add the last bar if it contains any trades
|
||||||
if trades_count > 0:
|
if trades_count > 0:
|
||||||
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
|
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
|
||||||
|
|
||||||
return np.array(ohlcv_bars)
|
return np.array(ohlcv_bars)
|
||||||
|
|
||||||
@ -284,13 +395,15 @@ def generate_time_bars_nb(ticks, resolution):
|
|||||||
close_price = 0
|
close_price = 0
|
||||||
volume = 0
|
volume = 0
|
||||||
trades_count = 0
|
trades_count = 0
|
||||||
|
trades_buy_count = 0
|
||||||
|
trades_sell_count = 0
|
||||||
vwap_cum_volume_price = 0 # Cumulative volume * price
|
vwap_cum_volume_price = 0 # Cumulative volume * price
|
||||||
cum_volume = 0 # Cumulative volume for VWAP
|
cum_volume = 0 # Cumulative volume for VWAP
|
||||||
buy_volume = 0 # Volume of buy trades
|
buy_volume = 0 # Volume of buy trades
|
||||||
sell_volume = 0 # Volume of sell trades
|
sell_volume = 0 # Volume of sell trades
|
||||||
prev_price = ticks[0, 1] # Initialize previous price for the first tick
|
prev_price = ticks[0, 1] # Initialize previous price for the first tick
|
||||||
prev_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
|
prev_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
|
||||||
|
last_tick_up = None
|
||||||
for tick in ticks:
|
for tick in ticks:
|
||||||
curr_time = tick[0] #updated time
|
curr_time = tick[0] #updated time
|
||||||
tick_time = np.floor(tick[0] / resolution) * resolution
|
tick_time = np.floor(tick[0] / resolution) * resolution
|
||||||
@ -307,7 +420,7 @@ def generate_time_bars_nb(ticks, resolution):
|
|||||||
if tick_time != start_time + current_bar_index * resolution:
|
if tick_time != start_time + current_bar_index * resolution:
|
||||||
if current_bar_index >= 0 and trades_count > 0: # Save the previous bar if trades happened
|
if current_bar_index >= 0 and trades_count > 0: # Save the previous bar if trades happened
|
||||||
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
|
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
|
||||||
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
|
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
|
||||||
|
|
||||||
# Reset bar values
|
# Reset bar values
|
||||||
current_bar_index = int((tick_time - start_time) / resolution)
|
current_bar_index = int((tick_time - start_time) / resolution)
|
||||||
@ -316,6 +429,8 @@ def generate_time_bars_nb(ticks, resolution):
|
|||||||
low_price = price
|
low_price = price
|
||||||
volume = 0
|
volume = 0
|
||||||
trades_count = 0
|
trades_count = 0
|
||||||
|
trades_buy_count = 0
|
||||||
|
trades_sell_count = 0
|
||||||
vwap_cum_volume_price = 0
|
vwap_cum_volume_price = 0
|
||||||
cum_volume = 0
|
cum_volume = 0
|
||||||
buy_volume = 0
|
buy_volume = 0
|
||||||
@ -333,15 +448,28 @@ def generate_time_bars_nb(ticks, resolution):
|
|||||||
# Update buy and sell volumes
|
# Update buy and sell volumes
|
||||||
if price > prev_price:
|
if price > prev_price:
|
||||||
buy_volume += tick_volume
|
buy_volume += tick_volume
|
||||||
|
trades_buy_count += 1
|
||||||
|
last_tick_up = True
|
||||||
elif price < prev_price:
|
elif price < prev_price:
|
||||||
sell_volume += tick_volume
|
sell_volume += tick_volume
|
||||||
|
trades_sell_count += 1
|
||||||
|
last_tick_up = False
|
||||||
|
else: #same price, use last direction
|
||||||
|
if last_tick_up is None:
|
||||||
|
pass
|
||||||
|
elif last_tick_up:
|
||||||
|
buy_volume += tick_volume
|
||||||
|
trades_buy_count += 1
|
||||||
|
else:
|
||||||
|
sell_volume += tick_volume
|
||||||
|
trades_sell_count += 1
|
||||||
|
|
||||||
prev_price = price
|
prev_price = price
|
||||||
|
|
||||||
# Save the last processed bar
|
# Save the last processed bar
|
||||||
if trades_count > 0:
|
if trades_count > 0:
|
||||||
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
|
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
|
||||||
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
|
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume, trades_buy_count, trades_sell_count])
|
||||||
|
|
||||||
return np.array(ohlcv_bars)
|
return np.array(ohlcv_bars)
|
||||||
|
|
||||||
|
|||||||
@ -1,14 +1,35 @@
|
|||||||
|
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
from appdirs import user_data_dir
|
from appdirs import user_data_dir
|
||||||
from ttools.utils import find_dotenv
|
import ttools.utils as utils
|
||||||
import os
|
import os
|
||||||
import pytz
|
import pytz
|
||||||
import vectorbtpro as vbt
|
|
||||||
import pytz
|
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from dotenv import load_dotenv
|
from dotenv import load_dotenv
|
||||||
import os
|
def find_dotenv():
|
||||||
|
"""
|
||||||
|
Searches for a .env file in the given directory or its parents and returns the path.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
start_path: The directory to start searching from.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Path to the .env file if found, otherwise None.
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
start_path = __file__
|
||||||
|
except NameError:
|
||||||
|
#print("Notebook probably")
|
||||||
|
start_path = os.getcwd()
|
||||||
|
#print(start_path)
|
||||||
|
|
||||||
|
current_path = Path(start_path)
|
||||||
|
for _ in range(10): # Limit search depth to 5 levels
|
||||||
|
dotenv_path = current_path / '.env'
|
||||||
|
if dotenv_path.exists():
|
||||||
|
return dotenv_path
|
||||||
|
current_path = current_path.parent
|
||||||
|
return None
|
||||||
|
|
||||||
ENV_FILE = find_dotenv()
|
ENV_FILE = find_dotenv()
|
||||||
|
|
||||||
|
|||||||
55
ttools/external_loaders.py
Normal file
55
ttools/external_loaders.py
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
from ctypes import Union
|
||||||
|
from ttools import zoneUTC
|
||||||
|
from ttools.config import *
|
||||||
|
from datetime import datetime
|
||||||
|
from alpaca.data.historical import StockHistoricalDataClient
|
||||||
|
from ttools.config import ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY
|
||||||
|
from datetime import timedelta, datetime, time
|
||||||
|
from alpaca.data.enums import DataFeed
|
||||||
|
from typing import List, Union
|
||||||
|
import pandas as pd
|
||||||
|
from alpaca.data.historical import StockHistoricalDataClient
|
||||||
|
from alpaca.data.requests import StockBarsRequest
|
||||||
|
from alpaca.data.enums import DataFeed
|
||||||
|
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
|
||||||
|
|
||||||
|
def load_history_bars(symbol: Union[str, List[str]], datetime_object_from: datetime, datetime_object_to: datetime, timeframe: TimeFrame, main_session_only: bool = True):
|
||||||
|
"""Returns dataframe fetched remotely from Alpaca.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
symbol: symbol or list of symbols
|
||||||
|
datetime_object_from: datetime in zoneNY
|
||||||
|
datetime_object_to: datetime in zoneNY
|
||||||
|
timeframe: timeframe
|
||||||
|
main_session_only: boolean to fetch only main session data
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
dataframe
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```python
|
||||||
|
from ttools.external_loaders import load_history_bars
|
||||||
|
from ttools.config import zoneNY
|
||||||
|
from datetime import datetime
|
||||||
|
from alpaca.data.timeframe import TimeFrame, TimeFrameUnit
|
||||||
|
|
||||||
|
symbol = "AAPL"
|
||||||
|
start_date = zoneNY.localize(datetime(2023, 2, 27, 18, 51, 38))
|
||||||
|
end_date = zoneNY.localize(datetime(2023, 4, 27, 21, 51, 39))
|
||||||
|
timeframe = TimeFrame(amount=1,unit=TimeFrameUnit.Minute)
|
||||||
|
|
||||||
|
df = load_history_bars(symbol, start_date, end_date, timeframe)
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
client = StockHistoricalDataClient(ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY, raw_data=False)
|
||||||
|
#datetime_object_from = datetime(2023, 2, 27, 18, 51, 38, tzinfo=datetime.timezone.utc)
|
||||||
|
#datetime_object_to = datetime(2023, 2, 27, 21, 51, 39, tzinfo=datetime.timezone.utc)
|
||||||
|
bar_request = StockBarsRequest(symbol_or_symbols=symbol,timeframe=timeframe, start=datetime_object_from, end=datetime_object_to, feed=DataFeed.SIP)
|
||||||
|
#print("before df")
|
||||||
|
df = client.get_stock_bars(bar_request).df
|
||||||
|
df.index = df.index.set_levels(df.index.get_level_values(1).tz_convert(zoneNY), level=1)
|
||||||
|
if main_session_only:
|
||||||
|
start_time = time(9, 30, 0)
|
||||||
|
end_time = time(15, 30, 0)
|
||||||
|
df = df.loc[(df.index.get_level_values(1).time >= start_time) & (df.index.get_level_values(1).time <= end_time)]
|
||||||
|
return df
|
||||||
@ -1,8 +1,6 @@
|
|||||||
|
|
||||||
from ctypes import Union
|
from ctypes import Union
|
||||||
from dotenv import load_dotenv
|
from ttools import zoneUTC
|
||||||
from appdirs import user_data_dir
|
|
||||||
from ttools.utils import find_dotenv
|
|
||||||
from ttools.config import *
|
from ttools.config import *
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
from alpaca.data.historical import StockHistoricalDataClient
|
from alpaca.data.historical import StockHistoricalDataClient
|
||||||
@ -16,12 +14,18 @@ from time import time as timetime
|
|||||||
from concurrent.futures import ThreadPoolExecutor
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from alpaca.data.enums import DataFeed
|
from alpaca.data.enums import DataFeed
|
||||||
import random
|
import random
|
||||||
from ttools.utils import AggType, fetch_calendar_data, print, set_verbose
|
from ttools.utils import AggType, fetch_calendar_data, print, print_matching_files_info, set_verbose, list_matching_files
|
||||||
from tqdm import tqdm
|
from tqdm import tqdm
|
||||||
import threading
|
import threading
|
||||||
from typing import List, Union
|
from typing import List, Union
|
||||||
from ttools.aggregator_vectorized import aggregate_trades
|
from ttools.aggregator_vectorized import aggregate_trades, aggregate_trades_optimized
|
||||||
|
import numpy as np
|
||||||
|
import pandas as pd
|
||||||
|
import pyarrow.dataset as ds
|
||||||
|
import pandas as pd
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
|
import math
|
||||||
|
import os
|
||||||
"""
|
"""
|
||||||
Module for fetching stock data. Supports
|
Module for fetching stock data. Supports
|
||||||
1) cache management
|
1) cache management
|
||||||
@ -90,6 +94,8 @@ def convert_dict_to_multiindex_df(tradesResponse, rename_labels = True, keep_sym
|
|||||||
final_df.reset_index(inplace=True) # Reset index to remove MultiIndex levels, making them columns
|
final_df.reset_index(inplace=True) # Reset index to remove MultiIndex levels, making them columns
|
||||||
final_df.drop(columns=['symbol'], inplace=True) #remove symbol column
|
final_df.drop(columns=['symbol'], inplace=True) #remove symbol column
|
||||||
final_df.set_index(timestamp_col, inplace=True) #reindex by timestamp
|
final_df.set_index(timestamp_col, inplace=True) #reindex by timestamp
|
||||||
|
#print index datetime resolution
|
||||||
|
#print(final_df.index.dtype)
|
||||||
|
|
||||||
return final_df
|
return final_df
|
||||||
|
|
||||||
@ -109,6 +115,28 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No
|
|||||||
Returns:
|
Returns:
|
||||||
df: pd.DataFrame
|
df: pd.DataFrame
|
||||||
"""
|
"""
|
||||||
|
def fast_filter(df, exclude_conditions):
|
||||||
|
# Convert arrays to strings once
|
||||||
|
str_series = df['c'].apply(lambda x: ','.join(x))
|
||||||
|
|
||||||
|
# Create mask using vectorized string operations
|
||||||
|
mask = np.zeros(len(df), dtype=bool)
|
||||||
|
for cond in exclude_conditions:
|
||||||
|
mask |= str_series.str.contains(cond, regex=False)
|
||||||
|
|
||||||
|
# Apply filter
|
||||||
|
return df[~mask]
|
||||||
|
|
||||||
|
def vectorized_string_sets(df, exclude_conditions):
|
||||||
|
# Convert exclude_conditions to set for O(1) lookup
|
||||||
|
exclude_set = set(exclude_conditions)
|
||||||
|
|
||||||
|
# Vectorized operation using sets intersection
|
||||||
|
arrays = df['c'].values
|
||||||
|
mask = np.array([bool(set(arr) & exclude_set) for arr in arrays])
|
||||||
|
|
||||||
|
return df[~mask]
|
||||||
|
|
||||||
# 9:30 to 16:00
|
# 9:30 to 16:00
|
||||||
if main_session_only:
|
if main_session_only:
|
||||||
|
|
||||||
@ -123,30 +151,50 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No
|
|||||||
#REQUIRED FILTERING
|
#REQUIRED FILTERING
|
||||||
# Create a mask to filter rows within the specified time range
|
# Create a mask to filter rows within the specified time range
|
||||||
if start is not None and end is not None:
|
if start is not None and end is not None:
|
||||||
print(f"filtering {start.time()} {end.time()}")
|
print(f"Trimming {start} {end}")
|
||||||
if symbol_included:
|
if symbol_included:
|
||||||
mask = (df.index.get_level_values('t') >= start) & \
|
mask = (df.index.get_level_values('t') >= start) & \
|
||||||
(df.index.get_level_values('t') <= end)
|
(df.index.get_level_values('t') <= end)
|
||||||
|
df = df[mask]
|
||||||
else:
|
else:
|
||||||
mask = (df.index >= start) & (df.index <= end)
|
df = df.loc[start:end]
|
||||||
|
|
||||||
# Apply the mask to the DataFrame
|
|
||||||
df = df[mask]
|
|
||||||
|
|
||||||
if exclude_conditions is not None:
|
if exclude_conditions is not None:
|
||||||
print(f"excluding {exclude_conditions}")
|
print(f"excluding {exclude_conditions}")
|
||||||
# Create a mask to exclude rows with any of the specified conditions
|
df = vectorized_string_sets(df, exclude_conditions)
|
||||||
mask = df['c'].apply(lambda x: any(cond in exclude_conditions for cond in x))
|
print("exclude done")
|
||||||
|
|
||||||
# Filter out the rows with specified conditions
|
|
||||||
df = df[~mask]
|
|
||||||
|
|
||||||
if minsize is not None:
|
if minsize is not None:
|
||||||
print(f"minsize {minsize}")
|
print(f"minsize {minsize}")
|
||||||
#exclude conditions
|
#exclude conditions
|
||||||
df = df[df['s'] >= minsize]
|
df = df[df['s'] >= minsize]
|
||||||
|
print("minsize done")
|
||||||
return df
|
return df
|
||||||
|
|
||||||
|
def calculate_optimal_workers(file_count, min_workers=4, max_workers=32):
|
||||||
|
"""
|
||||||
|
Calculate optimal number of workers based on file count and system resources
|
||||||
|
|
||||||
|
Rules of thumb:
|
||||||
|
- Minimum of 4 workers to ensure parallelization
|
||||||
|
- Maximum of 32 workers to avoid thread overhead
|
||||||
|
- For 100 files, aim for around 16-24 workers
|
||||||
|
- Scale with CPU count but don't exceed max_workers
|
||||||
|
"""
|
||||||
|
cpu_count = os.cpu_count() or 4
|
||||||
|
|
||||||
|
# Base calculation: 2-4x CPU count for I/O bound tasks
|
||||||
|
suggested_workers = cpu_count * 3
|
||||||
|
|
||||||
|
# Scale based on file count (1 worker per 4-6 files is a good ratio)
|
||||||
|
files_based_workers = math.ceil(file_count / 5)
|
||||||
|
|
||||||
|
# Take the smaller of the two suggestions
|
||||||
|
optimal_workers = min(suggested_workers, files_based_workers)
|
||||||
|
|
||||||
|
# Clamp between min and max workers
|
||||||
|
return max(min_workers, min(optimal_workers, max_workers))
|
||||||
|
|
||||||
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None):
|
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None):
|
||||||
#doc for this function
|
#doc for this function
|
||||||
"""
|
"""
|
||||||
@ -155,8 +203,8 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
|
|||||||
by using force_remote - forcess using remote data always and thus refreshing cache for these dates
|
by using force_remote - forcess using remote data always and thus refreshing cache for these dates
|
||||||
Attributes:
|
Attributes:
|
||||||
:param symbol: The stock symbol to fetch trades for.
|
:param symbol: The stock symbol to fetch trades for.
|
||||||
:param start: The start time for the trade data.
|
:param start: The start time for the trade data, in market timezone.
|
||||||
:param end: The end time for the trade data.
|
:param end: The end time for the trade data, in market timezone.
|
||||||
:exclude_conditions: list of string conditions to exclude from the data
|
:exclude_conditions: list of string conditions to exclude from the data
|
||||||
:minsize minimum size of trade to be included in the data
|
:minsize minimum size of trade to be included in the data
|
||||||
:no_return: If True, do not return the DataFrame. Used to prepare cached files.
|
:no_return: If True, do not return the DataFrame. Used to prepare cached files.
|
||||||
@ -184,24 +232,34 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
|
|||||||
#exists in cache?
|
#exists in cache?
|
||||||
daily_file = f"{symbol}-{str(start.date())}.parquet"
|
daily_file = f"{symbol}-{str(start.date())}.parquet"
|
||||||
file_path = TRADE_CACHE / daily_file
|
file_path = TRADE_CACHE / daily_file
|
||||||
if file_path.exists() and (not force_remote or not no_return):
|
if file_path.exists() and (not force_remote and not no_return):
|
||||||
with trade_cache_lock:
|
with trade_cache_lock:
|
||||||
df = pd.read_parquet(file_path)
|
df = pd.read_parquet(file_path)
|
||||||
print("Loaded from CACHE", file_path)
|
print("Loaded from CACHE", file_path)
|
||||||
df = filter_trade_df(df, start, end, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
|
df = filter_trade_df(df, start, end, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
|
||||||
return df
|
return df
|
||||||
|
|
||||||
day_next = start.date() + timedelta(days=1)
|
#lets create borders of day in UTC as Alpaca has only UTC date
|
||||||
|
start_date = start.date()
|
||||||
|
|
||||||
|
# Create min/max times in NY timezone
|
||||||
|
ny_day_min = zoneNY.localize(datetime.combine(start_date, time.min))
|
||||||
|
ny_day_max = zoneNY.localize(datetime.combine(start_date, time.max))
|
||||||
|
|
||||||
|
# Convert both to UTC
|
||||||
|
utc_day_min = ny_day_min.astimezone(zoneUTC)
|
||||||
|
utc_day_max = ny_day_max.astimezone(zoneUTC)
|
||||||
|
|
||||||
print("Fetching from remote.")
|
print("Fetching from remote.")
|
||||||
client = StockHistoricalDataClient(ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY, raw_data=True)
|
client = StockHistoricalDataClient(ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY, raw_data=True)
|
||||||
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=start.date(), end=day_next, feed=data_feed)
|
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=utc_day_min, end=utc_day_max, feed=data_feed)
|
||||||
last_exception = None
|
last_exception = None
|
||||||
|
|
||||||
for attempt in range(max_retries):
|
for attempt in range(max_retries):
|
||||||
try:
|
try:
|
||||||
tradesResponse = client.get_stock_trades(stockTradeRequest)
|
tradesResponse = client.get_stock_trades(stockTradeRequest)
|
||||||
print(f"Remote fetched completed.", start.date(), day_next)
|
print(f"Remote fetched completed whole day", start.date())
|
||||||
|
print(f"Exact UTC range fetched: {utc_day_min} - {utc_day_max}")
|
||||||
if not tradesResponse[symbol]:
|
if not tradesResponse[symbol]:
|
||||||
print(f"EMPTY")
|
print(f"EMPTY")
|
||||||
return pd.DataFrame()
|
return pd.DataFrame()
|
||||||
@ -209,7 +267,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
|
|||||||
df = convert_dict_to_multiindex_df(tradesResponse, rename_labels=rename_labels, keep_symbols=keep_symbols)
|
df = convert_dict_to_multiindex_df(tradesResponse, rename_labels=rename_labels, keep_symbols=keep_symbols)
|
||||||
|
|
||||||
#if today is market still open, dont cache - also dont cache for IEX feeed
|
#if today is market still open, dont cache - also dont cache for IEX feeed
|
||||||
if datetime.now().astimezone(zoneNY).date() < day_next or data_feed == DataFeed.IEX:
|
if datetime.now().astimezone(zoneNY).date() < start_date + timedelta(days=1) or data_feed == DataFeed.IEX:
|
||||||
print("not saving trade cache, market still open today or IEX datapoint")
|
print("not saving trade cache, market still open today or IEX datapoint")
|
||||||
#ic(datetime.now().astimezone(zoneNY))
|
#ic(datetime.now().astimezone(zoneNY))
|
||||||
#ic(day.open, day.close)
|
#ic(day.open, day.close)
|
||||||
@ -230,7 +288,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
|
|||||||
print("All attempts to fetch data failed.")
|
print("All attempts to fetch data failed.")
|
||||||
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
|
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
|
||||||
|
|
||||||
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None):
|
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = None, main_session_only = True, force_remote = False, max_workers=None, no_return = False, verbose = None):
|
||||||
"""
|
"""
|
||||||
Fetch trades between ranges.
|
Fetch trades between ranges.
|
||||||
|
|
||||||
@ -284,7 +342,12 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
|
|||||||
#speed it up , locals first and then fetches
|
#speed it up , locals first and then fetches
|
||||||
s_time = timetime()
|
s_time = timetime()
|
||||||
with trade_cache_lock:
|
with trade_cache_lock:
|
||||||
local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
|
file_paths = [f for _, f in days_from_cache]
|
||||||
|
dataset = ds.dataset(file_paths, format='parquet')
|
||||||
|
local_df = dataset.to_table().to_pandas()
|
||||||
|
del dataset
|
||||||
|
#original version
|
||||||
|
#local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
|
||||||
final_time = timetime() - s_time
|
final_time = timetime() - s_time
|
||||||
print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds")
|
print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds")
|
||||||
#the filter is required
|
#the filter is required
|
||||||
@ -294,6 +357,7 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
|
|||||||
#do this only for remotes
|
#do this only for remotes
|
||||||
if len(days_from_remote) > 0:
|
if len(days_from_remote) > 0:
|
||||||
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
||||||
|
futures_with_date = []
|
||||||
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
|
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
|
||||||
for market_day in tqdm(days_from_remote, desc=f"{symbol} Remote fetching"):
|
for market_day in tqdm(days_from_remote, desc=f"{symbol} Remote fetching"):
|
||||||
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
|
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
|
||||||
@ -316,14 +380,19 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC
|
|||||||
end = min(end_date, max_day_time)
|
end = min(end_date, max_day_time)
|
||||||
|
|
||||||
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote)
|
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote)
|
||||||
futures.append(future)
|
futures_with_date.append((future,start))
|
||||||
|
|
||||||
for future in tqdm(futures, desc=f"{symbol} Receiving trades"):
|
results_with_dates = []
|
||||||
|
for future, date in tqdm(futures_with_date, desc=f"{symbol} Receiving trades"):
|
||||||
try:
|
try:
|
||||||
result = future.result()
|
result = future.result()
|
||||||
results.append(result)
|
if result is not None:
|
||||||
|
results_with_dates.append((result,date))
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
print(f"Error fetching data for a day: {e}")
|
print(f"Error fetching data for a day: {e}")
|
||||||
|
# Sort by date before concatenating
|
||||||
|
results_with_dates.sort(key=lambda x: x[1])
|
||||||
|
results = [r for r, _ in results_with_dates]
|
||||||
|
|
||||||
if not no_return:
|
if not no_return:
|
||||||
# Batch concatenation to improve speed
|
# Batch concatenation to improve speed
|
||||||
@ -359,8 +428,8 @@ def load_data(symbol: Union[str, List[str]],
|
|||||||
symbol (Union[str, list]): Symbol
|
symbol (Union[str, list]): Symbol
|
||||||
agg_type (AggType): Type of aggregation
|
agg_type (AggType): Type of aggregation
|
||||||
resolution (Union[str, int]) Resolution of aggregation nased on agg_type
|
resolution (Union[str, int]) Resolution of aggregation nased on agg_type
|
||||||
start_date (datetime):
|
start_date (datetime): Start period, timezone aware
|
||||||
end_date (datetime):
|
end_date (datetime): Start period, timezone aware
|
||||||
exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None.
|
exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None.
|
||||||
minsize (_type_, optional): Minimum trade size to include. Defaults to None.
|
minsize (_type_, optional): Minimum trade size to include. Defaults to None.
|
||||||
main_session_only (bool, optional): Main or ext. hours.. Defaults to True.
|
main_session_only (bool, optional): Main or ext. hours.. Defaults to True.
|
||||||
@ -379,6 +448,12 @@ def load_data(symbol: Union[str, List[str]],
|
|||||||
if verbose is not None:
|
if verbose is not None:
|
||||||
set_verbose(verbose) # Change global verbose if specified
|
set_verbose(verbose) # Change global verbose if specified
|
||||||
|
|
||||||
|
if start_date.tzinfo is None:
|
||||||
|
start_date = zoneNY.localize(start_date)
|
||||||
|
|
||||||
|
if end_date.tzinfo is None:
|
||||||
|
end_date = zoneNY.localize(end_date)
|
||||||
|
|
||||||
if exclude_conditions is None:
|
if exclude_conditions is None:
|
||||||
exclude_conditions = EXCLUDE_CONDITIONS
|
exclude_conditions = EXCLUDE_CONDITIONS
|
||||||
|
|
||||||
@ -387,14 +462,30 @@ def load_data(symbol: Union[str, List[str]],
|
|||||||
excludes_str = ''.join(map(str, exclude_conditions))
|
excludes_str = ''.join(map(str, exclude_conditions))
|
||||||
file_ohlcv = AGG_CACHE / f"{symbol}-{str(agg_type)}-{str(resolution)}-{start_date.strftime('%Y-%m-%dT%H-%M-%S')}-{end_date.strftime('%Y-%m-%dT%H-%M-%S')}-{str(excludes_str)}-{minsize}-{main_session_only}.parquet"
|
file_ohlcv = AGG_CACHE / f"{symbol}-{str(agg_type)}-{str(resolution)}-{start_date.strftime('%Y-%m-%dT%H-%M-%S')}-{end_date.strftime('%Y-%m-%dT%H-%M-%S')}-{str(excludes_str)}-{minsize}-{main_session_only}.parquet"
|
||||||
|
|
||||||
if not force_remote and file_ohlcv.exists():
|
#if matching files with same condition and same or wider date span
|
||||||
ohlcv_df = pd.read_parquet(file_ohlcv, engine='pyarrow')
|
matched_files = list_matching_files(
|
||||||
print("Loaded from agg_cache", file_ohlcv)
|
symbol=symbol,
|
||||||
|
agg_type=str(agg_type),
|
||||||
|
resolution=str(resolution),
|
||||||
|
start_date=start_date,
|
||||||
|
end_date=end_date,
|
||||||
|
excludes_str=str(excludes_str),
|
||||||
|
minsize=minsize,
|
||||||
|
main_session_only=main_session_only
|
||||||
|
)
|
||||||
|
print("matched agg files", len(matched_files))
|
||||||
|
print_matching_files_info(matched_files)
|
||||||
|
|
||||||
|
if not force_remote and len(matched_files) > 0:
|
||||||
|
ohlcv_df = pd.read_parquet(matched_files[0],
|
||||||
|
engine='pyarrow',
|
||||||
|
filters=[('time', '>=', start_date), ('time', '<=', end_date)])
|
||||||
|
print("Loaded from agg_cache", matched_files[0])
|
||||||
return ohlcv_df
|
return ohlcv_df
|
||||||
else:
|
else:
|
||||||
#neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck?
|
#neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck?
|
||||||
df = fetch_trades_parallel(symbol, start_date, end_date, minsize=minsize, exclude_conditions=exclude_conditions, main_session_only=main_session_only, force_remote=force_remote) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])
|
df = fetch_trades_parallel(symbol, start_date, end_date, minsize=minsize, exclude_conditions=exclude_conditions, main_session_only=main_session_only, force_remote=force_remote) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])
|
||||||
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type)
|
ohlcv_df = aggregate_trades_optimized(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type, clear_input = True)
|
||||||
|
|
||||||
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow')
|
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow')
|
||||||
print(f"{symbol} Saved to agg_cache", file_ohlcv)
|
print(f"{symbol} Saved to agg_cache", file_ohlcv)
|
||||||
@ -405,6 +496,11 @@ def load_data(symbol: Union[str, List[str]],
|
|||||||
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
|
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
|
||||||
|
|
||||||
if return_vbt:
|
if return_vbt:
|
||||||
|
try:
|
||||||
|
import vectorbtpro as vbt # Import only when needed
|
||||||
|
except ImportError:
|
||||||
|
raise RuntimeError("vectorbtpro is required for return_vbt. Please install it.")
|
||||||
|
|
||||||
return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY)
|
return vbt.Data.from_data(vbt.symbol_dict(ret_dict_df), tz_convert=zoneNY)
|
||||||
|
|
||||||
return ret_dict_df
|
return ret_dict_df
|
||||||
|
|||||||
2226
ttools/models.py
Normal file
2226
ttools/models.py
Normal file
File diff suppressed because it is too large
Load Diff
315
ttools/utils.py
315
ttools/utils.py
@ -2,8 +2,10 @@ from pathlib import Path
|
|||||||
from enum import Enum
|
from enum import Enum
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
from typing import List, Tuple
|
from typing import List, Tuple
|
||||||
|
import re
|
||||||
import pytz
|
import pytz
|
||||||
import calendar
|
import calendar
|
||||||
|
from ttools.config import AGG_CACHE
|
||||||
import os
|
import os
|
||||||
from alpaca.trading.models import Order, TradeUpdate, Calendar
|
from alpaca.trading.models import Order, TradeUpdate, Calendar
|
||||||
import pandas_market_calendars as mcal
|
import pandas_market_calendars as mcal
|
||||||
@ -26,6 +28,147 @@ def set_verbose(value):
|
|||||||
global verbose
|
global verbose
|
||||||
verbose = value
|
verbose = value
|
||||||
|
|
||||||
|
def parse_filename(filename: str) -> dict:
|
||||||
|
"""Parse filename of AGG_CACHE files into its components using regex.
|
||||||
|
https://claude.ai/chat/b869644b-f542-4812-ad58-d4439c15fa78
|
||||||
|
"""
|
||||||
|
pattern = r"""
|
||||||
|
^
|
||||||
|
([A-Z]+)- # Symbol
|
||||||
|
([^-]+)- # Agg type
|
||||||
|
(\d+)- # Resolution
|
||||||
|
(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # Start date
|
||||||
|
(\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2})- # End date
|
||||||
|
([A-Z0-9]+)- # Excludes string
|
||||||
|
(\d+)- # Minsize
|
||||||
|
(True|False) # Main session flag
|
||||||
|
\.parquet$ # File extension
|
||||||
|
"""
|
||||||
|
match = re.match(pattern, filename, re.VERBOSE)
|
||||||
|
if not match:
|
||||||
|
return None
|
||||||
|
|
||||||
|
try:
|
||||||
|
symbol, agg_type, resolution, start_str, end_str, excludes, minsize, main_session = match.groups()
|
||||||
|
|
||||||
|
return {
|
||||||
|
'symbol': symbol,
|
||||||
|
'agg_type': agg_type,
|
||||||
|
'resolution': resolution,
|
||||||
|
'start_date': datetime.strptime(start_str, '%Y-%m-%dT%H-%M-%S'),
|
||||||
|
'end_date': datetime.strptime(end_str, '%Y-%m-%dT%H-%M-%S'),
|
||||||
|
'excludes_str': excludes,
|
||||||
|
'minsize': int(minsize),
|
||||||
|
'main_session_only': main_session == 'True'
|
||||||
|
}
|
||||||
|
except (ValueError, AttributeError):
|
||||||
|
return None
|
||||||
|
|
||||||
|
def list_matching_files(
|
||||||
|
symbol: str = None,
|
||||||
|
agg_type: str = None,
|
||||||
|
resolution: str = None,
|
||||||
|
start_date: datetime = None,
|
||||||
|
end_date: datetime = None,
|
||||||
|
excludes_str: str = None,
|
||||||
|
minsize: int = None,
|
||||||
|
main_session_only: bool = None
|
||||||
|
) -> list[Path]:
|
||||||
|
"""
|
||||||
|
List all aggregated files in the cache directory matching the specified criteria.
|
||||||
|
If start_date and end_date are provided, returns files that cover this interval
|
||||||
|
(meaning their date range encompasses the requested interval).
|
||||||
|
If a parameter is None, it matches any value for that component.
|
||||||
|
|
||||||
|
Example:
|
||||||
|
```python
|
||||||
|
# Example with all parameters specified
|
||||||
|
specific_files = list_matching_files(
|
||||||
|
symbol="SPY",
|
||||||
|
agg_type="AggType.OHLCV",
|
||||||
|
resolution="12",
|
||||||
|
start_date=datetime(2024, 1, 15, 9, 30),
|
||||||
|
end_date=datetime(2024, 1, 15, 16, 0),
|
||||||
|
excludes_str="4679BCFMOPUVWZ",
|
||||||
|
minsize=100,
|
||||||
|
main_session_only=True
|
||||||
|
)
|
||||||
|
|
||||||
|
print_matching_files_info(specific_files)
|
||||||
|
```
|
||||||
|
"""
|
||||||
|
#make date naive
|
||||||
|
if start_date is not None:
|
||||||
|
start_date = start_date.replace(tzinfo=None)
|
||||||
|
if end_date is not None:
|
||||||
|
end_date = end_date.replace(tzinfo=None)
|
||||||
|
|
||||||
|
agg_cache_dir = AGG_CACHE
|
||||||
|
def matches_criteria(file_info: dict) -> bool:
|
||||||
|
"""Check if file matches all specified criteria."""
|
||||||
|
if not file_info:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check non-date criteria first
|
||||||
|
if symbol is not None and file_info['symbol'] != symbol:
|
||||||
|
return False
|
||||||
|
if agg_type is not None and file_info['agg_type'] != agg_type:
|
||||||
|
return False
|
||||||
|
if resolution is not None and file_info['resolution'] != resolution:
|
||||||
|
return False
|
||||||
|
if excludes_str is not None and file_info['excludes_str'] != excludes_str:
|
||||||
|
return False
|
||||||
|
if minsize is not None and file_info['minsize'] != minsize:
|
||||||
|
return False
|
||||||
|
if main_session_only is not None and file_info['main_session_only'] != main_session_only:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Check date range coverage if both dates are provided
|
||||||
|
if start_date is not None and end_date is not None:
|
||||||
|
return (file_info['start_date'] <= start_date and
|
||||||
|
file_info['end_date'] >= end_date)
|
||||||
|
|
||||||
|
# If only start_date is provided
|
||||||
|
if start_date is not None:
|
||||||
|
return file_info['end_date'] >= start_date
|
||||||
|
|
||||||
|
# If only end_date is provided
|
||||||
|
if end_date is not None:
|
||||||
|
return file_info['start_date'] <= end_date
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Process all files
|
||||||
|
matching_files = []
|
||||||
|
for file_path in agg_cache_dir.iterdir():
|
||||||
|
if not file_path.is_file() or not file_path.name.endswith('.parquet'):
|
||||||
|
continue
|
||||||
|
|
||||||
|
file_info = parse_filename(file_path.name)
|
||||||
|
if matches_criteria(file_info):
|
||||||
|
matching_files.append((file_path, file_info))
|
||||||
|
|
||||||
|
# Sort files by start date and then end date
|
||||||
|
matching_files.sort(key=lambda x: (x[1]['start_date'], x[1]['end_date']))
|
||||||
|
|
||||||
|
# Return just the file paths
|
||||||
|
return [f[0] for f in matching_files]
|
||||||
|
|
||||||
|
def print_matching_files_info(files: list[Path]):
|
||||||
|
"""Helper function to print detailed information about matching files."""
|
||||||
|
for file_path in files:
|
||||||
|
file_info = parse_filename(file_path.name)
|
||||||
|
if file_info:
|
||||||
|
print(f"\nFile: {file_path.name}")
|
||||||
|
print(f"Coverage: {file_info['start_date']} to {file_info['end_date']}")
|
||||||
|
print(f"Symbol: {file_info['symbol']}")
|
||||||
|
print(f"Agg Type: {file_info['agg_type']}")
|
||||||
|
print(f"Resolution: {file_info['resolution']}")
|
||||||
|
print(f"Excludes: {file_info['excludes_str']}")
|
||||||
|
print(f"Minsize: {file_info['minsize']}")
|
||||||
|
print(f"Main Session Only: {file_info['main_session_only']}")
|
||||||
|
print("-" * 80)
|
||||||
|
|
||||||
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
|
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
|
||||||
"""
|
"""
|
||||||
Fetches the trading schedule for the NYSE (New York Stock Exchange) between the specified start and end dates.
|
Fetches the trading schedule for the NYSE (New York Stock Exchange) between the specified start and end dates.
|
||||||
@ -109,33 +252,6 @@ def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tupl
|
|||||||
|
|
||||||
return ranges
|
return ranges
|
||||||
|
|
||||||
|
|
||||||
def find_dotenv():
|
|
||||||
"""
|
|
||||||
Searches for a .env file in the given directory or its parents and returns the path.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
start_path: The directory to start searching from.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Path to the .env file if found, otherwise None.
|
|
||||||
"""
|
|
||||||
try:
|
|
||||||
start_path = __file__
|
|
||||||
except NameError:
|
|
||||||
#print("Notebook probably")
|
|
||||||
start_path = os.getcwd()
|
|
||||||
#print(start_path)
|
|
||||||
|
|
||||||
current_path = Path(start_path)
|
|
||||||
for _ in range(10): # Limit search depth to 5 levels
|
|
||||||
dotenv_path = current_path / '.env'
|
|
||||||
if dotenv_path.exists():
|
|
||||||
return dotenv_path
|
|
||||||
current_path = current_path.parent
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
#create enum AGG_TYPE
|
#create enum AGG_TYPE
|
||||||
class AggType(str, Enum):
|
class AggType(str, Enum):
|
||||||
"""
|
"""
|
||||||
@ -157,4 +273,147 @@ class StartBarAlign(str, Enum):
|
|||||||
RANDOM = first bar starts when first trade occurs
|
RANDOM = first bar starts when first trade occurs
|
||||||
"""
|
"""
|
||||||
ROUND = "round"
|
ROUND = "round"
|
||||||
RANDOM = "random"
|
RANDOM = "random"
|
||||||
|
|
||||||
|
def compare_dataframes(df1, df2, name1="DataFrame 1", name2="DataFrame 2", check_dtype=True):
|
||||||
|
"""
|
||||||
|
Compare two DataFrames and provide detailed analysis of their differences.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
-----------
|
||||||
|
df1, df2 : pandas.DataFrame
|
||||||
|
The DataFrames to compare
|
||||||
|
name1, name2 : str
|
||||||
|
Names to identify the DataFrames in the output
|
||||||
|
check_dtype : bool
|
||||||
|
Whether to check if dtypes match for columns
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
--------
|
||||||
|
bool
|
||||||
|
True if DataFrames are identical (based on check_dtype parameter)
|
||||||
|
dict
|
||||||
|
Detailed comparison results
|
||||||
|
"""
|
||||||
|
results = {
|
||||||
|
'are_equal': False,
|
||||||
|
'shape_match': False,
|
||||||
|
'column_match': False,
|
||||||
|
'index_match': False,
|
||||||
|
'dtype_match': False,
|
||||||
|
'content_match': False,
|
||||||
|
'differences': {}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Shape comparison
|
||||||
|
if df1.shape != df2.shape:
|
||||||
|
results['differences']['shape'] = {
|
||||||
|
name1: df1.shape,
|
||||||
|
name2: df2.shape
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
results['shape_match'] = True
|
||||||
|
|
||||||
|
# Column comparison
|
||||||
|
cols1 = set(df1.columns)
|
||||||
|
cols2 = set(df2.columns)
|
||||||
|
if cols1 != cols2:
|
||||||
|
results['differences']['columns'] = {
|
||||||
|
f'unique_to_{name1}': list(cols1 - cols2),
|
||||||
|
f'unique_to_{name2}': list(cols2 - cols1),
|
||||||
|
'common': list(cols1 & cols2)
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
results['column_match'] = True
|
||||||
|
|
||||||
|
# Index comparison
|
||||||
|
idx1 = set(df1.index)
|
||||||
|
idx2 = set(df2.index)
|
||||||
|
if idx1 != idx2:
|
||||||
|
results['differences']['index'] = {
|
||||||
|
f'unique_to_{name1}': list(idx1 - idx2),
|
||||||
|
f'unique_to_{name2}': list(idx2 - idx1),
|
||||||
|
'common': list(idx1 & idx2)
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
results['index_match'] = True
|
||||||
|
|
||||||
|
# dtype comparison
|
||||||
|
if check_dtype and results['column_match']:
|
||||||
|
dtype_diff = {}
|
||||||
|
for col in cols1:
|
||||||
|
if df1[col].dtype != df2[col].dtype:
|
||||||
|
dtype_diff[col] = {
|
||||||
|
name1: str(df1[col].dtype),
|
||||||
|
name2: str(df2[col].dtype)
|
||||||
|
}
|
||||||
|
if dtype_diff:
|
||||||
|
results['differences']['dtypes'] = dtype_diff
|
||||||
|
else:
|
||||||
|
results['dtype_match'] = True
|
||||||
|
|
||||||
|
# Content comparison (only for matching columns and indices)
|
||||||
|
if results['column_match'] and results['index_match']:
|
||||||
|
common_cols = list(cols1)
|
||||||
|
common_idx = list(idx1)
|
||||||
|
|
||||||
|
value_diff = {}
|
||||||
|
for col in common_cols:
|
||||||
|
# Compare values
|
||||||
|
if not df1[col].equals(df2[col]):
|
||||||
|
# Find specific differences
|
||||||
|
mask = df1[col] != df2[col]
|
||||||
|
if any(mask):
|
||||||
|
diff_indices = df1.index[mask]
|
||||||
|
value_diff[col] = {
|
||||||
|
'different_at_indices': list(diff_indices),
|
||||||
|
'sample_differences': {
|
||||||
|
str(idx): {
|
||||||
|
name1: df1.loc[idx, col],
|
||||||
|
name2: df2.loc[idx, col]
|
||||||
|
} for idx in list(diff_indices)[:5] # Show first 5 differences
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if value_diff:
|
||||||
|
results['differences']['values'] = value_diff
|
||||||
|
else:
|
||||||
|
results['content_match'] = True
|
||||||
|
|
||||||
|
# Overall equality
|
||||||
|
results['are_equal'] = all([
|
||||||
|
results['shape_match'],
|
||||||
|
results['column_match'],
|
||||||
|
results['index_match'],
|
||||||
|
results['content_match'],
|
||||||
|
(results['dtype_match'] if check_dtype else True)
|
||||||
|
])
|
||||||
|
|
||||||
|
# Print summary
|
||||||
|
print(f"\nComparison Summary of {name1} vs {name2}:")
|
||||||
|
print(f"Shape Match: {results['shape_match']} ({df1.shape} vs {df2.shape})")
|
||||||
|
print(f"Column Match: {results['column_match']}")
|
||||||
|
print(f"Index Match: {results['index_match']}")
|
||||||
|
print(f"Dtype Match: {results['dtype_match']}" if check_dtype else "Dtype Check: Skipped")
|
||||||
|
print(f"Content Match: {results['content_match']}")
|
||||||
|
print(f"\nOverall Equal: {results['are_equal']}")
|
||||||
|
|
||||||
|
# Print detailed differences if any
|
||||||
|
if not results['are_equal']:
|
||||||
|
print("\nDetailed Differences:")
|
||||||
|
for diff_type, diff_content in results['differences'].items():
|
||||||
|
print(f"\n{diff_type.upper()}:")
|
||||||
|
if diff_type == 'values':
|
||||||
|
print(f"Number of columns with differences: {len(diff_content)}")
|
||||||
|
for col, details in diff_content.items():
|
||||||
|
print(f"\nColumn '{col}':")
|
||||||
|
print(f"Number of different values: {len(details['different_at_indices'])}")
|
||||||
|
print("First few differences:")
|
||||||
|
for idx, vals in details['sample_differences'].items():
|
||||||
|
print(f" At index {idx}:")
|
||||||
|
print(f" {name1}: {vals[name1]}")
|
||||||
|
print(f" {name2}: {vals[name2]}")
|
||||||
|
else:
|
||||||
|
print(diff_content)
|
||||||
|
|
||||||
|
return results['are_equal'], results
|
||||||
|
|||||||
Reference in New Issue
Block a user