loader/aggregator added

This commit is contained in:
David Brazda
2024-10-30 08:35:15 +01:00
parent eee8de727c
commit 08f06456b7
10 changed files with 2094 additions and 96 deletions

View File

@ -7,6 +7,86 @@ A Python library for tools, utilities, and helpers for my trading research workf
pip install git+https://github.com/drew2323/ttools.git pip install git+https://github.com/drew2323/ttools.git
``` ```
Modules: Modules:
# loaders
Loads trades, aggregates data based on requested aggregation type (time based, dollars, volume bars, renkos...). And manages trade cache (daily trade files per symbol) and aggregation cache (per symbola and requested period).
Detailed examples in `tests/data_loader_tryme.ipynb`
## load_data
Returns vectorized aggregation of given type. If trades for given period are not cached they are remotely fetched from Alpaca first.
Example:
```python
#This is how to call LOAD function
symbol = ["BAC"]
#datetime in zoneNY
day_start = datetime(2024, 10, 14, 9, 45, 0)
day_stop = datetime(2024, 10, 16, 15, 1, 0)
day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop)
#requested AGG
resolution = 1
agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO
exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults
minsize = 100
main_session_only = True
force_remote = True
ohlcv_df = load_data(symbol = symbol,
agg_type = agg_type,
resolution = resolution,
start_date = day_start,
end_date = day_stop,
#exclude_conditions = None,
minsize = 100,
main_session_only = True,
force_remote = False
)
bac_df = ohlcv_df["BAC"]
basic_data = vbt.Data.from_data(vbt.symbol_dict(ohlcv_df), tz_convert=zoneNY)
vbt.settings['plotting']['auto_rangebreaks'] = True
basic_data.ohlcv.plot()
```
## prepare trade cache
To prepare daily trade cache files for given period.
If they are not present in cache, they are fetched.
`force_remote` refetches from remote, even when exists in cache.
```python
from ttools.loaders import prepare_trade_cache
symbols = ["BAC", "AAPL"]
#datetime in zoneNY
day_start = datetime(2024, 10, 1, 9, 45, 0)
day_stop = datetime(2024, 10, 14, 15, 1, 0)
day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop)
force_remote = False
prepare_trade_cache(symbols, day_start, day_stop, force_remote)
```
### Prepare daily trade cache - cli script
Daily trade cache data can be fetched for given period by CLI script, that can run in the background.
Note: To fetch 1 day takes about 35s. It is stored in /tradescache/ directory as daily file keyed by symbol.
To run this script in the background with specific arguments:
```bash
# Running without forcing remote fetch
python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 2024-10-18 &
# Running with force_remote set to True
python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 2024-10-18 --force_remote &
```
# vbtutils # vbtutils
Contains helpers for vbtpro Contains helpers for vbtpro

View File

@ -1,31 +1,95 @@
alpaca-py==0.31.0
annotated-types==0.7.0
appdirs==1.4.4
appnope==0.1.4
asttokens==2.4.1
attrs==24.2.0 attrs==24.2.0
bottle==0.13.2
certifi==2024.8.30 certifi==2024.8.30
charset-normalizer==3.4.0 charset-normalizer==3.4.0
comm==0.2.2
contourpy==1.3.0
cycler==0.12.1
dateparser==1.2.0 dateparser==1.2.0
debugpy==1.8.7
decorator==5.1.1
exceptiongroup==1.2.2
exchange_calendars==4.5.6
executing==2.1.0
fonttools==4.54.1
humanize==4.11.0 humanize==4.11.0
idna==3.10 idna==3.10
imageio==2.35.1 imageio==2.35.1
ipykernel==6.29.5
ipython==8.28.0
ipywidgets==8.1.5
jedi==0.19.1
joblib==1.4.2 joblib==1.4.2
jupyter_client==8.6.3
jupyter_core==5.7.2
jupyterlab_widgets==3.0.13
kiwisolver==1.4.7
korean-lunar-calendar==0.3.1
lightweight_charts @ git+https://github.com/drew2323/lightweight-charts-python.git@b3368578320b811f25df3d664ae1bb53069c31cd
llvmlite==0.39.1 llvmlite==0.39.1
matplotlib==3.9.2
matplotlib-inline==0.1.7
msgpack==1.1.0
mypy-extensions==1.0.0 mypy-extensions==1.0.0
nest-asyncio==1.6.0
numba==0.56.4 numba==0.56.4
numpy==1.23.5 numpy==1.23.5
orjson==3.10.10
packaging==24.1
pandas==2.2.3 pandas==2.2.3
pandas_market_calendars==4.4.1
parso==0.8.4
pexpect==4.9.0
pillow==10.4.0 pillow==10.4.0
platformdirs==4.3.6
plotly==5.24.1
prompt_toolkit==3.0.48
proxy-tools==0.1.0
psutil==6.1.0
ptyprocess==0.7.0
pure_eval==0.2.3
pyarrow==18.0.0
pydantic==2.9.2
pydantic_core==2.23.4
Pygments==2.18.0
pyluach==2.2.0
pyobjc-core==10.3.1
pyobjc-framework-Cocoa==10.3.1
pyobjc-framework-Quartz==10.3.1
pyobjc-framework-Security==10.3.1
pyobjc-framework-WebKit==10.3.1
pyparsing==3.2.0
python-dateutil==2.9.0.post0 python-dateutil==2.9.0.post0
python-dotenv==1.0.1
pytz==2024.2 pytz==2024.2
pywebview==5.3.2
pyzmq==26.2.0
regex==2024.9.11 regex==2024.9.11
requests==2.32.3 requests==2.32.3
schedule==1.2.2 schedule==1.2.2
scikit-learn==1.5.2 scikit-learn==1.5.2
scipy==1.14.1 scipy==1.14.1
six==1.16.0 six==1.16.0
sseclient-py==1.8.0
stack-data==0.6.3
tenacity==9.0.0
threadpoolctl==3.5.0 threadpoolctl==3.5.0
tomli==2.0.2 tomli==2.0.2
toolz==1.0.0
tornado==6.4.1
tqdm==4.66.5 tqdm==4.66.5
-e git+https://github.com/drew2323/ttools.git@6cfee7aa4c9f1c13efdf2b9dcb54d1ae2106d73d#egg=ttools traitlets==5.14.3
typing_extensions==4.12.2
tzdata==2024.2 tzdata==2024.2
tzlocal==5.2 tzlocal==5.2
urllib3==2.2.3 urllib3==2.2.3
vectorbtpro @ file:///Users/davidbrazda/Downloads/vectorbtpro-2024.6.19-py3-none-any.whl#sha256=5e202f62aeb7697bb1411302e1c062d1df4fed5de5e1c2e8867eac07263c2d4a vectorbtpro
wcwidth==0.2.13
websocket-client==1.8.0 websocket-client==1.8.0
websockets==13.1
widgetsnbextension==4.0.13

