daily update

This commit is contained in:
David Brazda
2024-10-04 08:15:16 +02:00
parent 48db2bc9de
commit 6b93e53ab1
12 changed files with 1344 additions and 1923 deletions

View File

@ -0,0 +1,252 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Experiment with file persistence structure\n",
"\n",
"ohlcv and trades persistence with bar type and trade filtering and minsize support\n",
"\n",
"```\n",
"/OHLCV/\n",
" ├── {bar_type}/ (1s)\n",
" │ ├── {resolution}/\n",
" │ │ ├── {filtered_trades}-{min_trade_size}/\n",
" │ │ │ ├── {day}/\n",
" │ │ │ │ └── hashedname.parquet\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from v2realbot.tools.loadbatch import load_batch\n",
"from v2realbot.utils.utils import zoneNY\n",
"import pandas as pd\n",
"import numpy as np\n",
"import vectorbtpro as vbt\n",
"from itables import init_notebook_mode, show\n",
"import datetime\n",
"from itertools import product\n",
"from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR\n",
"import v2realbot.utils.config_handler as cfh\n",
"init_notebook_mode(all_interactive=True)\n",
"from v2realbot.enums.enums import BarType\n",
"\n",
"vbt.settings.set_theme(\"dark\")\n",
"vbt.settings['plotting']['layout']['width'] = 1280\n",
"vbt.settings.plotting.auto_rangebreaks = True\n",
"# Set the option to display with pagination\n",
"pd.set_option('display.notebook_repr_html', True)\n",
"pd.set_option('display.max_rows', 10) # Number of rows per page\n",
"\n",
"# Define the market open and close times\n",
"market_open = datetime.time(9, 30)\n",
"market_close = datetime.time(16, 0)\n",
"entry_window_opens = 1\n",
"entry_window_closes = 370\n",
"\n",
"forced_exit_start = 380\n",
"forced_exit_end = 390\n",
"\n",
"#LOAD FROM BATCH\n",
"# res, df = load_batch(batch_id=\"f1ac6651\", #138170bc 0fb5043a bde6d0be f1ac6651\n",
"# space_resolution_evenly=False,\n",
"# indicators_columns=[\"Rsi14\"],\n",
"# main_session_only=True,\n",
"# verbose = False)\n",
"# if res < 0:\n",
"# print(\"Error\" + str(res) + str(df))\n",
"# df = df[\"bars\"]\n",
"\n",
"# basic_data = vbt.Data.from_data(vbt.symbol_dict({\"BAC\": df}), tz_convert=zoneNY)\n",
"# #m1_data = basic_data[['Open', 'High', 'Low', 'Close', 'Volume']]\n",
"# basic_data = basic_data.transform(lambda df: df.between_time('09:30', '16:00'))\n",
"# #basic_data.info()\n",
"\n",
"#LOAD FROM PARQUET\n",
"#list all files is dir directory with parquet extension\n",
"dir = DATA_DIR + \"/notebooks/\"\n",
"import os\n",
"files = [f for f in os.listdir(dir) if f.endswith(\".parquet\")]\n",
"print('\\n'.join(map(str, files)))\n",
"file_name = \"ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\"\n",
"ohlcv_df = pd.read_parquet(dir+file_name,engine='pyarrow')\n",
"\n",
"#filter ohlcv_df to certain date range (assuming datetime index)\n",
"#ohlcv_df = ohlcv_df.loc[\"2024-05-14 09:30\":\"2024-05-15 09:35\"]\n",
"\n",
"#add vwap column to ohlcv_df\n",
"#ohlcv_df[\"hlcc4\"] = (ohlcv_df[\"close\"] + ohlcv_df[\"high\"] + ohlcv_df[\"low\"] + ohlcv_df[\"close\"]) / 4\n",
"\n",
"basic_data = vbt.Data.from_data(vbt.symbol_dict({\"BAC\": ohlcv_df}), tz_convert=zoneNY)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#basic_data.data[\"BAC\"].info()\n",
"#ohlcv_df group by week number of rows\n",
"# ohlcv_df['close'].groupby(pd.Grouper(freq='ME')).mean()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#trade filtering\n",
"exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F']\n",
"minsize = 100\n",
"exclude_conditions_str = ''.join(exclude_conditions)\n",
"exclude_conditions_str"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"basic_data.data[\"BAC\"].info()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Aim is to store\n",
"OHLCV grouped by symbol, day, resolution\n",
"and \n",
"bar type\n",
"excluded_conditions\n",
"minsize\n",
"main session"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bartype= BarType.TIME\n",
"resolution = \"1s\"\n",
"trade_filter = exclude_conditions_str+\"-\"+str(minsize)\n",
"dir = \"/OHLCV/\"+bartype+\"/\"+resolution+\"/\"+trade_filter+\"/\"\n",
"#dir = DATA_DIR + dir\n",
"basic_data.to_parquet(partition_by=\"day\", keep_groupby_names=False, path_or_buf=dir, mkdir_kwargs=dict(mkdir=True)) \n",
"#partition_by=\"day\",\n",
"\n",
"#naloaduje partitionvana 1s data skrz 90 dni za 2s\n",
"#day_data = vbt.ParquetData.pull(\"BAC\", paths=dir, filters=[(\"group\", \">\", \"2024-01-02\"),(\"group\", \"<=\", \"2024-01-09\")]) #, \n",
"# day_data[\"2024-05-01\":\"2024-05-14\"].get()\n",
"\n",
"# day_data.data[\"BAC\"].info()\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#naloaduje partitionvana 1s data skrz 90 dni za 2s\n",
"day_data = vbt.ParquetData.pull(\"BAC\", paths=dir, filters=[(\"group\", \">=\", \"2024-01-02\"),(\"group\", \"<=\", \"2024-01-09\")]) #, \n",
"# day_data[\"2024-05-01\":\"2024-05-14\"].get()\n",
"\n",
"day_data.data[\"BAC\"].info()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"close = basic_data.close\n",
"#group by close by day, using pandas grouper\n",
"#close.groupby(pd.Grouper(freq='ME')).mean()\n",
"\n",
"#using Grouper of vectorbtpro\n",
"#close.vbt.group_by(pd.Grouper(freq='ME')).mean()\n",
"\n",
"#basic_data.wrapper.get_columns()\n",
"basic_data.wrapper.get_freq()\n",
"# vbt.pdir(basic_data.wrapper)\n",
"# basic_data.wrapper\n",
"basic_data.wrapper.grouper.is_grouped()\n",
"\n",
"vbt.pdir(basic_data.wrapper.grouper)\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"grouper = basic_data.wrapper.index.vbt.get_grouper(\"ME\")\n",
"\n",
"for group, group_idx in grouper:\n",
" print(group, group_idx)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#prevede 1milion dat (6mes 1s) na dict za 10ss\n",
"df = day_data.data[\"BAC\"]\n",
"df_dict = df.to_dict(orient='list')\n",
"\n",
"# Convert the index (which is the time) to a list of float timestamps\n",
"df_dict['time'] = [timestamp.timestamp() for timestamp in df.index]\n",
"\n",
"df_dict"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -0,0 +1,74 @@
"""Saves data from Kraken Futures, combining BTC and USD settled markets"""
import logging
from logging.config import fileConfig
from vectorbtpro import pd, vbt
from ext_lib.db import db_connect
from ext_lib.util import find_earliest_date
EXCHANGE = "1s_OHLCV"
SYMBOLS = ("BTC/USD:BTC", "BTC/USD:USD")
RESOLUTION = "1s"
DB_ENGINE = db_connect("ohlcv_1m")
DB_SYMBOL = "BTC/USD"
#fileConfig("logging.ini", disable_existing_loggers=False)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[logging.StreamHandler()]
)
log = logging.getLogger("main")
vbt.CCXTData.set_custom_settings(exchange=EXCHANGE, timeframe=RESOLUTION, limit=6000000)
vbt.SQLData.set_engine_settings(engine_name="pg", engine=DB_ENGINE, schema=EXCHANGE, populate_=True, chunksize=1000)
vbt.SQLData.set_custom_settings(engine_name="pg", schema=EXCHANGE)
def main():
if vbt.SQLData.has_table(DB_SYMBOL, schema=EXCHANGE):
data = vbt.SQLData.has_table(DB_SYMBOL, schema=EXCHANGE)
vbt.SQLData.create_schema(EXCHANGE)
db_last_tstamp = None
# TODO: figure out if it's possible to avoid using tables directly, but rather symbols
if vbt.SQLData.has_table(DB_SYMBOL, schema=EXCHANGE):
db_last_tstamp = vbt.SQLData.get_last_row_number(DB_SYMBOL, row_number_column="Open time")
dfs = []
for symbol in SYMBOLS:
if db_last_tstamp is None:
start = find_earliest_date(symbol, EXCHANGE)
else:
start = db_last_tstamp + pd.Timedelta(RESOLUTION)
log.info("Start date for %s is %s", symbol, start)
# Get data
df = vbt.CCXTData.pull(symbol, exchange=EXCHANGE, timeframe=RESOLUTION, start=start).get()
# If symbol is BTC/USD:BTC, convert volume from USD to BTC using the close price
if symbol == "BTC/USD:BTC":
df["Volume"] = df["Volume"].div(df["Close"])
dfs.append(df)
# Combine data from two symbols (last row is incomplete so removed)
concatenated_data = pd.concat(dfs, axis=0)
final_data = (
concatenated_data.groupby(concatenated_data.index)
.agg({"Open": "mean", "High": "mean", "Low": "mean", "Close": "mean", "Volume": "sum"})
.iloc[:-1]
)
data = vbt.SQLData.from_data({DB_SYMBOL: final_data})
# TODO: use custom method to prevent duplicate timestamps
log.info("Saving to DB")
vbt.SQLDataSaver(data).save_data(method="multi")
if __name__ == "__main__":
main()

