Files
strategy-lab/to_explore/1_principal_components.ipynb
2024-10-03 07:37:19 +02:00

29 KiB

Principal Component Analysis

In trading you make money when you know something that no-one else knows. But if you know that that thing will happen with certainty, but only happen once, how do you show that you're not lucky? You have to show that the circumstances are abnormal and that your model is confident under those circumstances.

This video touches on this idea. Firstly, he selects the parameters of an indicator that convey the most unique (uncorrelated) information using prinipal components analysis. He then creates a target variable for the indicator to predict, and fits a model to it.

The key finding is that whilst the model has almost no predictive power in normal cases (when predicting small changes), in extreme cases (when predicting large changes) the model appears to be predictive.

Again, lots of indicators might only be predictive when they predict something extreme. He's filtering out areas of the model where you can't be sure that it works. When it predicts something extreme, it's got space to be wrong (if it predicts a large move up and it's really only a small move up, you're probably still winning).

So...

  • Dimensionality reduction is very useful.
  • Selective prediction is very useful.
  • You can quantify a model's success not by its accuracy, but by the ability to know when it's right.
  • Money is made based on the uniqueness of the information you hold, the certainty of its truth, and the magnitude of its impact.
    • If you know something highly abnormal will happen with high certainty, then you can make a lot of money.

Future Ideas

  • Run a similar process on the 101 World Quant Alphas.
  • Switch out the model to something robust to noise and non-linear (e.g. a random forest).
  • Switch out the target to something that better represents trend following / reversal.
    • Label tops and bottoms with directional change events.
      • Accuracy to top/bottom (time units e.g. days)
      • Accuracy to top/bottom (price units e.g. log returns)
      • Calculate expected length of trend from
    • Label variance with Markov variance regimes.

Author: shittles

Created: 2024-09-26

Modified: 2024-10-01

Sources

Changelog

  • Watched video, crafted theory, and wrote opening paragraph (2h - 2024/09/26)
  • Followed along with video (4h - 2024/09/27)
  • Scikit-learn pipeline (3h - 2024/10/01)
In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from scipy import linalg as la
from sklearn.compose import make_column_transformer
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, RobustScaler
from sklearn.decomposition import PCA, FactorAnalysis
from sklearn.linear_model import LinearRegression
# from sklearn.model_selection import cross_val_score
from vectorbtpro import *
In [2]:
plt.style.use("dark_background")

vbt.settings.set_theme("dark")

US Stock Market

Ingestion

In [ ]:
data = vbt.YFData.pull(
    ["^GSPC"],
    start="50 years ago",
    end="today",
    timeframe="daily",
    tz="UTC",
    missing_columns="drop",
)

data.stats()
In [ ]:
data.data["^GSPC"]
In [ ]:
data.index
In [ ]:
# The opens are corrupt...
data.plot(symbol="^GSPC", yaxis=dict(type="log")).show()
In [ ]:
# but the closes are fine.
data.data["^GSPC"]["Close"].vbt.plot(yaxis=dict(type="log")).show()

Cleaning

In [ ]:
data.features
In [ ]:
data.get_feature("Dividends").any(axis=1).all()
In [ ]:
data.get_feature("Stock Splits").any(axis=1).all()
In [11]:
data = data.remove_features(["Dividends", "Stock Splits"])
In [12]:
# data = data.transform(lambda df: df.loc["April 19th 1982" < df.index])
In [13]:
# data = data.resample("daily")
In [14]:
sr_open_gspc = data.get_feature("Open")["^GSPC"]
sr_high_gspc = data.get_feature("High")["^GSPC"]
sr_low_gspc = data.get_feature("Low")["^GSPC"]
sr_close_gspc = data.get_feature("Close")["^GSPC"]

Following Along

Calculate and Analyse the RSI

In [ ]:
vbt.IF.list_indicators("RSI")
In [ ]:
vbt.phelp(vbt.RSI.run)
In [ ]:
rsi_periods = np.arange(2, 25)

rsi = vbt.RSI.run(sr_close_gspc, rsi_periods)