View File

@ -0,0 +1,47 @@
from ttools.utils import zoneNY
from datetime import timedelta, datetime
from ttools.loaders import prepare_trade_cache
import argparse
"""
Python script prepares trade cache for specified symbols and date range.
Usually 1 day takes about 35s. It is stored in /tradescache/ directory as daily file keyed by symbol.
To run this script in the background with specific arguments:
```bash
# Running without forcing remote fetch
python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 2024-10-18 &
# Running with force_remote set to True
python3 prepare_cache.py --symbols BAC AAPL --day_start 2024-10-14 --day_stop 2024-10-18 --force_remote &
```
"""
def main(symbols, day_start, day_stop, force_remote=False):
# Convert day_start and day_stop to datetime objects in zoneNY
day_start = zoneNY.localize(datetime.strptime(day_start, "%Y-%m-%d") + timedelta(hours=9, minutes=45))
day_stop = zoneNY.localize(datetime.strptime(day_stop, "%Y-%m-%d") + timedelta(hours=15, minutes=1))
prepare_trade_cache(symbols, day_start, day_stop, force_remote=force_remote)
if __name__ == "__main__":
# Argument parser
parser = argparse.ArgumentParser(description="Prepare cache with trades for specified symbols and date range.")
parser.add_argument(
"-s", "--symbols", nargs="+", required=True, help="List of symbols to prepare cache for (e.g., BAC AAPL MSFT)"
)
parser.add_argument(
"-start", "--day_start", type=str, required=True, help="Start date in format YYYY-MM-DD"
)
parser.add_argument(
"-end", "--day_stop", type=str, required=True, help="End date in format YYYY-MM-DD"
)
parser.add_argument(
"-f", "--force_remote", action="store_true", help="Set this flag to force remote data fetch (default: False)"
)
args = parser.parse_args()
# Run main function with parsed arguments
main(args.symbols, args.day_start, args.day_stop, force_remote=args.force_remote)

View File

