Compare commits

...

23 Commits

Author SHA1 Message Date
be7de0ef19 update 2025-07-31 14:01:52 +02:00
36c7c9f68d vault backup: 2025-06-30 14:08:10 2025-06-30 14:08:10 +02:00
e82f639932 vault backup: 2025-06-30 12:14:14 2025-06-30 12:14:14 +02:00
c60a41d7ff change 2025-06-30 12:07:39 +02:00
23c1862610 commit po pul roce 2025-06-26 10:14:25 +02:00
28a5e6ecf3 update 2024-10-24 12:32:16 +02:00
76ea5daa6f fix 2024-10-22 16:31:55 +02:00
cbc7b5325f daily update 2024-10-22 15:15:03 +02:00
a05be4933f fix 2024-10-21 20:12:21 +02:00
9abf7dada5 fix 2024-10-18 14:45:22 +02:00
490fc11098 fix 2024-10-18 12:22:59 +02:00
c8d3df7f70 daily fix 2024-10-18 11:17:59 +02:00
3152fcb0b5 daily fix 2024-10-17 09:30:05 +02:00
58b6bde651 fix 2024-10-16 20:36:01 +02:00
cc91d106c3 fix 2024-10-16 20:23:58 +02:00
c49f7e1ed6 daily update 2024-10-16 17:36:59 +02:00
804ade5c29 fix 2024-10-16 15:13:01 +02:00
b64aa93c9b fix 2024-10-16 12:38:24 +02:00
15439bb98f fix 2024-10-16 12:36:47 +02:00
eac1ba13de fix 2024-10-16 12:35:43 +02:00
80b785d661 update 2024-10-16 12:34:41 +02:00
195d9d7ca9 update 2024-10-16 12:32:44 +02:00
48c8f82b32 daily update 2024-10-16 12:32:26 +02:00
18 changed files with 3190 additions and 659 deletions

5
.gitignore vendored
View File

@ -1,2 +1,5 @@
/.venv/
.venv
.venv
/.vscode/
.vscode
.obsidian

49
.vscode/launch.json vendored
View File

@ -1,49 +0,0 @@
{
"version": "0.2.0",
"configurations": [
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"port": 5678, // or the port used by your Jupyter server
"justMyCode": false
},
{
"name": "Python: Aktuální soubor",
"type": "python",
"request": "launch",
"program": "${file}",
"cwd": "${workspaceFolder}",
"env": {
"PYTHONPATH": "${workspaceFolder}:${workspaceFolder}/bld"
},
"console": "integratedTerminal",
"justMyCode": false,
"python": "${command:python.interpreterPath}",
"internalConsoleOptions": "openOnSessionStart"
},
{
"name": "Python: Main",
"type": "python",
"request": "launch",
"program": "v2realbot/main.py",
"justMyCode": false
},
{
"name": "Python: File",
"type": "python",
"request": "launch",
"program": "${file}",
"justMyCode": false
},
{
"name": "Python: Attach",
"type": "python",
"request": "attach",
"connect": {
"host": "localhost",
"port": 5678
}
}
]
}

View File

