Files
strategy-lab/research/data/prepare_aggregated_data.ipynb
David Brazda e3da60c647 daily update
2024-10-21 20:57:56 +02:00

30 KiB
Raw Permalink Blame History

Create aggregated data from trades

This is how new aggregated data are created and stored to cache, where can they be loaded. It is created for given symbol, interval and aggregation type/resolution. For example OHLCV_1m, or OHLCV_VOLUME_2000 (volume bars with resolution 2000).

Possible aggregation types

  • time based OHLCV, time resolution
  • volume based OHLCV, volume resolution
  • dollar based OHLCV, dollar amount resolution
  • renko bars, bricks size as resolution

Steps include

  • fetch trades (remote/cached)
  • use new vectorized aggregation to aggregate bars of given type (time, volume, dollar) and resolution
  • store to agg cache

Methods:

  • fetch_trades_parallel enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. Returns trades_df
  • aggregate_trades accepts trades_df and resolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe ohlcv_df

TBD will be soon introduced in separate package responsible for fetching the data (cache mngmt, remote fetching and vectorized aggregation) - see (issue)[https://github.com/drew2323/v2trading/issues/250]

In [1]:
import pandas as pd
import numpy as np
from numba import jit
from alpaca.data.historical import StockHistoricalDataClient
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR
from alpaca.data.requests import StockTradesRequest
from v2realbot.enums.enums import BarType
import time
from datetime import datetime
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data
import pyarrow
from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades
import vectorbtpro as vbt
import v2realbot.utils.config_handler as cfh

vbt.settings.set_theme("dark")
vbt.settings['plotting']['layout']['width'] = 1280
vbt.settings.plotting.auto_rangebreaks = True
# Set the option to display with pagination
pd.set_option('display.notebook_repr_html', True)
pd.set_option('display.max_rows', 20)  # Number of rows per page
# pd.set_option('display.float_format', '{:.9f}'.format)


#trade filtering
exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F']
minsize = 100

symbol = "BAC"
#datetime in zoneNY 
day_start = datetime(2024, 10, 3, 9, 30, 0)
day_stop = datetime(2024, 10, 16, 16, 00, 0)
day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop)
#filename of trades_df parquet, date are in isoformat but without time zone part
dir = DATA_DIR + "/notebooks/"
#parquet interval cache contains exclude conditions and minsize filtering
file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet"
#file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet"
file_ohlcv = dir + f"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{str(exclude_conditions)}-{minsize}.parquet"

#PRINT all parquet in directory
import os
files = [f for f in os.listdir(dir) if f.endswith(".parquet")]
for f in files:
    print(f)

exclude_conditions
None
Loaded env variables from file None
Activating profile profile1
trades_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet
trades_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet
trades_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet
trades_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet
ohlcv_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet
trades_df-BAC-2023-01-01T09:30:00-2024-10-02T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet
trades_df-BAC-2024-05-15T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet
ohlcv_df-BAC-2023-01-01T09:30:00-2024-10-02T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet
ohlcv_df-BAC-2024-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet
ohlcv_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet
ohlcv_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet
ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet
ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T15_30_00-47BCFOPUVWZ-100.parquet
Out[1]:
['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
In [3]:
from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades, fetch_trades_parallel_optimized
#fetch trades in one go
#trades_df = fetch_daily_stock_trades(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_retries=5, backoff_factor=1)
#fetch trades in parallel - for longer intervals
trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=True, max_workers=None)
 