@ -2,10 +2,9 @@ from setuptools import setup, find_packages
setup( setup(
name='ttools', name='ttools',
version='0.4.2', version='0.5.0',
packages=find_packages(), packages=find_packages(),
install_requires=[ install_requires=[
'vectorbtpro',
# list your dependencies here # list your dependencies here
], ],
) #find_packages() )

File diff suppressed because one or more lines are too long

View File

@ -1,3 +1,4 @@
from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell from .vbtutils import AnchoredIndicator, create_mask_from_window, isrising, isfalling, isrisingc, isfallingc, trades2entries_exits, figs2cell
from .vbtindicators import register_custom_inds from .vbtindicators import register_custom_inds
from .utils import find_dotenv, AGG_TYPE, zoneNY, zonePRG, zoneUTC from .utils import find_dotenv, AggType, zoneNY, zonePRG, zoneUTC
from .loaders import load_data, prepare_trade_cache

View File

@ -0,0 +1,351 @@
import pandas as pd
import numpy as np
from numba import jit
from ttools.utils import zoneNY, AggType
""""
Module used for vectorized aggregation of trades.
Includes fetch (remote/cached) methods and numba aggregator function for TIME BASED, VOLUME BASED and DOLLAR BARS
"""""
def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV):
""""
Accepts dataframe with trades keyed by symbol. Preparess dataframe to
numpy and calls Numba optimized aggregator for given bar type. (time/volume/dollar)
"""""
#trades_df = trades_df.loc[symbol] no symbol keyed df
trades_df= trades_df.reset_index()
ticks = trades_df[['t', 'p', 's']].to_numpy()
# Extract the timestamps column (assuming it's the first column)
timestamps = ticks[:, 0]
# Convert the timestamps to Unix timestamps in seconds with microsecond precision
unix_timestamps_s = np.array([ts.timestamp() for ts in timestamps], dtype='float64')
# Replace the original timestamps in the NumPy array with the converted Unix timestamps
ticks[:, 0] = unix_timestamps_s
ticks = ticks.astype(np.float64)
#based on type, specific aggregator function is called
match type:
case AggType.OHLCV:
ohlcv_bars = generate_time_bars_nb(ticks, resolution)
case AggType.OHLCV_VOL:
ohlcv_bars = generate_volume_bars_nb(ticks, resolution)
case AggType.OHLCV_DOL:
ohlcv_bars = generate_dollar_bars_nb(ticks, resolution)
case _:
raise ValueError("Invalid AggType type. Supported types are 'time', 'volume' and 'dollar'.")
# Convert the resulting array back to a DataFrame
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades']
if type == AggType.OHLCV_DOL:
columns.append('amount')
columns.append('updated')
if type == AggType.OHLCV:
columns.append('vwap')
columns.append('buyvolume')
columns.append('sellvolume')
if type == AggType.OHLCV_VOL:
columns.append('buyvolume')
columns.append('sellvolume')
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
#print(ohlcv_df['updated'])
ohlcv_df['updated'] = pd.to_datetime(ohlcv_df['updated'], unit="s").dt.tz_localize('UTC').dt.tz_convert(zoneNY)
# Round to microseconds to maintain six decimal places
ohlcv_df['updated'] = ohlcv_df['updated'].dt.round('us')
ohlcv_df.set_index('time', inplace=True)
#ohlcv_df.index = ohlcv_df.index.tz_localize('UTC').tz_convert(zoneNY)
return ohlcv_df
@jit(nopython=True)
def generate_dollar_bars_nb(ticks, amount_per_bar):
""""
Generates Dollar based bars from ticks.
There is also simple prevention of aggregation from different days
as described here https://chatgpt.com/c/17804fc1-a7bc-495d-8686-b8392f3640a2
Downside: split days by UTC (which is ok for main session, but when extended hours it should be reworked by preprocessing new column identifying session)
When trade is split into multiple bars it is counted as trade in each of the bars.
Other option: trade count can be proportionally distributed by weight (0.2 to 1st bar, 0.8 to 2nd bar) - but this is not implemented yet
https://chatgpt.com/c/ff4802d9-22a2-4b72-8ab7-97a91e7a515f
"""""
ohlcv_bars = []
remaining_amount = amount_per_bar
# Initialize bar values based on the first tick to avoid uninitialized values
open_price = ticks[0, 1]
high_price = ticks[0, 1]
low_price = ticks[0, 1]
close_price = ticks[0, 1]
volume = 0
trades_count = 0
current_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
bar_time = ticks[0, 0] # Initialize bar time with the time of the first tick
for tick in ticks:
tick_time = tick[0]
price = tick[1]
tick_volume = tick[2]
tick_amount = price * tick_volume
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
# Check if the new tick is from a different day, then close the current bar
if tick_day != current_day:
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, amount_per_bar, tick_time])
# Reset for the new day using the current tick data
open_price = price
high_price = price
low_price = price
close_price = price
volume = 0
trades_count = 0
remaining_amount = amount_per_bar
current_day = tick_day
bar_time = tick_time
# Start new bar if needed because of the dollar value
while tick_amount > 0:
if tick_amount < remaining_amount:
# Add the entire tick to the current bar
high_price = max(high_price, price)
low_price = min(low_price, price)
close_price = price
volume += tick_volume
remaining_amount -= tick_amount
trades_count += 1
tick_amount = 0
else:
# Calculate the amount of volume that fits within the remaining dollar amount
volume_to_add = remaining_amount / price
volume += volume_to_add # Update the volume here before appending and resetting
# Append the partially filled bar to the list
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count + 1, amount_per_bar, tick_time])
# Fill the current bar and continue with a new bar
tick_volume -= volume_to_add
tick_amount -= remaining_amount
# Reset bar values for the new bar using the current tick data
open_price = price
high_price = price
low_price = price
close_price = price
volume = 0 # Reset volume for the new bar
trades_count = 0
remaining_amount = amount_per_bar
# Increment bar time if splitting a trade
if tick_volume > 0: #pokud v tradu je jeste zbytek nastavujeme cas o nanosekundu vetsi
bar_time = tick_time + 1e-6
else:
bar_time = tick_time #jinak nastavujeme cas ticku
#bar_time = tick_time
# Add the last bar if it contains any trades
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, amount_per_bar, tick_time])
return np.array(ohlcv_bars)
@jit(nopython=True)
def generate_volume_bars_nb(ticks, volume_per_bar):
""""
Generates Volume based bars from ticks.
NOTE: UTC day split here (doesnt aggregate trades from different days)
but realized from UTC (ok for main session) - but needs rework for extension by preprocessing ticks_df and introduction sesssion column
When trade is split into multiple bars it is counted as trade in each of the bars.
Other option: trade count can be proportionally distributed by weight (0.2 to 1st bar, 0.8 to 2nd bar) - but this is not implemented yet
https://chatgpt.com/c/ff4802d9-22a2-4b72-8ab7-97a91e7a515f
"""""
ohlcv_bars = []
remaining_volume = volume_per_bar
# Initialize bar values based on the first tick to avoid uninitialized values
open_price = ticks[0, 1]
high_price = ticks[0, 1]
low_price = ticks[0, 1]
close_price = ticks[0, 1]
volume = 0
trades_count = 0
current_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
bar_time = ticks[0, 0] # Initialize bar time with the time of the first tick
buy_volume = 0 # Volume of buy trades
sell_volume = 0 # Volume of sell trades
prev_price = ticks[0, 1] # Initialize previous price for the first tick
for tick in ticks:
tick_time = tick[0]
price = tick[1]
tick_volume = tick[2]
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
# Check if the new tick is from a different day, then close the current bar
if tick_day != current_day:
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
# Reset for the new day using the current tick data
open_price = price
high_price = price
low_price = price
close_price = price
volume = 0
trades_count = 0
remaining_volume = volume_per_bar
current_day = tick_day
bar_time = tick_time # Update bar time to the current tick time
buy_volume = 0
sell_volume = 0
# Reset previous tick price (calulating imbalance for each day from the start)
prev_price = price
# Start new bar if needed because of the volume
while tick_volume > 0:
if tick_volume < remaining_volume:
# Add the entire tick to the current bar
high_price = max(high_price, price)
low_price = min(low_price, price)
close_price = price
volume += tick_volume
remaining_volume -= tick_volume
trades_count += 1
# Update buy and sell volumes
if price > prev_price:
buy_volume += tick_volume
elif price < prev_price:
sell_volume += tick_volume
tick_volume = 0
else:
# Fill the current bar and continue with a new bar
volume_to_add = remaining_volume
volume += volume_to_add
tick_volume -= volume_to_add
trades_count += 1
# Update buy and sell volumes
if price > prev_price:
buy_volume += volume_to_add
elif price < prev_price:
sell_volume += volume_to_add
# Append the completed bar to the list
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
# Reset bar values for the new bar using the current tick data
open_price = price
high_price = price
low_price = price
close_price = price
volume = 0
trades_count = 0
remaining_volume = volume_per_bar
buy_volume = 0
sell_volume = 0
# Increment bar time if splitting a trade
if tick_volume > 0: # If there's remaining volume in the trade, set bar time slightly later
bar_time = tick_time + 1e-6
else:
bar_time = tick_time # Otherwise, set bar time to the tick time
prev_price = price
# Add the last bar if it contains any trades
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
return np.array(ohlcv_bars)
@jit(nopython=True)
def generate_time_bars_nb(ticks, resolution):
# Initialize the start and end time
start_time = np.floor(ticks[0, 0] / resolution) * resolution
end_time = np.floor(ticks[-1, 0] / resolution) * resolution
# # Calculate number of bars
# num_bars = int((end_time - start_time) // resolution + 1)
# Using a list to append data only when trades exist
ohlcv_bars = []
# Variables to track the current bar
current_bar_index = -1
open_price = 0
high_price = -np.inf
low_price = np.inf
close_price = 0
volume = 0
trades_count = 0
vwap_cum_volume_price = 0 # Cumulative volume * price
cum_volume = 0 # Cumulative volume for VWAP
buy_volume = 0 # Volume of buy trades
sell_volume = 0 # Volume of sell trades
prev_price = ticks[0, 1] # Initialize previous price for the first tick
prev_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
for tick in ticks:
curr_time = tick[0] #updated time
tick_time = np.floor(tick[0] / resolution) * resolution
price = tick[1]
tick_volume = tick[2]
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
#if the new tick is from a new day, reset previous tick price (calculating imbalance starts over)
if tick_day != prev_day:
prev_price = price
prev_day = tick_day
# Check if the tick belongs to a new bar
if tick_time != start_time + current_bar_index * resolution:
if current_bar_index >= 0 and trades_count > 0: # Save the previous bar if trades happened
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
# Reset bar values
current_bar_index = int((tick_time - start_time) / resolution)
open_price = price
high_price = price
low_price = price
volume = 0
trades_count = 0
vwap_cum_volume_price = 0
cum_volume = 0
buy_volume = 0
sell_volume = 0
# Update the OHLCV values for the current bar
high_price = max(high_price, price)
low_price = min(low_price, price)
close_price = price
volume += tick_volume
trades_count += 1
vwap_cum_volume_price += price * tick_volume
cum_volume += tick_volume
# Update buy and sell volumes
if price > prev_price:
buy_volume += tick_volume
elif price < prev_price:
sell_volume += tick_volume
prev_price = price
# Save the last processed bar
if trades_count > 0:
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
return np.array(ohlcv_bars)
# Example usage
if __name__ == '__main__':
pass
#example in agg_vect.ipynb