@ -1,3 +0,0 @@
{
"git.ignoreLimitWarning": true
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 25 KiB

View File

@ -1,3 +1,3 @@
# snippets
A collection of reusable code snippets
* vbtpro [vbt-snippets.MD](vbt-snippets.MD)
* vbtpro [vbt-snippets.md](vbt-snippets.md)

4
deleteme.md Normal file
View File

@ -0,0 +1,4 @@
[Fetching data](#Fetching%20data)
# Fetching data
fdfd

75
docs.md Normal file
View File

@ -0,0 +1,75 @@
## MCP
### Local vbtpro mcp server
main code in `vectorbtpro.mcp_server` can run either in `stdio` or in `streamable-http mode`
Run mcp server in streamable-http mode (currently manually in terminal as single process for all local mcp clients)
```bash
cd /Users/davidbrazda/Documents/Development/python/vectorbt.pro-2025.6.24
source .venv/bin/activate
python -m vectorbtpro.mcp_server --transport streamable-http
```
Claude desktop (currently doesnt support remotes) can use it by `mcp-remote`
```json
"vectorbtpro_server": {
"command": "npx",
"args": [
"-y",
"mcp-remote",
"http://localhost:8000/mcp"
]
}
```
Other MCPs (Claude code, Gemini CLI)
```json
{
"mcpServers": {
"vectorbtpro_server": {
"transport": "http",
"url": "http://localhost:8000"
}
}
}
```
## Claude Desktop
**Settings:**
`/Users/davidbrazda/Library/Application Support/Claude/claude_desktop_config.json`
**Logs:**
`/Users/davidbrazda/Library/Logs/Claude` (main.log,mcp.log, mcp_server-name.log)
## Claude code
[Doc](https://docs.anthropic.com/en/docs/claude-code/mcp)
**Settings:**
- User/global settings (`~/.claude.json`)
- Project-specific MCP file (`.mcp.json`)
**Dedicated MCP file:** ~/.claude/mcp_servers.json
**Logs:**
## Claude Code Router
[doc](https://github.com/musistudio/claude-code-router)
**Settings:**
To set models for each tasks
`~/.claude-code-router/config.json`
**Logs:**
`~/.claude-code-router/claude-code-router.log`

259
features_targets.md Normal file
View File

@ -0,0 +1,259 @@
Here goes the target features
> [!NOTE] Poznámka
> Contents
> [!note]- poznám ka
> neco
> neco
> [!example]- Graph: voaltility averagae slope
> ![[Pasted image 20250630140635.png]]
> [!example]- Graph: volatility average slope across 1d to 30d range of windows
> ![[Volatility_average_slope.png]]
# Things to try
TODO:
* lepsi labeling
* continue here https://claude.ai/chat/b3ee78b6-9662-4f25-95f0-ecac4a78a41b
* try model with other symbols
* rey different retraining options (even hourly)
Features:
- add datetime features (useful for rush hour model)
- add MT features as columns
- use convolutional networks to create features (https://www.youtube.com/watch?v=6wK4q8QvsV4)
Enhance model:
* multi target see xgb doc
* use SL with target price, with validy for few seconds
* how handle imbalanced datase https://xgboost.readthedocs.io/en/stable/tutorials/param_tuning.html
Target:
- maybe add manual labeling
# Features
```python
def prepare_features(self, df: pd.DataFrame) -> tuple[pd.DataFrame, list]:
"""Prepare enhanced features from input df with focus on predictive potential"""
features = pd.DataFrame(index=df.index)
# Original ohlcv added to features
features['close'] = df['close']
features['volume'] = df['volume']
features['trades_count'] = df['trades']
features['buy_volume'] = df['buyvolume']
features['sell_volume'] = df['sellvolume']
features['high'] = df['high']
features['low'] = df['low']
# features['log_return'] = np.log(features['close'] / features['close'].shift(1))
# features['returns_1'] = features['close'].pct_change()
# features['returns_5'] = features['close'].pct_change(5)
# features['returns_20'] = features['close'].pct_change(20)
def get_fib_windows():
"""
#TODO based on real time (originally for 1s bars)
Generate Fibonacci sequence windows up to ~1 hour (3600 seconds)
Returns sequence: 3, 5, 8, 13, 21, 34, 55, 89, 144, 233, 377, 610, 987, 1597, 2584
"""
fib_windows = [3, 5]
while fib_windows[-1] < 3600/60:
next_fib = fib_windows[-1] + fib_windows[-2]
if next_fib > 3600/60:
break
fib_windows.append(next_fib)
return fib_windows
fib_windows = get_fib_windows()
# Base price and returns
features['log_return'] = np.log(features['close'] / features['close'].shift(1))
features['price_velocity'] = (features['close'] - features['close'].shift(1)) / 1.0 # per second
features['price_acceleration'] = features['price_velocity'] - features['price_velocity'].shift(1)
# Fibonacci-based features
for window in fib_windows:
# Price features
features[f'log_return_{window}s'] = np.log(features['close'] / features['close'].shift(window))
features[f'volatility_{window}s'] = features['log_return'].rolling(window).std()
features[f'range_{window}s'] = (features['high'].rolling(window).max() -
features['low'].rolling(window).min()) / features['close']
# Volume features
features[f'volume_momentum_{window}s'] = (
features['volume'].rolling(window).mean() /
features['volume'].rolling(window * 2).mean()
)
features[f'buy_volume_momentum_{window}s'] = (
features['buy_volume'].rolling(window).mean() /
features['buy_volume'].rolling(window * 2).mean()
)
features[f'sell_volume_momentum_{window}s'] = (
features['sell_volume'].rolling(window).mean() /
features['sell_volume'].rolling(window * 2).mean()
)
# Trade features
features[f'trade_intensity_{window}s'] = (
features['trades_count'].rolling(window).mean() /
features['trades_count'].rolling(window * 2).mean()
)
features[f'avg_trade_size_{window}s'] = (
features['volume'].rolling(window).sum() /
features['trades_count'].rolling(window).sum()
)
# Order flow features
features[f'cum_volume_delta_{window}s'] = (
features['buy_volume'] - features['sell_volume']
).rolling(window).sum()
features[f'volume_pressure_{window}s'] = (
features['buy_volume'].rolling(window).sum() /
features['sell_volume'].rolling(window).sum()
)
# Price efficiency
features[f'price_efficiency_{window}s'] = (
np.abs(features['close'] - features['close'].shift(window)) /
(features['high'].rolling(window).max() - features['low'].rolling(window).min())
)
# Moving averages and their crosses
features[f'sma_{window}s'] = features['close'].rolling(window).mean()
if window > 5: # Create MA crosses with shorter timeframe
features[f'ma_cross_5_{window}s'] = (
features['close'].rolling(5).mean() -
features['close'].rolling(window).mean()
)
# MA-based features
ma_lengths = [5, 10, 20, 50]
for length in ma_lengths:
# Regular MAs
features[f'ma_{length}'] = features['close'].rolling(length).mean()
# MA slopes (rate of change)
features[f'ma_{length}_slope'] = features[f'ma_{length}'].pct_change(3)
# Price distance from MA
features[f'price_ma_{length}_dist'] = (features['close'] - features[f'ma_{length}']) / features[f'ma_{length}']
# MA crossovers
if length > 5:
features[f'ma_5_{length}_cross'] = (features['ma_5'] - features[f'ma_{length}']) / features[f'ma_{length}']
# MA convergence/divergence
features['ma_convergence'] = ((features['ma_5'] - features['ma_20']).abs() /
features['ma_20'].rolling(10).mean())
# Volatility features using MAs
features['ma_volatility'] = features['ma_5'].rolling(10).std() / features['ma_20']
# MA momentum
features['ma_momentum'] = (features['ma_5'] / features['ma_5'].shift(5) - 1) * 100
# Cleanup and feature selection
features = features.replace([np.inf, -np.inf], np.nan)
lookback = 1000
if len(features) > lookback:
rolling_corr = features.iloc[-lookback:].corr().abs()
upper = rolling_corr.where(np.triu(np.ones(rolling_corr.shape), k=1).astype(bool))
to_drop = [column for column in upper.columns if any(upper[column] > 0.95)]
print(f"Column highly correlated - maybe drop? {to_drop} ")
#features = features.drop(columns=to_drop)
feature_columns = list(features.columns)
print(f"Features shape before dropna: {features.shape}")
return features.dropna(), feature_columns
```
# Targets
## Unbalanced classes
```python
from xgboost import XGBClassifier
# Compute scale_pos_weight
n_0 = sum(y_train == 0)
n_1 = sum(y_train == 1)
scale_pos_weight = n_0 / n_1
model = XGBClassifier(scale_pos_weight=scale_pos_weight, ...)
```
```python
def create_target_regressor(self, df: pd.DataFrame) -> pd.Series:
"""
https://claude.ai/chat/8e7fe81c-ddbe-4e64-9af0-2bc4764fc5f0
Creates enhanced target variable using adaptive returns based on market conditions.
Key improvements:
1. Multi-timeframe momentum approach
2. Volume-volatility regime adaptation
3. Trend-following vs mean-reversion regime detection
4. Noise reduction through sophisticated filtering
Parameters:
-----------
df : pd.DataFrame
Features df containing required columns: 'close', 'volume', volatility features
Returns:
--------
pd.Series
Enhanced target variable with cross-day targets removed
"""
future_bars= self.config.forward_bars
future_ma_fast = df['close'].shift(-future_bars).rolling(5).mean()
# Calculate forward returns (original approach)
forward_returns = df['close'].shift(-future_bars) / df['close'] - 1
target = forward_returns
# 6. Advanced noise reduction
# Use exponential moving standard deviation for dynamic thresholds
target_std = target.ewm(span=50, min_periods=20).std()
# Adaptive thresholds based on rolling standard deviation
upper_clip = 2.5 * target_std
lower_clip = -2.5 * target_std
# Apply soft clipping using hyperbolic tangent
target = target_std * np.tanh(target / target_std)
# Final hard clips for extreme outliers
target = target.clip(lower=lower_clip, upper=upper_clip)
# 7. Remove cross-day targets and intraday seasonality
target = self.remove_crossday_targets(target, df, future_bars)
#only 10% of extreme values from both sides are kept
#target = target.where((target > target.quantile(0.9)) | (target < target.quantile(0.1)), 0)
print("after target generation", target.index[[0, -1]])
return target
```

BIN
image-1.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 83 KiB

BIN
image.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 100 KiB

317
ml-snippets.md Normal file
View File

@ -0,0 +1,317 @@
- [Features](#features)
- [Features analysis](#features-analysis)
- [Target to classes](#target-to-classes)
- [Features importance](#features-importance)
- [Features selection](#features-selection)
- [Prediction](#prediction)
- [evaluation](#evaluation)
- [calculated returns based on various probability prediction thresholda](#calculated-returns-based-on-various-probability-prediction-thresholda)
- [cumulative returns bases od prob predictions](#cumulative-returns-bases-od-prob-predictions)
- [charts](#charts)
# Features
## Features analysis
```python
# Calculate different percentiles
percentiles = [1, 5, 10, 20, 30, 40, 50, 60, 70, 80, 90, 95, 99]
print("\nPercentiles:")
for p in percentiles:
print(f"{p}th percentile: {df['target'].quantile(p/100):.6f}")
# Plot distribution
plt.figure(figsize=(15, 10))
# Plot 1: Overall distribution
plt.subplot(2, 2, 1)
sns.histplot(df['target'], bins=100)
plt.title('Distribution of Returns')
plt.axvline(x=0, color='r', linestyle='--', alpha=0.5)
# Plot 2: Distribution with potential thresholds
plt.subplot(2, 2, 2)
sns.histplot(df['target'], bins=100)
plt.title('Distribution with Potential Thresholds')
# Add lines for different standard deviations
std = df['target'].std()
mean = df['target'].mean()
for i in [0.5, 1.0, 1.5]:
plt.axvline(x=mean + i*std, color='g', linestyle='--', alpha=0.3, label=f'+{i} std')
plt.axvline(x=mean - i*std, color='r', linestyle='--', alpha=0.3, label=f'-{i} std')
plt.legend()
# Let's try different threshold approaches
# Approach 1: Standard deviation based
std_multiplier = 0.2
std_threshold = std_multiplier * std
labels_std = np.where(df['target'] > std_threshold, 1,
np.where(df['target'] < -std_threshold, -1, 0))
# Approach 2: Percentile based
percentile_threshold = 0.2 # top/bottom 20%
top_threshold = df['target'].quantile(1 - percentile_threshold)
bottom_threshold = df['target'].quantile(percentile_threshold)
labels_percentile = np.where(df['target'] > top_threshold, 1,
np.where(df['target'] < bottom_threshold, -1, 0))
# Plot 3: Distribution of STD-based classes
plt.subplot(2, 2, 3)
sns.histplot(data=pd.DataFrame({'return': df['target'], 'class': labels_std}),
x='return', hue='class', bins=100)
plt.title(f'Classes Based on {std_multiplier} Standard Deviation')
plt.axvline(x=std_threshold, color='g', linestyle='--', alpha=0.5)
plt.axvline(x=-std_threshold, color='r', linestyle='--', alpha=0.5)
# Plot 4: Distribution of Percentile-based classes
plt.subplot(2, 2, 4)
sns.histplot(data=pd.DataFrame({'return': df['target'], 'class': labels_percentile}),
x='return', hue='class', bins=100)
plt.title(f'Classes Based on {percentile_threshold*100}th Percentiles')
plt.axvline(x=top_threshold, color='g', linestyle='--', alpha=0.5)
plt.axvline(x=bottom_threshold, color='r', linestyle='--', alpha=0.5)
plt.tight_layout()
plt.show()
# Print class distributions
print("\nClass Distribution (STD-based):")
print(pd.Series(labels_std).value_counts(normalize=True))
print("\nClass Distribution (Percentile-based):")
print(pd.Series(labels_percentile).value_counts(normalize=True))
# Calculate mean return for each class
print("\nMean Return by Class (STD-based):")
std_df = pd.DataFrame({'return': df['target'], 'class': labels_std})
print(std_df.groupby('class')['return'].mean())
print("\nMean Return by Class (Percentile-based):")
perc_df = pd.DataFrame({'return': df['target'], 'class': labels_percentile})
print(perc_df.groupby('class')['return'].mean())
```
<img src="image-1.png" alt="Target distributions" width="300"/>
### Target to classes
Based on std dev
```python
# Read and prepare the data
df = pd.read_csv('model_data.csv')
df = df.drop('ts_event', axis=1)
# Separate features and target
X = df.drop('target', axis=1)
y = df['target']
# Split the data first so we only use train data statistics for thresholds
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Calculate threshold based on training data only
train_std = y_train.std()
threshold = 0.2 * train_std
# Transform targets into classes (update this function) instead of -1,0,1 do 0,1,2
def create_labels(y, threshold):
return np.where(y > threshold, 2,
np.where(y < -threshold, 0, 1))
y_train_classes = create_labels(y_train, threshold)
y_test_classes = create_labels(y_test, threshold)
# Print class distribution
print("Training Class Distribution:")
print(pd.Series(y_train_classes).value_counts(normalize=True))
print("\nTest Class Distribution:")
print(pd.Series(y_test_classes).value_counts(normalize=True))
```
based on percentile/threshold
## Features importance
```python
#XGB top 20 feature importance
feature_importance = pd.DataFrame({
'feature': X.columns,
'importance': xgb_model.feature_importances_
})
feature_importance = feature_importance.sort_values('importance', ascending=False).head(20)
plt.figure(figsize=(12, 6))
sns.barplot(x='importance', y='feature', data=feature_importance)
plt.title('Top 20 Most Important Features')
plt.xlabel('Feature Importance')
plt.tight_layout()
plt.show()
```
## Features selection
# Prediction
## evaluation
```python
# Calculate directional accuracy
directional_accuracy = (np.sign(y_pred) == np.sign(y_test)).mean()
print(f"Directional Accuracy: {directional_accuracy:.4f}")
#confusion matrix
from sklearn.metrics import confusion_matrix
# Plot confusion matrix
plt.figure(figsize=(10, 8))
cm = confusion_matrix(y_test_classes, y_pred)
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title('Confusion Matrix')
plt.ylabel('True Label')
plt.xlabel('Predicted Label')
plt.show()
```
### calculated returns based on various probability prediction thresholda
```python
# .predict_proba() gives the probabilities for each class
print("Predicted probabilities:", model.predict_proba(X_test))
# Output example:
# [
# [0.35, 0.65], # 35% not spam, 65% spam
# [0.70, 0.30], # 70% not spam, 30% spam
# [0.45, 0.55], # 45% not spam, 55% spam
# ]
```
Chart probabilities
```python
# Predict probabilities for each class
probabilities = model.predict_proba(X_test) # Shape: (n_samples, n_classes)
results_df = pd.DataFrame({
'Date': dates_test,
'Short Probability': probabilities[:, 0], # Probability of class 0 (short)
'Neutral Probability': probabilities[:, 1], # Probability of class 1 (neutral)
'Long Probability': probabilities[:, 2] # Probability of class 2 (long)
}).sort_values(by='Date') # Sort by date for time series plotting
fig = go.Figure()
# Add lines for each class probability
fig.add_trace(go.Scatter(
x=results_df['Date'], y=results_df['Short Probability'],
mode='lines', name='Short (Class 0)', line=dict(color='red')
))
fig.add_trace(go.Scatter(
x=results_df['Date'], y=results_df['Neutral Probability'],
mode='lines', name='Neutral (Class 1)', line=dict(color='orange')
))
fig.add_trace(go.Scatter(
x=results_df['Date'], y=results_df['Long Probability'],
mode='lines', name='Long (Class 2)', line=dict(color='green')
))
# Add title and labels
fig.update_layout(
title="Time Series of Predicted Class Probabilities",
xaxis_title="Date",
yaxis_title="Probability",
legend_title="Class"
)
fig.show()
```
### cumulative returns bases od prob predictions
```python
# Calculate returns based on probablity predictions
def calculate_returns(predictions, actual_returns, confidence_threshold=0.0):
pred_probs = final_model.predict_proba(X_test_selected)
max_probs = np.max(pred_probs, axis=1)
# Only take positions when confidence exceeds threshold
positions = np.zeros_like(predictions, dtype=float)
confident_mask = max_probs > confidence_threshold
# Convert predictions 0->-1, 2->1 for returns calculation
adj_predictions = np.where(predictions == 2, 1, np.where(predictions == 0, -1, 0))
positions[confident_mask] = adj_predictions[confident_mask]
returns = positions * actual_returns
return returns, np.mean(confident_mask)
# Test different confidence thresholds
confidence_thresholds = [0.4, 0.5, 0.6, 0.7, 0.8]
results = []
for conf_threshold in confidence_thresholds:
returns, coverage = calculate_returns(y_pred, y_test.values, conf_threshold)
# Calculate metrics
sharpe = np.sqrt(252) * returns.mean() / returns.std()
accuracy = accuracy_score(y_test_classes[returns != 0],
y_pred[returns != 0])
results.append({
'confidence_threshold': conf_threshold,
'sharpe': sharpe,
'accuracy': accuracy,
'coverage': coverage
})
##Plot difference confidence threshodls
# Plot cumulative returns
plt.figure(figsize=(12, 6))
for th in confidence_thresholds:
returns, _ = calculate_returns(y_pred, y_test.values, th) # Using 0.6 confidence threshold
cumulative_returns = (1 + returns).cumprod()
plt.plot(cumulative_returns)
plt.title('Cumulative Returns (0.6 confidence threshold)')
plt.xlabel('Trade Number')
plt.ylabel('Cumulative Return')
plt.grid(True)
plt.show()
results_df = pd.DataFrame(results)
print("\nPerformance at different confidence thresholds:")
print(results_df)
# Plot feature importance
importance_df = pd.DataFrame({
'feature': selected_features,
'importance': final_model.feature_importances_
})
importance_df = importance_df.sort_values('importance', ascending=False)
plt.figure(figsize=(12, 6))
sns.barplot(x='importance', y='feature', data=importance_df)
plt.title('Feature Importance')
plt.xlabel('Importance')
plt.tight_layout()
plt.show()
```
## charts
```python
# Actual vs predicted values
plt.figure(figsize=(10, 6))
plt.scatter(y_test, y_pred, alpha=0.5)
plt.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
plt.xlabel('Actual Returns')
plt.ylabel('Predicted Returns')
plt.title('Actual vs Predicted Returns')
plt.tight_layout()
plt.show()
```

9
pandas-snippets.md Normal file
View File

@ -0,0 +1,9 @@
- [Basics](#basics)
# Basics
```python
df.loc[:, "D"] = np.array([5] * len(df))
df.loc[:, ["A", "B"]]
df.sort_index(axis=1, ascending=False) #axis=1 sort columns by labels, axis=0 sort by index
```

View File

@ -1,7 +1,7 @@
from setuptools import find_packages, setup
setup(name='snippets',
version='0.1.2',
version='0.2.2',
description='Snippets for vbtpro',
author='David Brazda',
author_email='davidbrazda61@gmail.com',

View File

@ -1,604 +0,0 @@
- [FETCHING DATA](#fetching-data)
- [DISCOVERY](#discovery)
- [DATA/WRAPPER](#datawrapper)
- [create WRAPPER manually](#create-wrapper-manually)
- [RESAMPLING](#resampling)
- [config](#config)
- [REALIGN](#realign)
- [REALIGN\_CLOSING accessors](#realign_closing-accessors)
- [SIGNALS](#signals)
- [ENTRIES/EXITS time based](#entriesexits-time-based)
- [STOPS](#stops)
- [OHLCSTX modul](#ohlcstx-modul)
- [WINDOW OPEN/CLOSE](#window-openclose)
- [END OF DAY EXITS](#end-of-day-exits)
- [DF/SR ACCESSORS](#dfsr-accessors)
- [Generic](#generic)
- [SIGNAL ACCESSORS](#signal-accessors)
- [RANKING - partitioning](#ranking---partitioning)
- [Base Accessors](#base-accessors)
- [Stoploss/Takeprofit](#stoplosstakeprofit)
- [SL - ATR based](#sl---atr-based)
- [EXIT after time](#exit-after-time)
- [CALLBACKS -](#callbacks--)
- [MEMORY](#memory)
- [INDICATORS DEV](#indicators-dev)
- [FAV INDICATORS](#fav-indicators)
- [GROUPING - SPLITTING](#grouping---splitting)
- [CHARTING](#charting)
- [MULTIACCOUNT](#multiaccount)
- [CUSTOM SIMULATION](#custom-simulation)
- [ANALYSIS](#analysis)
- [ROBUSTNESS](#robustness)
- [UTILS](#utils)
```python
import vectorbtpro as vbt
from lightweight_charts import Panel, chart, PlotDFAccessor, PlotSRAccessor
t15data = None
if not hasattr(pd.Series, 'lw'):
pd.api.extensions.register_series_accessor("lw")(PlotSRAccessor)
if not hasattr(pd.DataFrame, 'lw'):
pd.api.extensions.register_dataframe_accessor("lw")(PlotDFAccessor)
```
# FETCHING DATA
#fetching from remote db
from lib.db import Connection
SYMBOL = "BAC"
SCHEMA = "ohlcv_1s" #time based 1s other options ohlcv_vol_200 (volume based ohlcv with resolution of 200), ohlcv_renko_20 (renko with 20 bricks size) ...
DB = "market_data"
con = Connection(db_name=DB, default_schema=SCHEMA, create_db=True)
basic_data = con.pull(symbols=[SYMBOL], schema=SCHEMA,start="2024-08-01", end="2024-08-08", tz_convert='America/New_York')
#Fetching from YAHOO
symbols = ["AAPL", "MSFT", "AMZN", "TSLA", "AMD", "NVDA", "SPY", "QQQ", "META", "GOOG"]
data = vbt.YFData.pull(symbols, start="2024-09-28", end="now", timeframe="1H", missing_columns="nan")
#endregion
# DISCOVERY
#get parameters of method
vbt.IF.list_locations() #lists categories
vbt.IF.list_indicators(pattern="vbt") #all in category vbt
vbt.IF.list_indicators("*sma")
vbt.phelp(vbt.indicator("talib:MOM").run)
# DATA/WRAPPER
Available [methods for data](http://5.161.179.223:8000/vbt-doc/api/data/base/index.html#vectorbtpro.data.base.Data)
**Main data container** (servees as a wrapper for symbol oriented or feature oriented data)
```python
data.transform()
data.dropna()
data.feature_oriented vs data.symbol_oriented #returns True/False if cols are features or symbols
data.data #dictionary either feature oriented or
data.ohlcv #OHLCV mixin filters only ohlcv feature and offers methods http://5.161.179.223:8000/vbt-doc/api/data/base/index.html#vectorbtpro.data.base.OHLCDataMixin
data.base #base mixin - implicit offers functions wrapper methods http://5.161.179.223:8000/vbt-doc/api/data/base/index.html#vectorbtpro.data.base.BaseDataMixin
- data.symbol_wrapper
- data.feature_wrapper
- data.features
show(t1data.data["BAC"])
#display returns on top of ohlcv
t1data.ohlcv.data["BAC"].lw.plot(left=[(t1data.returns, "returns")], precision=4)
```
## create WRAPPER manually
[wrapper methods](http://5.161.179.223:8000/vbt-doc/api/base/wrapping/index.html#vectorbtpro.base.wrapping.ArrayWrapper)
```python
#create wrapper from existing objects
wrapper = data.symbol_wrapper # one column for each symbol
wrapper = data.get_symbol_wrapper() # symbol - level, one column for each symbol (BAC a pod tim series)
wrapper = data.get_feature_wrapper() #feature level, one column for each feature (open,high...)
wrapper = df.vbt.wrapper
#Create an empty array with the same shape, index, and columns as in another array
new_float_df = wrapper.fill(np.nan)
new_bool_df = wrapper.fill(False)
new_int_df = wrapper.fill(-1)
#display df/series
from itables import show
show(t1data.close)
```
# RESAMPLING
## config
```python
from vectorbtpro.utils.config import merge_dicts, Config, HybridConfig
from vectorbtpro import _typing as tp
from vectorbtpro.generic import nb as generic_nb
_feature_config: tp.ClassVar[Config] = HybridConfig(
{
"buyvolume": dict(
resample_func=lambda self, obj, resampler: obj.vbt.resample_apply(
resampler,
generic_nb.sum_reduce_nb,
)
),
"sellvolume": dict(
resample_func=lambda self, obj, resampler: obj.vbt.resample_apply(
resampler,
generic_nb.sum_reduce_nb,
)
),
"trades": dict(
resample_func=lambda self, obj, resampler: obj.vbt.resample_apply(
resampler,
generic_nb.sum_reduce_nb,
)
)
}
)
basic_data._feature_config = _feature_config
```
ddd
#1s to 1T
t1data = basic_data[['open', 'high', 'low', 'close', 'volume','vwap','buyvolume','trades','sellvolume']].resample("1T")
t1data = t1data.transform(lambda df: df.between_time('09:30', '16:00').dropna())
#using resampler (with more control over target index)
resampler_s = vbt.Resampler(target_data.index, source_data.index, source_freq="1T", target_freq="1s")
basic_data.resample(resampler_s)
# REALIGN
`REALIGN` method - runs on data object (OHLCV) - (open feature realigns leftbound, rest of features rightboud) .resample("1T").first().ffill()
```python
ffill=True = same frequency as t1data.index
ffill=False = keeps original frequency but moved to where data are available ie. instead of 15:30 to 15:44 for 15T bar
t15data_realigned = t15data.realign(t1data.index, ffill=True, freq="1T") #freq - target frequency
```
## REALIGN_CLOSING accessors
```python
t15data_realigned_close = t15data.close.vbt.realign_closing(t1data.index, ffill=True, freq="1T")
t15data_realigned_open = t15data.open.vbt.realign_open(t1data.index, ffill=True, freq="1T")
```
#realign_closing accessor just calls
#return self.realign(*args, source_rbound=False, target_rbound=False, **kwargs)
#realign opening
#return self.realign(*args, source_rbound=True, target_rbound=True, **kwargs)
#using RESAMPLER
#or
resampler_s = vbt.Resampler(t15data.index, t1data.index, source_freq="1T", target_freq="1s")
t15close_realigned_with_resampler = t1data.data["BAC"].realign_closing(resampler_s)
# SIGNALS
```python
cond1 = data.get("Low") < bb.lowerband
#comparing with previous value
cond2 = bandwidth > bandwidth.shift(1)
#comparing with value week ago
cond2 = bandwidth > bandwidth.vbt.ago("7d")
mask = cond1 & cond2
mask.sum()
```
## ENTRIES/EXITS time based
```python
#create entries/exits based on open of first symbol
entries = pd.DataFrame.vbt.signals.empty_like(data.open.iloc[:,0])
#create entries/exits based on symbol level
symbol_wrapper = data.get_symbol_wrapper()
entries = symbol_wrapper.fill(False)
exits = symbol_wrapper.fill(False)
entries.vbt.set(
True,
every="W-MON",
at_time="00:00:00",
indexer_method="bfill", # this time or after
inplace=True
)
exits.vbt.set(
True,
every="W-MON",
at_time="23:59:59",
indexer_method="ffill", # this time or before
inplace=True
)
```
## STOPS
[doc from_signal](http://5.161.179.223:8000/vbt-doc/api/portfolio/base/#vectorbtpro.portfolio.base.Portfolio.from_signals)
- StopExitPrice (Which price to use when exiting a position upon a stop signal?)
- StopEntryPrice (Which price to use as an initial stop price?)
price = close.vbt.wrapper.fill()
price[entries] = entry_price
price[exits] = exit_price
## OHLCSTX modul
- exit signal generator based on price and stop values
[doc](ttp://5.161.179.223:8000/vbt-doc/api/signals/generators/ohlcstx/index.html)
## WINDOW OPEN/CLOSE
## END OF DAY EXITS
```python
sr = t1data.data["BAC"]
last_n_daily_rows = sr.groupby(sr.index.date).tail(4) #or N last rows
second_last_daily_row = sr.groupby(sr.index.date).nth(-2) #or Nth last row
second_last_two_rows = sr.groupby(sr.index.date).apply(lambda x: x.iloc[-3:-1]).droplevel(0) #or any slice of rows
#create exit array
exits = t1data.get_symbol_wrapper().fill(False)
exits.loc[last_n_daily_rows.index] = True
#visualize
t1data.ohlcv.data["BAC"].lw.plot(right=[(t1data.close,"close",exits)], size="s")
#REGULAR EXITS -EVERY HOUR/D/WEEK exits
exits.vbt.set(
True,
every="H" # "min" "2min" "2H" "W-MON"+at time "D"+time
#at_time="23:59:59",
indexer_method="ffill", # this time or before
inplace=True
)
```
# DF/SR ACCESSORS
## Generic
- for common taks ([docs](http://5.161.179.223:8000/vbt-doc/api/generic/accessors/index.html#vectorbtpro.generic.accessors.GenericAccessor))
`rolling_apply` - runs custom function over a rolling window of a fixed size (number of bars or frequency)
`expanding_apply` - runs custome function over expanding the window from the start of the data to the current poin
```python
from numba import njit
mean_nb = njit(lambda a: np.nanmean(a))
hourly_anchored_expanding_mean = t1data.close.vbt.rolling_apply("1H", mean_nb) #ROLLING to FREQENCY or with fixed windows rolling_apply(10,mean_nb)
t1data.ohlcv.data["BAC"].lw.plot(right=[(t1data.close,"close"),(hourly_anchored_expanding_mean, "hourly_anchored_expanding_mean")], size="s")
#NOTE for anchored "1D" frequency - it measures timedelta that means requires 1 day between reseting (16:00 end of market, 9:30 start - not a full day, so it is enOugh to set 7H)
df['a'].vbt.overlay_with_heatmap(df['b']).show()
```
## SIGNAL ACCESSORS
- http://5.161.179.223:8000/vbt-doc/api/signals/accessors/#vectorbtpro.signals.accessors.SignalsAccessor
## RANKING - partitioning
```python
#pos_rank -1 when False, 0, 1 ... for consecutive Trues, allow_gaps defautlne False
# sample_mask = pd.Series([True, True, False, True, True])
ranked = sample_mask.vbt.signals.pos_rank()
ranked == 1 #select each second signal in each partition
ranked = sample_mask.vbt.signals.pos_rank(allow_gaps=True)
(ranked > -1) & (ranked % 2 == 1) #Select each second signal globally
entries.vbt.signals.first() #selects only first entries in each group
entries.vbt.signals.from_nth(n) # pos_rank >= n in each group, all from Nth
#AFTER - with variants _after which resets partition each reset array
#maximum number of exit signals after each entry signal
exits.vbt.signals.pos_rank_after(entries, reset_wait=0).max() + 1 #Count is the maximum rank plus one since ranks start with zero. We also assume that an entry signal comes before an exit signal if both are at the same timestamp by passing reset_wait=0.
entries.vbt.signals.total_partitions
#partition_pos_rank - all members of each partition have the same rank
ranked = sample_mask.vbt.signals.partition_pos_rank(allow_gaps=True) #0,0,-1,1,1
ranked == 1 # the whole second partition
```
## Base Accessors
* low level accessors - http://5.161.179.223:8000/vbt-doc/api/base/accessors/index.html#vectorbtpro.base.accessors.BaseAccessor
```python
exits.vbt.set(
True,
every="W-MON",
at_time="23:59:59",
indexer_method="ffill", # this time or before
inplace=True
)
```
# Stoploss/Takeprofit
[doc StopOrders](http://5.161.179.223:8000/vbt-doc/documentation/portfolio/from-signals/index.html#stop-orders)
## SL - ATR based
```
atr = data.run("atr").atr
pf = vbt.Portfolio.from_signals(
data,
entries=entries,
sl_stop=atr / sub_data.close
)
```
## EXIT after time
using [from signals](http://5.161.179.223:8000/vbt-doc/cookbook/portfolio/index.html#from-signals)
```python
f = vbt.PF.from_signals(..., td_stop="7 days")
pf = vbt.PF.from_signals(..., td_stop=pd.Timedelta(days=7))
pf = vbt.PF.from_signals(..., td_stop=td_arr)
#EXIT at time
pf = vbt.PF.from_signals(..., dt_stop="16:00") #exit at 16 and later
pf = vbt.PF.from_signals(..., dt_stop=datetime.time(16, 0))
pf = vbt.PF.from_signals( #exit last bar before
...,
dt_stop="16:00",
arg_config=dict(dt_stop=dict(last_before=True))
)
```
## CALLBACKS -
- a signal function (`signal_func_nb`)
- can dynamically generate signals (True, True, False,False)
- runs at beginning of bar
- an adjustment function (`adjust_func_nb`) - [doc](http://5.161.179.223:8000/vbt-doc/documentation/portfolio/from-signals/#adjustment)
- runs only if signal function above was not provided, but entry,exit arrays
- runs before default signal function [ls_signal_func_nb](http://5.161.179.223:8000/vbt-doc/api/portfolio/nb/from_signals/index.html#vectorbtpro.portfolio.nb.from_signals.ls_signal_func_nb)
- can change pending limit orders etc.
- a post-signal function (`post_signal_func_nb`)
- post-segment function (`post_segment_func_nb`)
all of them are accessing [SignalContext](http://5.161.179.223:8000/vbt-doc/api/portfolio/enums/index.html#vectorbtpro.portfolio.enums.SignalContext) `(c)` as named tuple
SignalContaxt (contains various metrics) such as:
* last_limit_info - 1D with latest limit order per column
* order_counts
* last_return ...
"""
### MEMORY
save an information piece at one timestamp and re-use at a later timestamp when using [callbacks memory](http://5.161.179.223:8000/vbt-doc/cookbook/portfolio/index.html#callbacks)
Usecases:
* [MULTIPLE simultaneuos LIMIT ORDERS at TIME](http://5.161.179.223:8000/vbt-doc/cookbook/portfolio/index.html#callbacks)
* [IGNORE ENTRIES number of DAYS after losing trade](http://5.161.179.223:8000/vbt-doc/cookbook/portfolio/index.html#callbacks) - signal function
# INDICATORS DEV
```python
#REGISTER CUSTOM INDICATOR
vbt.IndicatorFactory.register_custom_indicator(
SupportResistance,
name="SUPPRES",
location=None,
if_exists='raise'
)
#RUN INDICATOR on DATA WRAPPER
cdlbreakaway = s1data.run(vbt.indicator("talib:CDLHAMMER"), skipna=True, timeframe=["12s"])
#FROM EXPRESSION http://5.161.179.223:8000/vbt-doc/api/indicators/factory/#vectorbtpro.indicators.factory.IndicatorFactory.from_expr
WMA = vbt.IF(
class_name='WMA',
input_names=['close'],
param_names=['window'],
output_names=['wma']
).from_expr("wm_mean_nb(close, window)")
wma = WMA.run(t1data.close, window=10)
wma.wma
```
# FAV INDICATORS
```python
#for TALIB indicator always use skipna=True
#TALIB INDICATORS can do realing closing : timeframe=["1T"]
mom_multi = vbt.indicator("talib:MOM").run(t1data.close, timeperiod=5, timeframe=["1T","5T"], skipna=True) #returned 5T can be directly compared with 1T
#ANCHORED indciators vbt.indicator("talib:MOM") becomes AnchoredIndicator("talib:MOM", anchor="D") - freq of pd.Grouper
from ttools import AnchoredIndicator
mom_anch_d = AnchoredIndicator("talib:MOM", anchor='30min').run(t1data.data["BAC"].close, timeperiod=10)
mom = vbt.indicator("talib:MOM").run(t1data.data["BAC"].close, timeperiod=10, skipna=True)
t1data.ohlcv.data["BAC"].lw.plot(auto_scale=[mom_anch_d, mom])
#FIBO RETRACEMENT
fibo = vbt.indicator("technical:FIBONACCI_RETRACEMENTS").run(t1data.close, skipna=True)
#fibo.fibonacci_retracements
fibo_plusclose = t1data.close + fibo.fibonacci_retracements
fibo_minusclose = t1data.close - fibo.fibonacci_retracements
#fibo_plusclose
Panel(
auto_scale=[fibo_plusclose["BAC"]],
ohlcv=(t1data.ohlcv.data["BAC"],),
histogram=[],
right=[(fibo_plusclose["BAC"],),(fibo_minusclose["BAC"],)],
left=[],
middle1=[(fibo.fibonacci_retracements["BAC"],"fibonacci_retracements")],
middle2=[]
).chart(size="xs")
#CHOPINESS indicator
chopiness = vbt.indicator("technical:CHOPINESS").run(s1data.open, s1data.high, s1data.low, s1data.close, t1data.volume, skipna=True)
s1data.ohlcv.data["BAC"].lw.plot(auto_scale=[chopiness])
#anchored VWAP
t1vwap_h = vbt.VWAP.run(t1data.high, t1data.low, t1data.close, t1data.volume, anchor="H")
t1vwap_h_real = t1vwap_h.vwap.vbt.realign_closing(resampler_s)
#BBANDS = vbt.indicator("pandas_ta:BBANDS")
mom_anch_d = AnchoredIndicator("talib:MOM", anchor='30min').run(t1data.data["BAC"].close, timeperiod=10)
mom = vbt.indicator("talib:MOM").run(t1data.data["BAC"].close, timeperiod=10, skipna=True)
#macd = vbt.indicator("talib:MACD").run(t1data.data["BAC"].close) #, timeframe=["1T"]) #,
t1data.ohlcv.data["BAC"].lw.plot(auto_scale=[mom_anch_d, mom])
```
# GROUPING - SPLITTING
```python
#SPLITTER - splitting wrapper based on index
#http://5.161.179.223:8000/vbt-doc/tutorials/cross-validation/splitter/index.html#anchored
daily_splitter = vbt.Splitter.from_grouper(t1data.index, "D", split=None) #DOES contain last DAY
daily_splitter = vbt.Splitter.from_ranges( #doesnt contain last DY
t1data.index,
every="D",
split=None
)
daily_splitter.stats()
daily_splitter.plot()
daily_splitter.coverage()
#TAKING and APPLY MANUALLY - run UDF on ALL takes and concatenates
taken = daily_splitter.take(t1data)
inds = []
for series in taken:
mom = vbt.indicator("talib:MOM").run(series.close, timeperiod=10, skipna=True)
inds.append(mom)
mom_daily = vbt.base.merging.row_stack_merge(inds) #merge
mom = vbt.indicator("talib:MOM").run(t1data.close, timeperiod=10, skipna=True)
t1data.ohlcv.data["BAC"].lw.plot(left=[(mom_daily, "daily_splitter"),(mom, "original mom")]) #OHLCV with indicators on top
#TAKING and APPLY AUTOMATIC
daily_splitter = vbt.Splitter.from_grouper(t1data.index, "D", split=None) #DOES contain last DAY
def indi_run(sr):
return vbt.indicator("talib:MOM").run(sr.close, timeperiod=10, skipna=True)
res = daily_splitter.apply(indi_run, vbt.Takeable(t1data), merge_func="row_stack", freq="1T")
#use of IDX accessor (docs:http://5.161.179.223:8000/vbt-doc/api/base/accessors/index.html#vectorbtpro.base.accessors.BaseIDXAccessor)
daily_grouper = t1data.index.vbt.get_grouper("D")
#grouper instance can be iterated over
for name, indices in daily_grouper.iter_groups():
print(name, indices)
#PANDAS GROUPING - series/df grouping resulting in GroupBySeries placeholder that can be aggregated(sum, mean), transformed iterated over or fitlered
for name, group in t1data.data["BAC"].close.groupby(pd.Grouper(freq='D')):
print(name, group)
```
# CHARTING
Using [custom lightweight-charts-python](https://github.com/drew2323/lightweight-charts-python)
```python
#LW df/sr accessor
t1data.ohlcv.data["BAC"].lw.plot(left=[(mom_multi, "mom_multi")]) #OHLCV with indicators on top
t5data.ohlcv.data["BAC"].lw.plot(
left=[(mom_multi.real, "mom"),(mom_multi_beztf, "mom_beztf"), (mom_5t_orig, "mom_5t_orig"), (mom_5t_orig_realigned, "mom_5t_orig_realigned")],
right=[(t1data.data["BAC"].close, "t1 close"),(t5data.data["BAC"].close, "t5 close")],
size="s") #.loc[:,(20,"1T","BAC")]
#SINGLE PANEL
Panel(
auto_scale=[cdlbreakaway],
ohlcv=(t1data.ohlcv.data["BAC"],entries),
histogram=[],
right=[],
left=[],
middle1=[],
middle2=[]
).chart(size="xs")
#MULTI PANEL
pane1 = Panel(
#auto_scale=[mom_multi, mom_multi_1t],
#ohlcv=(t1data.data["BAC"],), #(series, entries, exits, other_markers)
#histogram=[(order_imbalance_allvolume, "oivol")], # [(series, name, "rgba(53, 94, 59, 0.6)", opacity)]
right=[(t1data.data["BAC"].close,"close 1T"),(t5data.data["BAC"].close,"close 5T"),(mom_multi_1t.close, "mom multi close")], # [(series, name, entries, exits, other_markers)]
left=[(mom_multi, "mom_multi"), (mom_multi_1t, "mom_multi_1t")],
#middle1=[],
#middle2=[],
#xloc="2024-02-12 09:30",
precision=3
)
pane2 = Panel(....)
ch = chart([pane1, pane2], size="s")
```
# MULTIACCOUNT
Simultaneous LONG and short (hedging)
In vbt position requires one column of data, so hedging is possible by using two columns representing the same asset but different directions,
then stack both portfolio together [column stacking](http://5.161.179.223:8000/vbt-doc/features/productivity/#column-stacking)
pf_join = vbt.PF.column_stack((pf1, pf2), group_by=True)
# CUSTOM SIMULATION
# ANALYSIS
## ROBUSTNESS
```python
pf_stats.sort_values(by='Sharpe Ratio', ascending=False).iloc[::-1].vbt.heatmap().show() #works when there are more metrics
```
#endregion
# UTILS
```python
#MEMORY
sr.info()
#peak memory usage, running once
with vbt.MemTracer() as tracer:
my_pipeline()
print(tracer.peak_usage())
#CACHE
vbt.print_cache_stats()
vbt.print_cache_stats(vbt.PF)
vbt.flush() #clear cache and collect garbage
vbt.clear_cache(pf) #of specific
#TIMING
#running once
with vbt.Timer() as timer:
my_pipeline()
print(timer.elapsed())
#multiple times
print(vbt.timeit(my_pipeline))
#NUMBA
#numba doesnt return error when indexing out of bound, this raises the error
import os
os.environ["NUMBA_BOUNDSCHECK"] = "1"
```

1627
vbt-snippets.md Normal file

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,893 @@
Custom metrics in VectorBT PRO, which is a powerful feature for extending portfolio analysis beyond the built-in metrics.Now let me provide a comprehensive elaboration on custom metrics in VectorBT PRO:
# Custom Metrics in VectorBT PRO
Custom metrics in VectorBT PRO provide a powerful way to extend portfolio analysis beyond the built-in metrics. They allow you to calculate domain-specific metrics, implement proprietary performance measures, or create metrics tailored to your specific trading strategies.
## 1. Understanding the Metrics System
### Built-in Metrics Structure
VectorBT PRO uses a configuration-based approach where metrics are stored in `Portfolio.metrics` as a `HybridConfig`:
```python
# View all available metrics
print(vbt.Portfolio.metrics)
# Get specific metric configuration
print(vbt.Portfolio.metrics['sharpe_ratio'])
```
### Metric Configuration Structure
Each metric is defined as a dictionary with specific keys:
```python
metric_config = {
'title': 'My Custom Metric', # Display name
'calc_func': calculation_function, # Function to calculate the metric
'tags': ['custom', 'risk'], # Tags for filtering
'apply_to_timedelta': False, # Whether to convert to timedelta
'agg_func': None, # Aggregation function
'resolve_calc_func': True, # Whether to resolve attributes
# ... other configuration options
}
```
## 2. Creating Custom Metrics
### Method 1: Simple Function-Based Metrics
```python
# Add a simple custom metric
vbt.Portfolio.metrics['total_bars'] = dict(
title='Total Bars',
calc_func=lambda self: len(self.wrapper.index)
)
# Add skewness and kurtosis
vbt.Portfolio.metrics['skew'] = dict(
title='Skew',
calc_func='returns.skew'
)
vbt.Portfolio.metrics['kurtosis'] = dict(
title='Kurtosis',
calc_func='returns.kurtosis'
)
```
### Method 2: Complex Custom Calculations
```python
# Custom metric with multiple parameters
def total_return_no_fees(self, orders):
"""Calculate total return without fees"""
return (self.total_profit + orders.fees.sum()) / self.get_init_cash() * 100
vbt.Portfolio.metrics['total_return_no_fees'] = dict(
title='Total Return (No Fees) [%]',
calc_func=total_return_no_fees,
resolve_orders=True # Automatically resolve orders parameter
)
```
### Method 3: Using Lambda Functions with Settings
```python
# PnL in dollar terms (for futures trading)
vbt.Portfolio.metrics['pnl_dollars'] = dict(
title='PnL ($)',
calc_func=lambda self, settings: (self.value.iloc[-1] - self.value.iloc[0]) * 50,
resolve_calc_func=False # Don't resolve attributes automatically
)
```
## 3. Advanced Custom Metrics
### Quantile-Based Metrics
```python
def value_at_risk_custom(returns, confidence_level=0.05):
"""Custom VaR calculation"""
return returns.quantile(confidence_level)
vbt.Portfolio.metrics['custom_var'] = dict(
title='Custom VaR (5%)',
calc_func=value_at_risk_custom,
resolve_returns=True,
confidence_level=0.05
)
```
### Multi-Component Metrics
```python
def comprehensive_trade_stats(trades):
"""Return multiple trade statistics"""
return {
'long_trades': trades.direction_long.count(),
'short_trades': trades.direction_short.count(),
'long_pnl': trades.direction_long.pnl.sum(),
'short_pnl': trades.direction_short.pnl.sum(),
'avg_trade_duration': trades.duration.mean()
}
vbt.Portfolio.metrics['trade_breakdown'] = dict(
title='Trade Breakdown',
calc_func=comprehensive_trade_stats,
resolve_trades=True
)
```
### Time-Based Metrics
```python
def monthly_returns_volatility(returns):
"""Calculate monthly returns volatility"""
monthly_returns = returns.resample('M').sum()
return monthly_returns.std() * np.sqrt(12)
vbt.Portfolio.metrics['monthly_vol'] = dict(
title='Monthly Volatility',
calc_func=monthly_returns_volatility,
resolve_returns=True
)
```
## 4. Metric Resolution and Parameters
### Automatic Parameter Resolution
VectorBT PRO can automatically resolve portfolio attributes as parameters:
```python
# These parameters will be automatically resolved:
vbt.Portfolio.metrics['custom_metric'] = dict(
title='Custom Metric',
calc_func=lambda returns, trades, orders: calculation_logic(returns, trades, orders),
resolve_returns=True, # Passes self.returns
resolve_trades=True, # Passes self.trades
resolve_orders=True # Passes self.orders
)
```
### Common Resolvable Parameters
- `self` - The portfolio instance
- `returns` - Portfolio returns
- `trades` - Trade records
- `orders` - Order records
- `drawdowns` - Drawdown records
- `value` - Portfolio value
- `close` - Close prices
- `init_cash` - Initial cash
- `total_profit` - Total profit
- `wrapper` - Array wrapper (for index/column info)
## 5. Global vs Instance-Level Metrics
### Global Metrics (Class-Level)
```python
# Add to all future Portfolio instances
vbt.Portfolio.metrics['my_metric'] = metric_config
# Or modify settings globally
vbt.settings.portfolio['stats']['metrics'] = list(vbt.Portfolio.metrics.items()) + [
('my_metric', metric_config)
]
```
### Instance-Level Metrics
```python
# Add to specific portfolio instance
pf._metrics['my_metric'] = metric_config
# Then use it
pf.stats(['my_metric'])
```
## 6. Using Custom Metrics
### Basic Usage
```python
# Calculate specific custom metrics
pf.stats(['total_bars', 'skew', 'kurtosis'])
# Calculate all metrics including custom ones
pf.stats('all')
# Filter by tags
pf.stats(tags=['custom'])
```
### Advanced Usage with Settings
```python
# Use custom metrics in optimization
results = []
for param in parameter_combinations:
pf = vbt.Portfolio.from_signals(close, entries, exits, **param)
stats = pf.stats(['total_return', 'sharpe_ratio', 'my_custom_metric'])
results.append(stats)
# Create comparison DataFrame
comparison_df = pd.DataFrame(results)
```
## 7. Real-World Examples
### Futures Trading Metrics
```python
# Point-based P&L for futures
vbt.Portfolio.metrics['pnl_points'] = dict(
title='P&L (Points)',
calc_func=lambda self: (self.value.iloc[-1] - self.value.iloc[0]) / self.close.iloc[0] * 10000
)
# Risk-adjusted return for futures
vbt.Portfolio.metrics['risk_adjusted_return'] = dict(
title='Risk Adjusted Return',
calc_func=lambda self, returns: self.total_return / returns.std() * np.sqrt(252),
resolve_returns=True
)
```
### Intraday Strategy Metrics
```python
# Time-of-day analysis
def intraday_performance(orders):
"""Analyze performance by hour of day"""
order_df = orders.records_readable
order_df['hour'] = order_df.index.hour
return order_df.groupby('hour')['PnL'].mean()
vbt.Portfolio.metrics['hourly_performance'] = dict(
title='Hourly Performance',
calc_func=intraday_performance,
resolve_orders=True
)
```
### Market Regime Metrics
```python
def regime_performance(returns, benchmark_returns):
"""Performance in different market regimes"""
bull_mask = benchmark_returns > benchmark_returns.quantile(0.6)
bear_mask = benchmark_returns < benchmark_returns.quantile(0.4)
return {
'bull_return': returns[bull_mask].mean(),
'bear_return': returns[bear_mask].mean(),
'bull_sharpe': returns[bull_mask].mean() / returns[bull_mask].std() * np.sqrt(252),
'bear_sharpe': returns[bear_mask].mean() / returns[bear_mask].std() * np.sqrt(252)
}
vbt.Portfolio.metrics['regime_analysis'] = dict(
title='Market Regime Analysis',
calc_func=regime_performance,
resolve_returns=True,
resolve_bm_returns=True
)
```
## 8. Best Practices
### 1. Naming Conventions
- Use descriptive names: `monthly_volatility` instead of `mv`
- Include units in title: `'Max Drawdown [%]'`
- Use consistent naming patterns
### 2. Error Handling
```python
def robust_metric(returns):
"""Metric with error handling"""
try:
if len(returns) < 2:
return np.nan
return returns.std() * np.sqrt(252)
except Exception as e:
print(f"Error calculating metric: {e}")
return np.nan
```
### 3. Performance Optimization
```python
# Use vectorized operations
def efficient_metric(returns):
"""Efficient vectorized calculation"""
return returns.rolling(30).std().mean()
# Avoid loops when possible
def inefficient_metric(returns):
"""Avoid this approach"""
results = []
for i in range(len(returns)):
results.append(some_calculation(returns.iloc[i]))
return np.mean(results)
```
### 4. Documentation
```python
vbt.Portfolio.metrics['documented_metric'] = dict(
title='Well Documented Metric',
calc_func=lambda returns: returns.std() * np.sqrt(252),
resolve_returns=True,
tags=['custom', 'risk', 'volatility'],
# Add description in comments or docstrings
)
```
## 9. Common Pitfalls and Solutions
### Pitfall 1: Metric Not Available After Creation
```python
# ❌ Wrong: Metric added after portfolio creation
pf = vbt.Portfolio.from_signals(...)
vbt.Portfolio.metrics['my_metric'] = metric_config
pf.stats(['my_metric']) # KeyError!
# ✅ Correct: Add metric before portfolio creation
vbt.Portfolio.metrics['my_metric'] = metric_config
pf = vbt.Portfolio.from_signals(...)
pf.stats(['my_metric']) # Works!
```
### Pitfall 2: Incorrect Parameter Resolution
```python
# ❌ Wrong: Using external variables
portfolio_instance = some_portfolio
vbt.Portfolio.metrics['bad_metric'] = dict(
calc_func=lambda self: portfolio_instance.total_return # External reference
)
# ✅ Correct: Using self parameter
vbt.Portfolio.metrics['good_metric'] = dict(
calc_func=lambda self: self.total_return # Self reference
)
```
### Pitfall 3: Missing Error Handling
```python
# ❌ Wrong: No error handling
def risky_metric(trades):
return trades.pnl.sum() / trades.duration.mean() # Division by zero possible
# ✅ Correct: With error handling
def safe_metric(trades):
if len(trades) == 0 or trades.duration.mean() == 0:
return np.nan
return trades.pnl.sum() / trades.duration.mean()
```
Custom metrics in VectorBT PRO provide unlimited flexibility to analyze your trading strategies exactly how you need. They integrate seamlessly with the existing stats system and can be used in optimization, comparison, and reporting workflows.
I'll provide a comprehensive analysis of VectorBT PRO's `pf.trades` analysis capabilities, with a focus on the specific metrics you mentioned.# Comprehensive VectorBT PRO `pf.trades` Analysis
The `pf.trades` object in VectorBT PRO provides extensive capabilities for analyzing trading performance. Here's a comprehensive guide focusing on directional analysis, temporal patterns, and advanced trade analytics.
## 1. Basic Trade Analysis
### Trade Counts by Direction
```python
# Basic trade counts
total_trades = pf.trades.count()
long_trades = pf.trades.direction_long.count()
short_trades = pf.trades.direction_short.count()
print(f"Total trades: {total_trades}")
print(f"Long trades: {long_trades}")
print(f"Short trades: {short_trades}")
# Alternative using records
trade_records = pf.trades.records_readable
direction_counts = trade_records['Direction'].value_counts()
print(f"\nDirection breakdown:\n{direction_counts}")
```
### P&L Analysis by Direction
```python
# Total P&L by direction
long_pnl = pf.trades.direction_long.pnl.sum()
short_pnl = pf.trades.direction_short.pnl.sum()
total_pnl = pf.trades.pnl.sum()
print(f"Long P&L: {long_pnl:.2f}")
print(f"Short P&L: {short_pnl:.2f}")
print(f"Total P&L: {total_pnl:.2f}")
# P&L statistics by direction
long_stats = pf.trades.direction_long.pnl.describe()
short_stats = pf.trades.direction_short.pnl.describe()
```
## 2. Daily P&L Analysis
### Daily P&L Calculation
```python
# Method 1: Using trade records with date grouping
trade_records = pf.trades.records_readable
trade_records['exit_date'] = trade_records.index.date
# Daily P&L overall
daily_pnl = trade_records.groupby('exit_date')['PnL'].sum()
# Daily P&L by direction
daily_pnl_by_direction = trade_records.groupby(['exit_date', 'Direction'])['PnL'].sum().unstack(fill_value=0)
print("Daily P&L by Direction:")
print(daily_pnl_by_direction.head())
```
### Daily P&L for Each Direction
```python
# Separate long and short daily P&L
long_trades_records = trade_records[trade_records['Direction'] == 'Long']
short_trades_records = trade_records[trade_records['Direction'] == 'Short']
daily_long_pnl = long_trades_records.groupby('exit_date')['PnL'].sum()
daily_short_pnl = short_trades_records.groupby('exit_date')['PnL'].sum()
# Combine into comprehensive daily analysis
daily_analysis = pd.DataFrame({
'Total_PnL': daily_pnl,
'Long_PnL': daily_long_pnl,
'Short_PnL': daily_short_pnl,
'Long_Trades': long_trades_records.groupby('exit_date').size(),
'Short_Trades': short_trades_records.groupby('exit_date').size()
}).fillna(0)
print("Daily Trade Analysis:")
print(daily_analysis.head())
```
## 3. Hourly P&L Analysis by Exit Time
### Hourly P&L by Direction
```python
# Extract hour from exit time
trade_records = pf.trades.records_readable
trade_records['exit_hour'] = trade_records.index.hour
# Hourly P&L analysis
hourly_pnl_analysis = trade_records.groupby(['exit_hour', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count'],
'Return': ['mean', 'std']
}).round(4)
print("Hourly P&L Analysis by Direction:")
print(hourly_pnl_analysis)
# Separate analysis for each direction
hourly_long_pnl = trade_records[trade_records['Direction'] == 'Long'].groupby('exit_hour')['PnL'].agg(['sum', 'mean', 'count'])
hourly_short_pnl = trade_records[trade_records['Direction'] == 'Short'].groupby('exit_hour')['PnL'].agg(['sum', 'mean', 'count'])
print("\nHourly Long P&L:")
print(hourly_long_pnl)
print("\nHourly Short P&L:")
print(hourly_short_pnl)
```
### Advanced Hourly Analysis
```python
# Create comprehensive hourly performance matrix
def hourly_performance_analysis(trades_records):
"""Comprehensive hourly performance analysis"""
# Add time components
trades_records['exit_hour'] = trades_records.index.hour
trades_records['entry_hour'] = pd.to_datetime(trades_records['Entry Index']).dt.hour
# Hourly exit analysis
hourly_stats = trades_records.groupby(['exit_hour', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count', 'std'],
'Return': ['mean', 'std'],
'Size': 'mean'
}).round(4)
return hourly_stats
hourly_performance = hourly_performance_analysis(trade_records)
```
## 4. Day of Week Analysis
### P&L by Day of Week and Direction
```python
# Add day of week analysis
trade_records['exit_day_of_week'] = trade_records.index.day_name()
trade_records['exit_weekday'] = trade_records.index.weekday # 0=Monday, 6=Sunday
# Day of week P&L analysis
dow_analysis = trade_records.groupby(['exit_day_of_week', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count'],
'Return': ['mean', 'std'],
'Size': 'mean'
}).round(4)
print("Day of Week Analysis:")
print(dow_analysis)
# Pivot for easier viewing
dow_pivot = trade_records.pivot_table(
index='exit_day_of_week',
columns='Direction',
values='PnL',
aggfunc=['sum', 'mean', 'count'],
fill_value=0
)
print("\nDay of Week Pivot Analysis:")
print(dow_pivot)
```
### Advanced Day of Week Patterns
```python
# Create comprehensive day of week analysis
def day_of_week_analysis(trades_records):
"""Comprehensive day of week performance analysis"""
# Ensure we have day names in proper order
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
trades_records['exit_day_name'] = trades_records.index.day_name()
# Group by day and direction
dow_stats = trades_records.groupby(['exit_day_name', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count', 'std'],
'Return': ['mean', 'std'],
'Size': 'mean',
'Entry Fees': 'mean',
'Exit Fees': 'mean'
}).round(4)
# Reorder by day
dow_stats = dow_stats.reindex(day_order, level=0)
return dow_stats
dow_comprehensive = day_of_week_analysis(trade_records)
```
## 5. Advanced Temporal Analysis
### Combined Time Pattern Analysis
```python
# Create comprehensive time pattern analysis
def comprehensive_time_analysis(pf):
"""Complete temporal analysis of trades"""
trades_records = pf.trades.records_readable
# Add all time components
trades_records['exit_hour'] = trades_records.index.hour
trades_records['exit_day_name'] = trades_records.index.day_name()
trades_records['exit_month'] = trades_records.index.month
trades_records['exit_date'] = trades_records.index.date
# 1. Hourly analysis
hourly_stats = trades_records.groupby(['exit_hour', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count'],
'Return': ['mean', 'std']
}).round(4)
# 2. Daily analysis
daily_stats = trades_records.groupby(['exit_day_name', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count'],
'Return': ['mean', 'std']
}).round(4)
# 3. Monthly analysis
monthly_stats = trades_records.groupby(['exit_month', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count'],
'Return': ['mean', 'std']
}).round(4)
# 4. Combined hour-day analysis
hour_day_stats = trades_records.groupby(['exit_day_name', 'exit_hour', 'Direction']).agg({
'PnL': ['sum', 'mean', 'count']
}).round(4)
return {
'hourly': hourly_stats,
'daily': daily_stats,
'monthly': monthly_stats,
'hour_day': hour_day_stats
}
# Execute comprehensive analysis
time_analysis = comprehensive_time_analysis(pf)
# Display results
print("=== HOURLY ANALYSIS ===")
print(time_analysis['hourly'])
print("\n=== DAILY ANALYSIS ===")
print(time_analysis['daily'])
print("\n=== MONTHLY ANALYSIS ===")
print(time_analysis['monthly'])
```
## 6. Custom Metrics for Trade Analysis
### Custom Trade Metrics
```python
# Add custom metrics to Portfolio for directional analysis
vbt.Portfolio.metrics['long_trade_count'] = dict(
title='Long Trade Count',
calc_func=lambda trades: trades.direction_long.count(),
resolve_trades=True
)
vbt.Portfolio.metrics['short_trade_count'] = dict(
title='Short Trade Count',
calc_func=lambda trades: trades.direction_short.count(),
resolve_trades=True
)
vbt.Portfolio.metrics['long_pnl_total'] = dict(
title='Long P&L Total',
calc_func=lambda trades: trades.direction_long.pnl.sum(),
resolve_trades=True
)
vbt.Portfolio.metrics['short_pnl_total'] = dict(
title='Short P&L Total',
calc_func=lambda trades: trades.direction_short.pnl.sum(),
resolve_trades=True
)
# Temporal metrics
vbt.Portfolio.metrics['best_hour_pnl'] = dict(
title='Best Hour P&L',
calc_func=lambda trades: trades.records_readable.groupby(trades.records_readable.index.hour)['PnL'].sum().max(),
resolve_trades=True
)
vbt.Portfolio.metrics['worst_hour_pnl'] = dict(
title='Worst Hour P&L',
calc_func=lambda trades: trades.records_readable.groupby(trades.records_readable.index.hour)['PnL'].sum().min(),
resolve_trades=True
)
```
## 7. Performance Analysis Functions
### Comprehensive Trade Performance Function
```python
def analyze_trade_performance(pf):
"""Comprehensive trade performance analysis"""
trades = pf.trades
records = trades.records_readable
# Basic directional statistics
direction_stats = {
'Long': {
'count': trades.direction_long.count(),
'total_pnl': trades.direction_long.pnl.sum(),
'avg_pnl': trades.direction_long.pnl.mean(),
'win_rate': trades.direction_long.win_rate,
'profit_factor': trades.direction_long.profit_factor
},
'Short': {
'count': trades.direction_short.count(),
'total_pnl': trades.direction_short.pnl.sum(),
'avg_pnl': trades.direction_short.pnl.mean(),
'win_rate': trades.direction_short.win_rate,
'profit_factor': trades.direction_short.profit_factor
}
}
# Temporal analysis
records['exit_hour'] = records.index.hour
records['exit_day'] = records.index.day_name()
records['exit_date'] = records.index.date
# Hourly P&L by direction
hourly_pnl = records.groupby(['exit_hour', 'Direction'])['PnL'].agg(['sum', 'mean', 'count'])
# Daily P&L by direction
daily_pnl = records.groupby(['exit_day', 'Direction'])['PnL'].agg(['sum', 'mean', 'count'])
# Date-based P&L
date_pnl = records.groupby(['exit_date', 'Direction'])['PnL'].agg(['sum', 'mean', 'count'])
# Best/worst performing times
best_hours = records.groupby(['exit_hour', 'Direction'])['PnL'].sum().groupby('Direction').idxmax()
worst_hours = records.groupby(['exit_hour', 'Direction'])['PnL'].sum().groupby('Direction').idxmin()
return {
'direction_stats': direction_stats,
'hourly_pnl': hourly_pnl,
'daily_pnl': daily_pnl,
'date_pnl': date_pnl,
'best_hours': best_hours,
'worst_hours': worst_hours
}
# Execute analysis
performance_analysis = analyze_trade_performance(pf)
```
## 8. Visualization Functions
### Trade Performance Visualization
```python
def plot_trade_performance(pf):
"""Create comprehensive trade performance plots"""
import plotly.graph_objects as go
from plotly.subplots import make_subplots
records = pf.trades.records_readable
records['exit_hour'] = records.index.hour
records['exit_day'] = records.index.day_name()
# Create subplots
fig = make_subplots(
rows=2, cols=2,
subplot_titles=['Hourly P&L by Direction', 'Daily P&L by Direction',
'P&L Distribution', 'Cumulative P&L by Direction'],
specs=[[{"secondary_y": True}, {"secondary_y": True}],
[{"secondary_y": False}, {"secondary_y": False}]]
)
# Hourly P&L
hourly_long = records[records['Direction'] == 'Long'].groupby('exit_hour')['PnL'].sum()
hourly_short = records[records['Direction'] == 'Short'].groupby('exit_hour')['PnL'].sum()
fig.add_trace(go.Bar(x=hourly_long.index, y=hourly_long.values, name='Long Hourly', marker_color='green'), row=1, col=1)
fig.add_trace(go.Bar(x=hourly_short.index, y=hourly_short.values, name='Short Hourly', marker_color='red'), row=1, col=1)
# Daily P&L
day_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
daily_long = records[records['Direction'] == 'Long'].groupby('exit_day')['PnL'].sum().reindex(day_order, fill_value=0)
daily_short = records[records['Direction'] == 'Short'].groupby('exit_day')['PnL'].sum().reindex(day_order, fill_value=0)
fig.add_trace(go.Bar(x=daily_long.index, y=daily_long.values, name='Long Daily', marker_color='lightgreen'), row=1, col=2)
fig.add_trace(go.Bar(x=daily_short.index, y=daily_short.values, name='Short Daily', marker_color='lightcoral'), row=1, col=2)
# P&L Distribution
fig.add_trace(go.Histogram(x=records[records['Direction'] == 'Long']['PnL'], name='Long Distribution', opacity=0.7), row=2, col=1)
fig.add_trace(go.Histogram(x=records[records['Direction'] == 'Short']['PnL'], name='Short Distribution', opacity=0.7), row=2, col=1)
# Cumulative P&L
long_cumulative = records[records['Direction'] == 'Long']['PnL'].cumsum()
short_cumulative = records[records['Direction'] == 'Short']['PnL'].cumsum()
fig.add_trace(go.Scatter(y=long_cumulative.values, mode='lines', name='Long Cumulative', line=dict(color='green')), row=2, col=2)
fig.add_trace(go.Scatter(y=short_cumulative.values, mode='lines', name='Short Cumulative', line=dict(color='red')), row=2, col=2)
fig.update_layout(height=800, title_text="Comprehensive Trade Analysis")
return fig
# Create visualization
# trade_plot = plot_trade_performance(pf)
# trade_plot.show()
```
## 9. Advanced Analytics
### Trade Streaks and Patterns
```python
def analyze_trade_patterns(pf):
"""Analyze trade patterns and streaks"""
trades = pf.trades
records = trades.records_readable
# Winning and losing streaks
winning_streaks = trades.winning_streak.records_readable
losing_streaks = trades.losing_streak.records_readable
# Pattern analysis
patterns = {
'longest_winning_streak': winning_streaks['Duration'].max() if len(winning_streaks) > 0 else 0,
'longest_losing_streak': losing_streaks['Duration'].max() if len(losing_streaks) > 0 else 0,
'avg_winning_streak': winning_streaks['Duration'].mean() if len(winning_streaks) > 0 else 0,
'avg_losing_streak': losing_streaks['Duration'].mean() if len(losing_streaks) > 0 else 0,
}
# Direction-specific patterns
long_patterns = analyze_direction_patterns(trades.direction_long)
short_patterns = analyze_direction_patterns(trades.direction_short)
return {
'overall_patterns': patterns,
'long_patterns': long_patterns,
'short_patterns': short_patterns
}
def analyze_direction_patterns(direction_trades):
"""Analyze patterns for specific direction"""
if direction_trades.count() == 0:
return {}
return {
'total_trades': direction_trades.count(),
'win_rate': direction_trades.win_rate,
'profit_factor': direction_trades.profit_factor,
'avg_winner': direction_trades.winning.pnl.mean() if direction_trades.winning.count() > 0 else 0,
'avg_loser': direction_trades.losing.pnl.mean() if direction_trades.losing.count() > 0 else 0,
'largest_winner': direction_trades.pnl.max(),
'largest_loser': direction_trades.pnl.min(),
'total_pnl': direction_trades.pnl.sum()
}
# Execute pattern analysis
pattern_analysis = analyze_trade_patterns(pf)
```
## 10. Summary Report Function
### Comprehensive Trade Report
```python
def generate_trade_report(pf):
"""Generate comprehensive trade analysis report"""
print("="*80)
print("COMPREHENSIVE TRADE ANALYSIS REPORT")
print("="*80)
# Basic Statistics
trades = pf.trades
total_trades = trades.count()
long_trades = trades.direction_long.count()
short_trades = trades.direction_short.count()
print(f"\n📊 BASIC STATISTICS")
print(f"Total Trades: {total_trades}")
print(f"Long Trades: {long_trades} ({long_trades/total_trades*100:.1f}%)")
print(f"Short Trades: {short_trades} ({short_trades/total_trades*100:.1f}%)")
# P&L Analysis
print(f"\n💰 P&L ANALYSIS")
print(f"Total P&L: ${trades.pnl.sum():.2f}")
print(f"Long P&L: ${trades.direction_long.pnl.sum():.2f}")
print(f"Short P&L: ${trades.direction_short.pnl.sum():.2f}")
print(f"Average P&L per Trade: ${trades.pnl.mean():.2f}")
# Temporal Analysis
records = trades.records_readable
records['exit_hour'] = records.index.hour
records['exit_day'] = records.index.day_name()
print(f"\n⏰ TEMPORAL ANALYSIS")
# Best/Worst Hours
hourly_pnl = records.groupby('exit_hour')['PnL'].sum()
best_hour = hourly_pnl.idxmax()
worst_hour = hourly_pnl.idxmin()
print(f"Best Hour: {best_hour}:00 (${hourly_pnl[best_hour]:.2f})")
print(f"Worst Hour: {worst_hour}:00 (${hourly_pnl[worst_hour]:.2f})")
# Best/Worst Days
daily_pnl = records.groupby('exit_day')['PnL'].sum()
best_day = daily_pnl.idxmax()
worst_day = daily_pnl.idxmin()
print(f"Best Day: {best_day} (${daily_pnl[best_day]:.2f})")
print(f"Worst Day: {worst_day} (${daily_pnl[worst_day]:.2f})")
# Direction Performance
print(f"\n📈 DIRECTION PERFORMANCE")
if long_trades > 0:
print(f"Long Win Rate: {trades.direction_long.win_rate:.2%}")
print(f"Long Profit Factor: {trades.direction_long.profit_factor:.2f}")
if short_trades > 0:
print(f"Short Win Rate: {trades.direction_short.win_rate:.2%}")
print(f"Short Profit Factor: {trades.direction_short.profit_factor:.2f}")
print("="*80)
# Generate report
generate_trade_report(pf)
```
This comprehensive analysis framework provides all the tools needed to analyze `pf.trades` with particular focus on:
1. **Direction-specific analysis** - Separate analysis for long and short trades
2. **Daily P&L patterns** - Understanding daily performance patterns
3. **Hourly P&L by direction** - Identifying optimal trading hours for each direction
4. **Day of week analysis** - Finding the best/worst days for different directions
5. **Custom metrics** - Extending the analysis with domain-specific metrics
6. **Visualization tools** - Creating comprehensive performance visualizations
7. **Pattern recognition** - Identifying winning/losing streaks and patterns
8. **Comprehensive reporting** - Generating detailed performance reports
The framework is designed to be modular, allowing you to pick and choose the specific analyses most relevant to your trading strategy evaluation needs.