##trades_df.info()
Contains 10  market days
Processing market days: 100%|██████████| 10/10 [00:00<00:00, 267.74it/s]
NOT FOUND. Fetching from remote
NOT FOUND. Fetching from remote
NOT FOUND. Fetching from remote
NOT FOUND. Fetching from remote
NOT FOUND. Fetching from remote
Fetching data:   0%|          | 0/10 [00:00<?, ?it/s]
Remote fetched: is_empty=False 2024-10-03 09:30:00-04:00 2024-10-03 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1727962200-1727985600.cache.gz
Fetching data:  10%|█         | 1/10 [00:21<03:12, 21.41s/it]
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
NOT FOUND. Fetching from remote
Remote fetched: is_empty=False 2024-10-08 09:30:00-04:00 2024-10-08 16:00:00-04:00
Remote fetched: is_empty=False 2024-10-09 09:30:00-04:00 2024-10-09 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728394200-1728417600.cache.gz
Remote fetched: is_empty=False 2024-10-07 09:30:00-04:00 2024-10-07 16:00:00-04:00
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
NOT FOUND. Fetching from remote
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728480600-1728504000.cache.gz
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
NOT FOUND. Fetching from remote
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728307800-1728331200.cache.gz
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
NOT FOUND. Fetching from remote
Remote fetched: is_empty=False 2024-10-04 09:30:00-04:00 2024-10-04 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728048600-1728072000.cache.gz
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
Fetching data:  20%|██        | 2/10 [00:32<02:01, 15.24s/it]
minsize 100
NOT FOUND. Fetching from remote
Remote fetched: is_empty=False 2024-10-10 09:30:00-04:00 2024-10-10 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728567000-1728590400.cache.gz
Fetching data:  60%|██████    | 6/10 [00:47<00:25,  6.40s/it]
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
Remote fetched: is_empty=False 2024-10-14 09:30:00-04:00 2024-10-14 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728912600-1728936000.cache.gz
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
Remote fetched: is_empty=False 2024-10-16 09:30:00-04:00 2024-10-16 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1729085400-1729108800.cache.gz
Remote fetched: is_empty=False 2024-10-11 09:30:00-04:00 2024-10-11 16:00:00-04:00
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
minsize 100
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728653400-1728676800.cache.gz
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
Fetching data:  70%|███████   | 7/10 [01:13<00:31, 10.55s/it]
minsize 100
Remote fetched: is_empty=False 2024-10-15 09:30:00-04:00 2024-10-15 16:00:00-04:00
Saving to Trade CACHE /Users/davidbrazda/Library/Application Support/v2realbot/tradecache/BAC-1728999000-1729022400.cache.gz
excluding conditions ['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']
Fetching data: 100%|██████████| 10/10 [01:25<00:00,  8.53s/it]
minsize 100

In [4]:
trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')
In [5]:
#Either load trades or ohlcv from parquet if exists

#trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=50, max_workers=20) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])
# trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')

trades_df = pd.read_parquet(file_trades,engine='pyarrow')
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME)
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip')

# ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
# trades_df = pd.read_parquet(file_trades,engine='pyarrow')
In [6]:
#list all files is dir directory with parquet extension
dir = DATA_DIR + "/notebooks/"
import os
files = [f for f in os.listdir(dir) if f.endswith(".parquet")]
file_name = ""
ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
In [8]:
file_ohlcv
Out[8]:
"/Users/davidbrazda/Library/Application Support/v2realbot/notebooks/ohlcv_df-BAC-2024-10-03T09:30:00-2024-10-16T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet"
In [7]:
ohlcv_df
Out[7]:
<style scoped=""> .dataframe tbody tr th:only-of-type { vertical-align: middle; } .dataframe tbody tr th { vertical-align: top; } .dataframe thead th { text-align: right; } </style>
open high low close volume trades updated vwap buyvolume sellvolume
time
2024-10-03 09:30:00-04:00 38.9800 39.0000 38.940 38.970 249774.0 6.0 2024-10-03 09:30:01.061997-04:00 38.960055 500.0 249088.0
2024-10-03 09:30:01-04:00 38.9500 39.0001 38.950 39.000 13553.0 44.0 2024-10-03 09:30:02.171691-04:00 38.985179 2133.0 1894.0
2024-10-03 09:30:02-04:00 38.9992 39.0100 38.990 39.010 4600.0 20.0 2024-10-03 09:30:03.091339-04:00 39.000123 1031.0 797.0
2024-10-03 09:30:03-04:00 38.9900 39.0400 38.990 39.030 7533.0 36.0 2024-10-03 09:30:04.193646-04:00 39.030827 1733.0 713.0
2024-10-03 09:30:04-04:00 39.0320 39.0350 39.032 39.035 9142.0 2.0 2024-10-03 09:30:07.260896-04:00 39.032033 9142.0 0.0
... ... ... ... ... ... ... ... ... ... ...
2024-10-16 15:59:55-04:00 42.8100 42.8100 42.810 42.810 8681.0 22.0 2024-10-16 15:59:56.000104-04:00 42.810000 0.0 0.0
2024-10-16 15:59:56-04:00 42.8150 42.8150 42.810 42.810 4128.0 9.0 2024-10-16 15:59:57.010896-04:00 42.811550 1100.0 603.0
2024-10-16 15:59:57-04:00 42.8150 42.8150 42.810 42.810 5301.0 20.0 2024-10-16 15:59:58.006387-04:00 42.812493 789.0 1708.0
2024-10-16 15:59:58-04:00 42.8160 42.8200 42.800 42.800 21469.0 33.0 2024-10-16 15:59:59.088188-04:00 42.809572 542.0 632.0
2024-10-16 15:59:59-04:00 42.8087 42.8100 42.800 42.810 26899.0 16.0 2024-10-16 15:59:59.997799-04:00 42.801563 4757.0 16482.0

114097 rows × 10 columns

In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns
# Calculate daily returns
ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna()
#same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na


# Plot the probability distribution curve
plt.figure(figsize=(10, 6))
sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30)
plt.title('Probability Distribution of Daily Returns')
plt.xlabel('Daily Returns')
plt.ylabel('Probability')
plt.show()
In [ ]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Define the intervals from 5 to 20 s, returns for each interval
#maybe use rolling window?
intervals = range(5, 21, 5)

# Create columns for percentage returns
rolling_window = 50

# Normalize the returns using rolling mean and std
for N in intervals:
    column_name = f'returns_{N}'
    rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean()
    rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std()
    ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std

# Display the dataframe with normalized return columns
ohlcv_df
In [ ]:
# Calculate the sum of the normalized return columns for each row
ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1)

# Sort the DataFrame based on the sum of normalized returns in descending order
df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False)

# Display the top rows with the highest sum of normalized returns
df_sorted
In [ ]:
# Drop initial rows with NaN values due to pct_change
ohlcv_df.dropna(inplace=True)

# Plotting the probability distribution curves
plt.figure(figsize=(14, 8))
for N in intervals:
    sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True)

plt.title('Probability Distribution of Percentage Returns')
plt.xlabel('Percentage Return')
plt.ylabel('Density')
plt.legend()
plt.show()
In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns
# Plot the probability distribution curve
plt.figure(figsize=(10, 6))
sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30)
plt.title('Probability Distribution of Daily Returns')
plt.xlabel('Daily Returns')
plt.ylabel('Probability')
plt.show()
In [ ]:
#show only rows from ohlcv_df where returns > 0.005
ohlcv_df[ohlcv_df['returns'] > 0.0005]

#ohlcv_df[ohlcv_df['returns'] < -0.005]
In [ ]:
#ohlcv where index = date 2024-03-13 and between hour 12

a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00']
a
In [ ]:
ohlcv_df
In [ ]:
trades_df
In [ ]:
ohlcv_df.info()
In [ ]:
trades_df.to_parquet("trades_df-spy-0111-0111.parquett", engine='pyarrow', compression='gzip')
In [ ]:
trades_df.to_parquet("trades_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)
In [ ]:
ohlcv_df.to_parquet("ohlcv_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip')
In [ ]:
basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY)
vbt.settings['plotting']['auto_rangebreaks'] = True
basic_data.ohlcv.plot()
In [ ]:
#access just BCA
#df_filtered = df.loc["BAC"]