View File

@ -1,30 +1,33 @@
from dotenv import load_dotenv from dotenv import load_dotenv
from appdirs import user_data_dir from appdirs import user_data_dir
from ttools.utils import find_dotenv, AGG_TYPE, RecordType, StartBarAlign, zoneNY, zonePRG, zoneUTC from ttools.utils import find_dotenv
import os import os
import pytz import pytz
import vectorbtpro as vbt
import pytz
from pathlib import Path
from dotenv import load_dotenv
import os
#Trade can be shared with v2realbot, agg cache not (we use df, but v2realbot uses Queue - will be changed in the future, when vectorized agg is added to v2realbot) ENV_FILE = find_dotenv()
DATA_DIR = user_data_dir("v2realbot", False) # or any subfolder, if not sharing cache with v2realbot
LOCAL_TRADE_CACHE = DATA_DIR + "/tradecache:new/" # +daily_file
LOCAL_AGG_CACHE = DATA_DIR + "/aggcache_new/" #+ cache_file
RECTYPE = "BAR"
#AGG conditions -defaults
EXCLUDE_CONDITIONS = ['C','O','4','B','7','V','P','W','U','Z','F']
MINSIZE = 100
RECORD_TYPE = RecordType.BAR #loader supports only BAR type (no cbars)
#Load env variables #NALOADUJEME DOTENV ENV VARIABLES
ENV_FILE = find_dotenv(__file__)
print(ENV_FILE)
if load_dotenv(ENV_FILE, verbose=True) is False: if load_dotenv(ENV_FILE, verbose=True) is False:
print(f"Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.") print(f"Error loading.env file {ENV_FILE}. Now depending on ENV VARIABLES set externally.")
else: else:
print(f"Loaded env variables from file {ENV_FILE}") print(f"Loaded env variables from file {ENV_FILE}")
#Alpaca accounts ACCOUNT1_LIVE_API_KEY = os.environ.get('ACCOUNT1_LIVE_API_KEY')
ACCOUNT1_LIVE_API_KEY = os.getenv('ACCOUNT1_LIVE_API_KEY') ACCOUNT1_LIVE_SECRET_KEY = os.environ.get('ACCOUNT1_LIVE_SECRET_KEY')
ACCOUNT1_LIVE_SECRET_KEY = os.getenv('ACCOUNT1_LIVE_SECRET_KEY') DATA_DIR_NAME = os.environ.get('DATA_DIR_NAME', "ttools") #folder in datadir
ACCOUNT1_PAPER_API_KEY = os.getenv('ACCOUNT1_PAPER_API_KEY')
ACCOUNT1_PAPER_SECRET_KEY = os.getenv('ACCOUNT1_PAPER_SECRET_KEY') DATA_DIR = user_data_dir(DATA_DIR_NAME, False)
TRADE_CACHE = Path(DATA_DIR)/"tradecache"
AGG_CACHE = Path(DATA_DIR)/"aggcache"
zoneNY = pytz.timezone('US/Eastern')
#AGG conditions -defaults
EXCLUDE_CONDITIONS = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6']
#added 9 - correction, M - Official Close, T- extended hours, 6 - Cancel Trade
MINSIZE = 100

