39 KiB
39 KiB
Loading trades and vectorized aggregation¶
Describes how to fetch trades (remote/cached) and use new vectorized aggregation to aggregate bars of given type (time, volume, dollar) and resolution
fetch_trades_parallel enables to fetch trades of given symbol and interval, also can filter conditions and minimum size. return trades_df
aggregate_trades acceptss trades_df and ressolution and type of bars (VOLUME, TIME, DOLLAR) and return aggregated ohlcv dataframe ohlcv_df
In [1]:
import pandas as pd import numpy as np from numba import jit from alpaca.data.historical import StockHistoricalDataClient from v2realbot.config import ACCOUNT1_PAPER_API_KEY, ACCOUNT1_PAPER_SECRET_KEY, DATA_DIR from alpaca.data.requests import StockTradesRequest from v2realbot.enums.enums import BarType import time from datetime import datetime from v2realbot.utils.utils import parse_alpaca_timestamp, ltp, zoneNY, send_to_telegram, fetch_calendar_data import pyarrow from v2realbot.loader.aggregator_vectorized import fetch_daily_stock_trades, fetch_trades_parallel, generate_time_bars_nb, aggregate_trades import vectorbtpro as vbt import v2realbot.utils.config_handler as cfh vbt.settings.set_theme("dark") vbt.settings['plotting']['layout']['width'] = 1280 vbt.settings.plotting.auto_rangebreaks = True # Set the option to display with pagination pd.set_option('display.notebook_repr_html', True) pd.set_option('display.max_rows', 20) # Number of rows per page # pd.set_option('display.float_format', '{:.9f}'.format) #trade filtering exclude_conditions = cfh.config_handler.get_val('AGG_EXCLUDED_TRADES') #standard ['C','O','4','B','7','V','P','W','U','Z','F'] minsize = 100 symbol = "SPY" #datetime in zoneNY day_start = datetime(2024, 1, 1, 9, 30, 0) day_stop = datetime(2024, 1, 14, 16, 00, 0) day_start = zoneNY.localize(day_start) day_stop = zoneNY.localize(day_stop) #filename of trades_df parquet, date are in isoformat but without time zone part dir = DATA_DIR + "/notebooks/" #parquet interval cache contains exclude conditions and minsize filtering file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet" #file_trades = dir + f"trades_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}.parquet" file_ohlcv = dir + f"ohlcv_df-{symbol}-{day_start.strftime('%Y-%m-%dT%H:%M:%S')}-{day_stop.strftime('%Y-%m-%dT%H:%M:%S')}-{exclude_conditions}-{minsize}.parquet" #PRINT all parquet in directory import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] for f in files: print(f)
Activating profile profile1
trades_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet trades_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet ohlcv_df-BAC-2024-01-11T09:30:00-2024-01-12T16:00:00.parquet ohlcv_df-SPY-2024-01-01T09:30:00-2024-05-14T16:00:00.parquet
In [2]:
trades_df = fetch_daily_stock_trades(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_retries=5, backoff_factor=1) trades_df
NOT FOUND. Fetching from remote
--------------------------------------------------------------------------- KeyboardInterrupt Traceback (most recent call last) Cell In[2], line 1 ----> 1 trades_df = fetch_daily_stock_trades(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=minsize, force_remote=False, max_retries=5, backoff_factor=1) 2 trades_df File ~/Documents/Development/python/v2trading/v2realbot/loader/aggregator_vectorized.py:200, in fetch_daily_stock_trades(symbol, start, end, exclude_conditions, minsize, force_remote, max_retries, backoff_factor) 198 for attempt in range(max_retries): 199 try: --> 200 tradesResponse = client.get_stock_trades(stockTradeRequest) 201 is_empty = not tradesResponse[symbol] 202 print(f"Remote fetched: {is_empty=}", start, end) File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/alpaca/data/historical/stock.py:144, in StockHistoricalDataClient.get_stock_trades(self, request_params) 141 params = request_params.to_request_fields() 143 # paginated get request for market data api --> 144 raw_trades = self._data_get( 145 endpoint_data_type="trades", 146 endpoint_asset_class="stocks", 147 api_version="v2", 148 **params, 149 ) 151 if self._use_raw_data: 152 return raw_trades File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/alpaca/data/historical/stock.py:338, in StockHistoricalDataClient._data_get(self, endpoint_asset_class, endpoint_data_type, api_version, symbol_or_symbols, limit, page_limit, extension, **kwargs) 335 params["limit"] = actual_limit 336 params["page_token"] = page_token --> 338 response = self.get(path=path, data=params, api_version=api_version) 340 # TODO: Merge parsing if possible 341 if extension == DataExtensionType.SNAPSHOT: File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/alpaca/common/rest.py:221, in RESTClient.get(self, path, data, **kwargs) 210 def get(self, path: str, data: Union[dict, str] = None, **kwargs) -> HTTPResult: 211 """Performs a single GET request 212 213 Args: (...) 219 dict: The response 220 """ --> 221 return self._request("GET", path, data, **kwargs) File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/alpaca/common/rest.py:129, in RESTClient._request(self, method, path, data, base_url, api_version) 127 while retry >= 0: 128 try: --> 129 return self._one_request(method, url, opts, retry) 130 except RetryException: 131 time.sleep(self._retry_wait) File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/alpaca/common/rest.py:193, in RESTClient._one_request(self, method, url, opts, retry) 174 def _one_request(self, method: str, url: str, opts: dict, retry: int) -> dict: 175 """Perform one request, possibly raising RetryException in the case 176 the response is 429. Otherwise, if error text contain "code" string, 177 then it decodes to json object and returns APIError. (...) 191 dict: The response data 192 """ --> 193 response = self._session.request(method, url, **opts) 195 try: 196 response.raise_for_status() File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json) 584 send_kwargs = { 585 "timeout": timeout, 586 "allow_redirects": allow_redirects, 587 } 588 send_kwargs.update(settings) --> 589 resp = self.send(prep, **send_kwargs) 591 return resp File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs) 700 start = preferred_clock() 702 # Send the request --> 703 r = adapter.send(request, **kwargs) 705 # Total elapsed time of the request (approximately) 706 elapsed = preferred_clock() - start File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/requests/adapters.py:486, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies) 483 timeout = TimeoutSauce(connect=timeout, read=timeout) 485 try: --> 486 resp = conn.urlopen( 487 method=request.method, 488 url=url, 489 body=request.body, 490 headers=request.headers, 491 redirect=False, 492 assert_same_host=False, 493 preload_content=False, 494 decode_content=False, 495 retries=self.max_retries, 496 timeout=timeout, 497 chunked=chunked, 498 ) 500 except (ProtocolError, OSError) as err: 501 raise ConnectionError(err, request=request) File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/urllib3/connectionpool.py:703, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, **response_kw) 700 self._prepare_proxy(conn) 702 # Make the request on the httplib connection object. --> 703 httplib_response = self._make_request( 704 conn, 705 method, 706 url, 707 timeout=timeout_obj, 708 body=body, 709 headers=headers, 710 chunked=chunked, 711 ) 713 # If we're going to release the connection in ``finally:``, then 714 # the response doesn't need to know about the connection. Otherwise 715 # it will also try to release it and we'll have a double-release 716 # mess. 717 response_conn = conn if not release_conn else None File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/urllib3/connectionpool.py:449, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw) 444 httplib_response = conn.getresponse() 445 except BaseException as e: 446 # Remove the TypeError from the exception chain in 447 # Python 3 (including for exceptions like SystemExit). 448 # Otherwise it looks like a bug in the code. --> 449 six.raise_from(e, None) 450 except (SocketTimeout, BaseSSLError, SocketError) as e: 451 self._raise_timeout(err=e, url=url, timeout_value=read_timeout) File <string>:3, in raise_from(value, from_value) File ~/Documents/Development/python/v2trading/.venv/lib/python3.10/site-packages/urllib3/connectionpool.py:444, in HTTPConnectionPool._make_request(self, conn, method, url, timeout, chunked, **httplib_request_kw) 441 except TypeError: 442 # Python 3 443 try: --> 444 httplib_response = conn.getresponse() 445 except BaseException as e: 446 # Remove the TypeError from the exception chain in 447 # Python 3 (including for exceptions like SystemExit). 448 # Otherwise it looks like a bug in the code. 449 six.raise_from(e, None) File /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self) 1373 try: 1374 try: -> 1375 response.begin() 1376 except ConnectionError: 1377 self.close() File /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:318, in HTTPResponse.begin(self) 316 # read until we get a non-100 response 317 while True: --> 318 version, status, reason = self._read_status() 319 if status != CONTINUE: 320 break File /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/http/client.py:279, in HTTPResponse._read_status(self) 278 def _read_status(self): --> 279 line = str(self.fp.readline(_MAXLINE + 1), "iso-8859-1") 280 if len(line) > _MAXLINE: 281 raise LineTooLong("status line") File /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/socket.py:705, in SocketIO.readinto(self, b) 703 while True: 704 try: --> 705 return self._sock.recv_into(b) 706 except timeout: 707 self._timeout_occurred = True File /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/ssl.py:1274, in SSLSocket.recv_into(self, buffer, nbytes, flags) 1270 if flags != 0: 1271 raise ValueError( 1272 "non-zero flags not allowed in calls to recv_into() on %s" % 1273 self.__class__) -> 1274 return self.read(nbytes, buffer) 1275 else: 1276 return super().recv_into(buffer, nbytes, flags) File /Library/Frameworks/Python.framework/Versions/3.10/lib/python3.10/ssl.py:1130, in SSLSocket.read(self, len, buffer) 1128 try: 1129 if buffer is not None: -> 1130 return self._sslobj.read(len, buffer) 1131 else: 1132 return self._sslobj.read(len) KeyboardInterrupt:
In [2]:
#Either load trades or ohlcv from parquet if exists #trades_df = fetch_trades_parallel(symbol, day_start, day_stop, exclude_conditions=exclude_conditions, minsize=50, max_workers=20) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F']) # trades_df.to_parquet(file_trades, engine='pyarrow', compression='gzip') trades_df = pd.read_parquet(file_trades,engine='pyarrow') ohlcv_df = aggregate_trades(symbol=symbol, trades_df=trades_df, resolution=1, type=BarType.TIME) ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow', compression='gzip') # ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow') # trades_df = pd.read_parquet(file_trades,engine='pyarrow')
In [ ]:
#list all files is dir directory with parquet extension dir = DATA_DIR + "/notebooks/" import os files = [f for f in os.listdir(dir) if f.endswith(".parquet")] file_name = "" ohlcv_df = pd.read_parquet(file_ohlcv,engine='pyarrow')
In [ ]:
ohlcv_df
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Calculate daily returns ohlcv_df['returns'] = ohlcv_df['close'].pct_change().dropna() #same as above but pct_change is from 3 datapoints back, but only if it is the same date, else na # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
import pandas as pd import numpy as np from sklearn.model_selection import train_test_split from sklearn.preprocessing import StandardScaler from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score # Define the intervals from 5 to 20 s, returns for each interval #maybe use rolling window? intervals = range(5, 21, 5) # Create columns for percentage returns rolling_window = 50 # Normalize the returns using rolling mean and std for N in intervals: column_name = f'returns_{N}' rolling_mean = ohlcv_df[column_name].rolling(window=rolling_window).mean() rolling_std = ohlcv_df[column_name].rolling(window=rolling_window).std() ohlcv_df[f'norm_{column_name}'] = (ohlcv_df[column_name] - rolling_mean) / rolling_std # Display the dataframe with normalized return columns ohlcv_df
In [ ]:
# Calculate the sum of the normalized return columns for each row ohlcv_df['sum_norm_returns'] = ohlcv_df[[f'norm_returns_{N}' for N in intervals]].sum(axis=1) # Sort the DataFrame based on the sum of normalized returns in descending order df_sorted = ohlcv_df.sort_values(by='sum_norm_returns', ascending=False) # Display the top rows with the highest sum of normalized returns df_sorted
In [ ]:
# Drop initial rows with NaN values due to pct_change ohlcv_df.dropna(inplace=True) # Plotting the probability distribution curves plt.figure(figsize=(14, 8)) for N in intervals: sns.kdeplot(ohlcv_df[f'returns_{N}'].dropna(), label=f'Returns {N}', fill=True) plt.title('Probability Distribution of Percentage Returns') plt.xlabel('Percentage Return') plt.ylabel('Density') plt.legend() plt.show()
In [ ]:
import matplotlib.pyplot as plt import seaborn as sns # Plot the probability distribution curve plt.figure(figsize=(10, 6)) sns.histplot(ohlcv_df['returns'].dropna(), kde=True, stat='probability', bins=30) plt.title('Probability Distribution of Daily Returns') plt.xlabel('Daily Returns') plt.ylabel('Probability') plt.show()
In [ ]:
#show only rows from ohlcv_df where returns > 0.005 ohlcv_df[ohlcv_df['returns'] > 0.0005] #ohlcv_df[ohlcv_df['returns'] < -0.005]
In [ ]:
#ohlcv where index = date 2024-03-13 and between hour 12 a = ohlcv_df.loc['2024-03-13 12:00:00':'2024-03-13 13:00:00'] a
In [ ]:
ohlcv_df
In [ ]:
trades_df
In [ ]:
ohlcv_df.info()
In [ ]:
trades_df.to_parquet("trades_df-spy-0111-0111.parquett", engine='pyarrow', compression='gzip')
In [ ]:
trades_df.to_parquet("trades_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip', allow_truncated_timestamps=True)
In [ ]:
ohlcv_df.to_parquet("ohlcv_df-spy-111-0516.parquett", engine='pyarrow', compression='gzip')
In [ ]:
basic_data = vbt.Data.from_data(vbt.symbol_dict({symbol: ohlcv_df}), tz_convert=zoneNY) vbt.settings['plotting']['auto_rangebreaks'] = True basic_data.ohlcv.plot()
In [ ]:
#access just BCA #df_filtered = df.loc["BAC"]