rsi.rsi
In [ ]:
rsi.rsi.hist(bins=100)

plt.tight_layout()
plt.show()
In [ ]:
sns.heatmap(rsi.rsi.corr())

plt.show()

Compute the Principal Components

Using principal component analysis (eigenvalues and vectors) as in the video.

In [20]:
df_rsi_means = rsi.rsi.mean()

# Centre the RSI about zero.
df_rsi_values = rsi.rsi - df_rsi_means

df_rsi_values = df_rsi_values.dropna()
In [ ]:
df_rsi_values = pd.DataFrame(data=StandardScaler().fit_transform(rsi.rsi.dropna()), index=rsi.rsi.dropna().index, columns=rsi.rsi.columns)

df_rsi_values
In [22]:
# Compute the covariance and eigenvectors.
arr_covariances = np.cov(df_rsi_values, rowvar=False)
arr_eigenvals, arr_eigenvecs = la.eigh(arr_covariances)

# Sort the eigenvectors by the eigenvalues.
idx = np.argsort(arr_eigenvals)[::-1]

arr_eigenvecs = arr_eigenvecs[:, idx]
arr_eigenvals = arr_eigenvals[idx]
In [ ]:
n_components = 4

df_X = pd.DataFrame()

for j in range(n_components):
    df_X[j] = pd.Series(np.dot(df_rsi_values, arr_eigenvecs[j]), index=df_rsi_values.index)

df_X.columns.name = "PC"

df_X
In [ ]:
# The principal components pay little-to-no attention to the higher periods.
# This is unsuprising because they are highly correlated, and so don't convey unique information.
for j in range(n_components):
    pd.Series(arr_eigenvecs[j], index=rsi_periods).plot(label="PC" + str(j + 1))

plt.xlabel("RSI Period")
plt.ylabel("Eigenvector Value")
plt.legend()
plt.show()

Using single value decomposition as in scikit-learn.

In [ ]:
U, S, Vt = la.svd(df_rsi_values, full_matrices=False)

# df_X = pd.DataFrame(U @ np.diag(S), index=rsi.rsi.dropna().index)
df_X = pd.DataFrame(U[:, :n_components] * S[:n_components], index=rsi.rsi.dropna().index)

df_X
In [ ]:
# Plot the right singular vectors (Vt) for each component against the RSI periods
for j in range(n_components):
    pd.Series(Vt[j], index=rsi_periods).plot(label="PC" + str(j + 1))

plt.xlabel("RSI Period")
plt.ylabel("Right Singular Vector Value (Component Loadings)")
plt.legend()
plt.show()

Create the Model

In [27]:
# The targets are the returns in 5 days time.
sr_y = data.returns.shift(-5)["^GSPC"]

sr_y = sr_y.dropna()

# Ensure the features and targets have the same index.
common_index = df_X.index.intersection(sr_y.index)

df_X = df_X.loc[common_index]
sr_y = sr_y.loc[common_index]
In [28]:
# Fit the model.
model_coefs = la.lstsq(df_X, sr_y)[0]
sr_y_hat = pd.Series(np.dot(df_X, model_coefs), index=sr_y.index)

long_threshold = sr_y_hat.quantile(0.99)
short_threshold = sr_y_hat.quantile(0.01)
In [ ]:
# The plot's small, so make the marker size smaller.
plt.scatter(sr_y_hat, sr_y, s=1)
plt.axhline(0.0, color="white")
plt.axvline(long_threshold, color="green")
plt.axvline(short_threshold, color="red")

plt.xlabel("Predictions")
plt.ylabel("Targets")
plt.show()

Modifying Code

From here on out, I'm significantly modifying the source code. I'm using scikit-learn's decomposition and prediction models, and vectorbtpro's indicators and backtesting.

In [ ]:
# Use sci-kit learn to easily swap out decomposition models.
column_transformer = make_column_transformer(
    (StandardScaler(), np.arange(0, len(rsi.rsi.columns))),
    remainder="passthrough",
)

