Optimization¶
Optimization involves executing a function on a set of various configurations with an aim to optimize the performance of a strategy, and/or to optimize the CPU or RAM performance of a pipeline.
Question
Learn more in Pairs trading tutorial.
Parameterization¶
The first and easiest approach revolves around testing a single parameter combination at a time, which utilizes as little RAM as possible but may take longer to run if the function isn't written in pure Numba and has a fixed overhead (e.g., conversion from Pandas to NumPy and back) that adds to the total execution time with each run. For this, create a pipeline function that runs a set of single values and decorate it with @vbt.parameterized. To test multiple parameters, wrap each parameter argument with Param.
Example
See an example in Parameterized decorator.
Decoration¶
To parameterize any function, we have to decorate (or wrap) it with @vbt.parameterized. This will return a new function with the same name and arguments as the original one. The only difference: this new function will process passed arguments, build parameter combinations, call the original function on each parameter combination, and merge the outputs of all combinations.
@vbt.parameterized
def my_pipeline(data, fast_window, slow_window): # (1)!
...
return output # (2)!
outputs = my_pipeline( # (3)!
data,
vbt.Param(fast_windows), # (4)!
vbt.Param(slow_windows)
)
- Arguments can be anything. Here we're expecting a data instance and two parameters:
fast_windowandslow_window, they will be passed by the decorator as single values. - Do some calculations on the received parameter combination and return an output, which can be anything
- Run the function the same way as without the decorator
- Wrap multiple values with
vbt.Paramunder each parameter
To keep the original function separate from the decorated one, we can decorate it after it has been defined and give the decorated function another name.
def my_pipeline(data, fast_window, slow_window):
...
return output
my_param_pipeline = vbt.parameterized(my_pipeline)
outputs = my_param_pipeline(...)
Merging¶
The code above returns a list of outputs, one per parameter combination. To return the grid of parameter combinations as well, pass return_param_index=True to the decorator. Alternatively, let VBT merge the outputs into one or more Pandas objects and attach the grid to their index or columns by specifying the merging function (see resolve_merge_func).
@vbt.parameterized(return_param_index=True) # (1)!
def my_pipeline(...):
...
return output
outputs, param_index = my_pipeline(...)
# ______________________________________________________________
@vbt.parameterized(merge_func="concat") # (2)!
def my_pipeline(...):
...
return pf.sharpe_ratio
sharpe_ratio = my_pipeline(...)
# ______________________________________________________________
@vbt.parameterized(merge_func="concat")
def my_pipeline(...):
...
return pf.sharpe_ratio, pf.win_rate
sharpe_ratio, win_rate = my_pipeline(...)
# ______________________________________________________________
@vbt.parameterized(merge_func="column_stack") # (3)!
def my_pipeline(...):
...
return entries, exits
entries, exits = my_pipeline(...)
# ______________________________________________________________
@vbt.parameterized(merge_func="row_stack") # (4)!
def my_pipeline(...):
...
return pf.value
value = my_pipeline(...)
# ______________________________________________________________
@vbt.parameterized(merge_func=("concat", "column_stack")) # (5)!
def my_pipeline(...):
...
return pf.sharpe_ratio, pf.value
sharpe_ratio, value = my_pipeline(...)
# ______________________________________________________________
def merge_func(outputs, param_index):
return pd.Series(outputs, index=param_index)
@vbt.parameterized(
merge_func=merge_func, # (6)!
merge_kwargs=dict(param_index=vbt.Rep("param_index")) # (7)!
)
def my_pipeline(...):
...
return pf.sharpe_ratio
sharpe_ratio = my_pipeline(...)
- Return the outputs along with the parameter grid
- If the function returns a single number (or a tuple of such), concatenate all numbers into a Series with parameter combinations as index. Useful for returning metrics such as Sharpe ratio.
- If the function returns an array (or a tuple of such), stack all arrays along columns into a DataFrame with parameter combinations as an outermost column level. Useful for returning indicator arrays.
- If the function returns an array (or a tuple of such), stack all arrays along rows into a Series/DataFrame with parameter combinations as an outermost index level. Useful for cross-validation.
- If the function returns a number and an array, return a Series of concatenated numbers and a DataFrame of arrays stacked along columns
- Pass a custom merging function
- Use an expression template to pass the parameter index as a keyword argument
We can also use annotations to specify the merging function(s).
@vbt.parameterized
def my_pipeline(...) -> "concat": # (1)!
...
return output
# ______________________________________________________________
@vbt.parameterized
def my_pipeline(...) -> ("concat", "column_stack"): # (2)!
...
return output1, output2
# ______________________________________________________________
@vbt.parameterized
def my_pipeline(...) -> ( # (3)!
vbt.MergeFunc("concat", wrap=False),
vbt.MergeFunc("column_stack", wrap=False)
):
...
return output1, output2
- Concatenate outputs
- Concatenate instances of the first output and column-stack instances of the second output
- Same as above but provide keyword arguments to each merging function
Generation¶
The grid of parameter combinations can be controlled by individual parameters. By default, vectorbtpro will build a Cartesian product of all parameters. To avoid building the product between some parameters, they can be assigned to the same product level. To filter out unwanted parameter configurations, specify the condition as a boolean expression where variables are parameter names. Such a condition will be evaluated on each parameter combination, and if it returns True, the combination will be kept. To change the appearance of a parameter in the parameter index, keys with human-readable strings can be provided. A parameter can also be hidden entirely by setting hide=True.
sma_crossover( # (1)!
data=data,
fast_window=vbt.Param(windows, condition="fast_window < slow_window"),
slow_window=vbt.Param(windows),
)
# ______________________________________________________________
sma_crossover( # (2)!
data=vbt.Param([data.select(k) for k in data.symbols], keys=data.symbols),
fast_window=vbt.Param(windows, condition="fast_window < slow_window"),
slow_window=vbt.Param(windows),
)
# ______________________________________________________________
from itertools import combinations
fast_windows, slow_windows = zip(*combinations(windows, 2)) # (3)!
sma_crossover(
data=vbt.Param([data.select(k) for k in data.symbols], keys=data.symbols, level=0),
fast_window=vbt.Param(fast_windows, level=1),
slow_window=vbt.Param(slow_windows, level=1),
)
# ______________________________________________________________
bbands_indicator( # (4)!
data=data,
timeperiod=vbt.Param(timeperiods, level=0),
upper_threshold=vbt.Param(thresholds, level=1, keys=pd.Index(thresholds, name="threshold")),
lower_threshold=vbt.Param(thresholds, level=1, hide=True),
_random_subset=1_000 # (5)!
)
- Build a product of fast and slow windows while removing those where the fast window is longer than the slow window (e.g., 20 and 50 is ok but 50 and 20 doesn't make sense)
- Same as above but test only one symbol at a time
- Same as above but build the window combinations manually. The window parameters are now on the same level and won't build another product.
- Test two parameters: time periods and thresholds. The upper and lower thresholds should both share the same values and only one
thresholdlevel should be displayed in the parameter index. Also, select a random subset of 1000 parameter combinations. - Arguments that are normally passed to the decorator can be also passed to the function itself by prepending an underscore
Example
See an example in Conditional parameters.
Warning
Testing 6 parameters with only 10 values each would generate staggering 1 million parameter combinations, thus make sure that your grids are not too wide, otherwise the generation part alone will take forever to run. This warning doesn't apply when you use random_subset though; in this case, VBT won't build the full grid but select random combinations dynamically. See an example in Lazy parameter grids.
We can also use annotations to specify which arguments are parameters and their default configuration.
@vbt.parameterized
def sma_crossover(
data,
fast_window: vbt.Param(condition="fast_window < slow_window"),
slow_window: vbt.Param,
) -> "column_stack":
fast_sma = data.run("talib:sma", fast_window, unpack=True)
slow_sma = data.run("talib:sma", slow_window, unpack=True)
upper_crossover = fast_sma.vbt.crossed_above(slow_sma)
lower_crossover = fast_sma.vbt.crossed_below(slow_sma)
signals = upper_crossover | lower_crossover
return signals
signals = sma_crossover(data, fast_windows, slow_windows)
Execution¶
Each parameter combination involves one call of the pipeline function. To perform multiple calls in parallel, pass a dictionary named execute_kwargs with keyword arguments that should be forwarded to the function execute, which takes care of chunking and executing the function calls.
@vbt.parameterized(execute_kwargs=dict(show_progress=True)) # (1)!
def my_pipeline(...):
...
# ______________________________________________________________
@vbt.parameterized(execute_kwargs=dict(chunk_len="auto", engine="threadpool")) # (2)!
@njit(nogil=True)
def my_pipeline(...):
...
# ______________________________________________________________
@vbt.parameterized(execute_kwargs=dict(n_chunks="auto", distribute="chunks", engine="pathos")) # (3)!
def my_pipeline(...):
...
# ______________________________________________________________
@vbt.parameterized # (4)!
@njit(nogil=True)
def my_pipeline(...):
...
my_pipeline(
...,
_execute_kwargs=dict(show_progress=True)
)
my_pipeline(
...,
_execute_kwargs=dict(chunk_len="auto", engine="threadpool")
)
# ______________________________________________________________
@vbt.parameterized(execute_kwargs=dict(show_progress=True)) # (5)!
@njit(nogil=True)
def my_pipeline(...):
...
my_pipeline(
...,
_execute_kwargs=dict(chunk_len="auto", engine="threadpool") # (6)!
)
my_pipeline(
...,
_execute_kwargs=vbt.atomic_dict(chunk_len="auto", engine="threadpool") # (7)!
)
- Show a progress bar while executing parameter combinations serially
- Distribute parameter combinations into chunks of an optimal length, and execute all parameter combinations within each chunk in parallel with multithreading (i.e., one parameter combination per thread) while executing chunks themselves serially
- Divide parameter combinations into an optimal number of chunks, and execute all chunks in parallel with multiprocessing (i.e., one chunk per process) while executing all parameter combinations within each chunk serially
- Parallelization can be enabled/disabled sporadically by prepending an underscore to
execute_kwargsand passing it directly to the function - If there's already
execute_kwargsactive in the decorator, they will be merged together. To avoid merging, wrap any of the dicts withvbt.atomic_dict. show_progress=Trueshow_progress=False
Note
Threads are easier and faster to spawn than processes. Also, to execute a function in its own process, all the passed inputs and parameters need to be serialized and then deserialized, which takes time. Thus, multithreading is preferred, but it requires the function to release the GIL, which means either compiling the function with Numba and setting the nogil flag to True, or using exclusively NumPy.
If this isn't possible, use multiprocessing but make sure that the function either doesn't take large arrays, or that one parameter combination takes a considerable amount of time to run. Otherwise, you may find parallelization making the execution even slower.
To run a code before/after the entire processing or even before/after each individual chunk, execute offers a number of callbacks.
def post_chunk_func(chunk_idx, flush_every):
if (chunk_idx + 1) % flush_every == 0:
vbt.flush()
@vbt.parameterized(
post_chunk_func=post_chunk_func,
post_chunk_kwargs=dict(
chunk_idx=vbt.Rep("chunk_idx", sub_id="post_chunk_kwargs"),
flush_every=3
),
chunk_len=10 # (1)!
)
def my_pipeline(...):
...
- Put 10 calls into one chunk, that is, flush each 30 calls
def pre_execute_func(clear_cache=False):
if clear_cache:
vbt.remove_dir("chunk_data", missing_ok=True, with_contents=True)
def pre_chunk_func(chunk_idx, call_indices): # (1)!
fname = "chunk_data/chunk_" + str(chunk_idx)
if vbt.file_exists(fname):
return [object()] * len(call_indices)
return None
def post_chunk_func(chunk_idx, call_outputs, chunk_executed): # (2)!
if chunk_executed:
fname = "chunk_data/chunk_" + str(chunk_idx)
vbt.save(call_outputs, fname, mkdir_kwargs=dict(mkdir=True))
for i in range(len(call_outputs)):
call_outputs[i] = object()
def post_execute_func(n_chunks): # (3)!
outputs = []
for chunk_idx in range(n_chunks):
fname = "chunk_data/chunk_" + str(chunk_idx)
outputs.extend(vbt.load(fname))
return outputs
@vbt.parameterized(
pre_execute_func=pre_execute_func,
pre_execute_kwargs=dict(
clear_cache=False # (4)!
),
pre_chunk_func=pre_chunk_func,
pre_chunk_kwargs=dict(
chunk_idx=vbt.Rep("chunk_idx", sub_id="pre_chunk_kwargs"), # (5)!
call_indices=vbt.Rep("call_indices", sub_id="pre_chunk_kwargs")
),
post_chunk_func=post_chunk_func,
post_chunk_kwargs=dict(
chunk_idx=vbt.Rep("chunk_idx", sub_id="post_chunk_kwargs"),
call_outputs=vbt.Rep("call_outputs", sub_id="post_chunk_kwargs"),
chunk_executed=vbt.Rep("chunk_executed", sub_id="post_chunk_kwargs"),
),
post_execute_func=post_execute_func,
post_execute_kwargs=dict(
n_chunks=vbt.Rep("n_chunks", sub_id="post_execute_kwargs")
),
chunk_len=10
)
def my_pipeline(...):
...
- Before processing a chunk: check whether a cache for this chunk index already exists on disk, and if so, return something other than None (such as dummy objects) to prevent executing the chunk. We could have also loaded the actual objects and returned them, but we do not want to clog RAM.
- After processing a chunk: if the chunk was executed, cache it to disk and replace the actual objects by some dummy objects to release RAM
- After processing all the chunks: load the actual objects from disk and return them
- Set to True to remove the cache directory and execute the chunks
- Pass various information from the context using templates
Tip
This works not only with @vbt.parameterized but also with other functions that use execute!
Hybrid (mono-chunks)¶
The approach above calls the original function on each single parameter combination, which makes it slow when dealing with a large number of combinations, especially when each function call is associated with an overhead, such as when NumPy array gets converted to a Pandas object. Remember that 1 millisecond of an overhead translates into 17 minutes of additional execution time for one million of combinations.
There's nothing (apart from parallelization) we can do to speed up functions that take only one combination at a time. But if the function can be adapted to accept multiple combinations, where each parameter argument becomes an array instead of a single value, we can instruct @vbt.parameterized to merge all combinations into chunks and call the function on each chunk. This way, we can reduce the number of function calls significantly.
@vbt.parameterized(mono_n_chunks=?, mono_chunk_len=?, mono_chunk_meta=?) # (1)!
def my_pipeline(data, fast_windows, slow_windows): # (2)!
...
return output # (3)!
outputs = my_pipeline( # (4)!
data,
vbt.Param(fast_windows),
vbt.Param(slow_windows)
)
# ______________________________________________________________
@vbt.parameterized(mono_n_chunks="auto") # (5)!
...
# ______________________________________________________________
@vbt.parameterized(mono_chunk_len=100) # (6)!
...
- Instruct VBT to build chunks out of parameter combinations. You can use
mono_n_chunksto specify the target number of chunks, ormono_chunk_lento specify the max number of combinations in each chunk, ormono_chunk_metato specify the chunk metadata directly. - Function now must take multiple values
fast_windowsandslow_windowsinstead of single valuesfast_windowandslow_window. One set of values will contain combinations that belong to the same chunk. - Do some calculations on the received parameter combinations and return an output (which should contain an output for each parameter combination)
- Run the function the same way as before
- Build the same number of chunks as there are CPU cores
- Build chunks with at most 100 combinations each
By default, parameter values are passed as lists to the original function. To pass them as arrays or in any other format instead, set a merging function mono_merge_func for each parameter.
my_pipeline(
param_a=vbt.Param(param_a), # (1)!
param_b=vbt.Param(param_b, mono_reduce=True), # (2)!
param_c=vbt.Param(param_c, mono_merge_func="concat"), # (3)!
param_d=vbt.Param(param_d, mono_merge_func="row_stack"), # (4)!
param_e=vbt.Param(param_e, mono_merge_func="column_stack"), # (5)!
param_f=vbt.Param(param_f, mono_merge_func=vbt.MergeFunc(...)) # (6)!
)
- Will put chunk values into a list
- Same as above but will return a single value if all values in the chunk are the same
- Will concatenate values into a NumPy array or Pandas Series
- Will stack chunk values along rows into a NumPy array or Pandas Series/DataFrame
- Will stack chunk values along columns into a NumPy array or Pandas DataFrame
- Will merge chunk values using a custom merging function
Execution is done in the same way as in Parameterization and chunks can be easily parallelized, just keep an eye on RAM consumption since now multiple parameter combinations are executed at the same time.
Example
See an example in Mono-chunks.
Chunking¶
Chunking revolves around splitting a value (such as an array) of one or more arguments into many parts (or chunks), calling the function on each part, and then merging all parts together. This way, we can instruct VBT to process only a subset of data at a time, which is helpful in both reducing RAM consumption and increasing performance by utilizing parallelization. Chunking is also highly convenient: usually, you don't have to change your function in any way, and you'll get the same outputs regardless of whether chunking was enabled or disabled. To use chunking, create a pipeline function, decorate it with @vbt.chunked, and specify how exactly arguments should be chunked and outputs should be merged.
Example
See an example in Chunking.
Decoration¶
To make any function chunkable, we have to decorate (or wrap) it with @vbt.chunked. This will return a new function with the same name and arguments as the original one. The only difference: this new function will process passed arguments, chunk the arguments, call the original function on each chunk of the arguments, and merge the outputs of all chunks.
@vbt.chunked
def my_pipeline(data, fast_windows, slow_windows): # (1)!
...
return output # (2)!
outputs = my_pipeline( # (3)!
data,
vbt.Chunked(fast_windows), # (4)!
vbt.Chunked(slow_windows)
)
- Arguments can be anything. Here we're expecting a data instance, and already combined fast and slow windows, as in Hybrid (mono-chunks)
- Do some calculations on the received chunk of values and return an output, which can be anything
- Run the function the same way as without the decorator
- Wrap any chunkable argument with
vbt.Chunkedor other class
To keep the original function separate from the decorated one, we can decorate it after it has been defined and give the decorated function another name.
def my_pipeline(data, fast_windows, slow_windows):
...
return output
my_chunked_pipeline = vbt.chunked(my_pipeline)
outputs = my_chunked_pipeline(...)
Specification¶
To chunk an argument, we must provide a chunking specification for that argument. There are three main ways on how to provide such a specification.
Approach 1: Pass a dictionary arg_take_spec to the decorator. The most capable approach as it allows chunking of any nested objects of arbitrary depths, such as lists inside lists.
@vbt.chunked(
arg_take_spec=dict( # (1)!
array1=vbt.ChunkedArray(axis=1), # (2)!
array2=vbt.ChunkedArray(axis=1),
combine_func=vbt.NotChunked # (3)!
),
size=vbt.ArraySizer(arg_query="array1", axis=1), # (4)!
merge_func="column_stack" # (5)!
)
def combine_arrays(array1, array2, combine_func):
return combine_func(array1, array2)
new_array = combine_arrays(array1, array2, np.add)
- Dictionary where keys are argument names and values are chunking rules for those arguments
- Split arguments
array1andarray2along columns. They must be multidimensional NumPy or Pandas arrays. - Provide rules for all arguments. If any argument is missing in
arg_take_spec, a warning will be thrown. - Specify where the total size should be taken from. It's required to build chunks. This is mostly optional as newer versions of VBT can parse it automatically.
- Merging function must depend on the chunking arrays. Here, we should stack columns of output arrays back together.
Approach 2: Annotate the function. The most convenient approach as you can specify chunking rules next to their respective arguments directly in the function definition.
@vbt.chunked
def combine_arrays(
array1: vbt.ChunkedArray(axis=1) | vbt.ArraySizer(axis=1), # (1)!
array2: vbt.ChunkedArray(axis=1),
combine_func
) -> "column_stack":
return combine_func(array1, array2)
new_array = combine_arrays(array1, array2, np.add)
- Multiple VBT annotations can be combined with an
|operator. Also, it doesn't matter whether a chunking annotation is provided as a class or an instance. Providing the sizer is mostly optional as newer versions of VBT can parse it automatically.
Approach 3: Wrap argument values directly. Allows switching chunking rules on the fly.
@vbt.chunked
def combine_arrays(array1, array2, combine_func):
return combine_func(array1, array2)
new_array = combine_arrays( # (1)!
vbt.ChunkedArray(array1),
vbt.ChunkedArray(array2),
np.add,
_size=len(array1), # (2)!
_merge_func="concat"
)
new_array = combine_arrays( # (3)!
vbt.ChunkedArray(array1, axis=0),
vbt.ChunkedArray(array2, axis=0),
np.add,
_size=array1.shape[0],
_merge_func="row_stack"
)
new_array = combine_arrays( # (4)!
vbt.ChunkedArray(array1, axis=1),
vbt.ChunkedArray(array2, axis=1),
np.add,
_size=array1.shape[1],
_merge_func="column_stack"
)
- Split one-dimensional input arrays and concatenate output arrays back together
- Providing the total size is mostly optional as newer versions of VBT can parse it automatically
- Split two-dimensional input arrays along rows and stack rows of output arrays back together
- Split two-dimensional input arrays along columns and stack columns of output arrays back together
Merging and execution are done in the same way as in Parameterization.