static site pwd protected, load dotenv moved to config, aggregator vecotrized chng (#203)

This commit is contained in:
David Brazda
2024-06-04 12:49:32 +02:00
committed by GitHub
parent 05b7725a25
commit 1659cc7a6e
15 changed files with 49538 additions and 851 deletions
+120 -85
View File
@@ -18,6 +18,7 @@ import random
from alpaca.data.models import BarSet, QuoteSet, TradeSet
import v2realbot.utils.config_handler as cfh
from v2realbot.enums.enums import BarType
from tqdm import tqdm
""""
Module used for vectorized aggregation of trades.
@@ -28,7 +29,7 @@ Includes fetch (remote/cached) methods and numba aggregator function for TIME BA
def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type: BarType = BarType.TIME):
""""
Accepts dataframe with trades keyed by symbol. Preparess dataframe to
numpy and call nNumba optimized aggregator for given bar type. (time/volume/dollar)
numpy and calls Numba optimized aggregator for given bar type. (time/volume/dollar)
"""""
trades_df = trades_df.loc[symbol]
trades_df= trades_df.reset_index()
@@ -54,15 +55,37 @@ def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type
columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades']
if type == BarType.DOLLAR:
columns.append('amount')
columns.append('updated')
if type == BarType.TIME:
columns.append('vwap')
columns.append('buyvolume')
columns.append('sellvolume')
if type == BarType.VOLUME:
columns.append('buyvolume')
columns.append('sellvolume')
ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns)
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s')
ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY)
#print(ohlcv_df['updated'])
ohlcv_df['updated'] = pd.to_datetime(ohlcv_df['updated'], unit="s").dt.tz_localize('UTC').dt.tz_convert(zoneNY)
# Round to microseconds to maintain six decimal places
ohlcv_df['updated'] = ohlcv_df['updated'].dt.round('us')
ohlcv_df.set_index('time', inplace=True)
ohlcv_df.index = ohlcv_df.index.tz_localize('UTC').tz_convert(zoneNY)
#ohlcv_df.index = ohlcv_df.index.tz_localize('UTC').tz_convert(zoneNY)
return ohlcv_df
# Function to ensure fractional seconds are present
def ensure_fractional_seconds(timestamp):
if '.' not in timestamp:
# Inserting .000000 before the timezone indicator 'Z'
return timestamp.replace('Z', '.000000Z')
else:
return timestamp
def convert_dict_to_multiindex_df(tradesResponse):
""""
Converts dictionary from cache or from remote (raw input) to multiindex dataframe.
with microsecond precision (from nanoseconds in the raw data)
"""""
# Create a DataFrame for each key and add the key as part of the MultiIndex
dfs = []
@@ -74,7 +97,17 @@ def convert_dict_to_multiindex_df(tradesResponse):
df = df[['t', 'x', 'p', 's', 'i', 'c','z']]
df.rename(columns={'t': 'timestamp', 'c': 'conditions', 'p': 'price', 's': 'size', 'x': 'exchange', 'z':'tape', 'i':'id'}, inplace=True)
df['symbol'] = key # Add ticker as a column
df['timestamp'] = pd.to_datetime(df['timestamp']) # Convert 't' from string to datetime before setting it as an index
# Apply the function to ensure all timestamps have fractional seconds
#zvazit zda toto ponechat a nebo dat jen pri urcitem erroru pri to_datetime
#pripadne pak pridelat efektivnejsi pristup, aneb nahrazeni NaT - https://chatgpt.com/c/d2be6f87-b38f-4050-a1c6-541d100b1474
df['timestamp'] = df['timestamp'].apply(ensure_fractional_seconds)
df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce') # Convert 't' from string to datetime before setting it as an index
#Adjust to microsecond precision
df.loc[df['timestamp'].notna(), 'timestamp'] = df['timestamp'].dt.floor('us')
df.set_index(['symbol', 'timestamp'], inplace=True) # Set the multi-level index using both 'ticker' and 't'
df = df.tz_convert(zoneNY, level='timestamp')
dfs.append(df)
@@ -123,66 +156,6 @@ def dict_to_df(tradesResponse, start, end, exclude_conditions = None, minsize =
df = df[df['size'] >= minsize]
return df
#fetches daily stock tradess - currently only main session is supported
def fetch_daily_stock_trades_old(symbol, start, end, exclude_conditions = None, minsize = None, force_remote = False, max_retries=5, backoff_factor=1):
"""
Attempts to fetch stock trades with exponential backoff. Raises an exception if all retries fail.
:param symbol: The stock symbol to fetch trades for.
:param start: The start time for the trade data.
:param end: The end time for the trade data.
:param max_retries: Maximum number of retries.
:param backoff_factor: Factor to determine the next sleep time.
:return: TradesResponse object.
:raises: ConnectionError if all retries fail.
We use tradecache only for main sessison request = 9:30 to 16:00
"""
use_daily_tradecache = False
if (start.time() >= time(9, 30) and end.time() <= time(16, 0)):
use_daily_tradecache = True
filename_start = zoneNY.localize(datetime.combine(start.date(), time(9, 30)))
filename_end= zoneNY.localize(datetime.combine(end.date(), time(16, 0)))
daily_file = "TS" + str(symbol) + '-' + str(int(filename_start.timestamp())) + '-' + str(int(filename_end.timestamp())) + '.cache.gz'
file_path = DATA_DIR + "/tradecache/"+daily_file
if use_daily_tradecache and not force_remote and os.path.exists(file_path):
print("Searching cache: " + daily_file)
with gzip.open (file_path, 'rb') as fp:
tradesResponse = pickle.load(fp)
print("FOUND in CACHE", daily_file)
#response je vzdy ulozena jako raw(dict), davame zpet do TradeSetu, ktery umi i df
return dict_to_df(tradesResponse, start, end, exclude_conditions, minsize)
#daily file doesnt exist
else:
print("NOT FOUND. Fetching from remote")
client = StockHistoricalDataClient(ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, raw_data=False)
stockTradeRequest = StockTradesRequest(symbol_or_symbols=symbol, start=start, end=end)
last_exception = None
for attempt in range(max_retries):
try:
tradesResponse = client.get_stock_trades(stockTradeRequest)
is_empty = not tradesResponse[symbol]
print(f"Remote fetched: {is_empty=}", start, end)
#pokud jde o dnešní den a nebyl konec trhu tak cache neukládáme, pripadne pri iex datapointu necachujeme
if use_daily_tradecache and not is_empty:
if (start < datetime.now().astimezone(zoneNY) < end):
print("not saving trade cache, market still open today")
else:
with gzip.open(file_path, 'wb') as fp:
pickle.dump(tradesResponse, fp)
print("Saving to Trade CACHE", file_path)
return pd.DataFrame() if is_empty else dict_to_df(tradesResponse, start, end)
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
last_exception = e
time_module.sleep(backoff_factor * (2 ** attempt))
print("All attempts to fetch data failed.")
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, force_remote=False, max_retries=5, backoff_factor=1):
#doc for this function
"""
@@ -205,6 +178,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
Do budoucna ukládat celý den BAC-20240203.cache.gz a z toho si pak filtrovat bud main sesssionu a extended
Ale zatim je uloženo jen main session v BAC-timestampopenu-timestampclose.cache.gz
"""
is_same_day = start.date() == end.date()
# Determine if the requested times fall within the main session
in_main_session = (time(9, 30) <= start.time() < time(16, 0)) and (time(9, 30) <= end.time() <= time(16, 0))
file_path = ''
@@ -215,7 +189,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
daily_file = f"{symbol}-{int(filename_start.timestamp())}-{int(filename_end.timestamp())}.cache.gz"
file_path = f"{DATA_DIR}/tradecache/{daily_file}"
if not force_remote and os.path.exists(file_path):
print("Searching cache: " + daily_file)
print(f"Searching {str(start.date())} cache: " + daily_file)
with gzip.open(file_path, 'rb') as fp:
tradesResponse = pickle.load(fp)
print("FOUND in CACHE", daily_file)
@@ -240,7 +214,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
else: # Don't save the cache if the market is still open
print("Not saving trade cache, market still open today")
return pd.DataFrame() if is_empty else dict_to_df(tradesResponse, start, end)
return pd.DataFrame() if is_empty else dict_to_df(tradesResponse, start, end, exclude_conditions, minsize)
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
last_exception = e
@@ -250,7 +224,7 @@ def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsiz
raise ConnectionError(f"Failed to fetch stock trades after {max_retries} retries. Last exception: {str(last_exception)} and {format_exc()}")
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES'), minsize = 100, force_remote = False):
def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES'), minsize = 100, force_remote = False, max_workers=None):
"""
Fetches trades for each day between start_date and end_date during market hours (9:30-16:00) in parallel and concatenates them into a single DataFrame.
@@ -265,12 +239,11 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = cfh
market_open_days = fetch_calendar_data(start_date, end_date)
day_count = len(market_open_days)
print("Contains", day_count, " market days")
max_workers = min(10, max(5, day_count // 2)) # Heuristic: half the days to process, but at least 1 and no more than 10
max_workers = min(10, max(2, day_count // 2)) if max_workers is None else max_workers # Heuristic: half the days to process, but at least 1 and no more than 10
with ThreadPoolExecutor(max_workers=max_workers) as executor:
#for single_date in (start_date + timedelta(days=i) for i in range((end_date - start_date).days + 1)):
for market_day in market_open_days:
for market_day in tqdm(market_open_days, desc="Processing market days"):
#start = datetime.combine(single_date, time(9, 30)) # Market opens at 9:30 AM
#end = datetime.combine(single_date, time(16, 0)) # Market closes at 4:00 PM
@@ -286,14 +259,22 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = cfh
future = executor.submit(fetch_daily_stock_trades, symbol, start, end, exclude_conditions, minsize, force_remote)
futures.append(future)
for future in futures:
for future in tqdm(futures, desc="Fetching data"):
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Error fetching data for a day: {e}")
return pd.concat(results, ignore_index=False)
# Batch concatenation to improve speed
batch_size = 10
batches = [results[i:i + batch_size] for i in range(0, len(results), batch_size)]
final_df = pd.concat([pd.concat(batch, ignore_index=False) for batch in batches], ignore_index=False)
return final_df
#original version
#return pd.concat(results, ignore_index=False)
@jit(nopython=True)
def generate_dollar_bars_nb(ticks, amount_per_bar):
@@ -332,7 +313,7 @@ def generate_dollar_bars_nb(ticks, amount_per_bar):
# Check if the new tick is from a different day, then close the current bar
if tick_day != current_day:
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, amount_per_bar])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, amount_per_bar, tick_time])
# Reset for the new day using the current tick data
open_price = price
high_price = price
@@ -361,7 +342,7 @@ def generate_dollar_bars_nb(ticks, amount_per_bar):
volume += volume_to_add # Update the volume here before appending and resetting
# Append the partially filled bar to the list
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count + 1, amount_per_bar])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count + 1, amount_per_bar, tick_time])
# Fill the current bar and continue with a new bar
tick_volume -= volume_to_add
@@ -385,7 +366,7 @@ def generate_dollar_bars_nb(ticks, amount_per_bar):
# Add the last bar if it contains any trades
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, amount_per_bar])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, amount_per_bar, tick_time])
return np.array(ohlcv_bars)
@@ -414,6 +395,9 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
trades_count = 0
current_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
bar_time = ticks[0, 0] # Initialize bar time with the time of the first tick
buy_volume = 0 # Volume of buy trades
sell_volume = 0 # Volume of sell trades
prev_price = ticks[0, 1] # Initialize previous price for the first tick
for tick in ticks:
tick_time = tick[0]
@@ -424,7 +408,7 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
# Check if the new tick is from a different day, then close the current bar
if tick_day != current_day:
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
# Reset for the new day using the current tick data
open_price = price
high_price = price
@@ -435,6 +419,10 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
remaining_volume = volume_per_bar
current_day = tick_day
bar_time = tick_time # Update bar time to the current tick time
buy_volume = 0
sell_volume = 0
# Reset previous tick price (calulating imbalance for each day from the start)
prev_price = price
# Start new bar if needed because of the volume
while tick_volume > 0:
@@ -446,6 +434,13 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
volume += tick_volume
remaining_volume -= tick_volume
trades_count += 1
# Update buy and sell volumes
if price > prev_price:
buy_volume += tick_volume
elif price < prev_price:
sell_volume += tick_volume
tick_volume = 0
else:
# Fill the current bar and continue with a new bar
@@ -453,8 +448,15 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
volume += volume_to_add
tick_volume -= volume_to_add
trades_count += 1
# Update buy and sell volumes
if price > prev_price:
buy_volume += volume_to_add
elif price < prev_price:
sell_volume += volume_to_add
# Append the completed bar to the list
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
# Reset bar values for the new bar using the current tick data
open_price = price
@@ -464,16 +466,20 @@ def generate_volume_bars_nb(ticks, volume_per_bar):
volume = 0
trades_count = 0
remaining_volume = volume_per_bar
# Increment bar time if splitting a trade
if tick_volume > 0: #pokud v tradu je jeste zbytek nastavujeme cas o nanosekundu vetsi
buy_volume = 0
sell_volume = 0
# Increment bar time if splitting a trade
if tick_volume > 0: # If there's remaining volume in the trade, set bar time slightly later
bar_time = tick_time + 1e-6
else:
bar_time = tick_time #jinak nastavujeme cas ticku
bar_time = tick_time # Otherwise, set bar time to the tick time
prev_price = price
# Add the last bar if it contains any trades
if trades_count > 0:
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count])
ohlcv_bars.append([bar_time, open_price, high_price, low_price, close_price, volume, trades_count, tick_time, buy_volume, sell_volume])
return np.array(ohlcv_bars)
@@ -497,16 +503,30 @@ def generate_time_bars_nb(ticks, resolution):
close_price = 0
volume = 0
trades_count = 0
vwap_cum_volume_price = 0 # Cumulative volume * price
cum_volume = 0 # Cumulative volume for VWAP
buy_volume = 0 # Volume of buy trades
sell_volume = 0 # Volume of sell trades
prev_price = ticks[0, 1] # Initialize previous price for the first tick
prev_day = np.floor(ticks[0, 0] / 86400) # Calculate the initial day from the first tick timestamp
for tick in ticks:
curr_time = tick[0] #updated time
tick_time = np.floor(tick[0] / resolution) * resolution
price = tick[1]
tick_volume = tick[2]
tick_day = np.floor(tick_time / 86400) # Calculate the day of the current tick
#if the new tick is from a new day, reset previous tick price (calculating imbalance starts over)
if tick_day != prev_day:
prev_price = price
prev_day = tick_day
# Check if the tick belongs to a new bar
if tick_time != start_time + current_bar_index * resolution:
if current_bar_index >= 0 and trades_count > 0: # Save the previous bar if trades happened
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count])
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
# Reset bar values
current_bar_index = int((tick_time - start_time) / resolution)
@@ -515,6 +535,10 @@ def generate_time_bars_nb(ticks, resolution):
low_price = price
volume = 0
trades_count = 0
vwap_cum_volume_price = 0
cum_volume = 0
buy_volume = 0
sell_volume = 0
# Update the OHLCV values for the current bar
high_price = max(high_price, price)
@@ -522,10 +546,21 @@ def generate_time_bars_nb(ticks, resolution):
close_price = price
volume += tick_volume
trades_count += 1
vwap_cum_volume_price += price * tick_volume
cum_volume += tick_volume
# Update buy and sell volumes
if price > prev_price:
buy_volume += tick_volume
elif price < prev_price:
sell_volume += tick_volume
prev_price = price
# Save the last processed bar
if trades_count > 0:
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count])
vwap = vwap_cum_volume_price / cum_volume if cum_volume > 0 else 0
ohlcv_bars.append([start_time + current_bar_index * resolution, open_price, high_price, low_price, close_price, volume, trades_count, curr_time, vwap, buy_volume, sell_volume])
return np.array(ohlcv_bars)