pd.DataFrame(
    data=column_transformer.fit_transform(rsi.rsi.dropna()),
    index=rsi.rsi.dropna().index,
    # In this case the column order hasn't been transformed.
    # columns=column_transformer.get_feature_names_out(),
    columns=rsi.rsi.columns,
)
In [ ]:
# There's a lot of room here to do more.
pipeline = make_pipeline(
    column_transformer,
    # PCA(n_components),
    FactorAnalysis(n_components),
)

df_X = pd.DataFrame(
    data=pipeline.fit_transform(rsi.rsi.dropna()),
    index=rsi.rsi
    .dropna().index,
)

# The columns are now the principal components.
df_X.columns.name = "PC"

df_X
In [35]:
# # Show the explained variance ratio of each component.
# model_pca = pipeline.named_steps["pca"]
model_fa = pipeline.named_steps["factoranalysis"]

# explained_variance = model_pca.explained_variance_ratio_
In [36]:
# plt.plot(range(1, len(explained_variance) + 1), explained_variance.cumsum(), marker="o", linestyle="--")

# plt.title("Explained Variance Ratio")
# plt.xlabel("Number of Components")
# plt.ylabel("Cumulative Explained Variance")
# plt.show()
In [ ]:
# Verify the results from before.
# Plot the eigenvectors for each principal component
for j in range(n_components):
    # pd.Series(model_pca.components_[j], index=rsi_periods).plot(label="PC" + str(j + 1))
    pd.Series(model_fa.components_[j], index=rsi_periods).plot(label="C" + str(j + 1))

plt.xlabel("RSI Period")
# plt.ylabel("Eigenvector Value")
plt.ylabel("Latent Vector Value")
plt.legend()
plt.show()
In [38]:
# The targets are the returns in 5 days time.
sr_y = data.returns.shift(-5)["^GSPC"]

sr_y = sr_y.dropna()

# Ensure the features and targets have the same index.
common_index = df_X.index.intersection(sr_y.index)

df_X = df_X.loc[common_index]
sr_y = sr_y.loc[common_index]
In [39]:
# Fit the model.
clf = LinearRegression() 

clf.fit(df_X, sr_y)

sr_y_hat_test = pd.Series(clf.predict(df_X), index=sr_y.index)

long_threshold = sr_y_hat.quantile(0.99)
short_threshold = sr_y_hat.quantile(0.01)
In [ ]:
# The plot's small, so make the marker size smaller.
plt.scatter(sr_y_hat, sr_y, s=1)
plt.axhline(0.0, color="white")
plt.axvline(long_threshold, color="green")
plt.axvline(short_threshold, color="red")

plt.xlabel("Predictions")
plt.ylabel("Targets")
plt.show()

If the model was better you'd expect a non-constant relationship between the targets and predictions. There are a million different metrics you can use to check this out.

Backtest

Honestly, I'm pretty surprised at how not shit this is given that it's only using the RSI and a linear model looking at super simple targets.

In [41]:
df_rsi = rsi.rsi.dropna()

# The targets are the returns in 5 days time.
sr_y = np.log(sr_close_gspc).diff(5).shift(-5).dropna()

# Ensure the features and targets have the same index.
common_index = df_rsi.index.intersection(sr_y.index)

df_rsi = df_rsi.loc[common_index]
sr_y = sr_y.loc[common_index]
In [42]:
cv = vbt.SplitterCV(
    "from_rolling",
    length=3 * 250,
    offset=-250,
    split=2 * 250,
    set_labels=["train", "test"],
)

cv_splitter = cv.get_splitter(df_rsi)
In [ ]:
cv_splitter.plot().show()
In [ ]:
sr_rsi_slices = cv_splitter.take(df_rsi)
sr_y_slices = cv_splitter.take(sr_y)

clf = LinearRegression() 

list_result_test = []

