11 KiB
11 KiB
Loading trades and vectorized aggregation¶
Describes how to fetch trades (remote/cached) and use new vectorized aggregation to aggregate bars of given type (time, volume, dollar) and resolution
fetch_trades_parallel enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. return trades_df
aggregate_trades acceptss trades_df and ressolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe ohlcv_df
In [ ]:
import pandas as pd import numpy as np from numba import jit from alpaca.data.historical import StockHistoricalDataClient from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR from alpaca.data.requests import StockTradesRequest from v2realbot.enums.enums import BarType import time from datetime import datetime from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data import pyarrow from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades import vectorbtpro as vbt import v2realbot.utils.config_handler as cfh vbt.settings.set_theme("dark") vbt.settings['plotting']['layout']['width'] = 1280 vbt.settings.plotting.auto_rangebreaks = True # Set the option to display with pagination pd.set_option('display.notebook_repr_html', True) pd.set_option('display.max_rows', 20) # Number of rows per page # pd.set_option('display.float_format', '{:.9f}'.format) #trade filtering exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F'] minsize = 100 symbol = "SPY" #datetime in zoneNY day_start = datetime(2024, 1, 1, 9, 30, 0) day_stop = datetime(2024, 1, 14, 16, 00, 0) day_start = zoneNY.localize(day_start) day_stop = zoneNY.localize(day_stop) #filename of trades_df parquet, date are in isoformat but without time zone part dir = DATA_DIR + "/notebooks/" #parquet interval cache contains exclude conditions and minsize filtering file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet" #file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet" file_ohlcv = dir + f"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet" #PRINT all parquet in directory import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] for f in files: print(f)
In [ ]:
trades_df = fetch_daily_stock_trades(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_retries=5, backoff_factor=1) trades_df
In [ ]:
#Either load trades or ohlcv from parquet if exists #trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=50, max_workers=20) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F']) # trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip') trades_df = pd.read_parquet(file_trades,engine='pyarrow') ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME) ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip') # ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow') # trades_df = pd.read_parquet(file_trades,engine='pyarrow')
In [ ]:
#list all files is dir directory with parquet extension dir = DATA_DIR + "/notebooks/" import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] file_name = "" ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
In [ ]:
ohlcv_df
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Calculate daily returns ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna() #same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score # Define the intervals from 5 to 20 s, returns for each interval #maybe use rolling window? intervals = range(5, 21, 5) # Create columns for percentage returns rolling_window = 50 # Normalize the returns using rolling mean and std for N in intervals: column_name = f'returns_{N}' rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean() rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std() ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std # Display the dataframe with normalized return columns ohlcv_df
In [ ]:
# Calculate the sum of the normalized return columns for each row ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1) # Sort the DataFrame based on the sum of normalized returns in descending order df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False) # Display the top rows with the highest sum of normalized returns df_sorted
In [ ]:
# Drop initial rows with NaN values due to pct_change ohlcv_df.dropna(inplace=True) # Plotting the probability distribution curves plt.figure(figsize=(14, 8)) for N in intervals: sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True) plt.title('Probability Distribution of Percentage Returns') plt.xlabel('Percentage Return') plt.ylabel('Density') plt.legend() plt.show()
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
#show only rows from ohlcv_df where returns > 0.005 ohlcv_df[ohlcv_df['returns'] > 0.0005] #ohlcv_df[ohlcv_df['returns'] < -0.005]
In [ ]:
#ohlcv where index = date 2024-03-13 and between hour 12 a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00'] a
In [ ]:
ohlcv_df
In [ ]:
trades_df
In [ ]:
ohlcv_df.info()
In [ ]:
trades_df.to_parquet("trades_df-spy-0111-0111.parquett", engine='pyarrow', compression='gzip')
In [ ]:
trades_df.to_parquet("trades_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)
In [ ]:
ohlcv_df.to_parquet("ohlcv_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip')
In [ ]:
basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY) vbt.settings['plotting']['auto_rangebreaks'] = True basic_data.ohlcv.plot()
In [ ]:
#access just BCA #df_filtered = df.loc["BAC"]