diff --git a/setup.py b/setup.py index 682d5a3..65e6edc 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup, find_packages setup( name='ttools', - version='0.7.9', + version='0.7.91', packages=find_packages(), install_requires=[ # list your dependencies here diff --git a/ttools/models.py b/ttools/models.py index 45fa312..e5d5fd5 100644 --- a/ttools/models.py +++ b/ttools/models.py @@ -25,6 +25,8 @@ import pandas as pd import numpy as np from traceback import format_exc from scipy.stats import entropy +import pickle +from itertools import zip_longest #https://claude.ai/chat/dc62f18b-f293-4c7e-890d-1e591ce78763 #skew of return prediction @@ -418,12 +420,36 @@ def granger_causality_test(file_path = None, features_df = None): fig.show() +class Bars: + """ + Class to represent a number of bars for walk forward window calculation. + + Parameters: + count (int): Number of bars + """ + def __init__(self, count: int): + if not isinstance(count, int) or count <= 0: + raise ValueError("Bars count must be a positive integer") + self.count = count + + def __repr__(self): + return f"Bars({self.count})" + @dataclass class ModelConfig: """Configuration for the trading model""" - train_days: int = 10 - test_days: int = 1 + train_period: int | pd.Timedelta | Bars = 10 + test_period: int | pd.Timedelta | Bars = 1 + reoptimize_frequency: Optional[int] = 5 #hypertuning every Nth iteration + test_on_train: bool = False forward_bars: int = 5 + target_threshold: float = 1.005 # upper pct threshold for target (1.005 = up by 0.5%) + target_direction: str = "rise" + target_reversal_threshold: float = 0.3 #How much retracement to allow 0.3=30%retracement terminates the window + target_min_bars: int = 2 ## minimum bars to consider a valid movement + target_min_profit_threshold: float = 0.0015 # 0.15% minimum profit threshold to maintain + target_max_drawdown: float = 0.002 # 0.002 = 0.2% maximum drawdown to allow + volatility_window: int = 100 model_type: str = 'classifier' n_classes: int = 3 @@ -507,13 +533,35 @@ class BaseFeatureBuilder(ABC): class LibraryTradingModel: """Main trading model implementation with configuration-based setup""" - def __init__(self, config: Optional[ModelConfig] = None, feature_builder: Optional[BaseFeatureBuilder] = None): + def __init__(self, config: Optional[ModelConfig] = None, feature_builder: Optional[BaseFeatureBuilder] = None, load_model: str = None, save_model = False): self.config = config or ModelConfig() self.feature_builder = feature_builder self.scaler = StandardScaler() self.best_params = None self.study = None - + self.def_params = {'n_estimators': 192, 'max_depth': 4, 'learning_rate': 0.11955492653616603, 'min_child_weight': 6, 'subsample': 0.7593587243666793, 'colsample_bytree': 0.841538282739158, 'gamma': 9.747926761292942e-06, 'lambda': 2.389116689874295, 'alpha': 0.4036592961514103} + self.use_model = None + if load_model is not None: + self.use_model, self.scaler = self.load_pickled(load_model) + self.save_model = save_model + + def load_pickled(self, file): + #LOAD MODEL/SCALER COMBO + # Load both scaler and model + with open(file, "rb") as f: + data = pickle.load(f) + + print("Model LOADED from " + file) + + return data["model"], data["scaler"] + + def save_pickled(self, iteration, model): + file = "model_scaler_" + str(iteration) + ".pkl" + + # Save both scaler and model + with open(file, "wb") as f: + pickle.dump({"model": model, "scaler": self.scaler}, f) + print("Model SAVED as " + file) def get_date_windows(self, data: pd.DataFrame) -> List[Tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp]]: """ Calculate date windows for training and testing using market days. @@ -541,8 +589,8 @@ class LibraryTradingModel: while True: # Calculate indices for train and test windows - train_end_idx = current_idx + self.config.train_days - test_end_idx = train_end_idx + self.config.test_days + train_end_idx = current_idx + self.config.train_period + test_end_idx = train_end_idx + self.config.test_period # Break if we've reached the end of data if test_end_idx >= end_idx: @@ -556,16 +604,125 @@ class LibraryTradingModel: windows.append((current_start, train_end, train_end, test_end)) # Move forward by test period in market days - current_idx += self.config.test_days + current_idx += self.config.test_period return windows + def get_time_windows(self, data: pd.DataFrame) -> List[Tuple[pd.Timestamp, pd.Timestamp, pd.Timestamp, pd.Timestamp]]: + """ + Calculate time windows for training and testing using the data's index. + Supports three types of periods: + - Integer market days + - Time periods (pandas.Timedelta) + - Number of bars (Bars class) + """ + from pandas import Timedelta + + def get_bars_offset(current_idx: int, bars: Bars) -> Optional[pd.Timestamp]: + if current_idx + bars.count >= len(data.index): + return None + return data.index[current_idx + bars.count] + + def get_timedelta_offset(start_date: pd.Timestamp, offset: Timedelta) -> Optional[pd.Timestamp]: + """Handle timedelta offsets using direct datetime operations""" + target_time = start_date + offset + next_idx = data.index.searchsorted(target_time) + + # If we're beyond data range + if next_idx >= len(data.index): + return None + + # Get the actual available timestamp + actual_time = data.index[next_idx] + + # Check if the found timestamp is within acceptable range + # (not more than one original frequency away from target) + # if actual_time - target_time > offset * 0.1: # 10% tolerance + # return None + + return actual_time + + def get_market_day_offset(start_date: pd.Timestamp, offset_days: int) -> Optional[pd.Timestamp]: + start_idx = market_days.searchsorted(start_date) + end_idx = start_idx + offset_days + + if end_idx >= len(market_days): + return None + + target_date = market_days[end_idx] + # Check if we have enough data points after the target date + target_idx = data.index.searchsorted(target_date) + if target_idx >= len(data.index): + return None + + return target_date + + # Rest of the function remains the same... + if isinstance(self.config.train_period, Bars): + def get_offset(start_date, period): + idx = data.index.searchsorted(start_date) + return get_bars_offset(idx, period) + training_period = self.config.train_period + testing_period = self.config.test_period + + elif isinstance(self.config.train_period, Timedelta): + get_offset = get_timedelta_offset + training_period = self.config.train_period + testing_period = self.config.test_period + + elif isinstance(self.config.train_period, (int, float)): + import pandas_market_calendars as mcal + nyse = mcal.get_calendar('NYSE') + + schedule = nyse.schedule( + start_date=data.index[0].tz_convert('America/New_York'), + end_date=data.index[-1].tz_convert('America/New_York') + ) + market_days = pd.DatetimeIndex(schedule.index).tz_localize('US/Eastern') + + get_offset = get_market_day_offset + training_period = self.config.train_period + testing_period = self.config.test_period + + else: + raise ValueError( + "train_period and test_period must be either:\n" + "- Integer (market days)\n" + "- pandas.Timedelta (time period)\n" + "- Bars (number of bars)" + ) + + windows = [] + if isinstance(self.config.train_period, (int, float)): + current_start = market_days[0] + else: + current_start = data.index[0] + + while True: + train_end = get_offset(current_start, training_period) + if train_end is None: + break + + test_end = get_offset(train_end, testing_period) + if test_end is None: + break + + windows.append((current_start, train_end, train_end, test_end)) + + next_start = get_offset(current_start, testing_period) + if next_start is None: + break + current_start = next_start + + return windows + def create_model(self, trial=None): """Create XGBoost model with either default or Optuna-suggested parameters""" if self.config.model_type == 'classifier': + num_class = self.config.n_classes if self.config.n_classes > 2 else None #for binary no num_class from xgboost import XGBClassifier if trial is None: - return XGBClassifier(n_estimators=100, random_state=42, num_class=self.config.n_classes) + return XGBClassifier(n_estimators=100, random_state=42, num_class=num_class) else: params = { 'n_estimators': trial.suggest_int('n_estimators', 50, 300), @@ -575,9 +732,11 @@ class LibraryTradingModel: 'subsample': trial.suggest_float('subsample', 0.6, 1.0), 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), 'gamma': trial.suggest_float('gamma', 1e-8, 1.0, log=True), + "lambda": trial.suggest_float("lambda", 1e-3, 10, log=True), + "alpha": trial.suggest_float("alpha", 1e-3, 10, log=True), 'random_state': 42 } - return XGBClassifier(**params, num_class=self.config.n_classes) + return XGBClassifier(**params, num_class=num_class) else: from xgboost import XGBRegressor if trial is None: @@ -595,55 +754,176 @@ class LibraryTradingModel: } return XGBRegressor(**params) - def objective(self, trial, X_train, y_train, X_val, y_val): - """Optuna objective function for hyperparameter optimization""" - model = self.create_model(trial) + def run_rolling_window(self, data: pd.DataFrame, num_iterations: Optional[int] = None) -> Dict: + """Run the model using a rolling window approach""" + windows = self.get_time_windows(data) + if num_iterations: + windows = windows[:num_iterations] - # Train the model - model.fit( - X_train, - y_train, - eval_set=[(X_val, y_val)], - early_stopping_rounds=50, - verbose=False - ) - - # Evaluate based on model type - if self.config.model_type == 'classifier': - from sklearn.metrics import accuracy_score - pred = model.predict(X_val) - score = accuracy_score(y_val, pred) - else: - from sklearn.metrics import mean_squared_error - pred = model.predict(X_val) - score = -mean_squared_error(y_val, pred, squared=False) # Negative RMSE for maximization + all_results = {} + + # Reset best_params for each rolling window run + self.best_params = None + + # Add hyperparameter reoptimization frequency + reoptimize_every_n_iterations = self.config.reoptimize_frequency or 5 + + #number of warm up bars for each iteration + warm_period = self.config.warm_up_period if self.config.warm_up_period is not None else 0 + print("Warmup period:", warm_period) + + for i, (train_start, train_end, test_start, test_end) in enumerate(windows): + # If warm_period is 0, use original timestamps, otherwise add warm-up period + if warm_period > 0: + train_warmup_data = data[data.index < train_start].tail(warm_period) + train_start_with_warmup = train_warmup_data.index[0] if not train_warmup_data.empty else train_start + + test_warmup_data = data[data.index < test_start].tail(warm_period) + test_start_with_warmup = test_warmup_data.index[0] if not test_warmup_data.empty else test_start + else: + train_start_with_warmup = train_start + test_start_with_warmup = test_start - return score + train_mask = (data.index >= train_start_with_warmup) & (data.index < train_end) + test_mask = (data.index >= test_start_with_warmup) & (data.index < test_end) + + train_data = data[train_mask] + test_data = data[test_mask] + + min_required_bars = max(20, self.config.forward_bars + 1) + if len(train_data) < min_required_bars or len(test_data) < 1: + print(f"Skipping iteration {i}: Insufficient data") + continue + + # Reoptimize hyperparameters periodically + if i % reoptimize_every_n_iterations == 0: + self.best_params = None # Force reoptimization + + results, model = self.run_iteration(train_data, test_data, i) + + if results is not None: + if self.save_model: #save model if required + self.save_pickled(i, model) + + all_results[i] = { + 'train_period': (train_start, train_end), + 'test_period': (test_start, test_end), + 'results': results, + 'model': model, + 'hyperparameters': self.best_params.copy() if self.best_params else None + } + + return all_results def optimize_hyperparameters(self, X_train, y_train, n_trials=2): - """Run Optuna hyperparameter optimization""" + """Run Optuna hyperparameter optimization using time series cross-validation""" import optuna - from sklearn.model_selection import train_test_split + from sklearn.model_selection import TimeSeriesSplit print("\nStarting hyperparameter optimization...") - # Split training data into train and validation sets - X_train_opt, X_val, y_train_opt, y_val = train_test_split( - X_train, y_train, test_size=0.2, random_state=42 + # Calculate appropriate number of splits based on data size + total_samples = len(X_train) + test_size = int(len(X_train) * 0.2) # 20% validation size + gap = self.config.forward_bars + + # Calculate maximum possible splits + available_samples = total_samples + max_splits = 0 + while available_samples >= 2 * test_size + gap: # Need at least 2x test_size (for train and test) plus gap + available_samples -= (test_size + gap) + max_splits += 1 + + n_splits = min(3, max_splits) # Use at most 3 splits, or fewer if data doesn't allow more + + print(f"Using {n_splits} splits for time series cross-validation") + print(f"Total samples: {total_samples}, Test size: {test_size}, Gap: {gap}") + + # Create time series cross-validation splits + tscv = TimeSeriesSplit( + n_splits=n_splits, + test_size=test_size, + gap=gap ) - # Create Optuna study - if self.config.model_type == 'classifier': - study = optuna.create_study(direction='maximize') # Maximize accuracy - else: - study = optuna.create_study(direction='maximize') # Maximize negative RMSE + def objective(trial): + params = { + 'n_estimators': trial.suggest_int('n_estimators', 50, 300), + 'max_depth': trial.suggest_int('max_depth', 3, 10), + 'learning_rate': trial.suggest_float('learning_rate', 0.01, 0.3, log=True), + 'min_child_weight': trial.suggest_int('min_child_weight', 1, 7), + 'subsample': trial.suggest_float('subsample', 0.6, 1.0), + 'colsample_bytree': trial.suggest_float('colsample_bytree', 0.6, 1.0), + 'gamma': trial.suggest_float('gamma', 1e-8, 1.0, log=True), + "lambda": trial.suggest_float("lambda", 1e-3, 10, log=True), + "alpha": trial.suggest_float("alpha", 1e-3, 10, log=True), + 'random_state': 42 + } - # Run optimization - study.optimize( - lambda trial: self.objective(trial, X_train_opt, y_train_opt, X_val, y_val), - n_trials=n_trials + # Store scores from each fold + fold_scores = [] + + # Time series cross-validation + for fold, (train_idx, val_idx) in enumerate(tscv.split(X_train)): + X_fold_train = X_train.iloc[train_idx] + y_fold_train = y_train.iloc[train_idx] + X_fold_val = X_train.iloc[val_idx] + y_fold_val = y_train.iloc[val_idx] + + # Create and train model + if self.config.model_type == 'classifier': + model = XGBClassifier(**params) + if self.config.n_classes > 2: + model.set_params(num_class=self.config.n_classes) + else: + model = XGBRegressor(**params) + + # Handle class imbalance for binary classification + if self.config.n_classes == 2: + n_0 = sum(y_fold_train == 0) + n_1 = sum(y_fold_train == 1) + scale_pos_weight = n_0 / n_1 + model.set_params(scale_pos_weight=scale_pos_weight) + + # Train with early stopping + model.fit( + X_fold_train, + y_fold_train, + eval_set=[(X_fold_val, y_fold_val)], + early_stopping_rounds=50, + verbose=False + ) + + # Calculate score + if self.config.model_type == 'classifier': + pred = model.predict(X_fold_val) + score = accuracy_score(y_fold_val, pred) + else: + pred = model.predict(X_fold_val) + score = -mean_squared_error(y_fold_val, pred, squared=False) + + fold_scores.append(score) + + # Return mean score across folds + return np.mean(fold_scores) + + print("init optuna study") + + # Create Optuna study + study = optuna.create_study( + direction='maximize', + pruner=optuna.pruners.MedianPruner( + n_startup_trials=5, + n_warmup_steps=20, + interval_steps=10 + ) ) + print("starting optimization") + + # Run optimization + study.optimize(objective, n_trials=n_trials) + self.study = study self.best_params = study.best_params @@ -659,6 +939,10 @@ class LibraryTradingModel: iteration_num: int) -> Tuple[Optional[pd.DataFrame], Optional[object]]: """Run a single iteration of training and testing with optional hyperparameter optimization""" try: + train_features = None + train_target = None + train_cols = [] + test_cols = [] print(f"\nProcessing iteration {iteration_num}") print(f"Training: {train_data.index[0]} to {train_data.index[-1]} : {train_data.shape}") print(f"Testing: {test_data.index[0]} to {test_data.index[-1]} : {test_data.shape}") @@ -669,50 +953,72 @@ class LibraryTradingModel: print("Features created. Target starting") train_target = self.feature_builder.create_target(train_features) print("Target created") + + if self.use_model is None: #when model is provided, dont prepare train data, only test + X_train = train_features + y_train = train_target + + print("TRAIN-----") + print(f"X_train shape: {X_train.shape}", X_train.index[[0,-1]]) + print(f"y_train shape: {y_train.shape}", y_train.index[[0,-1]]) + print("Removing NaNs") + + # Remove NaN values or infinite values + y_train = y_train.replace([np.inf, -np.inf], np.nan) + mask_train = ~y_train.isna() + X_train = X_train[mask_train] + y_train = y_train[mask_train] + + print(f"X_train shape after cleaning: {X_train.shape}", X_train.index[[0,-1]]) + print(f"y_train shape after cleaning: {y_train.shape}", y_train.index[[0,-1]]) + print(f"X_train columns: {X_train.columns}") + train_cols = set(X_train.columns) + train_columns = X_train.columns + + if len(X_train) < self.config.forward_bars + 1: + print(f"Warning: Iteration {iteration_num} - Insufficient training data") + return None, None + + # Scale features + print("Scaling features...") + X_train_scaled = self.scaler.fit_transform(X_train) + X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns, index=X_train.index) - X_train = train_features - y_train = train_target - - print("TRAIN-----") - print(f"X_train shape: {X_train.shape}", X_train.index[[0,-1]]) - print(f"y_train shape: {y_train.shape}", y_train.index[[0,-1]]) - print("Removing NaNs") - - # Remove NaN values or infinite values - y_train = y_train.replace([np.inf, -np.inf], np.nan) - mask_train = ~y_train.isna() - X_train = X_train[mask_train] - y_train = y_train[mask_train] - - print(f"X_train shape after cleaning: {X_train.shape}", X_train.index[[0,-1]]) - print(f"y_train shape after cleaning: {y_train.shape}", y_train.index[[0,-1]]) - print(f"X_train columns: {X_train.columns}") - train_cols = set(X_train.columns) - train_columns = X_train.columns - - if len(X_train) < self.config.forward_bars + 1: - print(f"Warning: Iteration {iteration_num} - Insufficient training data") - return None, None - - # Scale features - X_train_scaled = self.scaler.fit_transform(X_train) - X_train_scaled = pd.DataFrame(X_train_scaled, columns=X_train.columns, index=X_train.index) - - # Run hyperparameter optimization if not done yet - if self.best_params is None: - self.optimize_hyperparameters(X_train_scaled, y_train, self.config.optuna_trials) - - # Create and train model with best parameters - model = self.create_model() - if self.best_params: - model.set_params(**self.best_params) - - model.fit(X_train_scaled, y_train) + # Run hyperparameter optimization if not done yet + if self.best_params is None and self.config.optuna_trials is not None and self.config.optuna_trials > 0: + print("optimization started...") + self.optimize_hyperparameters(X_train_scaled, y_train, self.config.optuna_trials) + + # Create and train model with best parameters + model = self.create_model() + if self.best_params: + model.set_params(**self.best_params) + else: + print("Using default hyperparameters",self.def_params) + model.set_params(**self.def_params) + + #balance unbalanced classes, works for binary:logistics + if self.config.n_classes == 2: + n_0 = sum(y_train == 0) # 900 + n_1 = sum(y_train == 1) # 100 + scale_pos_weight = n_0 / n_1 # 900/100 = 9 + model.set_params(scale_pos_weight=scale_pos_weight) + + print("Model Training...") + model.fit(X_train_scaled, y_train) + else: + print("Using PROVIDED MODEL and SCALER") + model = self.use_model print("TEST-----") - test_features, features_cols = self.feature_builder.prepare_features(test_data) - X_test = test_features - y_test = self.feature_builder.create_target(test_features, train_data=train_features) + if self.config.test_on_train: + print("TESTED ON TRAIN DATA!") + X_test = train_features + y_test = train_target + else: + test_features, features_cols = self.feature_builder.prepare_features(test_data) + X_test = test_features + y_test = self.feature_builder.create_target(test_features, train_data=train_features) print(f"X_test shape: {X_test.shape}", X_test.index[[0,-1]]) print(f"y_test shape: {y_test.shape}", y_test.index[[0,-1]]) @@ -738,16 +1044,21 @@ class LibraryTradingModel: print(f"X_test shape after trimming: {X_test.shape}", X_test.index[[0,-1]]) print(f"y_test shape after trimming: {y_test.shape}", y_test.index[[0,-1]]) - # Find columns in test but not in train - extra_in_test = test_cols - train_cols - print("Extra columns in X_test:", extra_in_test) + if self.use_model is None: + # Find columns in test but not in train + extra_in_test = test_cols - train_cols + print("Extra columns in X_test:", extra_in_test) - # Find columns in train but not in test - extra_in_train = train_cols - test_cols - print("Extra columns in X_train:", extra_in_train) + # Find columns in train but not in test + extra_in_train = train_cols - test_cols + print("Extra columns in X_train:", extra_in_train) + + # Reorder X_test columns to match, excpet when model was provided + X_test = X_test[train_columns] + else: + # Assuming X_test is a DataFrame, ensure ordering + X_test = X_test[self.scaler.feature_names_in_] - # Reorder X_test columns to match - X_test = X_test[train_columns] X_test_scaled = self.scaler.transform(X_test) X_test_scaled = pd.DataFrame(X_test_scaled, columns=X_test.columns, index=X_test.index) @@ -782,6 +1093,83 @@ class LibraryTradingModel: print(f"Error in iteration {iteration_num}: {str(e)} - {format_exc()}") return None, None + + def calculate_probability_skew(self, predict_proba_output, weights=None): + """ + Calculate probability skew metrics for each individual prediction. + """ + num_class = self.config.n_classes + # Validate input dimensions + probs = np.array(predict_proba_output) + if probs.shape[1] != num_class: + raise ValueError(f"predict_proba_output shape {probs.shape} doesn't match num_class {num_class}") + + # Create linear weights if not provided + if weights is None: + weights = np.linspace(0, num_class - 1, num_class) + elif len(weights) != num_class: + raise ValueError(f"weights length {len(weights)} doesn't match num_class {num_class}") + + # Calculate weighted score for each prediction + weighted_probs = np.sum(probs * weights, axis=1) + + # Calculate basic metrics for each prediction + max_class_prob = np.max(probs, axis=1) + positive_class_prob = probs[:, -1] + predicted_class = np.argmax(probs, axis=1) + + # Calculate skewness + center = (num_class - 1) / 2 + distances = np.arange(num_class) - center + skewness = np.sum(probs * distances, axis=1) / center + + # Calculate high/low ratio + mid_point = num_class // 2 + higher_probs = np.sum(probs[:, mid_point:], axis=1) + lower_probs = np.sum(probs[:, :mid_point], axis=1) + high_low_ratio = np.where(lower_probs > 0, higher_probs / lower_probs, higher_probs) + + # Calculate Progressive Growth Score (excluding class 0) + def calculate_progress_score(prob_row): + # Extract probabilities excluding class 0 + probs_excl_0 = prob_row[1:] + n = len(probs_excl_0) + + # Compare each probability with all previous ones + total_comparisons = 0 + positive_growths = 0 + + for i in range(1, n): + # Compare with all previous probabilities + comparisons = probs_excl_0[i] > probs_excl_0[:i] + positive_growths += np.sum(comparisons) + total_comparisons += i + + # Normalize score between -1 and 1 + # -1: perfect reverse stairs, 0: random, 1: perfect stairs + return (2 * positive_growths / total_comparisons - 1) if total_comparisons > 0 else 0 + + progress_scores = np.array([calculate_progress_score(p) for p in probs]) + + # Combine metrics into a single array + metrics = np.column_stack([ + weighted_probs, # Weighted probability score + max_class_prob, # Maximum probability + positive_class_prob, # Probability of highest class + predicted_class, # Predicted class + (predicted_class == (num_class-1)), # Binary indicator for highest class + skewness, # Skewness towards higher classes + high_low_ratio, # Ratio of higher to lower class probabilities + progress_scores # Progressive Growth Score + ]) + + # Create column names for reference + metric_names = ['weighted_score', 'max_prob', 'positive_prob', + 'predicted_class', 'is_positive', 'skewness', + 'high_low_ratio', 'progress_score'] + + return metrics, metric_names + def iteration_summary_classifier(self,results, model, predictions_proba, iteration_num): """ Analyze classifier results with focus on directional confidence and class probabilities @@ -954,6 +1342,10 @@ class LibraryTradingModel: prob_df = pd.DataFrame(predictions_proba, columns=class_names) prob_df.index = results.index + #calculate positivity skew (probability skew towards positive class 0 to N, where N is most positive) + pos_skew_metrics, names = self.calculate_probability_skew(predictions_proba) + pos_skew_df = pd.DataFrame(pos_skew_metrics, columns=names, index=results.index) + # # Verification step # def verify_xgb_predictions(model, predictions, predictions_proba): # # Get predicted class from probabilities @@ -992,17 +1384,17 @@ class LibraryTradingModel: # Add directional analysis - dir_metrics = evaluate_directional_accuracy( - results, - predictions_proba, - confidence_thresholds={'high': 0.6, 'medium': 0.3} - ) + # dir_metrics = evaluate_directional_accuracy( + # results, + # predictions_proba, + # confidence_thresholds={'high': 0.6, 'medium': 0.3} + # ) - print(dir_metrics) + # print(dir_metrics) - plot_directional_analysis( - metrics=dir_metrics, - iteration_num=iteration_num) + # plot_directional_analysis( + # metrics=dir_metrics, + # iteration_num=iteration_num) analysis_df = analyze_return_distribution(prob_df, results["actual"]) fig = plot_distribution_analysis(prob_df, analysis_df, results["actual"]) @@ -1020,6 +1412,7 @@ class LibraryTradingModel: right=[(results["close"], "close") if "close" in results.columns else ()], left=[], middle1=[(results["predicted"],"predicted"),(results["actual"],"actual"),(prob_df,)], + middle2=[(pos_skew_df,)] ).chart(size="s", precision=6, title=f"Iteration {iteration_num} classes:{self.config.n_classes} forward_bars:{self.config.forward_bars}") num_classes = self.config.n_classes @@ -1028,23 +1421,45 @@ class LibraryTradingModel: for i in range(num_classes): results[f'prob_class_{i}'] = predictions_proba[:, i] - # Calculate directional probabilities (assuming 5 classes) - results['prob_negative'] = results['prob_class_0'] + results['prob_class_1'] - results['prob_neutral'] = results['prob_class_2'] - results['prob_positive'] = results['prob_class_3'] + results['prob_class_4'] + # Calculate class groupings + if num_classes < 2: + print("Must have at least 2 classes") + raise ValueError("Must have at least 2 classes") - # Calculate directional accuracy + # For 2 classes: negative=[0], positive=[1], no neutral + # For 3 classes: negative=[0], neutral=[1], positive=[2] + # For 4 classes: negative=[0,1], positive=[2,3], no neutral + # For 5 classes: negative=[0,1], neutral=[2], positive=[3,4] + # And so on... + + negative_classes = list(range(num_classes // 2 if num_classes > 2 else 1)) + positive_classes = list(range(num_classes - (num_classes // 2 if num_classes > 2 else 1), num_classes)) + + # Calculate if there should be a neutral class + has_neutral = num_classes > 2 and num_classes % 2 == 1 + neutral_class = [num_classes // 2] if has_neutral else [] + + # Calculate directional probabilities + results['prob_negative'] = results[[f'prob_class_{i}' for i in negative_classes]].sum(axis=1) + if has_neutral: + results['prob_neutral'] = results[f'prob_class_{neutral_class[0]}'] + else: + results['prob_neutral'] = 0.0 + results['prob_positive'] = results[[f'prob_class_{i}' for i in positive_classes]].sum(axis=1) + + # Define direction mapping function def get_direction(x): - if x <= 1: # Classes 0,1 + if x in negative_classes: return 'negative' - elif x >= 3: # Classes 3,4 + elif x in positive_classes: return 'positive' return 'neutral' - + + # Calculate directional predictions results['predicted_direction'] = results['predicted'].map(get_direction) results['actual_direction'] = results['actual'].map(get_direction) results['direction_correct'] = results['predicted_direction'] == results['actual_direction'] - + # 1. Print Summary Statistics print(f"\n=== Iteration {iteration_num} Summary ===") print("\nClass Distribution:") @@ -1065,7 +1480,8 @@ class LibraryTradingModel: print("New confusion matrix") - def plot_threshold_confusion_matrices(results, predictions_proba, thresholds=[0.3, 0.5, 0.8], num_classes=5): + + def plot_threshold_confusion_matrices(results, predictions_proba, thresholds=[0.3, 0.5, 0.8], num_classes=8): """ Plot confusion matrices for different probability thresholds """ @@ -1089,8 +1505,23 @@ class LibraryTradingModel: max_probs = np.max(predictions_proba, axis=1) confident_mask = max_probs >= threshold - # Only assign predictions where confidence meets threshold - predicted_classes[confident_mask] = np.argmax(predictions_proba[confident_mask], axis=1) + # Debug - print first few samples + # print(f"\nThreshold: {threshold}") + # print("First 5 probabilities arrays:") + # for i in range(5): + # if confident_mask[i]: + # prob_array = predictions_proba[i] + # pred_idx = np.argmax(prob_array) + # print(f"\nSample {i}:") + # print(f"Probabilities: {prob_array}") + # print(f"Max prob: {max_probs[i]:.3f} at index {pred_idx}") + # print(f"Actual class: {results['actual'][i]}") + + # Get the raw predictions where confidence meets threshold + raw_predictions = np.argmax(predictions_proba[confident_mask], axis=1) + + # Map predictions to correct classes + predicted_classes[confident_mask] = raw_predictions # Filter results to only include confident predictions valid_indices = predicted_classes != -1 @@ -1099,8 +1530,12 @@ class LibraryTradingModel: if len(filtered_actual) > 0: # Calculate confusion matrix for confident predictions - conf_matrix = confusion_matrix(filtered_actual, filtered_predicted) + conf_matrix = confusion_matrix(filtered_actual, filtered_predicted, + labels=range(num_classes)) + + # Calculate percentage matrix conf_matrix_pct = conf_matrix / conf_matrix.sum(axis=1)[:, np.newaxis] + conf_matrix_pct = np.nan_to_num(conf_matrix_pct) # Handle division by zero # Plot heatmap sns.heatmap(conf_matrix_pct, annot=conf_matrix, fmt='d', cmap='YlOrRd', @@ -1121,10 +1556,9 @@ class LibraryTradingModel: ax.text(-0.2, (positive_start + negative_end)/2, 'Neutral', rotation=90, verticalalignment='center') ax.text(-0.2, (positive_start + num_classes)/2, 'Positive', rotation=90, verticalalignment='center') - # Calculate accuracy metrics + # Calculate metrics accuracy = (filtered_predicted == filtered_actual).mean() - # Calculate directional metrics def get_direction(x): negative_end = num_classes // 3 positive_start = num_classes - (num_classes // 3) @@ -1595,52 +2029,6 @@ class LibraryTradingModel: plt.tight_layout() plt.show() - def run_rolling_window(self, data: pd.DataFrame, num_iterations: Optional[int] = None) -> Dict: - """Run the model using a rolling window approach""" - windows = self.get_date_windows(data) - if num_iterations: - windows = windows[:num_iterations] - - all_results = {} - - #number of warm up bars for each iteration - warm_period = self.config.warm_up_period if self.config.warm_up_period is not None else 0 - print("Warmup period:", warm_period) - - for i, (train_start, train_end, test_start, test_end) in enumerate(windows): - # If warm_period is 0, use original timestamps, otherwise add warm-up period - if warm_period > 0: - train_warmup_data = data[data.index < train_start].tail(warm_period) - train_start_with_warmup = train_warmup_data.index[0] if not train_warmup_data.empty else train_start - - test_warmup_data = data[data.index < test_start].tail(warm_period) - test_start_with_warmup = test_warmup_data.index[0] if not test_warmup_data.empty else test_start - else: - train_start_with_warmup = train_start - test_start_with_warmup = test_start - - train_mask = (data.index >= train_start_with_warmup) & (data.index < train_end) - test_mask = (data.index >= test_start_with_warmup) & (data.index < test_end) - - train_data = data[train_mask] - test_data = data[test_mask] - - min_required_bars = max(20, self.config.forward_bars + 1) - if len(train_data) < min_required_bars or len(test_data) < 1: - print(f"Skipping iteration {i}: Insufficient data") - continue - - results, model = self.run_iteration(train_data, test_data, i) - - if results is not None: - all_results[i] = { - 'train_period': (train_start, train_end), - 'test_period': (test_start, test_end), - 'results': results, - 'model': model - } - - return all_results def generate_feature_dataset( self,