21 KiB
21 KiB
In [1]:
import ray import numpy as np import pandas as pd import plotly.express as px import plotly.graph_objects as go import statsmodels.api as sm from sklearn.compose import make_column_transformer from sklearn.preprocessing import RobustScaler from vectorbtpro import *
In [ ]:
ray.init()
In [3]:
vbt.settings.set_theme("dark")
Ingestion¶
In [4]:
symbol = "^GSPC" # the S&P 500 ticker
In [ ]:
data = vbt.YFData.pull( symbol, start="50 years ago", end="today", timeframe="daily", tz="UTC" ) # 50 years of data data.stats()
In [ ]:
data.data[symbol]
In [ ]:
data.data[symbol].index
In [ ]:
# The opens are corrupt... data.plot(yaxis=dict(type="log")).show()
In [ ]:
# but the closes are fine. data.data["^GSPC"]["Close"].vbt.plot(yaxis=dict(type="log")).show()
Cleaning¶
In [ ]:
data.data[symbol]["Dividends"].any()
In [ ]:
data.data[symbol]["Stock Splits"].any()
In [12]:
data = data.remove_features(["Dividends", "Stock Splits"])
In [13]:
# data = data.transform(lambda df: df.loc["April 19th 1982" < df.index])
In [14]:
# data.plot(yaxis=dict(type="log")).show()
In [15]:
# len(data.index) / 365.25 # 30 years of data remaining
Modelling¶
In [16]:
# sr_open = data.get("Open") # sr_high = data.get("High") # sr_low = data.get("Low") sr_close = data.get("Close")
In [17]:
# sr_log_open = np.log(sr_open) # sr_log_high = np.log(sr_high) # sr_log_low = np.log(sr_low) sr_log_close = np.log(sr_close)
In [18]:
sr_log_returns = data.log_returns
In [ ]:
sr_log_returns.vbt.plot().show()
In [20]:
column_transformer = make_column_transformer( (RobustScaler(), [symbol]), ) sr_log_returns_scaled = pd.Series( data=column_transformer.fit_transform(pd.DataFrame(sr_log_returns)).ravel(), index=sr_log_returns.index, )
In [ ]:
sr_log_returns_scaled.vbt.plot().show()
In [22]:
k_regimes_kns = 3
In [ ]:
kns = sm.tsa.MarkovRegression( sr_log_returns_scaled, k_regimes=k_regimes_kns, trend="n", switching_variance=True ) results_kns = kns.fit() results_kns.summary()
In [ ]:
results_kns.filtered_marginal_probabilities # using data until time t (excluding time t+1, ..., T)
In [ ]:
results_kns.smoothed_marginal_probabilities # using data until time T
In [ ]:
fig = vbt.make_subplots( rows=k_regimes_kns, cols=1, y_title="Smoothed Marginal Variance Regime Probabilities", shared_xaxes=True, subplot_titles=[ "Medium-variance", "Low-variance", "High-variance", ], # order changes dependent on fit ) for i in range(k_regimes_kns): fig = results_kns.smoothed_marginal_probabilities[i].vbt.plot( add_trace_kwargs=dict(row=i + 1, col=1), fig=fig ) fig.update_layout(showlegend=False) fig.show()
In [27]:
def plot_annotated_line( fig: go.Figure, x: pd.Series, y: pd.Series, classes: pd.Series, dict_class_colours: dict, dict_class_labels: dict, ) -> go.Figure: """Plot a line chart where each trace is coloured based on its class. Yes, plotly really doesn't support this out of the box. Args: fig: Figure. x: Indices. y: Close prices. classes: Regimes. dict_class_colours: In the format {class: colour} dict_class_labels: In the format {class: label} Returns: fig: The figure. """ # Plot each segment in its corresponding color. for i in range(len(x) - 1): fig.add_trace( go.Scatter( x=x[i : i + 2], y=y[i : i + 2], mode="lines", line=dict(color=dict_class_colours[classes[i]], width=2), showlegend=False, # added manually ) ) # Label each regime. for regime, colour in dict_class_colours.items(): fig.add_trace( go.Scatter( x=[None], y=[None], mode="lines", line=dict(color=colour, width=2), name=dict_class_labels[regime], ) ) return fig
In [28]:
sr_variance_regime_forecasts = results_kns.filtered_marginal_probabilities.idxmax( axis=1 ) sr_variance_regime_predictions = results_kns.smoothed_marginal_probabilities.idxmax( axis=1 )
In [ ]:
# sr_variance_regime_forecasts.vbt.plot().show() sr_variance_regime_predictions.vbt.plot().show()
In [30]:
# order changes dependent on fit dict_variance_regime_labels = { 0: "Medium", 1: "Low", 2: "High", } dict_variance_regime_colours = { 0: "orange", 1: "green", 2: "red", }
In [ ]:
fig = vbt.make_figure() fig = plot_annotated_line( fig, data.index, sr_log_close, # sr_variance_regime_forecasts.rolling(5).mean().fillna(0).round(0), sr_variance_regime_forecasts, # sr_variance_regime_predictions, dict_variance_regime_colours, dict_variance_regime_labels, ) fig.update_layout( title="Filtered Variance Regime Labels", # title="Smoothed Variance Regime Labels", xaxis_title="Date", yaxis_title="Log Close", showlegend=True, ) fig.show()
In [32]:
# TODO - Double check me! # Assuming that you sell today if yesterday was in the high-variance regime. # entries = (sr_variance_regime_forecasts != 2).vbt.signals.fshift() # exits = (sr_variance_regime_forecasts == 2).vbt.signals.fshift() # Assuming that you sell today (at the close) if today was in the high-variance regime. entries = (sr_variance_regime_forecasts != 2) exits = (sr_variance_regime_forecasts == 2) # I haven't tested any additional logic. # entries = (sr_variance_regime_forecasts.rolling(5).mean().fillna(0).round(0) != 2) clean_entries, clean_exits = entries.vbt.signals.clean(exits)
In [ ]:
fig = sr_variance_regime_forecasts.vbt.plot() clean_entries.vbt.signals.plot_as_entries(sr_variance_regime_forecasts, fig=fig) clean_exits.vbt.signals.plot_as_exits(sr_variance_regime_forecasts, fig=fig) fig.show()
In [ ]:
pf = vbt.Portfolio.from_signals( close=sr_close, entries=clean_entries, exits=clean_exits, direction="both", fees=0.001, size=1.0, size_type=vbt.pf_enums.SizeType.ValuePercent, ) pf.stats()
In [ ]:
pf.plot(yaxis=dict(type="log")).show()
In [ ]:
pf.drawdowns.plot(yaxis=dict(type="log")).show()
In [ ]:
pf.plot_underwater(pct_scale=True).show()
Re-fitting Every Day¶
A backtest with a single training and validation set, that's refit every day
In [38]:
n_days_validation = int(365.25 * 2) # 2 years of data held back for the validation set
In [39]:
# slices_sr = vbt.Splitter.split_range(slice(None), new_split=-n_days_validation, index=data.index) splitter_fr = vbt.Splitter.from_rolling( data.index, length=len(data.index), split=len(data.index) - n_days_validation )
In [ ]:
splitter_fr.plot().show()
In [41]:
# Define a Ray remote function for parallelization. @ray.remote def compute_smoothed_marginal_probabilities(sr): kns = sm.tsa.MarkovRegression( sr, k_regimes=k_regimes_kns, trend="n", switching_variance=True, ) results_kns = kns.fit() # the smoothing might not work properly out of sample return results_kns.smoothed_marginal_probabilities.iloc[-1]
In [42]:
# The predict method doesn't support out of sample forecasts... # kns = sm.tsa.MarkovRegression( # sr_log_returns[splitter_fr.get_mask()["set_0"]], # k_regimes=k_regimes_kns, # trend="n", # switching_variance=True, # ) # results_kns = kns.fit() # results_kns.summary() # results_kns.predict(sr_log_returns[splitter_fr.get_mask()["set_1"]])
In [43]:
sr_train = sr_log_returns[splitter_fr.get_mask()["set_0"]] sr_validate = sr_log_returns[splitter_fr.get_mask()["set_1"]]
In [ ]:
# ...so re-fit the model every timestep. futures = [] sr_log_returns_to_date = sr_train.copy() # launch parallel tasks for i in range(len(sr_validate)): sr_log_returns_to_date = pd.concat( [ sr_log_returns_to_date, pd.Series(sr_validate.iloc[i], index=[sr_validate.index[i]]), ] ) futures.append( compute_smoothed_marginal_probabilities.remote(sr_log_returns_to_date) ) # collect results smoothed_marginal_probabilities = ray.get(futures)
In [ ]:
smoothed_marginal_probabilities = pd.concat(smoothed_marginal_probabilities, axis=1).T smoothed_marginal_probabilities.index.name = "Date"
In [38]:
sr_variance_regime_predictions = smoothed_marginal_probabilities.idxmax( axis=1 )
In [ ]:
sr_close_validate = sr_close[splitter_fr.get_mask()["set_1"]] sr_log_close_validate = sr_log_close[splitter_fr.get_mask()["set_1"]]
In [ ]:
fig = vbt.make_figure() fig = plot_annotated_line( fig, data.index[splitter_fr.get_mask()["set_1"]], sr_log_close_validate, sr_variance_regime_predictions, dict_variance_regime_colours, dict_variance_regime_labels, ) fig.update_layout( title="Variance Regime Forecasts", xaxis_title="Date", yaxis_title="Log Close", showlegend=True, ) fig.show()
In [49]:
# TODO - Double check me! # Assuming that you sell today if yesterday was in the high-variance regime. # entries = (sr_variance_regime_forecasts != 2).vbt.signals.fshift() # exits = (sr_variance_regime_forecasts == 2).vbt.signals.fshift() # Assuming that you sell today (at the close) if today was in the high-variance regime. entries = (sr_variance_regime_forecasts != 2) exits = (sr_variance_regime_forecasts == 2) # I haven't tested any additional logic. # entries = (sr_variance_regime_forecasts.rolling(5).mean().fillna(0).round(0) != 2) clean_entries, clean_exits = entries.vbt.signals.clean(exits)
In [ ]:
fig = sr_variance_regime_forecasts.vbt.plot() clean_entries.vbt.signals.plot_as_entries(sr_variance_regime_forecasts, fig=fig) clean_exits.vbt.signals.plot_as_exits(sr_variance_regime_forecasts, fig=fig) fig.show()
In [51]:
pf = vbt.Portfolio.from_signals( close=sr_close_validate, entries=clean_entries, exits=clean_exits, direction="both", fees=0.001, size=1.0, size_type=vbt.pf_enums.SizeType.ValuePercent, )
In [ ]:
pf.stats()
In [ ]:
pf.plot(yaxis=dict(type="log")).show()
In [ ]:
pf.drawdowns.plot(yaxis=dict(type="log")).show()
In [ ]:
pf.plot_underwater(pct_scale=True).show()
Observations¶
- For this implementation you have to use the filtered probabilities to not introduce look-ahead bias.
- If you backtest the smoothed probilities (which smoothes using all of the data) it only performs well after the great financial crisis. Why? No clue.
- It looks like its better for labelling than it is as a strategy in its current state.
- After slow recessions like the dot-com bubble, there can be a medium-variance decline which this simple strategy doesn't capture.
- After fast recessions like covid-19, there can be a high-variance rebound which this simple strategy doesn't capture.
- It looks like you can safely leverage up during low-variance regimes.
- Maybe you could combine this strategy with other recession-leading indicators (e.g. manufacturing/services pmi, federal funds rate, 10y-2y yield curve, 1y-3mo yield curve) to help time the tops?
- Maybe you could combine this strategy with another trend-following strategy (e.g. VWAP, EMA, BBANDS, ADX) to help time the bottoms?
In [46]:
ray.shutdown()
In [ ]: