30 KiB
30 KiB
Create aggregated data from trades¶
This is how new aggregated data are created and stored to cache, where can they be loaded. It is created for given symbol, interval and aggregation type/resolution. For example OHLCV_1m, or OHLCV_VOLUME_2000 (volume bars with resolution 2000).
Possible aggregation types
- time based OHLCV, time resolution
- volume based OHLCV, volume resolution
- dollar based OHLCV, dollar amount resolution
- renko bars, bricks size as resolution
Steps include
- fetch trades (remote/cached)
- use new vectorized aggregation to aggregate bars of given type (time, volume, dollar) and resolution
- store to agg cache
Methods:
fetch_trades_parallelenables to fetch trades of given symbol and interval, also can filter conditions and minimum size. Returnstrades_dfaggregate_tradesacceptstrades_dfand resolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframeohlcv_df
TBD will be soon introduced in separate package responsible for fetching the data (cache mngmt, remote fetching and vectorized aggregation) - see (issue)[https://github.com/drew2323/v2trading/issues/250]
In [1]:
import pandas as pd import numpy as np from numba import jit from alpaca.data.historical import StockHistoricalDataClient from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR from alpaca.data.requests import StockTradesRequest from v2realbot.enums.enums import BarType import time from datetime import datetime from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data import pyarrow from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades import vectorbtpro as vbt import v2realbot.utils.config_handler as cfh vbt.settings.set_theme("dark") vbt.settings['plotting']['layout']['width'] = 1280 vbt.settings.plotting.auto_rangebreaks = True # Set the option to display with pagination pd.set_option('display.notebook_repr_html', True) pd.set_option('display.max_rows', 20) # Number of rows per page # pd.set_option('display.float_format', '{:.9f}'.format) #trade filtering exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F'] minsize = 100 symbol = "BAC" #datetime in zoneNY day_start = datetime(2024, 10, 3, 9, 30, 0) day_stop = datetime(2024, 10, 16, 16, 00, 0) day_start = zoneNY.localize(day_start) day_stop = zoneNY.localize(day_stop) #filename of trades_df parquet, date are in isoformat but without time zone part dir = DATA_DIR + "/notebooks/" #parquet interval cache contains exclude conditions and minsize filtering file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet" #file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet" file_ohlcv = dir + f"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{str(exclude_conditions)}-{minsize}.parquet" #PRINT all parquet in directory import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] for f in files: print(f) exclude_conditions
None Loaded env variables from file None
Activating profile profile1
trades_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet trades_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet trades_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet trades_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet ohlcv_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet trades_df-BAC-2023-01-01T09:30:00-2024-10-02T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet trades_df-BAC-2024-05-15T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet ohlcv_df-BAC-2023-01-01T09:30:00-2024-10-02T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet ohlcv_df-BAC-2024-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet ohlcv_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet ohlcv_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T15_30_00-47BCFOPUVWZ-100.parquet
Out[1]:
['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
In [3]:
from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades, fetch_trades_parallel_optimized #fetch trades in one go #trades_df = fetch_daily_stock_trades(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_retries=5, backoff_factor=1) #fetch trades in parallel - for longer intervals trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=True, max_workers=None) ##trades_df.info()
Contains 10 market days
Processing market days: 100%|██████████| 10/10 [00:00<00:00, 267.74it/s]
NOT FOUND. Fetching from remote NOT FOUND. Fetching from remote NOT FOUND. Fetching from remote NOT FOUND. Fetching from remote NOT FOUND. Fetching from remote
Fetching data: 0%| | 0/10 [00:00<?, ?it/s]
Remote fetched: is_empty=False 2024-10-03 09:30:00-04:00 2024-10-03 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1727962200-1727985600.cache.gz
Fetching data: 10%|█ | 1/10 [00:21<03:12, 21.41s/it]
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 NOT FOUND. Fetching from remote Remote fetched: is_empty=False 2024-10-08 09:30:00-04:00 2024-10-08 16:00:00-04:00 Remote fetched: is_empty=False 2024-10-09 09:30:00-04:00 2024-10-09 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728394200-1728417600.cache.gz Remote fetched: is_empty=False 2024-10-07 09:30:00-04:00 2024-10-07 16:00:00-04:00 excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 NOT FOUND. Fetching from remote Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728480600-1728504000.cache.gz excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 NOT FOUND. Fetching from remote Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728307800-1728331200.cache.gz excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 NOT FOUND. Fetching from remote Remote fetched: is_empty=False 2024-10-04 09:30:00-04:00 2024-10-04 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728048600-1728072000.cache.gz excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
Fetching data: 20%|██ | 2/10 [00:32<02:01, 15.24s/it]
minsize 100 NOT FOUND. Fetching from remote Remote fetched: is_empty=False 2024-10-10 09:30:00-04:00 2024-10-10 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728567000-1728590400.cache.gz
Fetching data: 60%|██████ | 6/10 [00:47<00:25, 6.40s/it]
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 Remote fetched: is_empty=False 2024-10-14 09:30:00-04:00 2024-10-14 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728912600-1728936000.cache.gz excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 Remote fetched: is_empty=False 2024-10-16 09:30:00-04:00 2024-10-16 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1729085400-1729108800.cache.gz Remote fetched: is_empty=False 2024-10-11 09:30:00-04:00 2024-10-11 16:00:00-04:00 excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z'] minsize 100 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728653400-1728676800.cache.gz excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
Fetching data: 70%|███████ | 7/10 [01:13<00:31, 10.55s/it]
minsize 100 Remote fetched: is_empty=False 2024-10-15 09:30:00-04:00 2024-10-15 16:00:00-04:00 Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728999000-1729022400.cache.gz excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
Fetching data: 100%|██████████| 10/10 [01:25<00:00, 8.53s/it]
minsize 100
In [4]:
trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')
In [5]:
#Either load trades or ohlcv from parquet if exists #trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=50, max_workers=20) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F']) # trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip') trades_df = pd.read_parquet(file_trades,engine='pyarrow') ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME) ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip') # ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow') # trades_df = pd.read_parquet(file_trades,engine='pyarrow')
In [6]:
#list all files is dir directory with parquet extension dir = DATA_DIR + "/notebooks/" import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] file_name = "" ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
In [8]:
file_ohlcv
Out[8]:
"/Users/davidbrazda/Library/Application Support/v2realbot/notebooks/ohlcv_df-BAC-2024-10-03T09:30:00-2024-10-16T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet"
In [7]:
ohlcv_df
Out[7]:
<style scoped="">
.dataframe tbody tr th:only-of-type {
vertical-align: middle;
}
.dataframe tbody tr th {
vertical-align: top;
}
.dataframe thead th {
text-align: right;
}
</style>
| open | high | low | close | volume | trades | updated | vwap | buyvolume | sellvolume | |
|---|---|---|---|---|---|---|---|---|---|---|
| time | ||||||||||
| 2024-10-03 09:30:00-04:00 | 38.9800 | 39.0000 | 38.940 | 38.970 | 249774.0 | 6.0 | 2024-10-03 09:30:01.061997-04:00 | 38.960055 | 500.0 | 249088.0 |
| 2024-10-03 09:30:01-04:00 | 38.9500 | 39.0001 | 38.950 | 39.000 | 13553.0 | 44.0 | 2024-10-03 09:30:02.171691-04:00 | 38.985179 | 2133.0 | 1894.0 |
| 2024-10-03 09:30:02-04:00 | 38.9992 | 39.0100 | 38.990 | 39.010 | 4600.0 | 20.0 | 2024-10-03 09:30:03.091339-04:00 | 39.000123 | 1031.0 | 797.0 |
| 2024-10-03 09:30:03-04:00 | 38.9900 | 39.0400 | 38.990 | 39.030 | 7533.0 | 36.0 | 2024-10-03 09:30:04.193646-04:00 | 39.030827 | 1733.0 | 713.0 |
| 2024-10-03 09:30:04-04:00 | 39.0320 | 39.0350 | 39.032 | 39.035 | 9142.0 | 2.0 | 2024-10-03 09:30:07.260896-04:00 | 39.032033 | 9142.0 | 0.0 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... | ... | ... |
| 2024-10-16 15:59:55-04:00 | 42.8100 | 42.8100 | 42.810 | 42.810 | 8681.0 | 22.0 | 2024-10-16 15:59:56.000104-04:00 | 42.810000 | 0.0 | 0.0 |
| 2024-10-16 15:59:56-04:00 | 42.8150 | 42.8150 | 42.810 | 42.810 | 4128.0 | 9.0 | 2024-10-16 15:59:57.010896-04:00 | 42.811550 | 1100.0 | 603.0 |
| 2024-10-16 15:59:57-04:00 | 42.8150 | 42.8150 | 42.810 | 42.810 | 5301.0 | 20.0 | 2024-10-16 15:59:58.006387-04:00 | 42.812493 | 789.0 | 1708.0 |
| 2024-10-16 15:59:58-04:00 | 42.8160 | 42.8200 | 42.800 | 42.800 | 21469.0 | 33.0 | 2024-10-16 15:59:59.088188-04:00 | 42.809572 | 542.0 | 632.0 |
| 2024-10-16 15:59:59-04:00 | 42.8087 | 42.8100 | 42.800 | 42.810 | 26899.0 | 16.0 | 2024-10-16 15:59:59.997799-04:00 | 42.801563 | 4757.0 | 16482.0 |
114097 rows × 10 columns
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Calculate daily returns ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna() #same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score # Define the intervals from 5 to 20 s, returns for each interval #maybe use rolling window? intervals = range(5, 21, 5) # Create columns for percentage returns rolling_window = 50 # Normalize the returns using rolling mean and std for N in intervals: column_name = f'returns_{N}' rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean() rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std() ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std # Display the dataframe with normalized return columns ohlcv_df
In [ ]:
# Calculate the sum of the normalized return columns for each row ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1) # Sort the DataFrame based on the sum of normalized returns in descending order df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False) # Display the top rows with the highest sum of normalized returns df_sorted
In [ ]:
# Drop initial rows with NaN values due to pct_change ohlcv_df.dropna(inplace=True) # Plotting the probability distribution curves plt.figure(figsize=(14, 8)) for N in intervals: sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True) plt.title('Probability Distribution of Percentage Returns') plt.xlabel('Percentage Return') plt.ylabel('Density') plt.legend() plt.show()
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
#show only rows from ohlcv_df where returns > 0.005 ohlcv_df[ohlcv_df['returns'] > 0.0005] #ohlcv_df[ohlcv_df['returns'] < -0.005]
In [ ]:
#ohlcv where index = date 2024-03-13 and between hour 12 a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00'] a
In [ ]:
ohlcv_df
In [ ]:
trades_df
In [ ]:
ohlcv_df.info()
In [ ]:
trades_df.to_parquet("trades_df-spy-0111-0111.parquett", engine='pyarrow', compression='gzip')
In [ ]:
trades_df.to_parquet("trades_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)
In [ ]:
ohlcv_df.to_parquet("ohlcv_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip')
In [ ]:
basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY) vbt.settings['plotting']['auto_rangebreaks'] = True basic_data.ohlcv.plot()
In [ ]:
#access just BCA #df_filtered = df.loc["BAC"]