This commit is contained in:
David Brazda
2024-11-26 18:06:41 +01:00
parent 191b58d11d
commit c5e4c03af7
2 changed files with 553 additions and 165 deletions

View File

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name='ttools',
version='0.7.9',
version='0.7.91',
packages=find_packages(),
install_requires=[
# list your dependencies here

View File

@ -25,6 +25,8 @@ import pandas as pd
import numpy as np
from traceback import format_exc
from scipy.stats import entropy
import pickle
from itertools import zip_longest
#https://claude.ai/chat/dc62f18b-f293-4c7e-890d-1e591ce78763
#skew of return prediction
@ -418,12 +420,36 @@ def granger_causality_test(file_path = None, features_df = None):
fig.show()
class Bars:
"""
Class to represent a number of bars for walk forward window calculation.
Parameters:
count (int): Number of bars
"""
def __init__(self, count: int):
if not isinstance(count, int) or count <= 0:
raise ValueError("Bars count must be a positive integer")
self.count = count
def __repr__(self):
return f"Bars({self.count})"
@dataclass
class ModelConfig:
"""Configuration for the trading model"""
train_days: int = 10
test_days: int = 1
train_period: int | pd.Timedelta | Bars = 10
test_period: int | pd.Timedelta | Bars = 1
reoptimize_frequency: Optional[int] = 5 #hypertuning every Nth iteration
test_on_train: bool = False
forward_bars: int = 5
target_threshold: float = 1.005 # upper pct threshold for target (1.005 = up by 0.5%)
target_direction: str = "rise"
target_reversal_threshold: float = 0.3 #How much retracement to allow 0.3=30%retracement terminates the window
target_min_bars: int = 2 ## minimum bars to consider a valid movement
target_min_profit_threshold: float = 0.0015 # 0.15% minimum profit threshold to maintain
target_max_drawdown: float = 0.002 # 0.002 = 0.2% maximum drawdown to allow
volatility_window: int = 100
model_type: str = 'classifier'
n_classes: int = 3
@ -507,13 +533,35 @@ class BaseFeatureBuilder(ABC):
class LibraryTradingModel:
"""Main trading model implementation with configuration-based setup"""
def __init__(self, config: Optional[ModelConfig] = None, feature_builder: Optional[BaseFeatureBuilder] = None):
def __init__(self, config: Optional[ModelConfig] = None, feature_builder: Optional[BaseFeatureBuilder] = None, load_model: str = None, save_model = False):
self.config = config or ModelConfig()
self.feature_builder = feature_builder
self.scaler = StandardScaler()
self.best_params = None
self.study = None
self.def_params = {'n_estimators': 192, 'max_depth': 4, 'learning_rate': 0.11955492653616603, 'min_child_weight': 6, 'subsample': 0.7593587243666793, 'colsample_bytree': 0.841538282739158, 'gamma': 9.747926761292942e-06, 'lambda': 2.389116689874295, 'alpha': 0.4036592961514103}
self.use_model = None
if load_model is not None:
self.use_model, self.scaler = self.load_pickled(load_model)
self.save_model = save_model
def load_pickled(self, file):
#LOAD MODEL/SCALER COMBO
# Load both scaler and model
with open(file, "rb") as f:
data = pickle.load(f)
print("Model LOADED from " + file)
return data["model"], data["scaler"]
def save_pickled(self, iteration, model):
file = "model_scaler_" + str(iteration) + ".pkl"
# Save both scaler and model
with open(file, "wb") as f:
pickle.dump({"model": model, "scaler": self.scaler}, f)
print("Model SAVED as " + file)
def get_date_windows(self, data: pd.DataFrame) -> List[Tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp]]:
"""
Calculate date windows for training and testing using market days.
@ -541,8 +589,8 @@ class LibraryTradingModel:
while True:
# Calculate indices for train and test windows
train_end_idx = current_idx + self.config.train_days
test_end_idx = train_end_idx + self.config.test_days
train_end_idx = current_idx + self.config.train_period
test_end_idx = train_end_idx + self.config.test_period
# Break if we've reached the end of data
if test_end_idx >= end_idx:
@ -556,16 +604,125 @@ class LibraryTradingModel:
windows.append((current_start, train_end, train_end, test_end))
# Move forward by test period in market days
current_idx += self.config.test_days
current_idx += self.config.test_period
return windows
def get_time_windows(self, data: pd.DataFrame) -> List[Tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp]]:
"""
Calculate time windows for training and testing using the data's index.
Supports three types of periods:
- Integer market days
- Time periods (pandas.Timedelta)
- Number of bars (Bars class)
"""
from pandas import Timedelta
def get_bars_offset(current_idx: int, bars: Bars) -> Optional[pd.Timestamp]:
if current_idx + bars.count >= len(data.index):
return None
return data.index[current_idx + bars.count]
def get_timedelta_offset(start_date: pd.Timestamp, offset: Timedelta) -> Optional[pd.Timestamp]:
"""Handle timedelta offsets using direct datetime operations"""
target_time = start_date + offset
next_idx = data.index.searchsorted(target_time)
# If we're beyond data range
if next_idx >= len(data.index):
return None
# Get the actual available timestamp
actual_time = data.index[next_idx]
# Check if the found timestamp is within acceptable range
# (not more than one original frequency away from target)
# if actual_time - target_time > offset * 0.1: # 10% tolerance
# return None
return actual_time
def get_market_day_offset(start_date: pd.Timestamp, offset_days: int) -> Optional[pd.Timestamp]:
start_idx = market_days.searchsorted(start_date)
end_idx = start_idx + offset_days
if end_idx >= len(market_days):
return None
target_date = market_days[end_idx]
# Check if we have enough data points after the target date
target_idx = data.index.searchsorted(target_date)
if target_idx >= len(data.index):
return None
return target_date
# Rest of the function remains the same...
if isinstance(self.config.train_period, Bars):
def get_offset(start_date, period):
idx = data.index.searchsorted(start_date)
return get_bars_offset(idx, period)
training_period = self.config.train_period
testing_period = self.config.test_period
elif isinstance(self.config.train_period, Timedelta):
get_offset = get_timedelta_offset
training_period = self.config.train_period
testing_period = self.config.test_period
elif isinstance(self.config.train_period, (int, float)):
import pandas_market_calendars as mcal
nyse = mcal.get_calendar('NYSE')
schedule = nyse.schedule(
start_date=data.index[0].tz_convert('America/New_York'),
end_date=data.index[-1].tz_convert('America/New_York')
)
market_days = pd.DatetimeIndex(schedule.index).tz_localize('US/Eastern')
get_offset = get_market_day_offset
training_period = self.config.train_period
testing_period = self.config.test_period
else:
raise ValueError(
"train_period and test_period must be either:\n"
"- Integer (market days)\n"
"- pandas.Timedelta (time period)\n"
"- Bars (number of bars)"
)
windows = []
if isinstance(self.config.train_period, (int, float)):
current_start = market_days[0]
else:
current_start = data.index[0]
while True:
train_end = get_offset(current_start, training_period)
if train_end is None:
break
test_end = get_offset(train_end, testing_period)
if test_end is None:
break
windows.append((current_start, train_end, train_end, test_end))
next_start = get_offset(current_start, testing_period)
if next_start is None:
break
current_start = next_start
return windows
def create_model(self, trial=None):
"""Create XGBoost model with either default or Optuna-suggested parameters"""
if self.config.model_type == 'classifier':
num_class = self.config.n_classes if self.config.n_classes > 2 else None #for binary no num_class
from xgboost import XGBClassifier
if trial is None:
return XGBClassifier(n_estimators=100, random_state=42, num_class=self.config.n_classes)
return XGBClassifier(n_estimators=100, random_state=42, num_class=num_class)
else:
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
@ -575,9 +732,11 @@ class LibraryTradingModel:
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'gamma': trial.suggest_float('gamma', 1e-8, 1.0, log=True),
"lambda": trial.suggest_float("lambda", 1e-3, 10, log=True),
"alpha": trial.suggest_float("alpha", 1e-3, 10, log=True),
'random_state': 42
}
return XGBClassifier(**params, num_class=self.config.n_classes)
return XGBClassifier(**params, num_class=num_class)
else:
from xgboost import XGBRegressor
if trial is None:
@ -595,55 +754,176 @@ class LibraryTradingModel:
}
return XGBRegressor(**params)
def objective(self, trial, X_train, y_train, X_val, y_val):
"""Optuna objective function for hyperparameter optimization"""
model = self.create_model(trial)
def run_rolling_window(self, data: pd.DataFrame, num_iterations: Optional[int] = None) -> Dict:
"""Run the model using a rolling window approach"""
windows = self.get_time_windows(data)
if num_iterations:
windows = windows[:num_iterations]
# Train the model
model.fit(
X_train,
y_train,
eval_set=[(X_val, y_val)],
early_stopping_rounds=50,
verbose=False
)
# Evaluate based on model type
if self.config.model_type == 'classifier':
from sklearn.metrics import accuracy_score
pred = model.predict(X_val)
score = accuracy_score(y_val, pred)
else:
from sklearn.metrics import mean_squared_error
pred = model.predict(X_val)
score = -mean_squared_error(y_val, pred, squared=False) # Negative RMSE for maximization
all_results = {}
# Reset best_params for each rolling window run
self.best_params = None
# Add hyperparameter reoptimization frequency
reoptimize_every_n_iterations = self.config.reoptimize_frequency or 5
#number of warm up bars for each iteration
warm_period = self.config.warm_up_period if self.config.warm_up_period is not None else 0
print("Warmup period:", warm_period)
for i, (train_start, train_end, test_start, test_end) in enumerate(windows):
# If warm_period is 0, use original timestamps, otherwise add warm-up period
if warm_period > 0:
train_warmup_data = data[data.index < train_start].tail(warm_period)
train_start_with_warmup = train_warmup_data.index[0] if not train_warmup_data.empty else train_start
test_warmup_data = data[data.index < test_start].tail(warm_period)
test_start_with_warmup = test_warmup_data.index[0] if not test_warmup_data.empty else test_start
else:
train_start_with_warmup = train_start
test_start_with_warmup = test_start
return score
train_mask = (data.index >= train_start_with_warmup) & (data.index < train_end)
test_mask = (data.index >= test_start_with_warmup) & (data.index < test_end)
train_data = data[train_mask]
test_data = data[test_mask]
min_required_bars = max(20, self.config.forward_bars + 1)
if len(train_data) < min_required_bars or len(test_data) < 1:
print(f"Skipping iteration {i}: Insufficient data")
continue
# Reoptimize hyperparameters periodically
if i % reoptimize_every_n_iterations == 0:
self.best_params = None # Force reoptimization
results, model = self.run_iteration(train_data, test_data, i)
if results is not None:
if self.save_model: #save model if required
self.save_pickled(i, model)
all_results[i] = {
'train_period': (train_start, train_end),
'test_period': (test_start, test_end),
'results': results,
'model': model,
'hyperparameters': self.best_params.copy() if self.best_params else None
}
return all_results
def optimize_hyperparameters(self, X_train, y_train, n_trials=2):
"""Run Optuna hyperparameter optimization"""
"""Run Optuna hyperparameter optimization using time series cross-validation"""
import optuna
from sklearn.model_selection import train_test_split
from sklearn.model_selection import TimeSeriesSplit
print("\nStarting hyperparameter optimization...")
# Split training data into train and validation sets
X_train_opt, X_val, y_train_opt, y_val = train_test_split(
X_train, y_train, test_size=0.2, random_state=42
# Calculate appropriate number of splits based on data size
total_samples = len(X_train)
test_size = int(len(X_train) * 0.2) # 20% validation size
gap = self.config.forward_bars
# Calculate maximum possible splits
available_samples = total_samples
max_splits = 0
while available_samples >= 2 * test_size + gap: # Need at least 2x test_size (for train and test) plus gap
available_samples -= (test_size + gap)
max_splits += 1
n_splits = min(3, max_splits) # Use at most 3 splits, or fewer if data doesn't allow more
print(f"Using {n_splits} splits for time series cross-validation")
print(f"Total samples: {total_samples}, Test size: {test_size}, Gap: {gap}")
# Create time series cross-validation splits
tscv = TimeSeriesSplit(
n_splits=n_splits,
test_size=test_size,
gap=gap
)
# Create Optuna study
if self.config.model_type == 'classifier':
study = optuna.create_study(direction='maximize') # Maximize accuracy
else:
study = optuna.create_study(direction='maximize') # Maximize negative RMSE
def objective(trial):
params = {
'n_estimators': trial.suggest_int('n_estimators', 50, 300),
'max_depth': trial.suggest_int('max_depth', 3, 10),
'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True),
'min_child_weight': trial.suggest_int('min_child_weight', 1, 7),
'subsample': trial.suggest_float('subsample', 0.6, 1.0),
'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0),
'gamma': trial.suggest_float('gamma', 1e-8, 1.0, log=True),
"lambda": trial.suggest_float("lambda", 1e-3, 10, log=True),
"alpha": trial.suggest_float("alpha", 1e-3, 10, log=True),
'random_state': 42
}
# Run optimization
study.optimize(
lambda trial: self.objective(trial, X_train_opt, y_train_opt, X_val, y_val),
n_trials=n_trials
# Store scores from each fold
fold_scores = []
# Time series cross-validation
for fold, (train_idx, val_idx) in enumerate(tscv.split(X_train)):
X_fold_train = X_train.iloc[train_idx]
y_fold_train = y_train.iloc[train_idx]
X_fold_val = X_train.iloc[val_idx]
y_fold_val = y_train.iloc[val_idx]
# Create and train model
if self.config.model_type == 'classifier':
model = XGBClassifier(**params)
if self.config.n_classes > 2:
model.set_params(num_class=self.config.n_classes)
else:
model = XGBRegressor(**params)
# Handle class imbalance for binary classification
if self.config.n_classes == 2:
n_0 = sum(y_fold_train == 0)
n_1 = sum(y_fold_train == 1)
scale_pos_weight = n_0 / n_1
model.set_params(scale_pos_weight=scale_pos_weight)
# Train with early stopping
model.fit(
X_fold_train,
y_fold_train,
eval_set=[(X_fold_val, y_fold_val)],
early_stopping_rounds=50,
verbose=False
)
# Calculate score
if self.config.model_type == 'classifier':
pred = model.predict(X_fold_val)
score = accuracy_score(y_fold_val, pred)
else:
pred = model.predict(X_fold_val)
score = -mean_squared_error(y_fold_val, pred, squared=False)
fold_scores.append(score)
# Return mean score across folds
return np.mean(fold_scores)
print("init optuna study")
# Create Optuna study
study = optuna.create_study(
direction='maximize',
pruner=optuna.pruners.MedianPruner(
n_startup_trials=5,
n_warmup_steps=20,
interval_steps=10
)
)
print("starting optimization")
# Run optimization
study.optimize(objective, n_trials=n_trials)
self.study = study
self.best_params = study.best_params
@ -659,6 +939,10 @@ class LibraryTradingModel:
iteration_num: int) -> Tuple[Optional[pd.DataFrame], Optional[object]]:
"""Run a single iteration of training and testing with optional hyperparameter optimization"""
try:
train_features = None
train_target = None
train_cols = []
test_cols = []
print(f"\nProcessing iteration {iteration_num}")
print(f"Training: {train_data.index[0]} to {train_data.index[-1]} : {train_data.shape}")
print(f"Testing: {test_data.index[0]} to {test_data.index[-1]} : {test_data.shape}")
@ -669,50 +953,72 @@ class LibraryTradingModel:
print("Features created. Target starting")
train_target = self.feature_builder.create_target(train_features)
print("Target created")
if self.use_model is None: #when model is provided, dont prepare train data, only test
X_train = train_features
y_train = train_target
print("TRAIN-----")
print(f"X_train shape: {X_train.shape}", X_train.index[[0,-1]])
print(f"y_train shape: {y_train.shape}", y_train.index[[0,-1]])
print("Removing NaNs")
# Remove NaN values or infinite values
y_train = y_train.replace([np.inf, -np.inf], np.nan)
mask_train = ~y_train.isna()
X_train = X_train[mask_train]
y_train = y_train[mask_train]
print(f"X_train shape after cleaning: {X_train.shape}", X_train.index[[0,-1]])
print(f"y_train shape after cleaning: {y_train.shape}", y_train.index[[0,-1]])
print(f"X_train columns: {X_train.columns}")
train_cols = set(X_train.columns)
train_columns = X_train.columns
if len(X_train) < self.config.forward_bars + 1:
print(f"Warning: Iteration {iteration_num} - Insufficient training data")
return None, None
# Scale features
print("Scaling features...")
X_train_scaled = self.scaler.fit_transform(X_train)
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns, index=X_train.index)
X_train = train_features
y_train = train_target
print("TRAIN-----")
print(f"X_train shape: {X_train.shape}", X_train.index[[0,-1]])
print(f"y_train shape: {y_train.shape}", y_train.index[[0,-1]])
print("Removing NaNs")
# Remove NaN values or infinite values
y_train = y_train.replace([np.inf, -np.inf], np.nan)
mask_train = ~y_train.isna()
X_train = X_train[mask_train]
y_train = y_train[mask_train]
print(f"X_train shape after cleaning: {X_train.shape}", X_train.index[[0,-1]])
print(f"y_train shape after cleaning: {y_train.shape}", y_train.index[[0,-1]])
print(f"X_train columns: {X_train.columns}")
train_cols = set(X_train.columns)
train_columns = X_train.columns
if len(X_train) < self.config.forward_bars + 1:
print(f"Warning: Iteration {iteration_num} - Insufficient training data")
return None, None
# Scale features
X_train_scaled = self.scaler.fit_transform(X_train)
X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns, index=X_train.index)
# Run hyperparameter optimization if not done yet
if self.best_params is None:
self.optimize_hyperparameters(X_train_scaled, y_train, self.config.optuna_trials)
# Create and train model with best parameters
model = self.create_model()
if self.best_params:
model.set_params(**self.best_params)
model.fit(X_train_scaled, y_train)
# Run hyperparameter optimization if not done yet
if self.best_params is None and self.config.optuna_trials is not None and self.config.optuna_trials > 0:
print("optimization started...")
self.optimize_hyperparameters(X_train_scaled, y_train, self.config.optuna_trials)
# Create and train model with best parameters
model = self.create_model()
if self.best_params:
model.set_params(**self.best_params)
else:
print("Using default hyperparameters",self.def_params)
model.set_params(**self.def_params)
#balance unbalanced classes, works for binary:logistics
if self.config.n_classes == 2:
n_0 = sum(y_train == 0) # 900
n_1 = sum(y_train == 1) # 100
scale_pos_weight = n_0 / n_1 # 900/100 = 9
model.set_params(scale_pos_weight=scale_pos_weight)
print("Model Training...")
model.fit(X_train_scaled, y_train)
else:
print("Using PROVIDED MODEL and SCALER")
model = self.use_model
print("TEST-----")
test_features, features_cols = self.feature_builder.prepare_features(test_data)
X_test = test_features
y_test = self.feature_builder.create_target(test_features, train_data=train_features)
if self.config.test_on_train:
print("TESTED ON TRAIN DATA!")
X_test = train_features
y_test = train_target
else:
test_features, features_cols = self.feature_builder.prepare_features(test_data)
X_test = test_features
y_test = self.feature_builder.create_target(test_features, train_data=train_features)
print(f"X_test shape: {X_test.shape}", X_test.index[[0,-1]])
print(f"y_test shape: {y_test.shape}", y_test.index[[0,-1]])
@ -738,16 +1044,21 @@ class LibraryTradingModel:
print(f"X_test shape after trimming: {X_test.shape}", X_test.index[[0,-1]])
print(f"y_test shape after trimming: {y_test.shape}", y_test.index[[0,-1]])
# Find columns in test but not in train
extra_in_test = test_cols - train_cols
print("Extra columns in X_test:", extra_in_test)
if self.use_model is None:
# Find columns in test but not in train
extra_in_test = test_cols - train_cols
print("Extra columns in X_test:", extra_in_test)
# Find columns in train but not in test
extra_in_train = train_cols - test_cols
print("Extra columns in X_train:", extra_in_train)
# Find columns in train but not in test
extra_in_train = train_cols - test_cols
print("Extra columns in X_train:", extra_in_train)
# Reorder X_test columns to match, excpet when model was provided
X_test = X_test[train_columns]
else:
# Assuming X_test is a DataFrame, ensure ordering
X_test = X_test[self.scaler.feature_names_in_]
# Reorder X_test columns to match
X_test = X_test[train_columns]
X_test_scaled = self.scaler.transform(X_test)
X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns, index=X_test.index)
@ -782,6 +1093,83 @@ class LibraryTradingModel:
print(f"Error in iteration {iteration_num}: {str(e)} - {format_exc()}")
return None, None
def calculate_probability_skew(self, predict_proba_output, weights=None):
"""
Calculate probability skew metrics for each individual prediction.
"""
num_class = self.config.n_classes
# Validate input dimensions
probs = np.array(predict_proba_output)
if probs.shape[1] != num_class:
raise ValueError(f"predict_proba_output shape {probs.shape} doesn't match num_class {num_class}")
# Create linear weights if not provided
if weights is None:
weights = np.linspace(0, num_class - 1, num_class)
elif len(weights) != num_class:
raise ValueError(f"weights length {len(weights)} doesn't match num_class {num_class}")
# Calculate weighted score for each prediction
weighted_probs = np.sum(probs * weights, axis=1)
# Calculate basic metrics for each prediction
max_class_prob = np.max(probs, axis=1)
positive_class_prob = probs[:, -1]
predicted_class = np.argmax(probs, axis=1)
# Calculate skewness
center = (num_class - 1) / 2
distances = np.arange(num_class) - center
skewness = np.sum(probs * distances, axis=1) / center
# Calculate high/low ratio
mid_point = num_class // 2
higher_probs = np.sum(probs[:, mid_point:], axis=1)
lower_probs = np.sum(probs[:, :mid_point], axis=1)
high_low_ratio = np.where(lower_probs > 0, higher_probs / lower_probs, higher_probs)
# Calculate Progressive Growth Score (excluding class 0)
def calculate_progress_score(prob_row):
# Extract probabilities excluding class 0
probs_excl_0 = prob_row[1:]
n = len(probs_excl_0)
# Compare each probability with all previous ones
total_comparisons = 0
positive_growths = 0
for i in range(1, n):
# Compare with all previous probabilities
comparisons = probs_excl_0[i] > probs_excl_0[:i]
positive_growths += np.sum(comparisons)
total_comparisons += i
# Normalize score between -1 and 1
# -1: perfect reverse stairs, 0: random, 1: perfect stairs
return (2 * positive_growths / total_comparisons - 1) if total_comparisons > 0 else 0
progress_scores = np.array([calculate_progress_score(p) for p in probs])
# Combine metrics into a single array
metrics = np.column_stack([
weighted_probs, # Weighted probability score
max_class_prob, # Maximum probability
positive_class_prob, # Probability of highest class
predicted_class, # Predicted class
(predicted_class == (num_class-1)), # Binary indicator for highest class
skewness, # Skewness towards higher classes
high_low_ratio, # Ratio of higher to lower class probabilities
progress_scores # Progressive Growth Score
])
# Create column names for reference
metric_names = ['weighted_score', 'max_prob', 'positive_prob',
'predicted_class', 'is_positive', 'skewness',
'high_low_ratio', 'progress_score']
return metrics, metric_names
def iteration_summary_classifier(self,results, model, predictions_proba, iteration_num):
"""
Analyze classifier results with focus on directional confidence and class probabilities
@ -954,6 +1342,10 @@ class LibraryTradingModel:
prob_df = pd.DataFrame(predictions_proba, columns=class_names)
prob_df.index = results.index
#calculate positivity skew (probability skew towards positive class 0 to N, where N is most positive)
pos_skew_metrics, names = self.calculate_probability_skew(predictions_proba)
pos_skew_df = pd.DataFrame(pos_skew_metrics, columns=names, index=results.index)
# # Verification step
# def verify_xgb_predictions(model, predictions, predictions_proba):
# # Get predicted class from probabilities
@ -992,17 +1384,17 @@ class LibraryTradingModel:
# Add directional analysis
dir_metrics = evaluate_directional_accuracy(
results,
predictions_proba,
confidence_thresholds={'high': 0.6, 'medium': 0.3}
)
# dir_metrics = evaluate_directional_accuracy(
# results,
# predictions_proba,
# confidence_thresholds={'high': 0.6, 'medium': 0.3}
# )
print(dir_metrics)
# print(dir_metrics)
plot_directional_analysis(
metrics=dir_metrics,
iteration_num=iteration_num)
# plot_directional_analysis(
# metrics=dir_metrics,
# iteration_num=iteration_num)
analysis_df = analyze_return_distribution(prob_df, results["actual"])
fig = plot_distribution_analysis(prob_df, analysis_df, results["actual"])
@ -1020,6 +1412,7 @@ class LibraryTradingModel:
right=[(results["close"], "close") if "close" in results.columns else ()],
left=[],
middle1=[(results["predicted"],"predicted"),(results["actual"],"actual"),(prob_df,)],
middle2=[(pos_skew_df,)]
).chart(size="s", precision=6, title=f"Iteration {iteration_num} classes:{self.config.n_classes} forward_bars:{self.config.forward_bars}")
num_classes = self.config.n_classes
@ -1028,23 +1421,45 @@ class LibraryTradingModel:
for i in range(num_classes):
results[f'prob_class_{i}'] = predictions_proba[:, i]
# Calculate directional probabilities (assuming 5 classes)
results['prob_negative'] = results['prob_class_0'] + results['prob_class_1']
results['prob_neutral'] = results['prob_class_2']
results['prob_positive'] = results['prob_class_3'] + results['prob_class_4']
# Calculate class groupings
if num_classes < 2:
print("Must have at least 2 classes")
raise ValueError("Must have at least 2 classes")
# Calculate directional accuracy
# For 2 classes: negative=[0], positive=[1], no neutral
# For 3 classes: negative=[0], neutral=[1], positive=[2]
# For 4 classes: negative=[0,1], positive=[2,3], no neutral
# For 5 classes: negative=[0,1], neutral=[2], positive=[3,4]
# And so on...
negative_classes = list(range(num_classes // 2 if num_classes > 2 else 1))
positive_classes = list(range(num_classes - (num_classes // 2 if num_classes > 2 else 1), num_classes))
# Calculate if there should be a neutral class
has_neutral = num_classes > 2 and num_classes % 2 == 1
neutral_class = [num_classes // 2] if has_neutral else []
# Calculate directional probabilities
results['prob_negative'] = results[[f'prob_class_{i}' for i in negative_classes]].sum(axis=1)
if has_neutral:
results['prob_neutral'] = results[f'prob_class_{neutral_class[0]}']
else:
results['prob_neutral'] = 0.0
results['prob_positive'] = results[[f'prob_class_{i}' for i in positive_classes]].sum(axis=1)
# Define direction mapping function
def get_direction(x):
if x <= 1: # Classes 0,1
if x in negative_classes:
return 'negative'
elif x >= 3: # Classes 3,4
elif x in positive_classes:
return 'positive'
return 'neutral'
# Calculate directional predictions
results['predicted_direction'] = results['predicted'].map(get_direction)
results['actual_direction'] = results['actual'].map(get_direction)
results['direction_correct'] = results['predicted_direction'] == results['actual_direction']
# 1. Print Summary Statistics
print(f"\n=== Iteration {iteration_num} Summary ===")
print("\nClass Distribution:")
@ -1065,7 +1480,8 @@ class LibraryTradingModel:
print("New confusion matrix")
def plot_threshold_confusion_matrices(results, predictions_proba, thresholds=[0.3, 0.5, 0.8], num_classes=5):
def plot_threshold_confusion_matrices(results, predictions_proba, thresholds=[0.3, 0.5, 0.8], num_classes=8):
"""
Plot confusion matrices for different probability thresholds
"""
@ -1089,8 +1505,23 @@ class LibraryTradingModel:
max_probs = np.max(predictions_proba, axis=1)
confident_mask = max_probs >= threshold
# Only assign predictions where confidence meets threshold
predicted_classes[confident_mask] = np.argmax(predictions_proba[confident_mask], axis=1)
# Debug - print first few samples
# print(f"\nThreshold: {threshold}")
# print("First 5 probabilities arrays:")
# for i in range(5):
# if confident_mask[i]:
# prob_array = predictions_proba[i]
# pred_idx = np.argmax(prob_array)
# print(f"\nSample {i}:")
# print(f"Probabilities: {prob_array}")
# print(f"Max prob: {max_probs[i]:.3f} at index {pred_idx}")
# print(f"Actual class: {results['actual'][i]}")
# Get the raw predictions where confidence meets threshold
raw_predictions = np.argmax(predictions_proba[confident_mask], axis=1)
# Map predictions to correct classes
predicted_classes[confident_mask] = raw_predictions
# Filter results to only include confident predictions
valid_indices = predicted_classes != -1
@ -1099,8 +1530,12 @@ class LibraryTradingModel:
if len(filtered_actual) > 0:
# Calculate confusion matrix for confident predictions
conf_matrix = confusion_matrix(filtered_actual, filtered_predicted)
conf_matrix = confusion_matrix(filtered_actual, filtered_predicted,
labels=range(num_classes))
# Calculate percentage matrix
conf_matrix_pct = conf_matrix / conf_matrix.sum(axis=1)[:, np.newaxis]
conf_matrix_pct = np.nan_to_num(conf_matrix_pct) # Handle division by zero
# Plot heatmap
sns.heatmap(conf_matrix_pct, annot=conf_matrix, fmt='d', cmap='YlOrRd',
@ -1121,10 +1556,9 @@ class LibraryTradingModel:
ax.text(-0.2, (positive_start + negative_end)/2, 'Neutral', rotation=90, verticalalignment='center')
ax.text(-0.2, (positive_start + num_classes)/2, 'Positive', rotation=90, verticalalignment='center')
# Calculate accuracy metrics
# Calculate metrics
accuracy = (filtered_predicted == filtered_actual).mean()
# Calculate directional metrics
def get_direction(x):
negative_end = num_classes // 3
positive_start = num_classes - (num_classes // 3)
@ -1595,52 +2029,6 @@ class LibraryTradingModel:
plt.tight_layout()
plt.show()
def run_rolling_window(self, data: pd.DataFrame, num_iterations: Optional[int] = None) -> Dict:
"""Run the model using a rolling window approach"""
windows = self.get_date_windows(data)
if num_iterations:
windows = windows[:num_iterations]
all_results = {}
#number of warm up bars for each iteration
warm_period = self.config.warm_up_period if self.config.warm_up_period is not None else 0
print("Warmup period:", warm_period)
for i, (train_start, train_end, test_start, test_end) in enumerate(windows):
# If warm_period is 0, use original timestamps, otherwise add warm-up period
if warm_period > 0:
train_warmup_data = data[data.index < train_start].tail(warm_period)
train_start_with_warmup = train_warmup_data.index[0] if not train_warmup_data.empty else train_start
test_warmup_data = data[data.index < test_start].tail(warm_period)
test_start_with_warmup = test_warmup_data.index[0] if not test_warmup_data.empty else test_start
else:
train_start_with_warmup = train_start
test_start_with_warmup = test_start
train_mask = (data.index >= train_start_with_warmup) & (data.index < train_end)
test_mask = (data.index >= test_start_with_warmup) & (data.index < test_end)
train_data = data[train_mask]
test_data = data[test_mask]
min_required_bars = max(20, self.config.forward_bars + 1)
if len(train_data) < min_required_bars or len(test_data) < 1:
print(f"Skipping iteration {i}: Insufficient data")
continue
results, model = self.run_iteration(train_data, test_data, i)
if results is not None:
all_results[i] = {
'train_period': (train_start, train_end),
'test_period': (test_start, test_end),
'results': results,
'model': model
}
return all_results
def generate_feature_dataset(
self,