for split in sr_rsi_slices.index.unique(level="split"):
    df_rsi_train = sr_rsi_slices[(split, "train")]
    df_rsi_test = sr_rsi_slices[(split, "test")]

    df_X_train = pd.DataFrame(
        data=pipeline.fit_transform(df_rsi_train),
        index=df_rsi_train.index,
    )
    df_X_test = pd.DataFrame(
        data=pipeline.fit_transform(df_rsi_test),
        index=df_rsi_test.index,
    )
    df_X_train.columns.name = "PC"
    df_X_test.columns.name = "PC"

    sr_y_train = sr_y_slices[(split, "train")]
    sr_y_test = sr_y_slices[(split, "test")]

    clf.fit(df_X_train, sr_y_train)

    sr_y_hat_test = pd.Series(clf.predict(df_X_test), index=sr_y_test.index)

    long_threshold = sr_y_hat_test.quantile(0.99)
    short_threshold = sr_y_hat_test.quantile(0.01)
    
    list_result_test.append(pd.DataFrame({
        "y": sr_y_test,
        "y_hat": sr_y_hat_test,
        "Long Threshold": long_threshold,
        "Short Threshold": short_threshold,
    }))

df_result_test = pd.concat(list_result_test)

df_result_test
In [ ]:
df_result_test.loc[df_result_test["y_hat"] > df_result_test["Long Threshold"], "Entries"] = True
df_result_test.loc[df_result_test["y_hat"] < df_result_test["Short Threshold"], "Exits"] = True

df_result_test = df_result_test.fillna(False)

df_result_test
In [ ]:
df_result_test["y_hat"].plot()
df_result_test["Long Threshold"].plot(color="green")
df_result_test["Short Threshold"].plot(color="red")

plt.show()
In [47]:
# sr_clean_entries_test, sr_clean_exits_test = sr_entries_test.vbt.signals.clean(sr_exits_test)
In [ ]:
pf = vbt.Portfolio.from_signals(
    close=sr_close_gspc,
    entries=df_result_test["Entries"],
    exits=df_result_test["Exits"],
    direction="longonly",
    fees=0.001,
    size=1.0,
    size_type=vbt.pf_enums.SizeType.ValuePercent,
)

pf.stats()
In [ ]:
pf.plot(yaxis=dict(type="log")).show()
In [ ]:
pf.drawdowns.plot(yaxis=dict(type="log")).show()
In [ ]:
pf.plot_underwater(pct_scale=True).show()

Original Code

For reference.

In [56]:
import pandas as pd
import pandas_ta as ta
import numpy as np
from typing import List
import seaborn as sns
from scipy import linalg as la
import matplotlib.pyplot as plt
In [57]:
def pca_linear_model(x: pd.DataFrame, y: pd.Series, n_components: int, thresh: float= 0.01):
    # Center data at 0
    means = x.mean()
    x -= means
    x = x.dropna()

    # Find covariance and compute eigen vectors
    cov = np.cov(x, rowvar=False)
    evals , evecs = la.eigh(cov)
    # Sort eigenvectors by size of eigenvalue
    idx = np.argsort(evals)[::-1]
    evecs = evecs[:,idx]
    evals = evals[idx]

    # Create data set for model
    model_data = pd.DataFrame()
    for j in range(n_components):
         model_data['PC' + str(j)] = pd.Series( np.dot(x, evecs[j]) , index=x.index)
    
    cols = list(model_data.columns)
    model_data['target'] = y
    model_coefs = la.lstsq(model_data[cols], y)[0]
    model_data['pred'] = np.dot( model_data[cols], model_coefs)

    l_thresh = model_data['pred'].quantile(0.99)
    s_thresh = model_data['pred'].quantile(0.01)

    return model_coefs, evecs, means, l_thresh, s_thresh, model_data
