29 KiB
Principal Component Analysis¶
In trading you make money when you know something that no-one else knows. But if you know that that thing will happen with certainty, but only happen once, how do you show that you're not lucky? You have to show that the circumstances are abnormal and that your model is confident under those circumstances.
This video touches on this idea. Firstly, he selects the parameters of an indicator that convey the most unique (uncorrelated) information using prinipal components analysis. He then creates a target variable for the indicator to predict, and fits a model to it.
The key finding is that whilst the model has almost no predictive power in normal cases (when predicting small changes), in extreme cases (when predicting large changes) the model appears to be predictive.
Again, lots of indicators might only be predictive when they predict something extreme. He's filtering out areas of the model where you can't be sure that it works. When it predicts something extreme, it's got space to be wrong (if it predicts a large move up and it's really only a small move up, you're probably still winning).
So...¶
- Dimensionality reduction is very useful.
- Selective prediction is very useful.
- You can quantify a model's success not by its accuracy, but by the ability to know when it's right.
- Money is made based on the uniqueness of the information you hold, the certainty of its truth, and the magnitude of its impact.
- If you know something highly abnormal will happen with high certainty, then you can make a lot of money.
Future Ideas¶
- Run a similar process on the 101 World Quant Alphas.
- Switch out the model to something robust to noise and non-linear (e.g. a random forest).
- Switch out the target to something that better represents trend following / reversal.
- Label tops and bottoms with directional change events.
- Accuracy to top/bottom (time units e.g. days)
- Accuracy to top/bottom (price units e.g. log returns)
- Calculate expected length of trend from
- Label variance with Markov variance regimes.
- Label tops and bottoms with directional change events.
Author: shittles
Created: 2024-09-26
Modified: 2024-10-01
Sources¶
Changelog¶
- Watched video, crafted theory, and wrote opening paragraph (2h - 2024/09/26)
- Followed along with video (4h - 2024/09/27)
- Scikit-learn pipeline (3h - 2024/10/01)
import numpy as np import pandas as pd import matplotlib.pyplot as plt import seaborn as sns from scipy import linalg as la from sklearn.compose import make_column_transformer from sklearn.pipeline import make_pipeline from sklearn.preprocessing import StandardScaler, RobustScaler from sklearn.decomposition import PCA, FactorAnalysis from sklearn.linear_model import LinearRegression # from sklearn.model_selection import cross_val_score from vectorbtpro import *
plt.style.use("dark_background") vbt.settings.set_theme("dark")
data = vbt.YFData.pull( ["^GSPC"], start="50 years ago", end="today", timeframe="daily", tz="UTC", missing_columns="drop", ) data.stats()
data.data["^GSPC"]
data.index
# The opens are corrupt... data.plot(symbol="^GSPC", yaxis=dict(type="log")).show()
# but the closes are fine. data.data["^GSPC"]["Close"].vbt.plot(yaxis=dict(type="log")).show()
Cleaning¶
data.features
data.get_feature("Dividends").any(axis=1).all()
data.get_feature("Stock Splits").any(axis=1).all()
data = data.remove_features(["Dividends", "Stock Splits"])
# data = data.transform(lambda df: df.loc["April 19th 1982" < df.index])
# data = data.resample("daily")
sr_open_gspc = data.get_feature("Open")["^GSPC"] sr_high_gspc = data.get_feature("High")["^GSPC"] sr_low_gspc = data.get_feature("Low")["^GSPC"] sr_close_gspc = data.get_feature("Close")["^GSPC"]
vbt.IF.list_indicators("RSI")
vbt.phelp(vbt.RSI.run)
rsi_periods = np.arange(2, 25) rsi = vbt.RSI.run(sr_close_gspc, rsi_periods) rsi.rsi
rsi.rsi.hist(bins=100) plt.tight_layout() plt.show()
sns.heatmap(rsi.rsi.corr()) plt.show()
Compute the Principal Components¶
Using principal component analysis (eigenvalues and vectors) as in the video.
df_rsi_means = rsi.rsi.mean() # Centre the RSI about zero. df_rsi_values = rsi.rsi - df_rsi_means df_rsi_values = df_rsi_values.dropna()
df_rsi_values = pd.DataFrame(data=StandardScaler().fit_transform(rsi.rsi.dropna()), index=rsi.rsi.dropna().index, columns=rsi.rsi.columns) df_rsi_values
# Compute the covariance and eigenvectors. arr_covariances = np.cov(df_rsi_values, rowvar=False) arr_eigenvals, arr_eigenvecs = la.eigh(arr_covariances) # Sort the eigenvectors by the eigenvalues. idx = np.argsort(arr_eigenvals)[::-1] arr_eigenvecs = arr_eigenvecs[:, idx] arr_eigenvals = arr_eigenvals[idx]
n_components = 4 df_X = pd.DataFrame() for j in range(n_components): df_X[j] = pd.Series(np.dot(df_rsi_values, arr_eigenvecs[j]), index=df_rsi_values.index) df_X.columns.name = "PC" df_X
# The principal components pay little-to-no attention to the higher periods. # This is unsuprising because they are highly correlated, and so don't convey unique information. for j in range(n_components): pd.Series(arr_eigenvecs[j], index=rsi_periods).plot(label="PC" + str(j + 1)) plt.xlabel("RSI Period") plt.ylabel("Eigenvector Value") plt.legend() plt.show()
Using single value decomposition as in scikit-learn.
U, S, Vt = la.svd(df_rsi_values, full_matrices=False) # df_X = pd.DataFrame(U @ np.diag(S), index=rsi.rsi.dropna().index) df_X = pd.DataFrame(U[:, :n_components] * S[:n_components], index=rsi.rsi.dropna().index) df_X
# Plot the right singular vectors (Vt) for each component against the RSI periods for j in range(n_components): pd.Series(Vt[j], index=rsi_periods).plot(label="PC" + str(j + 1)) plt.xlabel("RSI Period") plt.ylabel("Right Singular Vector Value (Component Loadings)") plt.legend() plt.show()
Create the Model¶
# The targets are the returns in 5 days time. sr_y = data.returns.shift(-5)["^GSPC"] sr_y = sr_y.dropna() # Ensure the features and targets have the same index. common_index = df_X.index.intersection(sr_y.index) df_X = df_X.loc[common_index] sr_y = sr_y.loc[common_index]
# Fit the model. model_coefs = la.lstsq(df_X, sr_y)[0] sr_y_hat = pd.Series(np.dot(df_X, model_coefs), index=sr_y.index) long_threshold = sr_y_hat.quantile(0.99) short_threshold = sr_y_hat.quantile(0.01)
# The plot's small, so make the marker size smaller. plt.scatter(sr_y_hat, sr_y, s=1) plt.axhline(0.0, color="white") plt.axvline(long_threshold, color="green") plt.axvline(short_threshold, color="red") plt.xlabel("Predictions") plt.ylabel("Targets") plt.show()
Modifying Code¶
From here on out, I'm significantly modifying the source code. I'm using scikit-learn's decomposition and prediction models, and vectorbtpro's indicators and backtesting.
# Use sci-kit learn to easily swap out decomposition models. column_transformer = make_column_transformer( (StandardScaler(), np.arange(0, len(rsi.rsi.columns))), remainder="passthrough", ) pd.DataFrame( data=column_transformer.fit_transform(rsi.rsi.dropna()), index=rsi.rsi.dropna().index, # In this case the column order hasn't been transformed. # columns=column_transformer.get_feature_names_out(), columns=rsi.rsi.columns, )
# There's a lot of room here to do more. pipeline = make_pipeline( column_transformer, # PCA(n_components), FactorAnalysis(n_components), ) df_X = pd.DataFrame( data=pipeline.fit_transform(rsi.rsi.dropna()), index=rsi.rsi .dropna().index, ) # The columns are now the principal components. df_X.columns.name = "PC" df_X
# # Show the explained variance ratio of each component. # model_pca = pipeline.named_steps["pca"] model_fa = pipeline.named_steps["factoranalysis"] # explained_variance = model_pca.explained_variance_ratio_
# plt.plot(range(1, len(explained_variance) + 1), explained_variance.cumsum(), marker="o", linestyle="--") # plt.title("Explained Variance Ratio") # plt.xlabel("Number of Components") # plt.ylabel("Cumulative Explained Variance") # plt.show()
# Verify the results from before. # Plot the eigenvectors for each principal component for j in range(n_components): # pd.Series(model_pca.components_[j], index=rsi_periods).plot(label="PC" + str(j + 1)) pd.Series(model_fa.components_[j], index=rsi_periods).plot(label="C" + str(j + 1)) plt.xlabel("RSI Period") # plt.ylabel("Eigenvector Value") plt.ylabel("Latent Vector Value") plt.legend() plt.show()
# The targets are the returns in 5 days time. sr_y = data.returns.shift(-5)["^GSPC"] sr_y = sr_y.dropna() # Ensure the features and targets have the same index. common_index = df_X.index.intersection(sr_y.index) df_X = df_X.loc[common_index] sr_y = sr_y.loc[common_index]
# Fit the model. clf = LinearRegression() clf.fit(df_X, sr_y) sr_y_hat_test = pd.Series(clf.predict(df_X), index=sr_y.index) long_threshold = sr_y_hat.quantile(0.99) short_threshold = sr_y_hat.quantile(0.01)
# The plot's small, so make the marker size smaller. plt.scatter(sr_y_hat, sr_y, s=1) plt.axhline(0.0, color="white") plt.axvline(long_threshold, color="green") plt.axvline(short_threshold, color="red") plt.xlabel("Predictions") plt.ylabel("Targets") plt.show()
If the model was better you'd expect a non-constant relationship between the targets and predictions. There are a million different metrics you can use to check this out.
Backtest¶
Honestly, I'm pretty surprised at how not shit this is given that it's only using the RSI and a linear model looking at super simple targets.
df_rsi = rsi.rsi.dropna() # The targets are the returns in 5 days time. sr_y = np.log(sr_close_gspc).diff(5).shift(-5).dropna() # Ensure the features and targets have the same index. common_index = df_rsi.index.intersection(sr_y.index) df_rsi = df_rsi.loc[common_index] sr_y = sr_y.loc[common_index]
cv = vbt.SplitterCV( "from_rolling", length=3 * 250, offset=-250, split=2 * 250, set_labels=["train", "test"], ) cv_splitter = cv.get_splitter(df_rsi)
cv_splitter.plot().show()
sr_rsi_slices = cv_splitter.take(df_rsi) sr_y_slices = cv_splitter.take(sr_y) clf = LinearRegression() list_result_test = [] for split in sr_rsi_slices.index.unique(level="split"): df_rsi_train = sr_rsi_slices[(split, "train")] df_rsi_test = sr_rsi_slices[(split, "test")] df_X_train = pd.DataFrame( data=pipeline.fit_transform(df_rsi_train), index=df_rsi_train.index, ) df_X_test = pd.DataFrame( data=pipeline.fit_transform(df_rsi_test), index=df_rsi_test.index, ) df_X_train.columns.name = "PC" df_X_test.columns.name = "PC" sr_y_train = sr_y_slices[(split, "train")] sr_y_test = sr_y_slices[(split, "test")] clf.fit(df_X_train, sr_y_train) sr_y_hat_test = pd.Series(clf.predict(df_X_test), index=sr_y_test.index) long_threshold = sr_y_hat_test.quantile(0.99) short_threshold = sr_y_hat_test.quantile(0.01) list_result_test.append(pd.DataFrame({ "y": sr_y_test, "y_hat": sr_y_hat_test, "Long Threshold": long_threshold, "Short Threshold": short_threshold, })) df_result_test = pd.concat(list_result_test) df_result_test
df_result_test.loc[df_result_test["y_hat"] > df_result_test["Long Threshold"], "Entries"] = True df_result_test.loc[df_result_test["y_hat"] < df_result_test["Short Threshold"], "Exits"] = True df_result_test = df_result_test.fillna(False) df_result_test
df_result_test["y_hat"].plot() df_result_test["Long Threshold"].plot(color="green") df_result_test["Short Threshold"].plot(color="red") plt.show()
# sr_clean_entries_test, sr_clean_exits_test = sr_entries_test.vbt.signals.clean(sr_exits_test)
pf = vbt.Portfolio.from_signals( close=sr_close_gspc, entries=df_result_test["Entries"], exits=df_result_test["Exits"], direction="longonly", fees=0.001, size=1.0, size_type=vbt.pf_enums.SizeType.ValuePercent, ) pf.stats()
pf.plot(yaxis=dict(type="log")).show()
pf.drawdowns.plot(yaxis=dict(type="log")).show()
pf.plot_underwater(pct_scale=True).show()
Original Code¶
For reference.
import pandas as pd import pandas_ta as ta import numpy as np from typing import List import seaborn as sns from scipy import linalg as la import matplotlib.pyplot as plt
def pca_linear_model(x: pd.DataFrame, y: pd.Series, n_components: int, thresh: float= 0.01): # Center data at 0 means = x.mean() x -= means x = x.dropna() # Find covariance and compute eigen vectors cov = np.cov(x, rowvar=False) evals , evecs = la.eigh(cov) # Sort eigenvectors by size of eigenvalue idx = np.argsort(evals)[::-1] evecs = evecs[:,idx] evals = evals[idx] # Create data set for model model_data = pd.DataFrame() for j in range(n_components): model_data['PC' + str(j)] = pd.Series( np.dot(x, evecs[j]) , index=x.index) cols = list(model_data.columns) model_data['target'] = y model_coefs = la.lstsq(model_data[cols], y)[0] model_data['pred'] = np.dot( model_data[cols], model_coefs) l_thresh = model_data['pred'].quantile(0.99) s_thresh = model_data['pred'].quantile(0.01) return model_coefs, evecs, means, l_thresh, s_thresh, model_data
def pca_rsi_model( ohlc: pd.DataFrame, rsi_lbs, train_size: int, step_size: int, n_components: int = 2, lookahead: int = 6 ): rsis = pd.DataFrame() for lb in rsi_lbs: rsis[lb] = ta.rsi(ohlc['Close'], lb) warm_up = max(rsi_lbs) * 2 next_train = warm_up + train_size tar = np.log(ohlc['Close']).diff(lookahead).shift(-lookahead) # Outputs model_pred = np.zeros(len(ohlc)) long_thresh = np.zeros(len(ohlc)) short_thresh = np.zeros(len(ohlc)) signal = np.zeros(len(ohlc)) model_pred[:] = np.nan long_thresh[:] = np.nan short_thresh[:] = np.nan rsi_means = None evecs = None model_coefs = None l_thresh = None s_thresh = None for i in range(next_train, len(ohlc)): if i == next_train: # Get RSI values in window, prevent future leak train_data = rsis.iloc[i - train_size: i + 1 - lookahead].copy() y = tar.reindex(train_data.index) model_coefs, evecs, rsi_means, l_thresh, s_thresh, _ = pca_linear_model(train_data, y, n_components) next_train += step_size curr_row = rsis.iloc[i] - rsi_means vec = np.zeros(n_components) for j in range(n_components): vec[j] = np.dot(curr_row, evecs[j]) curr_pred = np.dot(vec, model_coefs) model_pred[i] = curr_pred long_thresh[i] = l_thresh short_thresh[i] = s_thresh if curr_pred > l_thresh: signal[i] = 1 elif curr_pred < s_thresh: signal[i] = -1 # Output dataframe output_df = pd.DataFrame(index=ohlc.index) output_df['pred'] = model_pred output_df['long_thresh'] = long_thresh output_df['short_thresh'] = short_thresh output_df['signal'] = signal # Keep signals normalized to -1 1 output_df['signal'] = output_df['signal'].rolling(lookahead).mean() return output_df
lookahead = 6 df_gspc = data.data["^GSPC"] output = pca_rsi_model(df_gspc, list(range(2, 25)), 250 * 2, 250, n_components=3, lookahead=lookahead) output['t'] = np.log(df_gspc['Close']).diff(lookahead).shift(-lookahead) print("Mean Target Above Long Threshold", output[output['pred'] > output['long_thresh']]['t'].mean()) print("Mean Target Below Short Threshold", output[output['pred'] < output['short_thresh']]['t'].mean()) next_r = np.log(df_gspc['Close']).diff().shift(-1) df_gspc['strat_ret'] = next_r * output['signal'] # Profit fac pf = df_gspc[df_gspc['strat_ret'] > 0]['strat_ret'].sum() / df_gspc[df_gspc['strat_ret'] < 0]['strat_ret'].abs().sum() print("Profit Factor",pf) plt.style.use("dark_background") df_gspc['r'] = next_r # for his btcusdt data # df_gspc = df_gspc[df_gspc.index > '2020-01-01'] # output = output[output.index > '2020-01-01'] fig, axs = plt.subplots(2, 1, sharex=True) df_gspc['strat_ret'].cumsum().plot(label='RSI-PSA 3-6 Model', ax=axs[0]) output['pred'].plot(ax=axs[1]) output['long_thresh'].plot(ax=axs[1], color='green') output['short_thresh'].plot(ax=axs[1], color='red')
# Heatmap code next_r = np.log(df_gspc['Close']).diff().shift(-1) pf_df = pd.DataFrame() for lookahead in list(range(1, 25)): for n_components in [1,2,3,4,5,6]: output = pca_rsi_model(df_gspc, list(range(2, 25)), 250 * 2, 250, n_components=n_components, lookahead=lookahead) output['t'] = np.log(df_gspc['Close']).diff(lookahead).shift(-lookahead) df_gspc['strat_ret'] = next_r * output['signal'] # Profit fac pf = df_gspc[df_gspc['strat_ret'] > 0]['strat_ret'].sum() / df_gspc[df_gspc['strat_ret'] < 0]['strat_ret'].abs().sum() print(pf) pf_df.loc[lookahead, n_components] = pf plt.style.use("dark_background") sns.heatmap(pf_df, annot=True) plt.xlabel("N Components") plt.ylabel("Look Ahead")