Files
strategy-lab/research/prepare_aggregatied_data.ipynb

14 KiB

Loading trades and vectorized aggregation

This notebook fetches the trades from remote or local cache and aggregates them to bars of given type (time, volume, dollar) and resolution

fetch_trades_parallel enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. return trades_df aggregate_trades acceptss trades_df and ressolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe ohlcv_df

In [ ]:
from dotenv import load_dotenv

#as V2realbot is client , load env variables here
env_file = "/Users/davidbrazda/Documents/Development/python/.env"
# Load the .env file
load_dotenv(env_file)

import pandas as pd
import numpy as np
from numba import jit
from alpaca.data.historical import StockHistoricalDataClient
from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR
from alpaca.data.requests import StockTradesRequest
from v2realbot.enums.enums import BarType
import time
from datetime import datetime
from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data
import pyarrow
from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades
import vectorbtpro as vbt
import v2realbot.utils.config_handler as cfh
from appdirs import user_data_dir
from pathlib import Path

vbt.settings.set_theme("dark")
vbt.settings['plotting']['layout']['width'] = 1280
vbt.settings.plotting.auto_rangebreaks = True
# Set the option to display with pagination
pd.set_option('display.notebook_repr_html', True)
pd.set_option('display.max_rows', 20)  # Number of rows per page
# pd.set_option('display.float_format', '{:.9f}'.format)


#trade filtering
exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F']
minsize = 100

symbol = "BAC"
#datetime in zoneNY 
day_start = datetime(2023, 1, 1, 9, 30, 0)
day_stop = datetime(2024, 5, 25, 15, 30, 0)
day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop)
#filename of trades_df parquet, date are in isoformat but without time zone part
dir = DATA_DIR + "/notebooks/"
#parquet interval cache contains exclude conditions and minsize filtering
file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H_%M_%S')}-{day_stop.strftime('%Y-%m-%dT%H_%M_%S')}-{''.join(exclude_conditions)}-{minsize}.parquet"
#file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet"
file_ohlcv = dir + f"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H_%M_%S')}-{day_stop.strftime('%Y-%m-%dT%H_%M_%S')}-{''.join(exclude_conditions)}-{minsize}.parquet"
print(file_trades)
print(file_ohlcv)
#PRINT all parquet in directory
import os
files = [f for f in os.listdir(dir) if f.endswith(".parquet")]
for f in files:
    print(f)
In [ ]:
#Either load trades or ohlcv from parquet if exists
#trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, max_workers=30) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])
#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')
#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')
#filenames = [dir+"trades_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet",dir+"trades_df-BAC-2024-05-15T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet"]
trades_df = pd.read_parquet(dir+"trades_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet",engine='pyarrow')
#focused = trades_df.loc["2024-02-16 11:23:11":"2024-02-16 11:24:26"]
#focused
ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME)
ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip')

#ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
# trades_df = pd.read_parquet(file_trades,engine='pyarrow')
In [ ]:
trades_df = None
ohlcv_df = None
In [ ]:
#ohlcv_df.info()
#trades_df.info()
In [ ]:
a = trades_df.loc[("BAC", "2024-02-16 09:30"):("BAC","2024-02-16 09:32:11")]
a
In [ ]:
ohlcv_df
In [ ]:
#trades_df.info()
focused = trades_df.loc[("BAC", "2024-02-16 09:30:00"):("BAC", "2024-02-16 10:24:26")]
focused
In [ ]:
trades_df.loc["2024-02-16 09:30:00":"2024-02-16 10:24:26"]
In [ ]:
focohlc = ohlcv_df.loc["2024-02-16 09:30:00":"2024-02-16 10:24:26"]
focohlc
In [ ]:
focohlc.info()
In [ ]:
#trades_df.to_parquet(dir + "trades_df-BAC-2024-01-01T09:30:00-2024-05-14T16:00:00-CO4B7VPWUZF-100.parquet", engine='pyarrow', compression='gzip')
#trades_df = pd.read_parquet(dir + "trades_df-BAC-2024-01-01T09:30:00-2024-05-14T16:00:00-CO4B7VPWUZF-100.parquet",engine='pyarrow')

#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')
In [ ]:
trades_df
In [ ]:
file_trades
In [ ]:
#list all files is dir directory with parquet extension
dir = DATA_DIR + "/notebooks/"
import os
files = [f for f in os.listdir(dir) if f.endswith(".parquet")]
file_name = ""
ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
In [ ]:
ohlcv_df
In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns
# Calculate daily returns
ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna()
#same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na


# Plot the probability distribution curve
plt.figure(figsize=(10, 6))
sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30)
plt.title('Probability Distribution of Daily Returns')
plt.xlabel('Daily Returns')
plt.ylabel('Probability')
plt.show()
In [ ]:
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Define the intervals from 5 to 20 s, returns for each interval
#maybe use rolling window?
intervals = range(5, 21, 5)

# Create columns for percentage returns
rolling_window = 50

# Normalize the returns using rolling mean and std
for N in intervals:
    column_name = f'returns_{N}'
    rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean()
    rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std()
    ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std

# Display the dataframe with normalized return columns
ohlcv_df
In [ ]:
# Calculate the sum of the normalized return columns for each row
ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1)

# Sort the DataFrame based on the sum of normalized returns in descending order
df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False)

# Display the top rows with the highest sum of normalized returns
df_sorted
In [ ]:
# Drop initial rows with NaN values due to pct_change
ohlcv_df.dropna(inplace=True)

# Plotting the probability distribution curves
plt.figure(figsize=(14, 8))
for N in intervals:
    sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True)

plt.title('Probability Distribution of Percentage Returns')
plt.xlabel('Percentage Return')
plt.ylabel('Density')
plt.legend()
plt.show()
In [ ]:
import matplotlib.pyplot as plt
import seaborn as sns
# Plot the probability distribution curve
plt.figure(figsize=(10, 6))
sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30)
plt.title('Probability Distribution of Daily Returns')
plt.xlabel('Daily Returns')
plt.ylabel('Probability')
plt.show()
In [ ]:
#show only rows from ohlcv_df where returns > 0.005
ohlcv_df[ohlcv_df['returns'] > 0.0005]

#ohlcv_df[ohlcv_df['returns'] < -0.005]
In [ ]:
#ohlcv where index = date 2024-03-13 and between hour 12

a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00']
a
In [ ]:
ohlcv_df
In [ ]:
trades_df
In [ ]:
ohlcv_df.info()
In [ ]:
trades_df.to_parquet("trades_df-spy-0111-0111.parquett", engine='pyarrow', compression='gzip')
In [ ]:
trades_df.to_parquet("trades_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)
In [ ]:
ohlcv_df.to_parquet("ohlcv_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip')
In [ ]:
basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY)
vbt.settings['plotting']['auto_rangebreaks'] = True
basic_data.ohlcv.plot()
In [ ]:
#access just BCA
#df_filtered = df.loc["BAC"]