View File

@ -4,53 +4,456 @@ from dotenv import load_dotenv
from appdirs import user_data_dir from appdirs import user_data_dir
from ttools.utils import find_dotenv from ttools.utils import find_dotenv
from ttools.config import * from ttools.config import *
import os
from datetime import datetime from datetime import datetime
from alpaca.data.historical import StockHistoricalDataClient
import pandas as pd
from ttools.config import TRADE_CACHE, AGG_CACHE, EXCLUDE_CONDITIONS, ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY
from alpaca.data.requests import StockTradesRequest
import time as time_module
from traceback import format_exc
from datetime import timedelta, datetime, time
from time import time as timetime
from concurrent.futures import ThreadPoolExecutor
from alpaca.data.enums import DataFeed
import random
from ttools.utils import AggType, fetch_calendar_data
from tqdm import tqdm
import threading
from typing import List, Union
from ttools.aggregator_vectorized import aggregate_trades
print(DATA_DIR) """
Module for fetching stock data. Supports
1) cache management
- Trade Cache - daily files for all trades of that day
- Agg Cache - cache of whole requested period identified by aggtype, resolution etc.
def load_data(symbol: Union[str, list], day_start: datetime, day_end: datetime, agg_type: AGG_TYPE, resolution: Union[str, int], excludes: list = EXCLUDE_CONDITIONS, minsize: int = MINSIZE, ext_hours: bool = False, align: StartBarAlign =StartBarAlign.ROUND, as_df: bool = False, force_reload: bool = False) -> None: 2) custom vectorized aggregation of trades
- time based OHLCV
- volume OHLCV
- dollar OHLCV
- renko OHCLV
"""
trade_cache_lock = threading.Lock()
# Function to ensure fractional seconds are present
def ensure_fractional_seconds(timestamp):
if '.' not in timestamp:
# Inserting .000000 before the timezone indicator 'Z'
return timestamp.replace('Z', '.000000Z')
else:
return timestamp
def convert_dict_to_multiindex_df(tradesResponse, rename_labels = True, keep_symbols=True):
""""
Converts dictionary from cache or from remote (raw input) to multiindex dataframe.
with microsecond precision (from nanoseconds in the raw data)
keep_symbols - if true, then output is multiindex indexed by symbol. Otherwise, symbol is removed and output is simple df
"""""
# Create a DataFrame for each key and add the key as part of the MultiIndex
dfs = []
for key, values in tradesResponse.items():
df = pd.DataFrame(values)
# Rename columns
# Select and order columns explicitly
#print(df)
df = df[['t', 'x', 'p', 's', 'i', 'c','z']]
if rename_labels:
df.rename(columns={'t': 'timestamp', 'c': 'conditions', 'p': 'price', 's': 'size', 'x': 'exchange', 'z':'tape', 'i':'id'}, inplace=True)
timestamp_col = 'timestamp'
else:
timestamp_col = 't'
df['symbol'] = key # Add ticker as a column
# Apply the function to ensure all timestamps have fractional seconds
#zvazit zda toto ponechat a nebo dat jen pri urcitem erroru pri to_datetime
#pripadne pak pridelat efektivnejsi pristup, aneb nahrazeni NaT - https://chatgpt.com/c/d2be6f87-b38f-4050-a1c6-541d100b1474
df[timestamp_col] = df[timestamp_col].apply(ensure_fractional_seconds)
df[timestamp_col] = pd.to_datetime(df[timestamp_col], errors='coerce') # Convert 't' from string to datetime before setting it as an index
#Adjust to microsecond precision
df.loc[df[timestamp_col].notna(), timestamp_col] = df[timestamp_col].dt.floor('us')
df.set_index(['symbol', timestamp_col], inplace=True) # Set the multi-level index using both 'ticker' and 't'
df = df.tz_convert(zoneNY, level=timestamp_col)
dfs.append(df)
# Concatenate all DataFrames into a single DataFrame with MultiIndex
final_df = pd.concat(dfs)
if keep_symbols is False:
final_df.reset_index(inplace=True) # Reset index to remove MultiIndex levels, making them columns
final_df.drop(columns=['symbol'], inplace=True) #remove symbol column
final_df.set_index(timestamp_col, inplace=True) #reindex by timestamp
return final_df
def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = None, exclude_conditions = None, minsize = None, main_session_only = True, symbol_included=True):
""" """
Returns requested aggregated data for give symbol(s) Filters trade dataframe based on start and end, main_session and also applies exclude_conditions and minsize filtering if required.
- if requested agg data already exists in cache, returns it
- otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache Parameters:
df: pd.DataFrame
start: datetime
end: datetime
exclude_conditions: list of string conditions to exclude from the data
minsize: minimum size of trade to be included in the data
main_session_only: boolean, if True, only trades between 9:30 and 15:40 are included
symbol_included: boolean, if True, DataFrame contains symbol (tbd dynamic)
Returns:
df: pd.DataFrame
"""
# 9:30 to 16:00
if main_session_only:
if symbol_included:
# Create a mask to filter rows within the specified time range
mask = (df.index.get_level_values('t') >= time(9, 30)) & \
(df.index.get_level_values('t') < time(16, 0))
df = df[mask]
else:
df = df.between_time("9:30","16:00")#TODO adapt to market type
#REQUIRED FILTERING
# Create a mask to filter rows within the specified time range
if start is not None and end is not None:
print(f"filtering {start.time()} {end.time()}")
if symbol_included:
mask = (df.index.get_level_values('t') >= start) & \
(df.index.get_level_values('t') <= end)
else:
mask = (df.index >= start) & (df.index <= end)
# Apply the mask to the DataFrame
df = df[mask]
if exclude_conditions is not None:
print(f"excluding {exclude_conditions}")
# Create a mask to exclude rows with any of the specified conditions
mask = df['c'].apply(lambda x: any(cond in exclude_conditions for cond in x))
# Filter out the rows with specified conditions
df = df[~mask]
if minsize is not None:
print(f"minsize {minsize}")
#exclude conditions
df = df[df['s'] >= minsize]
return df
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP):
#doc for this function
"""
Attempts to fetch stock trades either from cache or remote. When remote, it uses retry mechanism with exponential backoff.
Also it stores the data to cache if it is not already there.
by using force_remote - forcess using remote data always and thus refreshing cache for these dates
Attributes:
:param symbol: The stock symbol to fetch trades for.
:param start: The start time for the trade data.
:param end: The end time for the trade data.
:exclude_conditions: list of string conditions to exclude from the data
:minsize minimum size of trade to be included in the data
:no_return: If True, do not return the DataFrame. Used to prepare cached files.
:force_remote will always use remote data and refresh cache
:param max_retries: Maximum number of retries.
:param backoff_factor: Factor to determine the next sleep time.
:param rename_labels: Rename t to timestamp, c to condition etc.
:param keep_symbols: Whether to keep symbols in the DataFrame (as hierarchical index)
:return: TradesResponse object.
:raises: ConnectionError if all retries fail.
For location of both caches, see config.py In parquet tradecache there are daily files including all trades incl ext hours
BAC-20240203.parquet
Note both trades and agg cache are daily_files
LOCAL_TRADE_CACHE
LOCAL_AGG_CACHE
Parameters
----------
symbol : Union[str, list]
Symbol or list of symbols
day_start : datetime
Start date, zone aware
day_end : datetime
End date, zone aware
agg_type : AGG_TYPE
Type of aggregation
resolution : Union[str, int]
Resolution of aggregation nased on agg_type
excludes : list
List of trade conditions to exclude
minsize : int
Minimum size of trade to be included
ext_hours : bool
If True, requests extended hours data
align : StartBarAlign
How to align first bar RANDOM vs ROUND
as_df : bool
If True, returns dataframe, otherwise returns vbt Data object
force_reload : bool
If True, forces cache reload (doesnt use cache both for trade and agg data)
Returns
-------
DF or vbt.Data object
""" """
pass is_same_day = start.date() == end.date()
# Determine if the requested times fall within the main session
#in_main_session = (time(9, 30) <= start.time() < time(16, 0)) and (time(9, 30) <= end.time() <= time(16, 0))
if not is_same_day:
raise ValueError("fetch_daily_stock_trades is not implemented for multiple days!")
#exists in cache?
daily_file = f"{symbol}-{str(start.date())}.parquet"
file_path = TRADE_CACHE / daily_file
if file_path.exists() and (not force_remote or not no_return):
with trade_cache_lock:
df = pd.read_parquet(file_path)
print("Loaded from CACHE", file_path)
df = filter_trade_df(df, start, end, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
return df
day_next = start.date() + timedelta(days=1)
print("Fetching from remote.")
client = StockHistoricalDataClient(ACCOUNT1_LIVE_API_KEY, ACCOUNT1_LIVE_SECRET_KEY, raw_data=True)
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=start.date(), end=day_next, feed=data_feed)
last_exception = None
for attempt in range(max_retries):
try:
tradesResponse = client.get_stock_trades(stockTradeRequest)
print(f"Remote fetched completed.", start.date(), day_next)
if not tradesResponse[symbol]:
print(f"EMPTY")
return pd.DataFrame()
df = convert_dict_to_multiindex_df(tradesResponse, rename_labels=rename_labels, keep_symbols=keep_symbols)
#if today is market still open, dont cache - also dont cache for IEX feeed
if datetime.now().astimezone(zoneNY).date() < day_next or data_feed == DataFeed.IEX:
print("not saving trade cache, market still open today or IEX datapoint")
#ic(datetime.now().astimezone(zoneNY))
#ic(day.open, day.close)
else:
with trade_cache_lock:
df.to_parquet(file_path, engine='pyarrow')
print("Saved to CACHE", file_path)
if no_return:
return pd.DataFrame()
df = filter_trade_df(df, start, end, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
return df
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
last_exception = e
time_module.sleep(backoff_factor * (2 ** attempt) + random.uniform(0, 1)) # Adding random jitter
print("All attempts to fetch data failed.")
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXCLUDE_CONDITIONS, minsize = 100, main_session_only = True, force_remote = False, max_workers=None, no_return = False):
"""
Fetch trades between ranges.
Fetches trades for each day between start_date and end_date during market hours (9:30-16:00) in parallel and concatenates them into a single DataFrame.
If fetched remotely, the data is stored in tradecache.
Also if required filters the condition, minsize, main_session if required for results.
Also can be used just to prepare cached trade files. (noreturn = True)
:param symbol: Stock symbol.
:param start_date: Start date as datetime.
:param end_date: End date as datetime.
:param exclude_conditions: List of conditions to exclude from the data. None means default.
:param minsize: Minimum size of trade to be included in the data.
:param main_session_only: Only include trades during market hours.
:param no_return: If True, do not return the DataFrame. Used to prepare cached files.
:return: DataFrame containing all trades from start_date to end_date.
"""
futures = []
results = []
market_open_days = fetch_calendar_data(start_date, end_date)
day_count = len(market_open_days)
print("Contains", day_count, " market days")
max_workers = min(10, max(2, day_count // 2)) if max_workers is None else max_workers # Heuristic: half the days to process, but at least 1 and no more than 10
#which days to fetch?
days_from_remote = []
days_from_cache = []
if not force_remote:
for market_day in tqdm(market_open_days, desc="Processing market days"):
daily_file_new = str(symbol) + '-' + str(market_day.date) + '.parquet'
file_path_new = TRADE_CACHE / daily_file_new
if file_path_new.exists():
days_from_remote.append(market_day)
else:
days_from_cache.append((market_day,file_path_new))
else:
days_from_remote = market_open_days
remote_df = pd.DataFrame()
local_df = pd.DataFrame()
if len(days_from_cache) > 0 and not no_return:
#speed it up , locals first and then fetches
s_time = timetime()
with trade_cache_lock:
local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache])
final_time = timetime() - s_time
print(f"All {len(days_from_cache)} split files loaded in", final_time, "seconds")
#the filter is required
local_df = filter_trade_df(local_df, start_date, end_date, exclude_conditions, minsize, symbol_included=False, main_session_only=main_session_only)
print("local_df filtered")
#do this only for remotes
if len(days_from_remote) > 0:
with ThreadPoolExecutor(max_workers=max_workers) as executor:
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
for market_day in tqdm(days_from_remote, desc="Processing market days to fetch"):
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
#end = datetime.combine(single_date, time(16, 0)) # Market closes at 4:00 PM
#day interval (min day time to max day time)
day_date = zoneNY.localize(market_day.open).date()
min_day_time = zoneNY.localize(datetime.combine(day_date, datetime.min.time()))
max_day_time = zoneNY.localize(datetime.combine(day_date, datetime.max.time()))
#print(min_day_time, max_day_time) #zacatek dne
#TADY JSEM SKONCIL
#ZKUSME TO NEJDRIV NECHAT puvodne pres market open days
# a jen vymysleme jak drivejsi start a konec
# a testnout zda parquety pojedou rychle
# pripadne pak doresit
#pripadne orizneme pokud je pozadovane pozdejsi zacatek a drivejsi konek
start = max(start_date, min_day_time)
end = min(end_date, max_day_time)
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, main_session_only, no_return, force_remote)
futures.append(future)
for future in tqdm(futures, desc="Fetching data"):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error fetching data for a day: {e}")
if not no_return:
# Batch concatenation to improve speed
batch_size = 10
batches = [results[i:i + batch_size] for i in range(0, len(results), batch_size)]
remote_df = pd.concat([pd.concat(batch, ignore_index=False) for batch in batches], ignore_index=False)
#merge local and remote
if not remote_df.empty and not no_return:
return pd.concat([local_df, remote_df], ignore_index=False)
return local_df
def load_data(symbol: Union[str, List[str]],
agg_type: AggType,
resolution: str,
start_date: datetime,
end_date: datetime,
exclude_conditions: list = EXCLUDE_CONDITIONS,
minsize = None,
main_session_only = True,
force_remote=False):
"""Main function to fetch data.
Args:
symbol (str): Symbol
start_date (datetime):
end_date (datetime):
exclude_conditions (list, optional): Trade conditions to exclude. Defaults to None.
minsize (_type_, optional): Minimum trade size to include. Defaults to None.
main_session_only (bool, optional): Main or ext. hours.. Defaults to True.
force_remote (bool, optional): Force remote fetch and reagreggation. Defaults to False.
Returns:
pd.Dataframe(): Aggregated data
"""
symbols = [symbol] if isinstance(symbol, str) else symbol
if exclude_conditions is None:
exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES')
def load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote):
exclude_conditions.sort()
excludes_str = ''.join(map(str, exclude_conditions))
file_ohlcv = AGG_CACHE / f"{symbol}-{str(agg_type)}-{str(resolution)}-{start_date.strftime('%Y-%m-%dT%H-%M-%S')}-{end_date.strftime('%Y-%m-%dT%H-%M-%S')}-{str(excludes_str)}-{minsize}-{main_session_only}.parquet"
if not force_remote and file_ohlcv.exists():
ohlcv_df = pd.read_parquet(file_ohlcv, engine='pyarrow')
print("Loaded from agg_cache", file_ohlcv)
return ohlcv_df
else:
#neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck?
df = fetch_trades_parallel(symbol, start_date, end_date, minsize=minsize, exclude_conditions=exclude_conditions, main_session_only=main_session_only, force_remote=force_remote) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type)
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow')
print("Saved to agg_cache", file_ohlcv)
return ohlcv_df
ret_dict_df = {}
for symbol in symbols:
ret_dict_df[symbol] = load_data_single(symbol, agg_type, resolution, start_date, end_date, exclude_conditions, minsize, main_session_only, force_remote)
return ret_dict_df
def prepare_trade_cache(symbol: Union[str, List[str]],
start_date: datetime,
end_date: datetime,
force_remote=False):
"""
Fetches trade cache daily files for specified symbols and date range and stores
them to trade cache location.
Only the dates not in cache are fetched, unles force_remote is set to True.
Note: daily trade cache files contain all trades (main+ext hours) for that day
Args:
symbols (str): Symbols to fetch
start_date (datetime):
end_date (datetime):
force_remote (bool, optional): Force remote fetch. Defaults to False.
Returns:
None
"""
symbols = [symbol] if isinstance(symbol, str) else symbol
for symbol in symbols:
#just cache update
print(f"Started for {symbol}")
df = fetch_trades_parallel(symbol, start_date, end_date, force_remote=force_remote, no_return=True)
print(f"Finished for {symbol}")
#this would become new loader in the future
# def load_data_tbd(symbol: Union[str, list], day_start: datetime, day_end: datetime, agg_type: AGG_TYPE, resolution: Union[str, int], excludes: list = EXCLUDE_CONDITIONS, minsize: int = MINSIZE, ext_hours: bool = False, align: StartBarAlign =StartBarAlign.ROUND, as_df: bool = False, force_reload: bool = False) -> None:
# """
# Returns requested aggregated data for give symbol(s)
# - if requested agg data already exists in cache, returns it
# - otherwise get the trades (from trade cache or Alpaca) and aggregate them and store to cache
# For location of both caches, see config.py
# Note both trades and agg cache are daily_files
# LOCAL_TRADE_CACHE
# LOCAL_AGG_CACHE
# Parameters
# ----------
# symbol : Union[str, list]
# Symbol or list of symbols
# day_start : datetime
# Start date, zone aware
# day_end : datetime
# End date, zone aware
# agg_type : AGG_TYPE
# Type of aggregation
# resolution : Union[str, int]
# Resolution of aggregation nased on agg_type
# excludes : list
# List of trade conditions to exclude
# minsize : int
# Minimum size of trade to be included
# ext_hours : bool
# If True, requests extended hours data
# align : StartBarAlign
# How to align first bar RANDOM vs ROUND
# as_df : bool
# If True, returns dataframe, otherwise returns vbt Data object
# force_reload : bool
# If True, forces cache reload (doesnt use cache both for trade and agg data)
# Returns
# -------
# DF or vbt.Data object
# """
# pass