View File

@ -0,0 +1,458 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Loading trades and vectorized aggregation\n",
"This notebook fetches the trades from remote or local cache and aggregates them to bars of given type (time, volume, dollar) and resolution\n",
"\n",
"`fetch_trades_parallel` enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. return `trades_df`\n",
"`aggregate_trades` acceptss `trades_df` and ressolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe `ohlcv_df`"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from dotenv import load_dotenv\n",
"\n",
"#as V2realbot is client , load env variables here\n",
"env_file = \"/Users/davidbrazda/Documents/Development/python/.env\"\n",
"# Load the .env file\n",
"load_dotenv(env_file)\n",
"\n",
"import pandas as pd\n",
"import numpy as np\n",
"from numba import jit\n",
"from alpaca.data.historical import StockHistoricalDataClient\n",
"from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR\n",
"from alpaca.data.requests import StockTradesRequest\n",
"from v2realbot.enums.enums import BarType\n",
"import time\n",
"from datetime import datetime\n",
"from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data\n",
"import pyarrow\n",
"from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades\n",
"import vectorbtpro as vbt\n",
"import v2realbot.utils.config_handler as cfh\n",
"from appdirs import user_data_dir\n",
"from pathlib import Path\n",
"\n",
"vbt.settings.set_theme(\"dark\")\n",
"vbt.settings['plotting']['layout']['width'] = 1280\n",
"vbt.settings.plotting.auto_rangebreaks = True\n",
"# Set the option to display with pagination\n",
"pd.set_option('display.notebook_repr_html', True)\n",
"pd.set_option('display.max_rows', 20) # Number of rows per page\n",
"# pd.set_option('display.float_format', '{:.9f}'.format)\n",
"\n",
"\n",
"#trade filtering\n",
"exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F']\n",
"minsize = 100\n",
"\n",
"symbol = \"BAC\"\n",
"#datetime in zoneNY \n",
"day_start = datetime(2023, 1, 1, 9, 30, 0)\n",
"day_stop = datetime(2024, 5, 25, 15, 30, 0)\n",
"day_start = zoneNY.localize(day_start)\n",
"day_stop = zoneNY.localize(day_stop)\n",
"#filename of trades_df parquet, date are in isoformat but without time zone part\n",
"dir = DATA_DIR + \"/notebooks/\"\n",
"#parquet interval cache contains exclude conditions and minsize filtering\n",
"file_trades = dir + f\"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H_%M_%S')}-{day_stop.strftime('%Y-%m-%dT%H_%M_%S')}-{''.join(exclude_conditions)}-{minsize}.parquet\"\n",
"#file_trades = dir + f\"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet\"\n",
"file_ohlcv = dir + f\"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H_%M_%S')}-{day_stop.strftime('%Y-%m-%dT%H_%M_%S')}-{''.join(exclude_conditions)}-{minsize}.parquet\"\n",
"print(file_trades)\n",
"print(file_ohlcv)\n",
"#PRINT all parquet in directory\n",
"import os\n",
"files = [f for f in os.listdir(dir) if f.endswith(\".parquet\")]\n",
"for f in files:\n",
" print(f)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Either load trades or ohlcv from parquet if exists\n",
"#trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, max_workers=30) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])\n",
"#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')\n",
"#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')\n",
"#filenames = [dir+\"trades_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet\",dir+\"trades_df-BAC-2024-05-15T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\"]\n",
"trades_df = pd.read_parquet(dir+\"trades_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\",engine='pyarrow')\n",
"#focused = trades_df.loc[\"2024-02-16 11:23:11\":\"2024-02-16 11:24:26\"]\n",
"#focused\n",
"ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME)\n",
"ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip')\n",
"\n",
"#ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')\n",
"# trades_df = pd.read_parquet(file_trades,engine='pyarrow')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df = None\n",
"ohlcv_df = None"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#ohlcv_df.info()\n",
"#trades_df.info()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a = trades_df.loc[(\"BAC\", \"2024-02-16 09:30\"):(\"BAC\",\"2024-02-16 09:32:11\")]\n",
"a"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#trades_df.info()\n",
"focused = trades_df.loc[(\"BAC\", \"2024-02-16 09:30:00\"):(\"BAC\", \"2024-02-16 10:24:26\")]\n",
"focused"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df.loc[\"2024-02-16 09:30:00\":\"2024-02-16 10:24:26\"]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"focohlc = ohlcv_df.loc[\"2024-02-16 09:30:00\":\"2024-02-16 10:24:26\"]\n",
"focohlc\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"focohlc.info()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#trades_df.to_parquet(dir + \"trades_df-BAC-2024-01-01T09:30:00-2024-05-14T16:00:00-CO4B7VPWUZF-100.parquet\", engine='pyarrow', compression='gzip')\n",
"#trades_df = pd.read_parquet(dir + \"trades_df-BAC-2024-01-01T09:30:00-2024-05-14T16:00:00-CO4B7VPWUZF-100.parquet\",engine='pyarrow')\n",
"\n",
"#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"file_trades"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#list all files is dir directory with parquet extension\n",
"dir = DATA_DIR + \"/notebooks/\"\n",
"import os\n",
"files = [f for f in os.listdir(dir) if f.endswith(\".parquet\")]\n",
"file_name = \"\"\n",
"ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"# Calculate daily returns\n",
"ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna()\n",
"#same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na\n",
"\n",
"\n",
"# Plot the probability distribution curve\n",
"plt.figure(figsize=(10, 6))\n",
"sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30)\n",
"plt.title('Probability Distribution of Daily Returns')\n",
"plt.xlabel('Daily Returns')\n",
"plt.ylabel('Probability')\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.preprocessing import StandardScaler\n",
"from sklearn.linear_model import LogisticRegression\n",
"from sklearn.metrics import accuracy_score\n",
"\n",
"# Define the intervals from 5 to 20 s, returns for each interval\n",
"#maybe use rolling window?\n",
"intervals = range(5, 21, 5)\n",
"\n",
"# Create columns for percentage returns\n",
"rolling_window = 50\n",
"\n",
"# Normalize the returns using rolling mean and std\n",
"for N in intervals:\n",
" column_name = f'returns_{N}'\n",
" rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean()\n",
" rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std()\n",
" ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std\n",
"\n",
"# Display the dataframe with normalized return columns\n",
"ohlcv_df\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Calculate the sum of the normalized return columns for each row\n",
"ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1)\n",
"\n",
"# Sort the DataFrame based on the sum of normalized returns in descending order\n",
"df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False)\n",
"\n",
"# Display the top rows with the highest sum of normalized returns\n",
"df_sorted\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Drop initial rows with NaN values due to pct_change\n",
"ohlcv_df.dropna(inplace=True)\n",
"\n",
"# Plotting the probability distribution curves\n",
"plt.figure(figsize=(14, 8))\n",
"for N in intervals:\n",
" sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True)\n",
"\n",
"plt.title('Probability Distribution of Percentage Returns')\n",
"plt.xlabel('Percentage Return')\n",
"plt.ylabel('Density')\n",
"plt.legend()\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"# Plot the probability distribution curve\n",
"plt.figure(figsize=(10, 6))\n",
"sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30)\n",
"plt.title('Probability Distribution of Daily Returns')\n",
"plt.xlabel('Daily Returns')\n",
"plt.ylabel('Probability')\n",
"plt.show()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#show only rows from ohlcv_df where returns > 0.005\n",
"ohlcv_df[ohlcv_df['returns'] > 0.0005]\n",
"\n",
"#ohlcv_df[ohlcv_df['returns'] < -0.005]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#ohlcv where index = date 2024-03-13 and between hour 12\n",
"\n",
"a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00']\n",
"a"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df.info()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df.to_parquet(\"trades_df-spy-0111-0111.parquett\", engine='pyarrow', compression='gzip')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df.to_parquet(\"trades_df-spy-111-0516.parquett\", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df.to_parquet(\"ohlcv_df-spy-111-0516.parquett\", engine='pyarrow', compression='gzip')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY)\n",
"vbt.settings['plotting']['auto_rangebreaks'] = True\n",
"basic_data.ohlcv.plot()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#access just BCA\n",
"#df_filtered = df.loc[\"BAC\"]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,392 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Loading trades and vectorized aggregation\n",
"Describes how to fetch trades (remote/cached) and use new vectorized aggregation to aggregate bars of given type (time, volume, dollar) and resolution\n",
"\n",
"`fetch_trades_parallel` enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. return `trades_df`\n",
"`aggregate_trades` acceptss `trades_df` and ressolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe `ohlcv_df`"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"trades_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet\n",
"trades_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet\n",
"trades_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet\n",
"trades_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\n",
"ohlcv_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet\n",
"trades_df-BAC-2023-01-01T09:30:00-2024-10-02T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet\n",
"trades_df-BAC-2024-05-15T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\n",
"ohlcv_df-BAC-2024-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\n",
"ohlcv_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet\n",
"ohlcv_df-BAC-2024-01-01T09_30_00-2024-05-14T16_00_00-CO4B7VPWUZF-100.parquet\n",
"ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T16_00_00-47BCFOPUVWZ-100.parquet\n",
"ohlcv_df-BAC-2023-01-01T09_30_00-2024-05-25T15_30_00-47BCFOPUVWZ-100.parquet\n"
]
},
{
"data": {
"text/plain": [
"['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']"
]
},
"execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from numba import jit\n",
"from alpaca.data.historical import StockHistoricalDataClient\n",
"from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR\n",
"from alpaca.data.requests import StockTradesRequest\n",
"from v2realbot.enums.enums import BarType\n",
"import time\n",
"from datetime import datetime\n",
"from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data\n",
"import pyarrow\n",
"from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades\n",
"import vectorbtpro as vbt\n",
"import v2realbot.utils.config_handler as cfh\n",
"\n",
"vbt.settings.set_theme(\"dark\")\n",
"vbt.settings['plotting']['layout']['width'] = 1280\n",
"vbt.settings.plotting.auto_rangebreaks = True\n",
"# Set the option to display with pagination\n",
"pd.set_option('display.notebook_repr_html', True)\n",
"pd.set_option('display.max_rows', 20) # Number of rows per page\n",
"# pd.set_option('display.float_format', '{:.9f}'.format)\n",
"\n",
"\n",
"#trade filtering\n",
"exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F']\n",
"minsize = 100\n",
"\n",
"symbol = \"BAC\"\n",
"#datetime in zoneNY \n",
"day_start = datetime(2023, 1, 1, 9, 30, 0)\n",
"day_stop = datetime(2024, 10, 2, 16, 00, 0)\n",
"day_start = zoneNY.localize(day_start)\n",
"day_stop = zoneNY.localize(day_stop)\n",
"#filename of trades_df parquet, date are in isoformat but without time zone part\n",
"dir = DATA_DIR + \"/notebooks/\"\n",
"#parquet interval cache contains exclude conditions and minsize filtering\n",
"file_trades = dir + f\"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet\"\n",
"#file_trades = dir + f\"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet\"\n",
"file_ohlcv = dir + f\"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{str(exclude_conditions)}-{minsize}.parquet\"\n",
"\n",
"#PRINT all parquet in directory\n",
"import os\n",
"files = [f for f in os.listdir(dir) if f.endswith(\".parquet\")]\n",
"for f in files:\n",
" print(f)\n",
"\n",
"exclude_conditions"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#fetch trades in one go\n",
"#trades_df = fetch_daily_stock_trades(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_retries=5, backoff_factor=1)\n",
"#fetch trades in parallel - for longer intervals\n",
"#trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_workers=None)\n",
" \n",
"##trades_df.info()"
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"#trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"#Either load trades or ohlcv from parquet if exists\n",
"\n",
"#trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=50, max_workers=20) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F'])\n",
"# trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip')\n",
"\n",
"trades_df = pd.read_parquet(file_trades,engine='pyarrow')\n",
"ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME)\n",
"ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip')\n",
"\n",
"# ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')\n",
"# trades_df = pd.read_parquet(file_trades,engine='pyarrow')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#list all files is dir directory with parquet extension\n",
"dir = DATA_DIR + \"/notebooks/\"\n",
"import os\n",
"files = [f for f in os.listdir(dir) if f.endswith(\".parquet\")]\n",
"file_name = \"\"\n",
"ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"# Calculate daily returns\n",
"ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna()\n",
"#same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na\n",
"\n",
"\n",
"# Plot the probability distribution curve\n",
"plt.figure(figsize=(10, 6))\n",
"sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30)\n",
"plt.title('Probability Distribution of Daily Returns')\n",
"plt.xlabel('Daily Returns')\n",
"plt.ylabel('Probability')\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import pandas as pd\n",
"import numpy as np\n",
"from sklearn.model_selection import train_test_split\n",
"from sklearn.preprocessing import StandardScaler\n",
"from sklearn.linear_model import LogisticRegression\n",
"from sklearn.metrics import accuracy_score\n",
"\n",
"# Define the intervals from 5 to 20 s, returns for each interval\n",
"#maybe use rolling window?\n",
"intervals = range(5, 21, 5)\n",
"\n",
"# Create columns for percentage returns\n",
"rolling_window = 50\n",
"\n",
"# Normalize the returns using rolling mean and std\n",
"for N in intervals:\n",
" column_name = f'returns_{N}'\n",
" rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean()\n",
" rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std()\n",
" ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std\n",
"\n",
"# Display the dataframe with normalized return columns\n",
"ohlcv_df\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Calculate the sum of the normalized return columns for each row\n",
"ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1)\n",
"\n",
"# Sort the DataFrame based on the sum of normalized returns in descending order\n",
"df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False)\n",
"\n",
"# Display the top rows with the highest sum of normalized returns\n",
"df_sorted\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Drop initial rows with NaN values due to pct_change\n",
"ohlcv_df.dropna(inplace=True)\n",
"\n",
"# Plotting the probability distribution curves\n",
"plt.figure(figsize=(14, 8))\n",
"for N in intervals:\n",
" sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True)\n",
"\n",
"plt.title('Probability Distribution of Percentage Returns')\n",
"plt.xlabel('Percentage Return')\n",
"plt.ylabel('Density')\n",
"plt.legend()\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import matplotlib.pyplot as plt\n",
"import seaborn as sns\n",
"# Plot the probability distribution curve\n",
"plt.figure(figsize=(10, 6))\n",
"sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30)\n",
"plt.title('Probability Distribution of Daily Returns')\n",
"plt.xlabel('Daily Returns')\n",
"plt.ylabel('Probability')\n",
"plt.show()\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#show only rows from ohlcv_df where returns > 0.005\n",
"ohlcv_df[ohlcv_df['returns'] > 0.0005]\n",
"\n",
"#ohlcv_df[ohlcv_df['returns'] < -0.005]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#ohlcv where index = date 2024-03-13 and between hour 12\n",
"\n",
"a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00']\n",
"a"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df.info()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df.to_parquet(\"trades_df-spy-0111-0111.parquett\", engine='pyarrow', compression='gzip')\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"trades_df.to_parquet(\"trades_df-spy-111-0516.parquett\", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ohlcv_df.to_parquet(\"ohlcv_df-spy-111-0516.parquett\", engine='pyarrow', compression='gzip')"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY)\n",
"vbt.settings['plotting']['auto_rangebreaks'] = True\n",
"basic_data.ohlcv.plot()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#access just BCA\n",
"#df_filtered = df.loc[\"BAC\"]"
]
}
],
"metadata": {
"kernelspec": {
"display_name": ".venv",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.11"
}
},
"nbformat": 4,
"nbformat_minor": 2
}