14 KiB
14 KiB
Loading trades and vectorized aggregation¶
This notebook fetches the trades from remote or local cache and aggregates them to bars of given type (time, volume, dollar) and resolution
fetch_trades_parallel enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. return trades_df
aggregate_trades acceptss trades_df and ressolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe ohlcv_df
In [ ]:
from dotenv import load_dotenv #as V2realbot is client , load env variables here env_file = "/Users/davidbrazda/Documents/Development/python/.env" # Load the .env file load_dotenv(env_file) import pandas as pd import numpy as np from numba import jit from alpaca.data.historical import StockHistoricalDataClient from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR from alpaca.data.requests import StockTradesRequest from v2realbot.enums.enums import BarType import time from datetime import datetime from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data import pyarrow from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades import vectorbtpro as vbt import v2realbot.utils.config_handler as cfh from appdirs import user_data_dir from pathlib import Path vbt.settings.set_theme("dark") vbt.settings['plotting']['layout']['width'] = 1280 vbt.settings.plotting.auto_rangebreaks = True # Set the option to display with pagination pd.set_option('display.notebook_repr_html', True) pd.set_option('display.max_rows', 20) # Number of rows per page # pd.set_option('display.float_format', '{:.9f}'.format) #trade filtering exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F'] minsize = 100 symbol = "BAC" #datetime in zoneNY day_start = datetime(2023, 1, 1, 9, 30, 0) day_stop = datetime(2024, 5, 25, 15, 30, 0) day_start = zoneNY.localize(day_start) day_stop = zoneNY.localize(day_stop) #filename of trades_df parquet, date are in isoformat but without time zone part dir = DATA_DIR + "/notebooks/" #parquet interval cache contains exclude conditions and minsize filtering file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H_%M_%S')}-{day_stop.strftime('%Y-%m-%dT%H_%M_%S')}-{''.join(exclude_conditions)}-{minsize}.parquet" #file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet" file_ohlcv = dir + f"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H_%M_%S')}-{day_stop.strftime('%Y-%m-%dT%H_%M_%S')}-{''.join(exclude_conditions)}-{minsize}.parquet" print(file_trades) print(file_ohlcv) #PRINT all parquet in directory import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] for f in files: print(f)
In [ ]:
#Either load trades or ohlcv from parquet if exists #trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, max_workers=30) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F']) #trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip') #trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip') #filenames = [dir+"trades_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet",dir+"trades_df-BAC-2024-05-15T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet"] trades_df = pd.read_parquet(dir+"trades_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet",engine='pyarrow') #focused = trades_df.loc["2024-02-16 11:23:11":"2024-02-16 11:24:26"] #focused ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME) ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip') #ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow') # trades_df = pd.read_parquet(file_trades,engine='pyarrow')
In [ ]:
trades_df = None ohlcv_df = None
In [ ]:
#ohlcv_df.info() #trades_df.info()
In [ ]:
a = trades_df.loc[("BAC", "2024-02-16 09:30"):("BAC","2024-02-16 09:32:11")] a
In [ ]:
ohlcv_df
In [ ]:
#trades_df.info() focused = trades_df.loc[("BAC", "2024-02-16 09:30:00"):("BAC", "2024-02-16 10:24:26")] focused
In [ ]:
trades_df.loc["2024-02-16 09:30:00":"2024-02-16 10:24:26"]
In [ ]:
focohlc = ohlcv_df.loc["2024-02-16 09:30:00":"2024-02-16 10:24:26"] focohlc
In [ ]:
focohlc.info()
In [ ]:
#trades_df.to_parquet(dir + "trades_df-BAC-2024-01-01T09:30:00-2024-05-14T16:00:00-CO4B7VPWUZF-100.parquet", engine='pyarrow', compression='gzip') #trades_df = pd.read_parquet(dir + "trades_df-BAC-2024-01-01T09:30:00-2024-05-14T16:00:00-CO4B7VPWUZF-100.parquet",engine='pyarrow') #trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')
In [ ]:
trades_df
In [ ]:
file_trades
In [ ]:
#list all files is dir directory with parquet extension dir = DATA_DIR + "/notebooks/" import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] file_name = "" ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
In [ ]:
ohlcv_df
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Calculate daily returns ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna() #same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score # Define the intervals from 5 to 20 s, returns for each interval #maybe use rolling window? intervals = range(5, 21, 5) # Create columns for percentage returns rolling_window = 50 # Normalize the returns using rolling mean and std for N in intervals: column_name = f'returns_{N}' rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean() rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std() ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std # Display the dataframe with normalized return columns ohlcv_df
In [ ]:
# Calculate the sum of the normalized return columns for each row ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1) # Sort the DataFrame based on the sum of normalized returns in descending order df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False) # Display the top rows with the highest sum of normalized returns df_sorted
In [ ]:
# Drop initial rows with NaN values due to pct_change ohlcv_df.dropna(inplace=True) # Plotting the probability distribution curves plt.figure(figsize=(14, 8)) for N in intervals: sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True) plt.title('Probability Distribution of Percentage Returns') plt.xlabel('Percentage Return') plt.ylabel('Density') plt.legend() plt.show()
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
#show only rows from ohlcv_df where returns > 0.005 ohlcv_df[ohlcv_df['returns'] > 0.0005] #ohlcv_df[ohlcv_df['returns'] < -0.005]
In [ ]:
#ohlcv where index = date 2024-03-13 and between hour 12 a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00'] a
In [ ]:
ohlcv_df
In [ ]:
trades_df
In [ ]:
ohlcv_df.info()
In [ ]:
trades_df.to_parquet("trades_df-spy-0111-0111.parquett", engine='pyarrow', compression='gzip')
In [ ]:
trades_df.to_parquet("trades_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)
In [ ]:
ohlcv_df.to_parquet("ohlcv_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip')
In [ ]:
basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY) vbt.settings['plotting']['auto_rangebreaks'] = True basic_data.ohlcv.plot()
In [ ]:
#access just BCA #df_filtered = df.loc["BAC"]