View File

@ -4,12 +4,38 @@ from datetime import datetime, timedelta
from typing import List, Tuple from typing import List, Tuple
import pytz import pytz
import calendar import calendar
import os
from alpaca.trading.models import Order, TradeUpdate, Calendar
import pandas_market_calendars as mcal
#Zones #Zones
zoneNY = pytz.timezone('US/Eastern') zoneNY = pytz.timezone('US/Eastern')
zoneUTC = pytz.utc zoneUTC = pytz.utc
zonePRG = pytz.timezone('Europe/Amsterdam') zonePRG = pytz.timezone('Europe/Amsterdam')
def fetch_calendar_data(start: datetime, end: datetime) -> List[Calendar]:
"""
Fetches the trading schedule for the NYSE (New York Stock Exchange) between the specified start and end dates.
Args:
start (datetime): The start date for the trading schedule.
end (datetime): The end date for the trading schedule.
Returns:
List[Calendar]: A list of Calendar objects containing the trading dates and market open/close times.
Returns an empty list if no trading days are found within the specified range.
"""
nyse = mcal.get_calendar('NYSE')
schedule = nyse.schedule(start_date=start, end_date=end, tz='America/New_York')
if not schedule.empty:
schedule = (schedule.reset_index()
.rename(columns={"index": "date", "market_open": "open", "market_close": "close"})
.assign(date=lambda day: day['date'].dt.date.astype(str),
open=lambda day: day['open'].dt.strftime('%H:%M'),
close=lambda day: day['close'].dt.strftime('%H:%M'))
.to_dict(orient="records"))
cal_dates = [Calendar(**record) for record in schedule]
return cal_dates
else:
return []
def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tuple[datetime, datetime]]: def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tuple[datetime, datetime]]:
""" """
Splits a range of dates into a list of (start, end) tuples, where end is exclusive (start of next range) Splits a range of dates into a list of (start, end) tuples, where end is exclusive (start of next range)
@ -70,8 +96,7 @@ def split_range(start: datetime, stop: datetime, period: str = "Y") -> List[Tupl
return ranges return ranges
def find_dotenv():
def find_dotenv(start_path):
""" """
Searches for a .env file in the given directory or its parents and returns the path. Searches for a .env file in the given directory or its parents and returns the path.
@ -81,6 +106,13 @@ def find_dotenv(start_path):
Returns: Returns:
Path to the .env file if found, otherwise None. Path to the .env file if found, otherwise None.
""" """
try:
start_path = __file__
except NameError:
#print("Notebook probably")
start_path = os.getcwd()
#print(start_path)
current_path = Path(start_path) current_path = Path(start_path)
for _ in range(6): # Limit search depth to 5 levels for _ in range(6): # Limit search depth to 5 levels
dotenv_path = current_path / '.env' dotenv_path = current_path / '.env'
@ -89,22 +121,9 @@ def find_dotenv(start_path):
current_path = current_path.parent current_path = current_path.parent
return None return None
# def get_daily_tradecache_file():
# return Path(DATA_DIR) / "tradecache"
# def get_daily_aggcache_file():
# #nazev obsahuje i child class
# #a take excludes result = ''.join(self.excludes.sort())
# self.excludes.sort() # Sorts the list in place
# excludes_str = ''.join(map(str, self.excludes)) # Joins the sorted elements after converting them to strings
# cache_file = self.__class__.__name__ + '-' + self.symbol + '-' + str(int(date_from.timestamp())) + '-' + str(int(date_to.timestamp())) + '-' + str(self.rectype) + "-" + str(self.resolution) + "-" + str(self.minsize) + "-" + str(self.align) + '-' + str(self.mintick) + str(self.exthours) + excludes_str + '.cache.gz'
# file_path = DATA_DIR + "/aggcache/" + cache_file
# #print(file_path)
# return file_path
#create enum AGG_TYPE #create enum AGG_TYPE
class AGG_TYPE(str, Enum): class AggType(str, Enum):
""" """
Enum class for aggregation types. Enum class for aggregation types.
ohlcv - time based ohlcv (time as resolution) ohlcv - time based ohlcv (time as resolution)
@ -117,19 +136,6 @@ class AGG_TYPE(str, Enum):
OHLCV_DOL = 'ohlcv_dol' OHLCV_DOL = 'ohlcv_dol'
OHLCV_RENKO = 'ohlcv_renko' OHLCV_RENKO = 'ohlcv_renko'
class RecordType(str, Enum):
"""
Represents output of aggregator
"""
BAR = "bar"
CBAR = "cbar"
CBARVOLUME = "cbarvolume"
CBARDOLLAR = "cbardollar"
CBARRENKO = "cbarrenko"
TRADE = "trade"
class StartBarAlign(str, Enum): class StartBarAlign(str, Enum):
""" """
Represents first bar start time alignement according to timeframe Represents first bar start time alignement according to timeframe