49 KiB
- DEBUGGING
- FETCHING DATA
- DISCOVERY
- DATA/WRAPPER
- RESAMPLING
- REALIGN
- SIGNALS
- DF/SR ACCESSORS
- Stoploss/Takeprofit
- Portfolio
- Portfolio analysis
- Optimalization
- INDICATORS DEV
- FAV INDICATORS
- GROUPING
- SPLITTING
- CHARTING
- MULTIACCOUNT
- CUSTOM SIMULATION
- ANALYSIS
- UTILS
- Market calendar
import vectorbtpro as vbt
from lightweight_charts import Panel, chart, PlotDFAccessor, PlotSRAccessor
t15data = None
if not hasattr(pd.Series, 'lw'):
pd.api.extensions.register_series_accessor("lw")(PlotSRAccessor)
if not hasattr(pd.DataFrame, 'lw'):
pd.api.extensions.register_dataframe_accessor("lw")(PlotDFAccessor)
DEBUGGING
vbt.pprint(pf.entry_trades) #pretty print of instance
vbt.pdir(pf.entry_trades) #available methods/properties
vbt.phelp(ollcov.run) #input/output attribnuttes of the method
prints which arguments are being passed to apply_func.
def apply_func(*args, **kwargs):
for i, arg in enumerate(args):
print("arg {}: {}".format(i, type(arg)))
for k, v in kwargs.items():
print("kwarg {}: {}".format(k, type(v)))
raise NotImplementedError
RollCov = vbt.IF(
class_name='RollCov',
input_names=['ts1', 'ts2'],
param_names=['w'],
output_names=['rollcov'],
).with_apply_func(apply_func, select_params=False)
ollCov.run(ts1, ts2, [2, 3], some_arg="some_value")
FETCHING DATA
#fetching from remote db
from lib.db import Connection
SYMBOL = "BAC"
SCHEMA = "ohlcv_1s" #time based 1s other options ohlcv_vol_200 (volume based ohlcv with resolution of 200), ohlcv_renko_20 (renko with 20 bricks size) ...
DB = "market_data"
con = Connection(db_name=DB, default_schema=SCHEMA, create_db=True)
basic_data = con.pull(symbols=[SYMBOL], schema=SCHEMA,start="2024-08-01", end="2024-08-08", tz_convert='America/New_York')
#Fetching from YAHOO
symbols = ["AAPL", "MSFT", "AMZN", "TSLA", "AMD", "NVDA", "SPY", "QQQ", "META", "GOOG"]
data = vbt.YFData.pull(symbols, start="2024-09-28", end="now", timeframe="1H", missing_columns="nan")
#Fetching from local cache
dir = DATA_DIR + "/notebooks/"
import os
files = [f for f in os.listdir(dir) if f.endswith(".parquet")]
print('\n'.join(map(str, files)))
file_name = "ohlcv_df-BAC-2024-10-03T09:30:00-2024-10-16T16:00:00-['4', '7', 'B', 'C', 'F', 'O', 'P', 'U', 'V', 'W', 'Z']-100.parquet"
ohlcv_df = pd.read_parquet(dir+file_name,engine='pyarrow')
basic_data = vbt.Data.from_data(vbt.symbol_dict({"BAC": ohlcv_df}), tz_convert=zoneNY)
basic_data.wrapper.index.normalize().nunique() #numdays
#Fetching Trades and Aggregating custom OHLCV
from ttools import load_data
#This is how to call LOAD function
symbol = ["SPY", "BAC"]
#datetime in zoneNY
day_start = datetime(2024, 1, 15, 9, 30, 0)
day_stop = datetime(2024, 10, 20, 16, 0, 0)
day_start = zoneNY.localize(day_start)
day_stop = zoneNY.localize(day_stop)
#requested AGG
resolution = 1 #12s bars
agg_type = AggType.OHLCV #other types AggType.OHLCV_VOL, AggType.OHLCV_DOL, AggType.OHLCV_RENKO
exclude_conditions = ['C','O','4','B','7','V','P','W','U','Z','F','9','M','6'] #None to defaults
minsize = 100 #min trade size to include
main_session_only = False
force_remote = False
data = load_data(symbol = symbol,
agg_type = agg_type,
resolution = resolution,
start_date = day_start,
end_date = day_stop,
#exclude_conditions = None,
minsize = minsize,
main_session_only = main_session_only,
force_remote = force_remote,
return_vbt = True, #returns vbt object
verbose = True
)
REINDEX to main session
Get trading days main sessions from pandas_market_calendars and reindex fetched data to main session only.
import vectorbtpro as vbt
# Start and end dates to use across both the calendar and data fetch
start=data.index[0].to_pydatetime()
end=tata.index[-1].to_pydatetime()
timeframe="1m"
import pandas_market_calendars as mcal
# Get the NYSE calendar
nyse = mcal.get_calendar("NYSE")
# Get the market hours data
market_hours = nyse.schedule(start_date=start, end_date=end, tz=nyse.tz)
#market_hours = market_hours.tz_localize(nyse.tz)
# Create a DatetimeIndex at our desired frequency for that schedule. Because the calendar hands back the end of
# the window, you need to subtract that size timeframe to get back to the start
market_klines = mcal.date_range(market_hours, frequency=timeframe) - pd.Timedelta(timeframe)
testData = vbt.YFData.fetch(['MSFT'], start=start, end=end, timeframe=timeframe, tz_convert="US/Eastern")
# Finally, take our DatetimeIndex and use that to pull just the data we're interested in (and ensuring we have rows
# for any empty klines in there, which helps for some time based algorithms that need to have time not exist outside
# of market hours)
testData = testData.transform(lambda x: x.reindex(market_klines))
Smart indexing
signal.vbt.xloc["04-26-2024":"04-29-2024"].get() #pdseries or df timeindex
signal.vbt.xloc[("BAC", "04-26-2024"):("BAC","04-29-2024")].get() #multiindex
entries.vbt.xloc["04-16-2024"].get() #one day
entries.vbt.xloc[slice("2024-08-01","2024-08-03")].obj.info()
data.xloc[slice("9:30","10:00")] #targeting only morning rush
Data manipulation
#add/rename/delete symbols
s12_data = s12_data.rename_symbols("BAC", "BAC-LONG")
s12_data = s12_data.add_symbol("BAC-SHORT", s12_data.data["BAC-LONG"])
s12_adata.symbols
s12_data = s12_data.remove_symbols(["BAC-SHORT"])
DISCOVERY
#get parameters of method
vbt.IF.list_locations() #lists categories
vbt.IF.list_indicators(pattern="vbt") #all in category vbt
vbt.IF.list_indicators("*sma")
vbt.phelp(vbt.indicator("talib:MOM").run)
DATA/WRAPPER
Available methods for data
Main data container (servees as a wrapper for symbol oriented or feature oriented data)
data.transform()
data.dropna()
data.feature_oriented vs data.symbol_oriented #returns True/False if cols are features or symbols
data.data #dictionary either feature oriented or
data.ohlcv #OHLCV mixin filters only ohlcv feature and offers methods http://5.161.179.223:8000/vbt-doc/api/data/base/index.html#vectorbtpro.data.base.OHLCDataMixin
data.base #base mixin - implicit offers functions wrapper methods http://5.161.179.223:8000/vbt-doc/api/data/base/index.html#vectorbtpro.data.base.BaseDataMixin
- data.symbol_wrapper
- data.feature_wrapper
- data.features
show(t1data.data["BAC"])
#display returns on top of ohlcv
t1data.ohlcv.data["BAC"].lw.plot(left=[(t1data.returns, "returns")], precision=4)
create WRAPPER manually
#create wrapper from existing objects
wrapper = data.symbol_wrapper # one column for each symbol
wrapper = data.get_symbol_wrapper() # symbol - level, one column for each symbol (BAC a pod tim series)
wrapper = data.get_feature_wrapper() #feature level, one column for each feature (open,high...)
wrapper = df.vbt.wrapper
#Create an empty array with the same shape, index, and columns as in another array
new_float_df = wrapper.fill(np.nan)
new_bool_df = wrapper.fill(False)
new_int_df = wrapper.fill(-1)
#display df/series
from itables import show
show(t1data.close)
RESAMPLING
config
from vectorbtpro.utils.config import merge_dicts, Config, HybridConfig
from vectorbtpro import _typing as tp
from vectorbtpro.generic import nb as generic_nb
_feature_config: tp.ClassVar[Config] = HybridConfig(
{
"buyvolume": dict(
resample_func=lambda self, obj, resampler: obj.vbt.resample_apply(
resampler,
generic_nb.sum_reduce_nb,
)
),
"sellvolume": dict(
resample_func=lambda self, obj, resampler: obj.vbt.resample_apply(
resampler,
generic_nb.sum_reduce_nb,
)
),
"trades": dict(
resample_func=lambda self, obj, resampler: obj.vbt.resample_apply(
resampler,
generic_nb.sum_reduce_nb,
)
)
}
)
basic_data._feature_config = _feature_config
#1s to 1T
t1data = basic_data[['open', 'high', 'low', 'close', 'volume','vwap','buyvolume','trades','sellvolume']].resample("1T")
t1data = t1data.transform(lambda df: df.between_time('09:30', '16:00').dropna())
#using resampler (with more control over target index)
resampler_s = vbt.Resampler(target_data.index, source_data.index, source_freq="1T", target_freq="1s")
basic_data.resample(resampler_s)
REALIGN
REALIGN method - runs on data object (OHLCV) - (open feature realigns leftbound, rest of features rightboud) .resample("1T").first().ffill()
ffill=True = same frequency as t1data.index
ffill=False = keeps original frequency but moved to where data are available ie. instead of 15:30 to 15:44 for 15T bar
t15data_realigned = t15data.realign(t1data.index, ffill=True, freq="1T") #freq - target frequency
REALIGN_CLOSING accessors
t15data_realigned_close = t15data.close.vbt.realign_closing(t1data.index, ffill=True, freq="1T")
t15data_realigned_open = t15data.open.vbt.realign_open(t1data.index, ffill=True, freq="1T")
#realign_closing accessor just calls #return self.realign(*args, source_rbound=False, target_rbound=False, **kwargs) #realign opening #return self.realign(*args, source_rbound=True, target_rbound=True, **kwargs)
#using RESAMPLER #or resampler_s = vbt.Resampler(t15data.index, t1data.index, source_freq="1T", target_freq="1s") t15close_realigned_with_resampler = t1data.data["BAC"].realign_closing(resampler_s)
SIGNALS
Comparing
dvla = np.round(div_vwap_lin_angle.real,4) #ROUNDING to 4 decimals
long_entries = tts.isrisingc(dvla,3).vbt & div_vwap_cum.div_below(0) #strictly rising for 3 bars
short_entries = tts.isfalling(dvla,3).vbt & div_vwap_cum.div_above(0) #strictly falling for 3 bars
long_entries = tts.isrising(dvla,3)#rising for 3 bars including equal values
short_entries = tts.isfalling(dvla,3)#falling for 3 bars including equal values
cond1 = data.get("Low") < bb.lowerband
#comparing with previous value
cond2 = bandwidth > bandwidth.shift(1)
#comparing with value week ago
cond2 = bandwidth > bandwidth.vbt.ago("7d")
mask = cond1 & cond2
mask.sum()
#creating
bandwidth = (bb.upperband - bb.lowerband) / bb.middleband
mask = bandwidth.vbt > vbt.Param([0.15, 0.3], name="threshold") #broadcasts and create combinations (for scalar params only)
#same but for arrays
mask = bandwidth.vbt.combine(
[0.15, 0.3], #values elements (scalars or array)
combine_func=np.greater,
keys=pd.Index([0.15, 0.3], name="threshold") #keys for the multiindex
)
mask.sum()
GENERATE SIGNALS IRERATIVELY (numba)
Used for 1D. For multiple symbol create own indicator instead.
@njit
def generate_mask_1d_nb( #required arrays as inputs
high, low,
uband, mband, lband,
cond2_th, cond4_th
):
out = np.full(high.shape, False)
for i in range(high.shape[0]):
bandwidth = (uband[i] - lband[i]) / mband[i]
cond1 = low[i] < lband[i]
cond2 = bandwidth > cond2_th
cond3 = high[i] > uband[i]
cond4 = bandwidth < cond4_th
signal = (cond1 and cond2) or (cond3 and cond4)
out[i] = signal
return out
mask = generate_mask_1d_nb(
data.get("High")["BTCUSDT"].values,
data.get("Low")["BTCUSDT"].values,
bb.upperband["BTCUSDT"].values,
bb.middleband["BTCUSDT"].values,
bb.lowerband["BTCUSDT"].values,
0.30,
0.15
)
symbol_wrapper = data.get_symbol_wrapper()
mask = symbol_wrapper["BTCUSDT"].wrap(mask)
mask.sum()
or create extra numba function to iterate over columns
@njit
def generate_mask_nb(
high, low,
uband, mband, lband,
cond2_th, cond4_th
):
out = np.empty(high.shape, dtype=np.bool_)
for col in range(high.shape[1]):
out[:, col] = generate_mask_1d_nb(
high[:, col], low[:, col],
uband[:, col], mband[:, col], lband[:, col],
cond2_th, cond4_th
)
return out
mask = generate_mask_nb(
vbt.to_2d_array(data.get("High")),
vbt.to_2d_array(data.get("Low")),
vbt.to_2d_array(bb.upperband),
vbt.to_2d_array(bb.middleband),
vbt.to_2d_array(bb.lowerband),
0.30,
0.15
)
mask = symbol_wrapper.wrap(mask)
mask.sum()
or as indicators
Works on columns.
MaskGenerator = vbt.IF(
input_names=["high", "low", "uband", "mband", "lband"],
param_names=["cond2_th", "cond4_th"],
output_names=["mask"]
).with_apply_func(generate_mask_1d_nb, takes_1d=True)
mask_generator = MaskGenerator.run(
data.get("High"),
data.get("Low"),
bb.upperband,
bb.middleband,
bb.lowerband,
[0.3, 0.4],
[0.1, 0.2],
param_product=True
)
mask_generator.mask.sum()
ENTRIES/EXITS time based
#create entries/exits based on open of first symbol
entries = pd.DataFrame.vbt.signals.empty_like(data.open.iloc[:,0])
exits = pd.DataFrame.vbt.signals.empty_like(entries)
#OR create entries/exits based on symbol level if needed (for each columns)
symbol_wrapper = data.get_symbol_wrapper()
entries = symbol_wrapper.fill(False)
exits = symbol_wrapper.fill(False)
entries.vbt.set(
True,
every="W-MON",
at_time="00:00:00",
indexer_method="bfill", # this time or after
inplace=True
)
exits.vbt.set(
True,
every="W-MON",
at_time="23:59:59",
indexer_method="ffill", # this time or before
inplace=True
)
STOPS
- StopExitPrice (Which price to use when exiting a position upon a stop signal?)
- StopEntryPrice (Which price to use as an initial stop price?)
price = close.vbt.wrapper.fill() price[entries] = entry_price price[exits] = exit_price
OHLCSTX Module
- exit signal generator based on price and stop values doc
Entry Window and Forced Exit Window
Applying entry window range (denoted by minutes from the session start) to entries and applying forced exit window to exits.
create_mask_from_window with param use_cal=True (default) uses market calendar data for each day to denote session start and end. When disabled it uses just fixed 9:30-16:00 for each day.
from ttools import create_mask_from_window
entry_window_opens = 3 #in minutes from start of the market
entry_window_closes = 388
forced_exit_start = 387
forced_exit_end = 390
#create mask based on main session that day
entry_window_opened = create_mask_from_window(entries, entry_window_opens, entry_window_closes, use_cal=True)
#limit entries to the window
entries = entries & entry_window_opened
#create forced exits mask
forced_exits_window = create_mask_from_window(exits, forced_exit_start, forced_exit_end, use_cal=True)
#add forced_exits to exits
exits = exits | forced_exits_window
END OF DAY EXITS
Another way of eod exits according to number of bars at the end of the session. Assuming the last rows each day represents end of the market.
sr = t1data.data["BAC"]
last_n_daily_rows = sr.groupby(sr.index.date).tail(4) #or N last rows
second_last_daily_row = sr.groupby(sr.index.date).nth(-2) #or Nth last row
second_last_two_rows = sr.groupby(sr.index.date).apply(lambda x: x.iloc[-3:-1]).droplevel(0) #or any slice of rows
#create exit array
exits = t1data.get_symbol_wrapper().fill(False)
exits.loc[last_n_daily_rows.index] = True
#visualize
t1data.ohlcv.data["BAC"].lw.plot(right=[(t1data.close,"close",exits)], size="s")
#which is ALTERNATIVE to
exits = create_mask_from_window(t1data.close, 387, 390, use_cal=False)
t1data.ohlcv.data["BAC"].lw.plot(right=[(t1data.close,"close",exits)], size="s")
REGULAR EXITS
Time based.
#REGULAR EXITS -EVERY HOUR/D/WEEK exits
exits.vbt.set(
True,
every="H" # "min" "2min" "2H" "W-MON"+at time "D"+time
#at_time="23:59:59",
indexer_method="ffill", # this time or before
inplace=True
)
DF/SR ACCESSORS
Generic
For common taks (docs)
-
rolling_apply- runs custom function over a rolling window of a fixed size (number of bars or frequency) -
expanding_apply- runs custome function over expanding the window from the start of the data to the current poin
from numba import njit
mean_nb = njit(lambda a: np.nanmean(a))
hourly_anchored_expanding_mean = t1data.close.vbt.rolling_apply("1H", mean_nb) #ROLLING to FREQENCY or with fixed windows rolling_apply(10,mean_nb)
t1data.ohlcv.data["BAC"].lw.plot(right=[(t1data.close,"close"),(hourly_anchored_expanding_mean, "hourly_anchored_expanding_mean")], size="s")
#NOTE for anchored "1D" frequency - it measures timedelta that means requires 1 day between reseting (16:00 end of market, 9:30 start - not a full day, so it is enOugh to set 7H)
#HEATMAP OVERLAY
df['a'].vbt.overlay_with_heatmap(df['b']).show()
SIGNAL ACCESSORS
RANKING - partitioning
#pos_rank -1 when False, 0, 1 ... for consecutive Trues, allow_gaps defautlne False
# sample_mask = pd.Series([True, True, False, True, True])
ranked = sample_mask.vbt.signals.pos_rank()
ranked == 1 #select each second signal in each partition
ranked = sample_mask.vbt.signals.pos_rank(allow_gaps=True)
(ranked > -1) & (ranked % 2 == 1) #Select each second signal globally
entries.vbt.signals.first() #selects only first entries in each group
entries.vbt.signals.from_nth(n) # pos_rank >= n in each group, all from Nth
#AFTER - with variants _after which resets partition each reset array
#maximum number of exit signals after each entry signal
exits.vbt.signals.pos_rank_after(entries, reset_wait=0).max() + 1 #Count is the maximum rank plus one since ranks start with zero. We also assume that an entry signal comes before an exit signal if both are at the same timestamp by passing reset_wait=0.
entries.vbt.signals.total_partitions
#partition_pos_rank - all members of each partition have the same rank
ranked = sample_mask.vbt.signals.partition_pos_rank(allow_gaps=True) #0,0,-1,1,1
ranked == 1 # the whole second partition
Base Accessors
- low level accessors - http://5.161.179.223:8000/vbt-doc/api/base/accessors/index.html#vectorbtpro.base.accessors.BaseAccessor
exits.vbt.set(
True,
every="W-MON",
at_time="23:59:59",
indexer_method="ffill", # this time or before
inplace=True
)
Stoploss/Takeprofit
SL - ATR based
atr = data.run("atr").atr
pf = vbt.Portfolio.from_signals(
data,
entries=entries,
sl_stop=atr / sub_data.close
)
EXIT after time
using from signals
f = vbt.PF.from_signals(..., td_stop="7 days")
pf = vbt.PF.from_signals(..., td_stop=pd.Timedelta(days=7))
pf = vbt.PF.from_signals(..., td_stop=td_arr)
#EXIT at time
pf = vbt.PF.from_signals(..., dt_stop="16:00") #exit at 16 and later
pf = vbt.PF.from_signals(..., dt_stop=datetime.time(16, 0))
pf = vbt.PF.from_signals( #exit last bar before
...,
dt_stop="16:00",
arg_config=dict(dt_stop=dict(last_before=True))
)
CALLBACKS -
- a signal function (
signal_func_nb)- can dynamically generate signals (True, True, False,False)
- runs at beginning of bar
- an adjustment function (
adjust_func_nb) - doc- runs only if signal function above was not provided, but entry,exit arrays
- runs before default signal function ls_signal_func_nb
- can change pending limit orders etc.
- a post-signal function (
post_signal_func_nb) - post-segment function (
post_segment_func_nb)
all of them are accessing SignalContext (c) as named tuple
SignalContaxt (contains various metrics) such as:
- last_limit_info - 1D with latest limit order per column
- order_counts
- last_return ...
"""
MEMORY
save an information piece at one timestamp and re-use at a later timestamp when using callbacks memory
Usecases:
-
IGNORE ENTRIES number of DAYS after losing trade - signal function
Portfolio
group_by=True to put all columns to the same group and cash_sharing=True to share capital among them
from signals
pf = vbt.Portfolio.from_signals(
close=s12_data.close,
entries=long_entries_cln,
exits=long_exits,
short_entries=short_entries_cln,
short_exits=short_exits,
size=1,
size_type=vbt.pf_enums.SizeType.Amount # Value, Percent, TargetAmount
price="nextopen" #where the fill is happening. Default is "close" of current bar, can be also multiparameter vbt.Param(["close", "nextopen"])
sl_stop=0.3,
tp_stop = 0.4,
delta_format = vbt.pf_enums.DeltaFormat.Percent100, #(Absolute, Percent, Percent100, Target)
fees=0.0167/100,
freq="12s") #sl_stop=sl_stop, tp_stop = sl_stop,, tsl_stop
CALLBACKS
Callbacks functions can be used to place/alter entries/exits and various other things dynamically based on simulation status. All of them contain SignalContext and also can include custom Memory.
Importan SignalContact attributes:
c.i- current indexc.index- time index numpyc.last_pos_info[c.col]- named tuple of last position info{'names': ['id', 'col', 'size', 'entry_order_id', 'entry_idx', 'entry_price', 'entry_fees', 'exit_order_id', 'exit_idx', 'exit_price', 'exit_fees', 'pnl', 'return', 'direction', 'status', 'parent_id']
Callback functions:
- signal_func_nb - place/alter entries/exits
- adjust_sl_func_nb - adjust SL at each time stamp
- adjust_func_nb - adjust size
- post_segment_func_nb
More on callbacks in cookbook.
For exit dependent entries, the entries can be preprocessed in signal_func_nb see callbacks in cookbok or signal functionin doc
@njit
def signal_func_nb(c, entries, exits, short_entries, short_exits, cooldown_time, cooldown_bars):
entry = vbt.pf_nb.select_nb(c, entries) #get current value
exit = vbt.pf_nb.select_nb(c, exits)
short_entry = vbt.pf_nb.select_nb(c, short_entries)
short_exit = vbt.pf_nb.select_nb(c, short_exits)
if not vbt.pf_nb.in_position_nb(c): # short for c.last_position == 0
if vbt.pf_nb.has_orders_nb(c):
if c.last_pos_info[c.col]["pnl"] < 0: #current index is c.i
last_exit_idx = c.last_pos_info[c.col]["exit_idx"] # exit index from last_pos_info named tuple
if cooldown_time is not None and c.index[c.i] - c.index[last_exit_idx] < cooldown_time:
return False, exit, False, short_exit #disable entry
elif cooldown_bars is not None and last_exit_idx + cooldown_bars > c.i:
return False, exit, False, short_exit #disable entry
return entry, exit, short_entry, short_exit
cooldown_time = vbt.dt.to_ns(vbt.timedelta("1m"))
cooldown_bars = 3
pf = vbt.Portfolio.from_signals(
close=s12_data.close,
bm_close=data.data["SPY"].close, #explicit benchmark used in pf, ie. pf.plot_cum_returns().show()
entries=long_entries_cln,
exits=long_exits,
short_entries=short_entries_cln,
short_exits=short_exits,
signal_func_nb="signal_func_nb.py",
signal_args=(
vbt.Rep("entries"),
vbt.Rep("exits"),
vbt.Rep("short_entries"),
vbt.Rep("short_exits"),
cooldown_time, # cooldown in timedelta in ns after exit
cooldown_bars #cooldown in number of bars after exit
),
sl_stop=0.3,
tp_stop = 0.4,
delta_format = vbt.pf_enums.DeltaFormat.Percent100, #(Absolute, Percent, Percent100, Target)
fees=0.0167/100,
freq="12s",
#staticized=True
#jitted=False
) #sl_stop=sl_stop, tp_stop = sl_stop,, tsl_stop
Tips:
- To avoid waiting for the compilation, remove the
@njitdecorator fromsignal_func_nband passjitted=Falseto from_signals in order to disable Numba
Access running total return from sim
create an empty array for cumulative returns and populate it inside the post_segment_func_nb callback. The same array accessed by other callbacks can be used to get the total return at any time step.
@njit
def adjust_func_nb(c, cum_return):
if c.cash_sharing:
total_return = cum_return[c.group] - 1
else:
total_return = cum_return[c.col] - 1
...
@njit
def post_segment_func_nb(c, cum_return):
if c.cash_sharing:
cum_return[c.group] *= 1 + c.last_return[c.group]
else:
for col in range(c.from_col, c.to_col):
cum_return[col] *= 1 + c.last_return[col]
cum_return = None
def init_cum_return(wrapper):
global cum_return
if cum_return is None:
cum_return = np.full(wrapper.shape_2d[1], 1.0)
return cum_return
pf = vbt.PF.from_signals(
...,
adjust_func_nb=adjust_func_nb,
adjust_args=(vbt.RepFunc(init_cum_return),),
post_segment_func_nb=post_segment_func_nb,
post_segment_args=(vbt.RepFunc(init_cum_return),),
)
Staticization
Callbacks make function uncacheable, to overcome that
- define the callback in external file
signal_func_nb.py
@njit
def signal_func_nb(c, fast_sma, slow_sma):
long = vbt.pf_nb.iter_crossed_above_nb(c, fast_sma, slow_sma)
short = vbt.pf_nb.iter_crossed_below_nb(c, fast_sma, slow_sma)
return long, False, short, False
and then use use staticized=True
data = vbt.YFData.pull("BTC-USD")
pf = vbt.PF.from_signals(
data,
signal_func_nb="signal_func_nb.py",
signal_args=(vbt.Rep("fast_sma"), vbt.Rep("slow_sma")),
broadcast_named_args=dict(
fast_sma=data.run("sma", 20, hide_params=True, unpack=True),
slow_sma=data.run("sma", 50, hide_params=True, unpack=True)
),
staticized=True
)
Grouping
Grouping in signal function.
Portfolio analysis
pf.orders.readable
pf.entry_trades.readable
pf.exit_trades.readable
pf.trades.readable
pf.positions.readable
pf.trade_history #human readable df expanding trades with metrics
dd = pf.get_drawdowns().records_readable
dd[dd["Status"] == "Active"] #Recovered
pf.metrics #get available metrics and its short names&function
#trades
vbt.pdir(pf.trades) # available methods/properties
#orders
pf.orders.side_buy.count() # pf.order.attribute_value.COUNT()
pf.orders.stats(group_by=True)
#daily returns
pf.daily_returns.sort_values([(2, 'BAC')], ascending=True) #sorting values in levels
pf.daily_returns.sort_values(pf.daily_returns.columns[0], ascending=True) #same with first level
pf.daily_returns.cumsum()
pf.trades analysis
pf.trades.plot() doc - various options.
fig = pf.trades.plot()
fig.auto_rangebreaks()
fig.show()
df = pf.trades.readable
df["Direction"].value_counts() #count of trades for each Direction
df.groupby("Direction")["PnL"].sum() #sum of pnl for each Direction (Short vs Long) .vbt.barplot() -to plot
#daily PnL
df.groupby(df['Exit Index'].dt.date)['PnL'].sum().sort_index(ascending=False) #daily PnL
#daily PnL for each Direction
df.groupby([df['Exit Index'].dt.date, 'Direction'])['PnL'].sum().sort_index(ascending=False) #daily PnL for each Direction
#same but unstack, wehere long/short values become columns - for better charting
df = df.groupby([df['Exit Index'].dt.date, 'Direction'])['PnL'].sum().sort_index(ascending=False).unstack()
#df.vbt.barplot() or
df.plot(kind="bar", stacked=True)
#hourly PnL for each Direction, by Exit
df = df.groupby([df['Exit Index'].dt.hour, 'Direction'])['PnL'].sum().sort_index(ascending=False).unstack()
#df.vbt.barplot()
df.plot(kind="bar", stacked=True)
#PnL by Day of the Week and Direction
# Group by day of the week and direction, then sum PnL
pnl_by_day_and_direction_week = df.groupby([df['Exit Index'].dt.day_name(), 'Direction'])['PnL'].sum().unstack()
fig = pnl_by_day_and_direction_week.vbt.barplot()
fig.update_layout(
barmode='stack', # Stack/group/overlay/relative the bars
title='Profit by Day of the Week and Direction',
xaxis_title='Day of the Week',
yaxis_title='Cumulative Profit'
)
PnL by hour of the day (BOXPLOT)
a = df.groupby([df['Exit Index'].dt.day_name(), df['Exit Index'].dt.hour])['PnL'].sum().unstack()
fig = a.vbt.boxplot()
fig.update_layout(
#barmode='stack', # Stack/group/overlay/relative the bars
title='Profit by hour of the day',
xaxis_title='Hour of the day',
yaxis_title='Cumulative Profit'
)
##Profit/Loss (PnL) vs. Trade Duration
# Calculate trade duration in minutes
df['Trade Duration'] = (df['Exit Index'] - df['Entry Index']).dt.total_seconds() / 60
# Scatter plot of PnL vs Trade Duration
plt.style.use('dark_background')
colors = {'Short': 'lightcyan', 'Long': 'yellow'}
plt.scatter(df['Trade Duration'], df['PnL'], c=df['Direction'].map(colors))
# Adding labels and title
plt.title('Trade Duration vs. Profit/Loss')
plt.xlabel('Duration (Minutes)')
plt.ylabel('Profit/Loss')
# Create a legend
handles = [plt.Line2D([0], [0], marker='o', color='w', label='Short', markerfacecolor='lightcyan', markersize=10),
plt.Line2D([0], [0], marker='o', color='w', label='Long', markerfacecolor='yellow', markersize=10)]
plt.legend(title='Type', handles=handles)
plt.tight_layout()
plt.show()
##Cumulative profits vs benchmark
pf.plot_cum_returns().show()
PF resampling
monthly_returns = pf.returns_acc.resample("M").get()
daily_returns = pf.resample("D").returns #alternative
fig = monthly_returns.vbt.boxplot() #box plot of monthly returns
fig = monthly_returns.vbt.heatmap() #heatmap of time vs monthly returns
fig = monthly_returns.vbt.ts_heatmap() #heatmap of returns vs time
PF Plotting
pf.plot_trade_signals().show() #plot long short entries/exits
pf.plot_cum_returns().show() #cum returns vs benchmark
##whether returns are distirbuted normally
pf.returns.vbt.qqplot()
#TRADES
pf.trades.plot_mae_returns().show()#MAE/MFE - identify max loss/profit during the trade
pf.trades.plot_expanding_mfe_returns().show() #expanding mea/mfe returns
Plot Edge ratio pf.trades.plot_running_edge_ratio()
Entries/exits visual analysis
#display entry exits for visual analysis
import ttools as tts
trade_entries, trade_exits = tts.trades2entries_exits(pf) #helper to extract info from trades and orders with texts to markers (notext=True can be used)
Panel(
ohlcv=(s12_data.ohlcv.data["BAC"],),
right=[(s12_data.close, "close", trade_entries, trade_exits)],
middle1=[(pf.returns.cumsum(), "returns")],
).chart(precision=4)
#or alternative display just markers with no text
trade_entries = pd.Series(index=pf.trades.readable["Entry Index"], dtype=bool, data=True)
trade_exits = pd.Series(index=pf.trades.readable["Exit Index"], dtype=bool, data=True)
#then call Panel same as above
Configuration
Changing year freq for stocks
vbt.settings.returns.year_freq = pd.Timedelta(hours=6.5) * 252
Optimalization
Param configuration
tp_stop = vbt.Param(tp_stop, condition="tp_stop > sl_stop") #conditional hyper parameters
tp_stop = vbt.Param(tp_stop, condition="tp_stop > sl_stop") #conditional hyper parameters
Pipeline
bt.parameterized(merge_func="concat")
def sma_crossover_perf(data, fast_window, slow_window):
fast_sma = data.run("sma", fast_window, short_name="fast_sma")
slow_sma = data.run("sma", slow_window, short_name="slow_sma")
entries = fast_sma.real_crossed_above(slow_sma)
exits = fast_sma.real_crossed_below(slow_sma)
pf = vbt.Portfolio.from_signals(
data, entries, exits, direction="both")
return pf.sharpe_ratio
#Let's test a grid of fast_window and slow_window combinations on one year of that data:
perf = sma_crossover_perf(
data["2020":"2020"],
vbt.Param(np.arange(5, 50), condition="x < slow_window"),
vbt.Param(np.arange(5, 50)),
_execute_kwargs=dict(
show_progress=True,
clear_cache=50,
collect_garbage=50
)
)
perf
INDICATORS DEV
#REGISTER CUSTOM INDICATOR
vbt.IndicatorFactory.register_custom_indicator(
SupportResistance,
name="SUPPRES",
location=None,
if_exists='raise'
)
#RUN INDICATOR on DATA WRAPPER
cdlbreakaway = s1data.run(vbt.indicator("talib:CDLHAMMER"), skipna=True, timeframe=["12s"])
#FROM EXPRESSION http://5.161.179.223:8000/vbt-doc/api/indicators/factory/#vectorbtpro.indicators.factory.IndicatorFactory.from_expr
WMA = vbt.IF(
class_name='WMA',
input_names=['close'],
param_names=['window'],
output_names=['wma']
).from_expr("wm_mean_nb(close, window)")
wma = WMA.run(t1data.close, window=10)
wma.wma
Custom ind
#simple
from numba import jit
@jit
def apply_func(high, low, close):
return (high + low + close + close) / 4
HLCC4 = vbt.IF(
class_name='hlcc4',
input_names=['high', 'low', 'close'],
output_names=['out']
).with_apply_func(
apply_func,
timeperiod=10, #single default
high=vbt.Ref('close')) #default from another input)
ind = HLCC4.run(s12_data.high, s12_data.low, s12_data.close)
#1D apply function
import talib
def apply_func_1d(close, timeperiod):
return talib.SMA(close.astype(np.double), timeperiod)
SMA = vbt.IF(
input_names=['ts'],
param_names=['timeperiod'],
output_names=['sma']
).with_apply_func(apply_func_1d, takes_1d=True)
sma = SMA.run(ts, [3, 4])
sma.sma
#with grouping and keep_pd (inputs are pd.series)
def apply_func(ts, group_by):
return ts.vbt.demean(group_by=group_by)
Demeaner = vbt.IF(
input_names=['ts'],
param_names=['group_by'],
output_names=['out']
).with_apply_func(apply_func, keep_pd=True) #if takes_1D it sends pd.series, otherwise df with symbol as columns
ts_wide = pd.DataFrame({
'a': [1, 2, 3, 4, 5],
'b': [5, 4, 3, 2, 1],
'c': [3, 2, 1, 2, 3],
'd': [1, 2, 3, 2, 1]
}, index=generate_index(5))
demeaner = Demeaner.run(ts_wide, group_by=[(0, 0, 1, 1), True])
demeaner.out
register custom ind
vbt.IF.register_custom_indicator(sma_indicator) #name=classname
vbt.IF.register_custom_indicator(sma_indicator, "rolling:SMA")
vbt.IF.deregister_custom_indicator("rolling:SMA")
VWAP anchored example
import numpy as np
from vectorbtpro import _typing as tp
from vectorbtpro.base.wrapping import ArrayWrapper
from vectorbtpro.utils.template import RepFunc
def substitute_anchor(wrapper: ArrayWrapper, anchor: tp.Optional[tp.FrequencyLike]) -> tp.Array1d:
"""Substitute reset frequency by group lens. It is array of number of elements of each group."""
if anchor is None:
return np.array([wrapper.shape[0]])
return wrapper.get_index_grouper(anchor).get_group_lens()
@jit(nopython=True)
def vwap_cum(high, low, close, volume, group_lens):
#anchor based grouping - prepare group indexes
group_end_idxs = np.cumsum(group_lens)
group_start_idxs = group_end_idxs - group_lens
#prepare output
out = np.full(volume.shape, np.nan, dtype=np.float_)
hlcc4 = (high + low + close + close) / 4
#iterate over groups
for group in range(len(group_lens)):
from_i = group_start_idxs[group]
to_i = group_end_idxs[group]
nom_cumsum = 0
denum_cumsum = 0
#for each group do this (it is just np.cumsum(hlcc4 * volume) / np.sum(volume) iteratively)
for i in range(from_i, to_i):
nom_cumsum += volume[i] * hlcc4[i]
denum_cumsum += volume[i]
if denum_cumsum == 0:
out[i] = np.nan
else:
out[i] = nom_cumsum / denum_cumsum
return out
vwap_ind = vbt.IF(
class_name='CUVWAP',
input_names=['high', 'low', 'close', 'volume'],
param_names=['anchor'],
output_names=['vwap']
).with_apply_func(vwap_cum,
takes_1d=True,
param_settings=dict(
anchor=dict(template=RepFunc(substitute_anchor)),
),
anchor="D",
)
%timeit vwap_cum = vwap_ind.run(s12_data.high, s12_data.low, s12_data.close, s12_data.volume, anchor="min")
vbt.IF.register_custom_indicator(vwap_ind)
Use ttols indicators
from ttools.vbtindicators import register_custom_inds
register_custom_inds(if_exists="skip") #register all, skip or override when exists
#register_custom_inds("CVWAP", "skip") #register one, skip if exists
#register_custom_inds() #deregister all
vbt.IF.list_indicators("ttools")
vwap_cum = vbt.indicator("ttools:CUVWAP").run(s12_data.high, s12_data.low, s12_data.close, s12_data.volume, anchor="D")
vwap_cum.vwap
div_vwap_cum = vbt.indicator("ttools:DIVERGENCE").run(s12_data.close, vwap_cum_d.vwap, divtype=vbt.Default(valeu="reln"), hide_default=True) #hide default levels
FAV INDICATORS
#for TALIB indicator always use skipna=True
#TALIB INDICATORS can do realing closing : timeframe=["1T"]
mom_multi = vbt.indicator("talib:MOM").run(t1data.close, timeperiod=5, timeframe=["1T","5T"], skipna=True) #returned 5T can be directly compared with 1T
#ANCHORED indciators vbt.indicator("talib:MOM") becomes AnchoredIndicator("talib:MOM", anchor="D") - freq of pd.Grouper
from ttools import AnchoredIndicator
mom_anch_d = AnchoredIndicator("talib:MOM", anchor='30min').run(t1data.data["BAC"].close, timeperiod=10)
mom = vbt.indicator("talib:MOM").run(t1data.data["BAC"].close, timeperiod=10, skipna=True)
t1data.ohlcv.data["BAC"].lw.plot(auto_scale=[mom_anch_d, mom])
#FIBO RETRACEMENT
fibo = vbt.indicator("technical:FIBONACCI_RETRACEMENTS").run(t1data.close, skipna=True)
#fibo.fibonacci_retracements
fibo_plusclose = t1data.close + fibo.fibonacci_retracements
fibo_minusclose = t1data.close - fibo.fibonacci_retracements
#fibo_plusclose
Panel(
auto_scale=[fibo_plusclose["BAC"]],
ohlcv=(t1data.ohlcv.data["BAC"],),
histogram=[],
right=[(fibo_plusclose["BAC"],),(fibo_minusclose["BAC"],)],
left=[],
middle1=[(fibo.fibonacci_retracements["BAC"],"fibonacci_retracements")],
middle2=[]
).chart(size="xs")
#CHOPINESS indicator
chopiness = vbt.indicator("technical:CHOPINESS").run(s1data.open, s1data.high, s1data.low, s1data.close, t1data.volume, skipna=True)
s1data.ohlcv.data["BAC"].lw.plot(auto_scale=[chopiness])
#anchored VWAP
t1vwap_h = vbt.VWAP.run(t1data.high, t1data.low, t1data.close, t1data.volume, anchor="H")
t1vwap_h_real = t1vwap_h.vwap.vbt.realign_closing(resampler_s)
#BBANDS = vbt.indicator("pandas_ta:BBANDS")
mom_anch_d = AnchoredIndicator("talib:MOM", anchor='30min').run(t1data.data["BAC"].close, timeperiod=10)
mom = vbt.indicator("talib:MOM").run(t1data.data["BAC"].close, timeperiod=10, skipna=True)
#macd = vbt.indicator("talib:MACD").run(t1data.data["BAC"].close) #, timeframe=["1T"]) #,
t1data.ohlcv.data["BAC"].lw.plot(auto_scale=[mom_anch_d, mom])
GROUPING
Group wrapper index based on freq:
#returns array of number of elements in each consec group
group_lens = s12_data.wrapper.get_index_grouper("D").get_group_lens()
#
group_end_idxs = np.cumsum(group_lens) #end indices of each group
group_start_idxs = group_end_idxs - group_lens #start indices of each group
out = np.full(volume.shape, np.nan, dtype=np.float_)
#iterate over groups
for group in range(len(group_lens)):
from_i = group_start_idxs[group]
to_i = group_end_idxs[group]
#iterate over elements of the group
for i in range(from_i, to_i):
out[i] = np.nan
return out
SPLITTING
#SPLITTER - splitting wrapper based on index
#http://5.161.179.223:8000/vbt-doc/tutorials/cross-validation/splitter/index.html#anchored
#based on GROUPER
daily_splitter = vbt.Splitter.from_grouper(t1data.index, "D", split=None) #DOES contain last DAY
daily_splitter = vbt.Splitter.from_ranges( #doesnt contain last DY
t1data.index,
every="D",
split=None
)
daily_splitter.stats()
daily_splitter.plot()
daily_splitter.coverage()
daily_splitter.get_bounds(index_bounds=True) #shows the exact times
daily_splitter.get_bounds_arr()
daily_splitter.get_range_coverage(relative=True)
#TAKING and APPLY MANUALLY - run UDF on ALL takes and concatenates
taken = daily_splitter.take(t1data)
inds = []
for series in taken:
mom = vbt.indicator("talib:MOM").run(series.close, timeperiod=10, skipna=True)
inds.append(mom)
mom_daily = vbt.base.merging.row_stack_merge(inds) #merge
mom = vbt.indicator("talib:MOM").run(t1data.close, timeperiod=10, skipna=True)
t1data.ohlcv.data["BAC"].lw.plot(left=[(mom_daily, "daily_splitter"),(mom, "original mom")]) #OHLCV with indicators on top
#TAKING and APPLY AUTOMATIC
daily_splitter = vbt.Splitter.from_grouper(t1data.index, "D", split=None) #DOES contain last DAY
def indi_run(sr):
return vbt.indicator("talib:MOM").run(sr.close, timeperiod=10, skipna=True)
res = daily_splitter.apply(indi_run, vbt.Takeable(t1data), merge_func="row_stack", freq="1T")
#use of IDX accessor (docs:http://5.161.179.223:8000/vbt-doc/api/base/accessors/index.html#vectorbtpro.base.accessors.BaseIDXAccessor)
daily_grouper = t1data.index.vbt.get_grouper("D")
#grouper instance can be iterated over
for name, indices in daily_grouper.iter_groups():
print(name, indices)
#PANDAS GROUPING - series/df grouping resulting in GroupBySeries placeholder that can be aggregated(sum, mean), transformed iterated over or fitlered
for name, group in t1data.data["BAC"].close.groupby(pd.Grouper(freq='D')):
print(name, group)
CHARTING
Using custom lightweight-charts-python
#LW df/sr accessor
t1data.ohlcv.data["BAC"].lw.plot(left=[(mom_multi, "mom_multi")]) #OHLCV with indicators on top
t5data.ohlcv.data["BAC"].lw.plot(
left=[(mom_multi.real, "mom"),(mom_multi_beztf, "mom_beztf"), (mom_5t_orig, "mom_5t_orig"), (mom_5t_orig_realigned, "mom_5t_orig_realigned")],
right=[(t1data.data["BAC"].close, "t1 close"),(t5data.data["BAC"].close, "t5 close")],
size="s") #.loc[:,(20,"1T","BAC")]
#SINGLE PANEL
Panel(
auto_scale=[cdlbreakaway],
ohlcv=(t1data.ohlcv.data["BAC"],entries),
histogram=[],
right=[],
left=[],
middle1=[],
middle2=[]
).chart(size="xs")
#MULTI PANEL
pane1 = Panel(
#auto_scale=[mom_multi, mom_multi_1t],
#ohlcv=(t1data.data["BAC"],), #(series, entries, exits, other_markers)
#histogram=[(order_imbalance_allvolume, "oivol")], # [(series, name, "rgba(53, 94, 59, 0.6)", opacity)]
right=[(t1data.data["BAC"].close,"close 1T"),(t5data.data["BAC"].close,"close 5T"),(mom_multi_1t.close, "mom multi close")], # [(series, name, entries, exits, other_markers)]
left=[(mom_multi, "mom_multi"), (mom_multi_1t, "mom_multi_1t")],
#middle1=[],
#middle2=[],
#xloc="2024-02-12 09:30",
precision=3
)
pane2 = Panel(....)
ch = chart([pane1, pane2], size="s")
standard vbt plot
#skip gaps automatically
vbt.settings.plotting.auto_rangebreaks = True
vbt.settings.set_theme("dark")
data.plot(symbol="SPY", yaxis=dict(type="log")).show()
#skip non-business hours and weekends
fig = df.vbt.plot()
fig.update_xaxes(
rangebreaks=[
dict(bounds=['sat', 'mon']),
dict(bounds=[16, 9.5], pattern='hour'),
]
)
MULTIACCOUNT
Simultaneous LONG and short (hedging) In vbt position requires one column of data, so hedging is possible by using two columns representing the same asset but different directions, then stack both portfolio together column stacking pf_join = vbt.PF.column_stack((pf1, pf2), group_by=True)
CUSTOM SIMULATION
ANALYSIS
ROBUSTNESS
pf_stats.sort_values(by='Sharpe Ratio', ascending=False).iloc[::-1].vbt.heatmap().show() #works when there are more metrics
#endregion
UTILS
#use plotly resampler
vbt.settings.plotting["use_resampler"] = True
#RELOAD module in ipynb
%load_ext autoreload
%autoreload 2
#MEMORY
sr.info()
#peak memory usage, running once
with vbt.MemTracer() as tracer:
my_pipeline()
print(tracer.peak_usage())
#CACHE
vbt.print_cache_stats()
vbt.print_cache_stats(vbt.PF)
vbt.flush() #clear cache and collect garbage
vbt.clear_cache(pf) #of specific
vbt.clear_pycache()
#TIMING
#running once
with vbt.Timer() as timer:
my_pipeline()
print(timer.elapsed())
#multiple times
print(vbt.timeit(my_pipeline))
#in notebook
%timeit function(x)
%% time
function(x)
#NUMBA
#numba doesnt return error when indexing out of bound, this raises the error
import os
os.environ["NUMBA_BOUNDSCHECK"] = "1"
Market calendar
from pandas.tseries.offsets import CustomBusinessDay
from pandas_market_calendars import get_calendar
# Get the NYSE trading calendar
nyse = get_calendar('NYSE')
# Create a CustomBusinessDay object using the NYSE trading calendar
custom_bd = CustomBusinessDay(holidays=nyse.holidays().holidays, weekmask=nyse.weekmask, calendar=nyse)