In [58]:
def pca_rsi_model(
        ohlc: pd.DataFrame, rsi_lbs, train_size: int, step_size: int,  
        n_components: int = 2, lookahead: int = 6
):    
    rsis = pd.DataFrame()
    for lb in rsi_lbs:
        rsis[lb] = ta.rsi(ohlc['Close'], lb)

    warm_up = max(rsi_lbs) * 2
    next_train = warm_up + train_size
    tar = np.log(ohlc['Close']).diff(lookahead).shift(-lookahead)

    # Outputs
    model_pred = np.zeros(len(ohlc))
    long_thresh = np.zeros(len(ohlc))
    short_thresh = np.zeros(len(ohlc))
    signal = np.zeros(len(ohlc))

    model_pred[:] = np.nan
    long_thresh[:] = np.nan
    short_thresh[:] = np.nan

    rsi_means = None
    evecs = None
    model_coefs = None
    l_thresh = None
    s_thresh = None
    for i in range(next_train, len(ohlc)):
        if i == next_train:
            # Get RSI values in window, prevent future leak
            train_data = rsis.iloc[i - train_size: i + 1 - lookahead].copy()
            y = tar.reindex(train_data.index)
            
            model_coefs, evecs, rsi_means, l_thresh, s_thresh, _ =  pca_linear_model(train_data, y, n_components)
            next_train += step_size
        
        curr_row = rsis.iloc[i] - rsi_means
        vec = np.zeros(n_components)
        for j in range(n_components):
            vec[j] = np.dot(curr_row, evecs[j])
        curr_pred = np.dot(vec, model_coefs)

        model_pred[i] = curr_pred
        long_thresh[i] = l_thresh 
        short_thresh[i] = s_thresh
        if curr_pred > l_thresh:
            signal[i] = 1
        elif curr_pred < s_thresh:
            signal[i] = -1

    # Output dataframe
    output_df = pd.DataFrame(index=ohlc.index)
    output_df['pred'] = model_pred
    output_df['long_thresh'] = long_thresh
    output_df['short_thresh'] = short_thresh
    output_df['signal'] = signal
    # Keep signals normalized to -1 1
    output_df['signal'] = output_df['signal'].rolling(lookahead).mean()     
    return output_df 
In [ ]:
lookahead = 6

df_gspc = data.data["^GSPC"]

output = pca_rsi_model(df_gspc, list(range(2, 25)), 250 * 2, 250, n_components=3, lookahead=lookahead)
output['t'] = np.log(df_gspc['Close']).diff(lookahead).shift(-lookahead)

print("Mean Target Above Long Threshold", output[output['pred'] > output['long_thresh']]['t'].mean())
print("Mean Target Below Short Threshold", output[output['pred'] < output['short_thresh']]['t'].mean())

next_r = np.log(df_gspc['Close']).diff().shift(-1)
df_gspc['strat_ret'] = next_r * output['signal']

# Profit fac
pf = df_gspc[df_gspc['strat_ret'] > 0]['strat_ret'].sum() / df_gspc[df_gspc['strat_ret'] < 0]['strat_ret'].abs().sum()
print("Profit Factor",pf)

plt.style.use("dark_background")

df_gspc['r'] = next_r

# for his btcusdt data
# df_gspc = df_gspc[df_gspc.index > '2020-01-01']
# output = output[output.index > '2020-01-01']

fig, axs = plt.subplots(2, 1, sharex=True)
df_gspc['strat_ret'].cumsum().plot(label='RSI-PSA 3-6 Model', ax=axs[0])

output['pred'].plot(ax=axs[1])
output['long_thresh'].plot(ax=axs[1], color='green')
output['short_thresh'].plot(ax=axs[1], color='red')
In [ ]:
# Heatmap code
next_r = np.log(df_gspc['Close']).diff().shift(-1)
pf_df = pd.DataFrame()
for lookahead in list(range(1, 25)):
    for n_components in [1,2,3,4,5,6]:
        output = pca_rsi_model(df_gspc, list(range(2, 25)), 250 * 2, 250, n_components=n_components, lookahead=lookahead)
        output['t'] = np.log(df_gspc['Close']).diff(lookahead).shift(-lookahead)
        df_gspc['strat_ret'] = next_r * output['signal']

        # Profit fac
        pf = df_gspc[df_gspc['strat_ret'] > 0]['strat_ret'].sum() / df_gspc[df_gspc['strat_ret'] < 0]['strat_ret'].abs().sum()
        print(pf)

        pf_df.loc[lookahead, n_components] = pf

plt.style.use("dark_background")
sns.heatmap(pf_df, annot=True)
plt.xlabel("N Components")
plt.ylabel("Look Ahead")