From 2116679dba20dfba844f878b0fb6aa5663f9b6cb Mon Sep 17 00:00:00 2001 From: David Brazda Date: Fri, 1 Nov 2024 11:18:10 +0100 Subject: [PATCH] optimalizations --- setup.py | 2 +- tests/data_loader_tryme.ipynb | 710 +++++++++----------------------- ttools/aggregator_vectorized.py | 72 ++++ ttools/loaders.py | 83 +++- ttools/utils.py | 145 ++++++- 5 files changed, 491 insertions(+), 521 deletions(-) diff --git a/setup.py b/setup.py index a4f9078..26b8fe9 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='ttools', - version='0.6.4', + version='0.7.0', packages=find_packages(), install_requires=[ # list your dependencies here diff --git a/tests/data_loader_tryme.ipynb b/tests/data_loader_tryme.ipynb index a834587..8ad48be 100644 --- a/tests/data_loader_tryme.ipynb +++ b/tests/data_loader_tryme.ipynb @@ -40,7 +40,7 @@ "from ttools.utils import AggType\n", "from datetime import datetime\n", "from ttools.aggregator_vectorized import generate_time_bars_nb, aggregate_trades\n", - "from ttools.loaders import load_data, prepare_trade_cache\n", + "from ttools.loaders import load_data, prepare_trade_cache, fetch_daily_stock_trades\n", "from ttools.utils import zoneNY\n", "import vectorbtpro as vbt\n", "from lightweight_charts import PlotDFAccessor, PlotSRAccessor\n", @@ -69,7 +69,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 7, "metadata": {}, "outputs": [ { @@ -110,44 +110,44 @@ " \n", " \n", " \n", - " 2024-02-15 09:30:00-05:00\n", - " 499.29\n", - " 499.41\n", - " 499.2900\n", - " 499.3200\n", - " 161900.0\n", + " 2024-09-16 04:01:24-04:00\n", + " 562.22\n", + " 562.22\n", + " 562.22\n", + " 562.22\n", + " 200.0\n", " \n", " \n", - " 2024-02-15 09:30:01-05:00\n", - " 499.32\n", - " 499.41\n", - " 499.3000\n", - " 499.4000\n", - " 10900.0\n", + " 2024-09-16 04:02:24-04:00\n", + " 562.17\n", + " 562.17\n", + " 562.17\n", + " 562.17\n", + " 293.0\n", " \n", " \n", - " 2024-02-15 09:30:02-05:00\n", - " 499.36\n", - " 499.40\n", - " 499.3550\n", - " 499.3800\n", - " 7040.0\n", + " 2024-09-16 04:04:36-04:00\n", + " 562.54\n", + " 562.54\n", + " 562.54\n", + " 562.54\n", + " 100.0\n", " \n", " \n", - " 2024-02-15 09:30:03-05:00\n", - " 499.39\n", - " 499.42\n", - " 499.3800\n", - " 499.4000\n", - " 8717.0\n", + " 2024-09-16 04:10:00-04:00\n", + " 562.39\n", + " 562.39\n", + " 562.39\n", + " 562.39\n", + " 102.0\n", " \n", " \n", - " 2024-02-15 09:30:04-05:00\n", - " 499.40\n", - " 499.40\n", - " 499.3500\n", - " 499.3500\n", - " 3265.0\n", + " 2024-09-16 04:10:24-04:00\n", + " 562.44\n", + " 562.44\n", + " 562.44\n", + " 562.44\n", + " 371.0\n", " \n", " \n", " ...\n", @@ -158,69 +158,69 @@ " ...\n", " \n", " \n", - " 2024-03-18 15:59:55-04:00\n", - " 512.94\n", - " 512.94\n", - " 512.8600\n", - " 512.8900\n", - " 7345.0\n", + " 2024-10-18 19:57:24-04:00\n", + " 584.80\n", + " 584.80\n", + " 584.80\n", + " 584.80\n", + " 100.0\n", " \n", " \n", - " 2024-03-18 15:59:56-04:00\n", - " 512.90\n", - " 512.90\n", - " 512.8700\n", - " 512.8800\n", - " 2551.0\n", + " 2024-10-18 19:57:48-04:00\n", + " 584.84\n", + " 584.84\n", + " 584.84\n", + " 584.84\n", + " 622.0\n", " \n", " \n", - " 2024-03-18 15:59:57-04:00\n", - " 512.89\n", - " 512.91\n", - " 512.8500\n", - " 512.8701\n", - " 18063.0\n", + " 2024-10-18 19:58:48-04:00\n", + " 584.77\n", + " 584.79\n", + " 584.77\n", + " 584.79\n", + " 4158.0\n", " \n", " \n", - " 2024-03-18 15:59:58-04:00\n", - " 512.87\n", - " 512.90\n", - " 512.8496\n", - " 512.9000\n", - " 7734.0\n", + " 2024-10-18 19:59:36-04:00\n", + " 584.80\n", + " 584.82\n", + " 584.80\n", + " 584.82\n", + " 298.0\n", " \n", " \n", - " 2024-03-18 15:59:59-04:00\n", - " 512.92\n", - " 512.92\n", - " 512.8200\n", - " 512.8700\n", - " 37159.0\n", + " 2024-10-18 19:59:48-04:00\n", + " 584.76\n", + " 584.76\n", + " 584.72\n", + " 584.72\n", + " 258.0\n", " \n", " \n", "\n", - "

417345 rows × 5 columns

\n", + "

64218 rows × 5 columns

\n", "" ], "text/plain": [ - " open high low close volume\n", - "time \n", - "2024-02-15 09:30:00-05:00 499.29 499.41 499.2900 499.3200 161900.0\n", - "2024-02-15 09:30:01-05:00 499.32 499.41 499.3000 499.4000 10900.0\n", - "2024-02-15 09:30:02-05:00 499.36 499.40 499.3550 499.3800 7040.0\n", - "2024-02-15 09:30:03-05:00 499.39 499.42 499.3800 499.4000 8717.0\n", - "2024-02-15 09:30:04-05:00 499.40 499.40 499.3500 499.3500 3265.0\n", - "... ... ... ... ... ...\n", - "2024-03-18 15:59:55-04:00 512.94 512.94 512.8600 512.8900 7345.0\n", - "2024-03-18 15:59:56-04:00 512.90 512.90 512.8700 512.8800 2551.0\n", - "2024-03-18 15:59:57-04:00 512.89 512.91 512.8500 512.8701 18063.0\n", - "2024-03-18 15:59:58-04:00 512.87 512.90 512.8496 512.9000 7734.0\n", - "2024-03-18 15:59:59-04:00 512.92 512.92 512.8200 512.8700 37159.0\n", + " open high low close volume\n", + "time \n", + "2024-09-16 04:01:24-04:00 562.22 562.22 562.22 562.22 200.0\n", + "2024-09-16 04:02:24-04:00 562.17 562.17 562.17 562.17 293.0\n", + "2024-09-16 04:04:36-04:00 562.54 562.54 562.54 562.54 100.0\n", + "2024-09-16 04:10:00-04:00 562.39 562.39 562.39 562.39 102.0\n", + "2024-09-16 04:10:24-04:00 562.44 562.44 562.44 562.44 371.0\n", + "... ... ... ... ... ...\n", + "2024-10-18 19:57:24-04:00 584.80 584.80 584.80 584.80 100.0\n", + "2024-10-18 19:57:48-04:00 584.84 584.84 584.84 584.84 622.0\n", + "2024-10-18 19:58:48-04:00 584.77 584.79 584.77 584.79 4158.0\n", + "2024-10-18 19:59:36-04:00 584.80 584.82 584.80 584.82 298.0\n", + "2024-10-18 19:59:48-04:00 584.76 584.76 584.72 584.72 258.0\n", "\n", - "[417345 rows x 5 columns]" + "[64218 rows x 5 columns]" ] }, - "execution_count": 5, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } @@ -229,17 +229,17 @@ "#This is how to call LOAD function\n", "symbol = [\"SPY\"]\n", "#datetime in zoneNY \n", - "day_start = datetime(2024, 2, 15, 9, 30, 0)\n", - "day_stop = datetime(2024, 3, 18, 16, 0, 0)\n", + "day_start = datetime(2024, 9, 15, 9, 30, 0)\n", + "day_stop = datetime(2024, 10, 20, 16, 0, 0)\n", "day_start = zoneNY.localize(day_start)\n", "day_stop = zoneNY.localize(day_stop)\n", "\n", "#requested AGG\n", - "resolution = 1 #12s bars\n", + "resolution = 12 #12s bars\n", "agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO\n", "exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults\n", "minsize = 100 #min trade size to include\n", - "main_session_only = True\n", + "main_session_only = False\n", "force_remote = False\n", "\n", "data = load_data(symbol = symbol,\n", @@ -260,162 +260,9 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
openhighlowclosevolume
time
2024-10-14 09:45:00-04:0041.965041.97041.95041.950017895.0
2024-10-14 09:45:12-04:0041.958941.96541.95041.96506281.0
2024-10-14 09:45:24-04:0041.965042.00541.96541.99753522.0
2024-10-14 09:45:36-04:0041.990042.00541.99042.00005960.0
2024-10-14 09:45:48-04:0042.005042.04042.00542.03009113.0
..................
2024-10-16 15:00:00-04:0042.915042.91542.91042.910012872.0
2024-10-16 15:00:12-04:0042.915042.92042.91042.92007574.0
2024-10-16 15:00:24-04:0042.920042.92042.91042.92001769.0
2024-10-16 15:00:36-04:0042.920042.92042.90542.905026599.0
2024-10-16 15:00:48-04:0042.905042.90542.88042.88009216.0
\n", - "

5480 rows × 5 columns

\n", - "
" - ], - "text/plain": [ - " open high low close volume\n", - "time \n", - "2024-10-14 09:45:00-04:00 41.9650 41.970 41.950 41.9500 17895.0\n", - "2024-10-14 09:45:12-04:00 41.9589 41.965 41.950 41.9650 6281.0\n", - "2024-10-14 09:45:24-04:00 41.9650 42.005 41.965 41.9975 3522.0\n", - "2024-10-14 09:45:36-04:00 41.9900 42.005 41.990 42.0000 5960.0\n", - "2024-10-14 09:45:48-04:00 42.0050 42.040 42.005 42.0300 9113.0\n", - "... ... ... ... ... ...\n", - "2024-10-16 15:00:00-04:00 42.9150 42.915 42.910 42.9100 12872.0\n", - "2024-10-16 15:00:12-04:00 42.9150 42.920 42.910 42.9200 7574.0\n", - "2024-10-16 15:00:24-04:00 42.9200 42.920 42.910 42.9200 1769.0\n", - "2024-10-16 15:00:36-04:00 42.9200 42.920 42.905 42.9050 26599.0\n", - "2024-10-16 15:00:48-04:00 42.9050 42.905 42.880 42.8800 9216.0\n", - "\n", - "[5480 rows x 5 columns]" - ] - }, - "execution_count": 5, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ "data.ohlcv.data[symbol[0]]" ] @@ -478,26 +325,9 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "\n", - "File: SPY-AggType.OHLCV-12-2024-01-15T09-30-00-2024-10-20T16-00-00-4679BCFMOPUVWZ-100-True.parquet\n", - "Coverage: 2024-01-15 09:30:00 to 2024-10-20 16:00:00\n", - "Symbol: SPY\n", - "Agg Type: AggType.OHLCV\n", - "Resolution: 12\n", - "Excludes: 4679BCFMOPUVWZ\n", - "Minsize: 100\n", - "Main Session Only: True\n", - "--------------------------------------------------------------------------------\n" - ] - } - ], + "outputs": [], "source": [ "from ttools.utils import list_matching_files, print_matching_files_info, zoneNY\n", "from datetime import datetime\n", @@ -533,261 +363,16 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "And date subset loaded from parquet. Usually this is all done yb `load_data` in loader." + "From this file the subset of dates are loaded. Usually this is all done automatically by `load_data` in loader." ] }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "
\n", - "\n", - "\n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - " \n", - "
openhighlowclosevolumetradesupdatedvwapbuyvolumesellvolume
time
2024-01-16 09:30:00-05:00475.250475.3600475.20475.285255386.093.02024-01-16 09:30:01.002183-05:00475.2517253692.0242756.0
2024-01-16 09:30:01-05:00475.335475.3350475.23475.26015161.0100.02024-01-16 09:30:02.007313-05:00475.2833904386.04944.0
2024-01-16 09:30:02-05:00475.250475.3000475.24475.3006993.039.02024-01-16 09:30:03.008912-05:00475.2625071900.02256.0
2024-01-16 09:30:03-05:00475.290475.3200475.24475.2708497.047.02024-01-16 09:30:04.201093-05:00475.2752801300.03200.0
2024-01-16 09:30:04-05:00475.250475.2700475.22475.2705367.037.02024-01-16 09:30:05.004980-05:00475.2343531613.01247.0
.................................
2024-10-18 15:59:55-04:00584.520584.5800584.51584.58010357.047.02024-10-18 15:59:56.008928-04:00584.5438701600.01100.0
2024-10-18 15:59:56-04:00584.570584.6091584.55584.5506527.032.02024-10-18 15:59:57.007658-04:00584.5666431525.01002.0
2024-10-18 15:59:57-04:00584.560584.6100584.56584.6005068.023.02024-10-18 15:59:58.000435-04:00584.5962491960.0900.0
2024-10-18 15:59:58-04:00584.590584.6200584.56584.5608786.023.02024-10-18 15:59:59.041984-04:00584.5922172859.03921.0
2024-10-18 15:59:59-04:00584.560584.6100584.56584.57012583.069.02024-10-18 15:59:59.982132-04:00584.5831315303.01980.0
\n", - "

3384529 rows × 10 columns

\n", - "
" - ], - "text/plain": [ - " open high low close volume \\\n", - "time \n", - "2024-01-16 09:30:00-05:00 475.250 475.3600 475.20 475.285 255386.0 \n", - "2024-01-16 09:30:01-05:00 475.335 475.3350 475.23 475.260 15161.0 \n", - "2024-01-16 09:30:02-05:00 475.250 475.3000 475.24 475.300 6993.0 \n", - "2024-01-16 09:30:03-05:00 475.290 475.3200 475.24 475.270 8497.0 \n", - "2024-01-16 09:30:04-05:00 475.250 475.2700 475.22 475.270 5367.0 \n", - "... ... ... ... ... ... \n", - "2024-10-18 15:59:55-04:00 584.520 584.5800 584.51 584.580 10357.0 \n", - "2024-10-18 15:59:56-04:00 584.570 584.6091 584.55 584.550 6527.0 \n", - "2024-10-18 15:59:57-04:00 584.560 584.6100 584.56 584.600 5068.0 \n", - "2024-10-18 15:59:58-04:00 584.590 584.6200 584.56 584.560 8786.0 \n", - "2024-10-18 15:59:59-04:00 584.560 584.6100 584.56 584.570 12583.0 \n", - "\n", - " trades updated \\\n", - "time \n", - "2024-01-16 09:30:00-05:00 93.0 2024-01-16 09:30:01.002183-05:00 \n", - "2024-01-16 09:30:01-05:00 100.0 2024-01-16 09:30:02.007313-05:00 \n", - "2024-01-16 09:30:02-05:00 39.0 2024-01-16 09:30:03.008912-05:00 \n", - "2024-01-16 09:30:03-05:00 47.0 2024-01-16 09:30:04.201093-05:00 \n", - "2024-01-16 09:30:04-05:00 37.0 2024-01-16 09:30:05.004980-05:00 \n", - "... ... ... \n", - "2024-10-18 15:59:55-04:00 47.0 2024-10-18 15:59:56.008928-04:00 \n", - "2024-10-18 15:59:56-04:00 32.0 2024-10-18 15:59:57.007658-04:00 \n", - "2024-10-18 15:59:57-04:00 23.0 2024-10-18 15:59:58.000435-04:00 \n", - "2024-10-18 15:59:58-04:00 23.0 2024-10-18 15:59:59.041984-04:00 \n", - "2024-10-18 15:59:59-04:00 69.0 2024-10-18 15:59:59.982132-04:00 \n", - "\n", - " vwap buyvolume sellvolume \n", - "time \n", - "2024-01-16 09:30:00-05:00 475.251725 3692.0 242756.0 \n", - "2024-01-16 09:30:01-05:00 475.283390 4386.0 4944.0 \n", - "2024-01-16 09:30:02-05:00 475.262507 1900.0 2256.0 \n", - "2024-01-16 09:30:03-05:00 475.275280 1300.0 3200.0 \n", - "2024-01-16 09:30:04-05:00 475.234353 1613.0 1247.0 \n", - "... ... ... ... \n", - "2024-10-18 15:59:55-04:00 584.543870 1600.0 1100.0 \n", - "2024-10-18 15:59:56-04:00 584.566643 1525.0 1002.0 \n", - "2024-10-18 15:59:57-04:00 584.596249 1960.0 900.0 \n", - "2024-10-18 15:59:58-04:00 584.592217 2859.0 3921.0 \n", - "2024-10-18 15:59:59-04:00 584.583131 5303.0 1980.0 \n", - "\n", - "[3384529 rows x 10 columns]" - ] - }, - "execution_count": 7, - "metadata": {}, - "output_type": "execute_result" - } - ], + "outputs": [], "source": [ + "#loading manually range subset from existing files\n", "start = zoneNY.localize(datetime(2024, 1, 15, 9, 30))\n", "end = zoneNY.localize(datetime(2024, 10, 20, 16, 00))\n", "\n", @@ -800,6 +385,121 @@ "\n", "ohlcv_df" ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "TTOOLS: Loaded env variables from file /Users/davidbrazda/Documents/Development/python/.env\n" + ] + } + ], + "source": [ + "\n", + "from ttools.loaders import fetch_daily_stock_trades, fetch_trades_parallel\n", + "from ttools.utils import zoneNY\n", + "from datetime import datetime" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Fetching trades for whole range" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "SPY Contains 46 market days\n", + "SPY All 46 split files loaded in 10.521624088287354 seconds\n", + "Trimming 2024-01-16 09:30:00-05:00 2024-03-20 16:00:00-04:00\n", + "excluding ['C', 'O', '4', 'B', '7', 'V', 'P', 'W', 'U', 'Z', 'F', '9', 'M', '6']\n", + "exclude done\n", + "minsize 100\n", + "minsize done\n", + "SPY filtered\n", + "\n", + "DatetimeIndex: 6513606 entries, 2024-01-16 09:30:00.001443-05:00 to 2024-03-20 15:59:59.992808-04:00\n", + "Data columns (total 6 columns):\n", + " # Column Dtype \n", + "--- ------ ----- \n", + " 0 x object \n", + " 1 p float64\n", + " 2 s int64 \n", + " 3 i int64 \n", + " 4 c object \n", + " 5 z object \n", + "dtypes: float64(1), int64(2), object(3)\n", + "memory usage: 347.9+ MB\n" + ] + } + ], + "source": [ + "\n", + "\n", + "#fethcing one day\n", + "# df = fetch_daily_stock_trades(symbol=\"SPY\",\n", + "# start=zoneNY.localize(datetime(2024, 1, 16, 9, 30)),\n", + "# end=zoneNY.localize(datetime(2024, 1, 16, 16, 00)))\n", + "# df.info()\n", + "\n", + "#fetching multiple days with parallel\n", + "df = fetch_trades_parallel(symbol=\"SPY\",\n", + " start_date=zoneNY.localize(datetime(2024, 1, 16, 9, 30)),\n", + " end_date=zoneNY.localize(datetime(2024, 3, 20, 16, 00)))\n", + "\n", + "df.info()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "df.info()" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 3, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "#comparing dataframes\n", + "from ttools.utils import AGG_CACHE, compare_dataframes\n", + "import pandas as pd\n", + "file1 = AGG_CACHE / \"SPY-AggType.OHLCV-1-2024-02-15T09-30-00-2024-10-20T16-00-00-4679BCFMOPUVWZ-100-False.parquet\"\n", + "file2 = AGG_CACHE / \"SPY-AggType.OHLCV-1-2024-02-15T09-30-00-2024-10-20T16-00-00-4679BCFMOPUVWZ-100-False_older2.parquet\"\n", + "df1 = pd.read_parquet(file1)\n", + "df2 = pd.read_parquet(file2)\n", + "df1.equals(df2)\n", + "\n", + "#compare_dataframes(df1, df2)" + ] } ], "metadata": { diff --git a/ttools/aggregator_vectorized.py b/ttools/aggregator_vectorized.py index dd73f56..2920e4f 100644 --- a/ttools/aggregator_vectorized.py +++ b/ttools/aggregator_vectorized.py @@ -10,8 +10,80 @@ Includes fetch (remote/cached) methods and numba aggregator function for TIME BA """"" +def aggregate_trades_optimized(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV, clear_input: bool = False): + """ + Optimized version of trade aggregation function with reduced memory footprint. + """ + # 1. Get timestamps from index if 't' is not in columns + if 't' not in trades_df.columns: + timestamps = trades_df.index.values + else: + timestamps = trades_df['t'].values + + # 2. Select only needed columns for prices and sizes + prices = trades_df['p'].values + sizes = trades_df['s'].values + + #Clears input to freeup memory + if clear_input: + del trades_df + + # 3. Convert timestamps maintaining exact precision + # Convert directly to int64 nanoseconds, then to float seconds + unix_timestamps_s = timestamps.view('int64').astype(np.float64) / 1e6 + #original not optimized, in case of issues (5x slower) + #unix_timestamps_s = timestamps.astype('datetime64[ns]').astype(np.float64) / 1e9 + + # 4. Create ticks array efficiently + # 3. Pre-allocate array for better memory efficiency + ticks = np.empty((len(timestamps), 3), dtype=np.float64) + ticks[:, 0] = unix_timestamps_s + ticks[:, 1] = prices + ticks[:, 2] = sizes + + # 5. Clear memory of intermediate objects + del timestamps, prices, sizes, unix_timestamps_s + + # 6. Process based on type using existing pattern + try: + match type: + case AggType.OHLCV: + ohlcv_bars = generate_time_bars_nb(ticks, resolution) + columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades', + 'updated', 'vwap', 'buyvolume', 'sellvolume'] + case AggType.OHLCV_VOL: + ohlcv_bars = generate_volume_bars_nb(ticks, resolution) + columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades', + 'updated', 'buyvolume', 'sellvolume'] + case AggType.OHLCV_DOL: + ohlcv_bars = generate_dollar_bars_nb(ticks, resolution) + columns = ['time', 'open', 'high', 'low', 'close', 'volume', 'trades', + 'amount', 'updated'] + case _: + raise ValueError("Invalid AggType type. Supported types are 'time', 'volume' and 'dollar'.") + finally: + # 7. Clear large numpy array as soon as possible + del ticks + + # 8. Create DataFrame and handle timestamps - keeping original working approach + ohlcv_df = pd.DataFrame(ohlcv_bars, columns=columns) + del ohlcv_bars + + # 9. Use the original timestamp handling that we know works + ohlcv_df['time'] = pd.to_datetime(ohlcv_df['time'], unit='s').dt.tz_localize('UTC').dt.tz_convert(zoneNY) + ohlcv_df['updated'] = pd.to_datetime(ohlcv_df['updated'], unit="s").dt.tz_localize('UTC').dt.tz_convert(zoneNY) + + # 10. Round microseconds as in original + ohlcv_df['updated'] = ohlcv_df['updated'].dt.round('us') + + # 11. Set index last, as in original + ohlcv_df.set_index('time', inplace=True) + + return ohlcv_df + def aggregate_trades(symbol: str, trades_df: pd.DataFrame, resolution: int, type: AggType = AggType.OHLCV): """" + Original replaced by optimized version Accepts dataframe with trades keyed by symbol. Preparess dataframe to numpy and calls Numba optimized aggregator for given bar type. (time/volume/dollar) """"" diff --git a/ttools/loaders.py b/ttools/loaders.py index 84e1bee..0863f83 100644 --- a/ttools/loaders.py +++ b/ttools/loaders.py @@ -17,8 +17,14 @@ from ttools.utils import AggType, fetch_calendar_data, print, print_matching_fil from tqdm import tqdm import threading from typing import List, Union -from ttools.aggregator_vectorized import aggregate_trades - +from ttools.aggregator_vectorized import aggregate_trades, aggregate_trades_optimized +import numpy as np +import pandas as pd +import pyarrow.dataset as ds +import pandas as pd +from concurrent.futures import ThreadPoolExecutor +import math +import os """ Module for fetching stock data. Supports 1) cache management @@ -87,6 +93,8 @@ def convert_dict_to_multiindex_df(tradesResponse, rename_labels = True, keep_sym final_df.reset_index(inplace=True) # Reset index to remove MultiIndex levels, making them columns final_df.drop(columns=['symbol'], inplace=True) #remove symbol column final_df.set_index(timestamp_col, inplace=True) #reindex by timestamp + #print index datetime resolution + #print(final_df.index.dtype) return final_df @@ -106,6 +114,28 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No Returns: df: pd.DataFrame """ + def fast_filter(df, exclude_conditions): + # Convert arrays to strings once + str_series = df['c'].apply(lambda x: ','.join(x)) + + # Create mask using vectorized string operations + mask = np.zeros(len(df), dtype=bool) + for cond in exclude_conditions: + mask |= str_series.str.contains(cond, regex=False) + + # Apply filter + return df[~mask] + + def vectorized_string_sets(df, exclude_conditions): + # Convert exclude_conditions to set for O(1) lookup + exclude_set = set(exclude_conditions) + + # Vectorized operation using sets intersection + arrays = df['c'].values + mask = np.array([bool(set(arr) & exclude_set) for arr in arrays]) + + return df[~mask] + # 9:30 to 16:00 if main_session_only: @@ -120,30 +150,50 @@ def filter_trade_df(df: pd.DataFrame, start: datetime = None, end: datetime = No #REQUIRED FILTERING # Create a mask to filter rows within the specified time range if start is not None and end is not None: - print(f"filtering {start.time()} {end.time()}") + print(f"Trimming {start} {end}") if symbol_included: mask = (df.index.get_level_values('t') >= start) & \ (df.index.get_level_values('t') <= end) + df = df[mask] else: - mask = (df.index >= start) & (df.index <= end) - - # Apply the mask to the DataFrame - df = df[mask] + df = df.loc[start:end] if exclude_conditions is not None: print(f"excluding {exclude_conditions}") - # Create a mask to exclude rows with any of the specified conditions - mask = df['c'].apply(lambda x: any(cond in exclude_conditions for cond in x)) - - # Filter out the rows with specified conditions - df = df[~mask] + df = vectorized_string_sets(df, exclude_conditions) + print("exclude done") if minsize is not None: print(f"minsize {minsize}") #exclude conditions df = df[df['s'] >= minsize] + print("minsize done") return df +def calculate_optimal_workers(file_count, min_workers=4, max_workers=32): + """ + Calculate optimal number of workers based on file count and system resources + + Rules of thumb: + - Minimum of 4 workers to ensure parallelization + - Maximum of 32 workers to avoid thread overhead + - For 100 files, aim for around 16-24 workers + - Scale with CPU count but don't exceed max_workers + """ + cpu_count = os.cpu_count() or 4 + + # Base calculation: 2-4x CPU count for I/O bound tasks + suggested_workers = cpu_count * 3 + + # Scale based on file count (1 worker per 4-6 files is a good ratio) + files_based_workers = math.ceil(file_count / 5) + + # Take the smaller of the two suggestions + optimal_workers = min(suggested_workers, files_based_workers) + + # Clamp between min and max workers + return max(min_workers, min(optimal_workers, max_workers)) + def fetch_daily_stock_trades(symbol, start, end, exclude_conditions=None, minsize=None, main_session_only=True, no_return=False,force_remote=False, rename_labels = False, keep_symbols=False, max_retries=5, backoff_factor=1, data_feed: DataFeed = DataFeed.SIP, verbose = None): #doc for this function """ @@ -281,7 +331,12 @@ def fetch_trades_parallel(symbol, start_date, end_date, exclude_conditions = EXC #speed it up , locals first and then fetches s_time = timetime() with trade_cache_lock: - local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache]) + file_paths = [f for _, f in days_from_cache] + dataset = ds.dataset(file_paths, format='parquet') + local_df = dataset.to_table().to_pandas() + del dataset + #original version + #local_df = pd.concat([pd.read_parquet(f) for _,f in days_from_cache]) final_time = timetime() - s_time print(f"{symbol} All {len(days_from_cache)} split files loaded in", final_time, "seconds") #the filter is required @@ -413,7 +468,7 @@ def load_data(symbol: Union[str, List[str]], else: #neslo by zrychlit, kdyz se zobrazuje pomalu Searching cache - nejaky bottle neck? df = fetch_trades_parallel(symbol, start_date, end_date, minsize=minsize, exclude_conditions=exclude_conditions, main_session_only=main_session_only, force_remote=force_remote) #exclude_conditions=['C','O','4','B','7','V','P','W','U','Z','F']) - ohlcv_df = aggregate_trades(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type) + ohlcv_df = aggregate_trades_optimized(symbol=symbol, trades_df=df, resolution=resolution, type=agg_type, clear_input = True) ohlcv_df.to_parquet(file_ohlcv, engine='pyarrow') print(f"{symbol} Saved to agg_cache", file_ohlcv) diff --git a/ttools/utils.py b/ttools/utils.py index f40ad4e..f80be77 100644 --- a/ttools/utils.py +++ b/ttools/utils.py @@ -273,4 +273,147 @@ class StartBarAlign(str, Enum): RANDOM = first bar starts when first trade occurs """ ROUND = "round" - RANDOM = "random" \ No newline at end of file + RANDOM = "random" + +def compare_dataframes(df1, df2, name1="DataFrame 1", name2="DataFrame 2", check_dtype=True): + """ + Compare two DataFrames and provide detailed analysis of their differences. + + Parameters: + ----------- + df1, df2 : pandas.DataFrame + The DataFrames to compare + name1, name2 : str + Names to identify the DataFrames in the output + check_dtype : bool + Whether to check if dtypes match for columns + + Returns: + -------- + bool + True if DataFrames are identical (based on check_dtype parameter) + dict + Detailed comparison results + """ + results = { + 'are_equal': False, + 'shape_match': False, + 'column_match': False, + 'index_match': False, + 'dtype_match': False, + 'content_match': False, + 'differences': {} + } + + # Shape comparison + if df1.shape != df2.shape: + results['differences']['shape'] = { + name1: df1.shape, + name2: df2.shape + } + else: + results['shape_match'] = True + + # Column comparison + cols1 = set(df1.columns) + cols2 = set(df2.columns) + if cols1 != cols2: + results['differences']['columns'] = { + f'unique_to_{name1}': list(cols1 - cols2), + f'unique_to_{name2}': list(cols2 - cols1), + 'common': list(cols1 & cols2) + } + else: + results['column_match'] = True + + # Index comparison + idx1 = set(df1.index) + idx2 = set(df2.index) + if idx1 != idx2: + results['differences']['index'] = { + f'unique_to_{name1}': list(idx1 - idx2), + f'unique_to_{name2}': list(idx2 - idx1), + 'common': list(idx1 & idx2) + } + else: + results['index_match'] = True + + # dtype comparison + if check_dtype and results['column_match']: + dtype_diff = {} + for col in cols1: + if df1[col].dtype != df2[col].dtype: + dtype_diff[col] = { + name1: str(df1[col].dtype), + name2: str(df2[col].dtype) + } + if dtype_diff: + results['differences']['dtypes'] = dtype_diff + else: + results['dtype_match'] = True + + # Content comparison (only for matching columns and indices) + if results['column_match'] and results['index_match']: + common_cols = list(cols1) + common_idx = list(idx1) + + value_diff = {} + for col in common_cols: + # Compare values + if not df1[col].equals(df2[col]): + # Find specific differences + mask = df1[col] != df2[col] + if any(mask): + diff_indices = df1.index[mask] + value_diff[col] = { + 'different_at_indices': list(diff_indices), + 'sample_differences': { + str(idx): { + name1: df1.loc[idx, col], + name2: df2.loc[idx, col] + } for idx in list(diff_indices)[:5] # Show first 5 differences + } + } + + if value_diff: + results['differences']['values'] = value_diff + else: + results['content_match'] = True + + # Overall equality + results['are_equal'] = all([ + results['shape_match'], + results['column_match'], + results['index_match'], + results['content_match'], + (results['dtype_match'] if check_dtype else True) + ]) + + # Print summary + print(f"\nComparison Summary of {name1} vs {name2}:") + print(f"Shape Match: {results['shape_match']} ({df1.shape} vs {df2.shape})") + print(f"Column Match: {results['column_match']}") + print(f"Index Match: {results['index_match']}") + print(f"Dtype Match: {results['dtype_match']}" if check_dtype else "Dtype Check: Skipped") + print(f"Content Match: {results['content_match']}") + print(f"\nOverall Equal: {results['are_equal']}") + + # Print detailed differences if any + if not results['are_equal']: + print("\nDetailed Differences:") + for diff_type, diff_content in results['differences'].items(): + print(f"\n{diff_type.upper()}:") + if diff_type == 'values': + print(f"Number of columns with differences: {len(diff_content)}") + for col, details in diff_content.items(): + print(f"\nColumn '{col}':") + print(f"Number of different values: {len(details['different_at_indices'])}") + print("First few differences:") + for idx, vals in details['sample_differences'].items(): + print(f" At index {idx}:") + print(f" {name1}: {vals[name1]}") + print(f" {name2}: {vals[name2]}") + else: + print(diff_content) + + return results['are_equal'], results