65 KiB
65 KiB
Signal development¶
Generation¶
Comparison¶
In [ ]:
from vectorbtpro import * # whats_imported() vbt.settings.set_theme("dark")
In [ ]:
data = vbt.BinanceData.pull( ["BTCUSDT", "ETHUSDT"], start="2021-01-01", end="2022-01-01" ) print(data.get("Low"))
In [ ]:
bb = vbt.talib("BBANDS").run( data.get("Close"), timeperiod=vbt.Default(14), nbdevup=vbt.Default(2), nbdevdn=vbt.Default(2) ) print(bb.lowerband)
In [ ]:
mask = data.get("Low") < bb.lowerband print(mask)
In [ ]:
mask.sum()
In [ ]:
bb_mult = vbt.talib("BBANDS").run( data.get("Close"), timeperiod=vbt.Default(14), nbdevup=[2, 3], nbdevdn=[2, 3] ) # mask = data.get("Low") < bb_mult.lowerband
In [ ]:
mask = data.get("Low").vbt < bb_mult.lowerband print(mask)
In [ ]:
mask.sum()
In [ ]:
mask = bb_mult.lowerband_above(data.get("Low")) mask.sum()
Thresholds¶
In [ ]:
bandwidth = (bb.upperband - bb.lowerband) / bb.middleband mask = bandwidth.vbt > vbt.Param([0.15, 0.3], name="threshold") mask.sum()
In [ ]:
mask = bandwidth.vbt.combine( [0.15, 0.3], combine_func=np.greater, keys=pd.Index([0.15, 0.3], name="threshold") ) mask.sum()
In [ ]:
mask = pd.concat( (bandwidth > 0.15, bandwidth > 0.3), keys=pd.Index([0.15, 0.3], name="threshold"), axis=1 ) mask.sum()
Crossovers¶
In [ ]:
low_below_lband = data.get("Low") < bb.lowerband mask = low_below_lband.vbt.signals.first() mask.sum()
In [ ]:
btc_low = data.get("Low", "BTCUSDT").rename("Low") btc_lowerband = bb.lowerband["BTCUSDT"].rename("Lower Band") btc_mask = mask["BTCUSDT"].rename("Signals") fig = btc_low.vbt.plot() btc_lowerband.vbt.plot(fig=fig) btc_mask.vbt.signals.plot_as_markers( y=btc_low, trace_kwargs=dict( marker=dict( color="#DFFF00" ) ), fig=fig ).show_svg()
In [ ]:
mask = low_below_lband.vbt.signals.first(after_false=True) mask.sum()
In [ ]:
sample_low = pd.Series([10, 9, 8, 9, 8]) sample_lband = pd.Series([np.nan, np.nan, 9, 8, 9]) sample_mask = sample_low < sample_lband sample_mask.vbt.signals.first(after_false=True)
In [ ]:
sample_mask[sample_lband.ffill().isnull()] = True sample_mask.vbt.signals.first(after_false=True)
In [ ]:
buffer = sample_lband.ffill().isnull().sum(axis=0).max() buffer
In [ ]:
sample_buf_mask = sample_low.iloc[buffer:] < sample_lband.iloc[buffer:] sample_buf_mask = sample_buf_mask.vbt.signals.first(after_false=True) sample_mask = sample_low.vbt.wrapper.fill(False) sample_mask.loc[sample_buf_mask.index] = sample_buf_mask sample_mask
In [ ]:
mask = data.get("Low").vbt.crossed_below(bb.lowerband, wait=1) mask.sum()
In [ ]:
mask = bb.lowerband_crossed_above(data.get("Low"), wait=1) mask.sum()
Logical operators¶
In [ ]:
cond1 = data.get("Low") < bb.lowerband cond2 = bandwidth > 0.3 cond3 = data.get("High") > bb.upperband cond4 = bandwidth < 0.15 mask = (cond1 & cond2) | (cond3 & cond4) mask.sum()
In [ ]:
cond1 = data.get("Low").vbt < bb.lowerband cond2 = bandwidth.vbt > vbt.Param([0.3, 0.3, 0.4, 0.4], name="cond2_th") cond3 = data.get("High").vbt > bb.upperband cond4 = bandwidth.vbt < vbt.Param([0.1, 0.2, 0.1, 0.2], name="cond4_th") mask = (cond1.vbt & cond2).vbt | (cond3.vbt & cond4) mask.sum()
In [ ]:
cond1 = data.get("Low").vbt < bb.lowerband cond2 = bandwidth.vbt > vbt.Param([0.3, 0.4], name="cond2_th") cond3 = data.get("High").vbt > bb.upperband cond4 = bandwidth.vbt < vbt.Param([0.1, 0.2], name="cond4_th")
In [ ]:
i1 = np.split(np.arange(len(cond1.columns)), len(cond1.columns) // 2) i2 = np.split(np.arange(len(cond2.columns)), len(cond2.columns) // 2) i3 = np.split(np.arange(len(cond3.columns)), len(cond3.columns) // 2) i4 = np.split(np.arange(len(cond4.columns)), len(cond4.columns) // 2)
In [ ]:
print(i1) print(i2) print(i3) print(i4)
In [ ]:
i1, i2, i3, i4 = zip(*product(i1, i2, i3, i4))
In [ ]:
print(i1) print(i2) print(i3) print(i4)
In [ ]:
i1 = np.asarray(i1).flatten() i2 = np.asarray(i2).flatten() i3 = np.asarray(i3).flatten() i4 = np.asarray(i4).flatten()
In [ ]:
print(i1) print(i2) print(i3) print(i4)
In [ ]:
cond1 = cond1.iloc[:, i1] cond2 = cond2.iloc[:, i2] cond3 = cond3.iloc[:, i3] cond4 = cond4.iloc[:, i4]
In [ ]:
mask = (cond1.vbt & cond2).vbt | (cond3.vbt & cond4) mask.sum()
In [ ]:
MaskGenerator = vbt.IF.from_expr(""" upperband, middleband, lowerband = @res_talib_bbands bandwidth = (upperband - lowerband) / middleband cond1 = low < lowerband cond2 = bandwidth > @p_cond2_th cond3 = high > upperband cond4 = bandwidth < @p_cond4_th @out_mask:(cond1 & cond2) | (cond3 & cond4) """) print(vbt.format_func(MaskGenerator.run, incl_doc=False))
In [ ]:
mask_generator = MaskGenerator.run( high=data.get("High"), low=data.get("Low"), close=data.get("Close"), cond2_th=[0.3, 0.4], cond4_th=[0.1, 0.2], bbands_timeperiod=vbt.Default(14), param_product=True ) mask_generator.mask.sum()
Shifting¶
In [ ]:
cond1 = data.get("Low") < bb.lowerband cond2 = bandwidth > bandwidth.shift(1) mask = cond1 & cond2 mask.sum()
In [ ]:
cond2 = bandwidth > bandwidth.rolling("7d").apply(lambda x: x[0]) mask = cond1 & cond2 mask.sum()
In [ ]:
def exactly_ago(sr): if sr.index[0] == sr.index[-1] - vbt.timedelta("7d"): return sr.iloc[0] return np.nan cond_7d_ago = bandwidth.rolling("8d").apply(exactly_ago, raw=False) cond2 = bandwidth > cond_7d_ago mask = cond1 & cond2 mask.sum()
In [ ]:
@njit def exactly_ago_meta_nb(from_i, to_i, col, index, freq, arr): if index[from_i] == index[to_i - 1] - freq: return arr[from_i, col] return np.nan cond_7d_ago = vbt.pd_acc.rolling_apply( "8d", exactly_ago_meta_nb, bandwidth.index.values, vbt.timedelta("7d").to_timedelta64(), vbt.to_2d_array(bandwidth), wrapper=bandwidth.vbt.wrapper ) cond2 = bandwidth > cond_7d_ago mask = cond1 & cond2 mask.sum()
In [ ]:
cond2 = bandwidth > bandwidth.vbt.ago("7d") mask = cond1 & cond2 mask.sum()
In [ ]:
bandwidth.iloc[-8]
In [ ]:
bandwidth.vbt.ago("7d").iloc[-1]
Truth value testing¶
In [ ]:
cond2 = data.get("Close").vbt.crossed_below(bb.middleband) cond2 = cond2.rolling(5, min_periods=1).max().astype(bool) mask = cond1 & cond2 mask.sum()
In [ ]:
cond2 = data.get("Close").vbt.crossed_below(bb.middleband) cond2 = cond2.vbt.rolling_any(5) mask = cond1 & cond2 mask.sum()
In [ ]:
cond2 = data.get("Close").vbt.crossed_below(bb.middleband) cond2 = cond2.vbt.rolling_apply( "W", "any", minp=1, wrap_kwargs=dict(fillna=0, dtype=bool) ) mask = cond1 & cond2 mask.sum()
In [ ]:
anchor_points = data.wrapper.get_index_points( every="M", start=0, exact_start=True ) anchor_points
In [ ]:
left_bound = np.full(len(data.wrapper.index), np.nan) left_bound[anchor_points] = anchor_points left_bound = vbt.dt.to_ns(vbt.nb.ffill_1d_nb(left_bound)) left_bound = bandwidth.index[left_bound] left_bound
In [ ]:
right_bound = data.wrapper.index right_bound
In [ ]:
mask = (bandwidth <= 0.1).vbt.resample_between_bounds( left_bound, right_bound, "any", closed_lbound=True, closed_rbound=True, wrap_with_lbound=False, wrap_kwargs=dict(fillna=0, dtype=bool) ) mask.index = right_bound mask.astype(int).vbt.ts_heatmap().show_svg()
Periodically¶
In [ ]:
min_data = vbt.BinanceData.pull( ["BTCUSDT", "ETHUSDT"], start="2021-01-01 UTC", end="2021-02-01 UTC", timeframe="1h" ) index = min_data.wrapper.index tuesday_index = index[index.weekday == 1] tuesday_index
In [ ]:
tuesday_1800_index = tuesday_index[tuesday_index.hour == 18] tuesday_1800_index
In [ ]:
tuesday_1730_index = index[ (index.weekday == 1) & (index.hour == 17) & (index.minute == 30) ] tuesday_1730_index
In [ ]:
index.get_indexer([vbt.timestamp("2021-01-07", tz=index.tz)])
In [ ]:
index.get_indexer([vbt.timestamp("2021-01-07 17:30:00", tz=index.tz)])
In [ ]:
index[index.get_indexer( [vbt.timestamp("2021-01-07 17:30:00", tz=index.tz)], method="ffill" )]
In [ ]:
index[index.get_indexer( [vbt.timestamp("2021-01-07 17:30:00", tz=index.tz)], method="bfill" )]
In [ ]:
each_tuesday = vbt.date_range(index[0], index[-1], freq="tuesday") each_tuesday_1730 = each_tuesday + pd.Timedelta(hours=17, minutes=30) each_tuesday_1730
In [ ]:
positions = index.get_indexer(each_tuesday_1730, method="bfill") min_symbol_wrapper = min_data.get_symbol_wrapper() mask = min_symbol_wrapper.fill(False) mask.iloc[positions] = True mask.sum()
In [ ]:
mask[mask.any(axis=1)].index.strftime("%A %T")
In [ ]:
tuesday_after_1700 = (index.weekday == 1) & (index.hour >= 17) wednesday_before_1700 = (index.weekday == 2) & (index.hour < 17) main_cond = tuesday_after_1700 | wednesday_before_1700 mask = min_symbol_wrapper.fill(False) mask[main_cond] = True mask = mask.vbt.signals.first() mask[mask.any(axis=1)].index.strftime("%A %T")
In [ ]:
mask = min_symbol_wrapper.fill(False) mask.vbt.set( True, every="tuesday", at_time="17:30", inplace=True ) mask[mask.any(axis=1)].index.strftime("%A %T")
In [ ]:
mask = min_symbol_wrapper.fill(False) mask.vbt.set( True, every="tuesday", at_time="18:00", add_delta=pd.Timedelta(nanoseconds=1), inplace=True ) mask[mask.any(axis=1)].index.strftime("%A %T")
In [ ]:
mask = min_symbol_wrapper.fill(False) mask.vbt.set_between( True, every="monday", start_time="12:00", end_time="17:00", add_end_delta=pd.Timedelta(days=1), inplace=True ) mask[mask.any(axis=1)].index.strftime("%A %T")
In [ ]:
mask = min_symbol_wrapper.fill(False) mask.vbt.set( True, on="January 7th 2021 UTC", indexer_method=None, inplace=True ) mask[mask.any(axis=1)].index
In [ ]:
mask = min_symbol_wrapper.fill(False) mask.vbt.set_between( True, start=["2021-01-01 12:00:00", "2021-01-07 12:00:00"], end=["2021-01-02 12:00:00", "2021-01-08 12:00:00"], inplace=True ) mask[mask.any(axis=1)].index
In [ ]:
mask = min_symbol_wrapper.fill(False) mask.vbt.set_between( True, every="monday", split_every=False, add_end_delta="2h", inplace=True ) mask[mask.any(axis=1)].index
Iteratively¶
In [ ]:
@njit def generate_mask_1d_nb( high, low, uband, mband, lband, cond2_th, cond4_th ): out = np.full(high.shape, False) for i in range(high.shape[0]): bandwidth = (uband[i] - lband[i]) / mband[i] cond1 = low[i] < lband[i] cond2 = bandwidth > cond2_th cond3 = high[i] > uband[i] cond4 = bandwidth < cond4_th signal = (cond1 and cond2) or (cond3 and cond4) out[i] = signal return out mask = generate_mask_1d_nb( data.get("High")["BTCUSDT"].values, data.get("Low")["BTCUSDT"].values, bb.upperband["BTCUSDT"].values, bb.middleband["BTCUSDT"].values, bb.lowerband["BTCUSDT"].values, 0.30, 0.15 ) symbol_wrapper = data.get_symbol_wrapper() mask = symbol_wrapper["BTCUSDT"].wrap(mask) mask.sum()
In [ ]:
@njit def generate_mask_nb( high, low, uband, mband, lband, cond2_th, cond4_th ): out = np.empty(high.shape, dtype=np.bool_) for col in range(high.shape[1]): out[:, col] = generate_mask_1d_nb( high[:, col], low[:, col], uband[:, col], mband[:, col], lband[:, col], cond2_th, cond4_th ) return out mask = generate_mask_nb( vbt.to_2d_array(data.get("High")), vbt.to_2d_array(data.get("Low")), vbt.to_2d_array(bb.upperband), vbt.to_2d_array(bb.middleband), vbt.to_2d_array(bb.lowerband), 0.30, 0.15 ) mask = symbol_wrapper.wrap(mask) mask.sum()
In [ ]:
MaskGenerator = vbt.IF( input_names=["high", "low", "uband", "mband", "lband"], param_names=["cond2_th", "cond4_th"], output_names=["mask"] ).with_apply_func(generate_mask_1d_nb, takes_1d=True) mask_generator = MaskGenerator.run( data.get("High"), data.get("Low"), bb.upperband, bb.middleband, bb.lowerband, [0.3, 0.4], [0.1, 0.2], param_product=True ) mask_generator.mask.sum()
In [ ]:
@njit def value_ago_1d_nb(arr, ago): out = np.empty(arr.shape, dtype=np.float_) for i in range(out.shape[0]): if i - ago >= 0: out[i] = arr[i - ago] else: out[i] = np.nan return out arr = np.array([1, 2, 3]) value_ago_1d_nb(arr, 1)
In [ ]:
@njit def any_in_window_1d_nb(arr, window): out = np.empty(arr.shape, dtype=np.bool_) for i in range(out.shape[0]): from_i = max(0, i + 1 - window) to_i = i + 1 out[i] = np.any(arr[from_i:to_i]) return out arr = np.array([False, True, True, False, False]) any_in_window_1d_nb(arr, 2)
In [ ]:
@njit def any_in_var_window_1d_nb(arr, index, freq): out = np.empty(arr.shape, dtype=np.bool_) from_i = 0 for i in range(out.shape[0]): if index[from_i] <= index[i] - freq: for j in range(from_i + 1, index.shape[0]): if index[j] > index[i] - freq: from_i = j break to_i = i + 1 out[i] = np.any(arr[from_i:to_i]) return out arr = np.array([False, True, True, False, False]) index = vbt.date_range("2020", freq="5min", periods=len(arr)).values freq = vbt.timedelta("10min").to_timedelta64() any_in_var_window_1d_nb(arr, index, freq)
In [ ]:
any_in_var_window_1d_nb(arr, vbt.dt.to_ns(index), vbt.dt.to_ns(freq))
In [ ]:
vbt.dt.to_ns(index)
In [ ]:
vbt.dt.to_ns(index - np.datetime64(0, "ns"))
In [ ]:
vbt.dt.to_ns(freq)
In [ ]:
vbt.dt.to_ns(freq) / 1000 / 1000 / 1000 / 60
Generators¶
In [ ]:
@njit def place_func_nb(c, index): for out_i in range(len(c.out)): i = c.from_i + out_i weekday = vbt.dt_nb.weekday_nb(index[i]) hour = vbt.dt_nb.hour_nb(index[i]) if weekday == 2 and hour == 17: c.out[out_i] = True return out_i return -1 mask = vbt.pd_acc.signals.generate( symbol_wrapper.shape, place_func_nb, vbt.dt.to_ns(symbol_wrapper.index), wrapper=symbol_wrapper ) mask.sum()
In [ ]:
@njit def place_func_nb(c, index): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i weekday = vbt.dt_nb.weekday_nb(index[i]) hour = vbt.dt_nb.hour_nb(index[i]) if weekday == 2 and hour == 17: c.out[out_i] = True last_i = out_i else: past_target_midnight = vbt.dt_nb.past_weekday_nb(index[i], 2) past_target = past_target_midnight + 17 * vbt.dt_nb.h_ns if (i > 0 and index[i - 1] < past_target) and \ index[i] > past_target: c.out[out_i] = True last_i = out_i return last_i mask = vbt.pd_acc.signals.generate( symbol_wrapper.shape, place_func_nb, vbt.dt.to_ns(symbol_wrapper.index), wrapper=symbol_wrapper ) mask.sum()
In [ ]:
mask.index[mask.any(axis=1)].strftime('%A %m/%d/%Y')
In [ ]:
@njit def place_func_nb(c, weekday, hour, index): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i weekday_now = vbt.dt_nb.weekday_nb(index[i]) hour_now = vbt.dt_nb.hour_nb(index[i]) if weekday_now == weekday and hour_now == hour: c.out[out_i] = True last_i = out_i return last_i EntryGenerator = vbt.SignalFactory( mode="entries", param_names=["weekday", "hour"] ).with_place_func( entry_place_func_nb=place_func_nb, entry_settings=dict( pass_params=["weekday", "hour"], ), var_args=True ) entry_generator = EntryGenerator.run( symbol_wrapper.shape, 2, [0, 17], vbt.dt.to_ns(symbol_wrapper.index), input_index=symbol_wrapper.index, input_columns=symbol_wrapper.columns ) entry_generator.entries.sum()
In [ ]:
entry_generator.plot(column=(2, 0, "BTCUSDT")).show_svg()
Exits¶
In [ ]:
@njit def exit_place_func_nb(c): c.out[0] = True return 0 entries = symbol_wrapper.fill(False) entries.vbt.set(True, every="Q", inplace=True) entries.index[entries.any(axis=1)]
In [ ]:
exits = entries.vbt.signals.generate_exits(exit_place_func_nb) exits.index[exits.any(axis=1)]
In [ ]:
exits = entries.vbt.signals.generate_exits( exit_place_func_nb, wait=0 ) exits.index[exits.any(axis=1)]
In [ ]:
@njit def exit_place_func_nb(c, index, wait_td): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i if index[i] >= index[c.from_i] + wait_td: c.out[out_i] = True last_i = out_i break return last_i exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0 ) exits.index[exits.any(axis=1)]
In [ ]:
entries = symbol_wrapper.fill(False) entries.vbt.set(True, every="5d", inplace=True) exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0 ) exits.index[exits.any(axis=1)]
In [ ]:
exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0, until_next=False ) exits.index[exits.any(axis=1)]
In [ ]:
exits = entries.vbt.signals.generate_exits( exit_place_func_nb, vbt.dt.to_ns(entries.index), vbt.dt.to_ns(vbt.timedelta("7d")), wait=0, until_next=False, skip_until_exit=True ) exits.index[exits.any(axis=1)]
In [ ]:
@njit def exit_place_func_nb(c, wait_td, index): last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i if index[i] >= index[c.from_i] + wait_td: c.out[out_i] = True last_i = out_i break return last_i ExitGenerator = vbt.SignalFactory( mode="exits", param_names=["wait_td"] ).with_place_func( exit_place_func_nb=exit_place_func_nb, exit_settings=dict( pass_params=["wait_td"], ), var_args=True, wait=0, until_next=False, skip_until_exit=True, param_settings=dict( wait_td=dict( post_index_func=lambda x: x.map(lambda y: str(vbt.timedelta(y))) ) ), ) exit_generator = ExitGenerator.run( entries, [ vbt.timedelta("3d").to_timedelta64(), vbt.timedelta("7d").to_timedelta64() ], symbol_wrapper.index.values ) exit_generator.exits.sum()
In [ ]:
new_entries = exit_generator.entries.vbt.signals.first( reset_by=exit_generator.exits, allow_gaps=True, ) new_entries.index[new_entries[("7 days 00:00:00", "BTCUSDT")]]
Both¶
In [ ]:
@njit def entry_place_func_nb(c, low, close, th): if c.from_i == 0: c.out[0] = True return 0 exit_price = close[c.from_i - c.wait, c.col] hit_price = exit_price * (1 - th) last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i if low[i, c.col] <= hit_price: c.out[out_i] = True last_i = out_i break return last_i @njit def exit_place_func_nb(c, high, close, th): entry_price = close[c.from_i - c.wait, c.col] hit_price = entry_price * (1 + th) last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i if high[i, c.col] >= hit_price: c.out[out_i] = True last_i = out_i break return last_i entries, exits = vbt.pd_acc.signals.generate_both( symbol_wrapper.shape, entry_place_func_nb=entry_place_func_nb, entry_place_args=(vbt.Rep("low"), vbt.Rep("close"), 0.1), exit_place_func_nb=exit_place_func_nb, exit_place_args=(vbt.Rep("high"), vbt.Rep("close"), 0.2), wrapper=symbol_wrapper, broadcast_named_args=dict( high=data.get("High"), low=data.get("Low"), close=data.get("Close") ), broadcast_kwargs=dict( post_func=vbt.to_2d_array ) ) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) entries["BTCUSDT"].vbt.signals.plot_as_entries( y=data.get("Close", "BTCUSDT"), fig=fig) exits["BTCUSDT"].vbt.signals.plot_as_exits( y=data.get("Close", "BTCUSDT"), fig=fig) fig.show_svg()
In [ ]:
BothGenerator = vbt.SignalFactory( mode="both", input_names=["high", "low", "close"], param_names=["entry_th", "exit_th"] ).with_place_func( entry_place_func_nb=entry_place_func_nb, entry_settings=dict( pass_inputs=["low", "close"], pass_params=["entry_th"], ), exit_place_func_nb=exit_place_func_nb, exit_settings=dict( pass_inputs=["high", "close"], pass_params=["exit_th"], ) ) both_generator = BothGenerator.run( data.get("High"), data.get("Low"), data.get("Close"), [0.1, 0.2], [0.2, 0.3], param_product=True ) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) both_generator.plot( column=(0.1, 0.3, "BTCUSDT"), entry_y=data.get("Close", "BTCUSDT"), exit_y=data.get("Close", "BTCUSDT"), fig=fig ).show_svg()
Chained exits¶
In [ ]:
@njit def exit_place_func_nb(c, low, request_price, fill_price_out): _request_price = request_price[c.from_i - c.wait, c.col] last_i = -1 for out_i in range(len(c.out)): i = c.from_i + out_i if low[i, c.col] <= _request_price: fill_price_out[i, c.col] = _request_price c.out[out_i] = True last_i = out_i break return last_i ChainGenerator = vbt.SignalFactory( mode="chain", input_names=["low", "request_price"], in_output_names=["fill_price_out"] ).with_place_func( exit_place_func_nb=exit_place_func_nb, exit_settings=dict( pass_inputs=["low", "request_price"], pass_in_outputs=["fill_price_out"], ), fill_price_out=np.nan )
In [ ]:
fast_ma = vbt.talib("SMA").run( data.get("Close"), vbt.Default(10), short_name="fast_ma" ) slow_ma = vbt.talib("SMA").run( data.get("Close"), vbt.Default(20), short_name="slow_ma" ) entries = fast_ma.real_crossed_above(slow_ma) entries.sum()
In [ ]:
chain_generator = ChainGenerator.run( entries, data.get("Low"), data.get("Close") * (1 - 0.1) ) request_mask = chain_generator.new_entries request_mask.sum()
In [ ]:
request_price = chain_generator.request_price print(request_price[request_mask.any(axis=1)])
In [ ]:
fill_mask = chain_generator.exits fill_mask.sum()
In [ ]:
fill_price = chain_generator.fill_price_out print(fill_price[fill_mask.any(axis=1)])
Preset generators¶
Random¶
In [ ]:
btcusdt_wrapper = symbol_wrapper["BTCUSDT"] mask = vbt.pd_acc.signals.generate_random( btcusdt_wrapper.shape, prob=1 / 10, wrapper=btcusdt_wrapper, seed=42 ) mask_index = mask.index[mask] (mask_index[1:] - mask_index[:-1]).mean()
In [ ]:
monday_mask = btcusdt_wrapper.fill(False) monday_mask.vbt.set(True, every="monday", inplace=True) mask = monday_mask.vbt.signals.generate_random_exits(wait=0) mask_index = mask.index[mask] mask_index.strftime("%W %A")
In [ ]:
prob = np.linspace(0, 1, len(symbol_wrapper.index)) rprob = vbt.RPROB.run( symbol_wrapper.shape, vbt.Default(vbt.to_2d_pr_array(prob)), seed=42, input_index=symbol_wrapper.index, input_columns=symbol_wrapper.columns ) rprob.entries.astype(int).vbt.ts_heatmap().show_svg()
In [ ]:
rprob = vbt.RPROB.run( symbol_wrapper.shape, [0.5, vbt.to_2d_pr_array(prob)], seed=42, input_index=symbol_wrapper.index, input_columns=symbol_wrapper.columns ) rprob.entries.sum()
Stops¶
In [ ]:
new_entries, exits = entries.vbt.signals.generate_stop_exits( data.get("Close"), data.get("High"), stop=0.1, chain=True ) print(new_entries[new_entries.any(axis=1)])
In [ ]:
print(exits[exits.any(axis=1)])
In [ ]:
out_dict = {} new_entries, exits = entries.vbt.signals.generate_stop_exits( data.get("Close"), data.get("High"), stop=0.1, chain=True, out_dict=out_dict ) print(out_dict["stop_ts"][exits.any(axis=1)])
In [ ]:
stcx = vbt.STCX.run( entries, data.get("Open"), ts=data.get("Low"), follow_ts=data.get("High"), stop=-0.1, trailing=[False, True], wait=0 ) fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) stcx.plot( column=(-0.1, True, "BTCUSDT"), entry_y="entry_ts", exit_y="stop_ts", fig=fig ).show_svg()
In [ ]:
ohlcstcx = vbt.OHLCSTCX.run( entries, data.get("Close"), data.get("Open"), data.get("High"), data.get("Low"), data.get("Close"), sl_stop=vbt.Default(0.1), tsl_stop=vbt.Default(0.15), is_entry_open=False ) ohlcstcx.plot(column=("BTCUSDT")).show_svg()
In [ ]:
print(ohlcstcx.stop_type_readable[ohlcstcx.exits.any(axis=1)])
In [ ]:
ohlcstcx = vbt.OHLCSTCX.run( entries, data.get("Close"), sl_stop=vbt.Default(0.1), tsl_stop=vbt.Default(0.15), is_entry_open=False ) ohlcstcx.plot(column=("BTCUSDT")).show_svg()
In [ ]:
entry_pos_rank = entries.vbt.signals.pos_rank(allow_gaps=True) short_entries = (entry_pos_rank >= 0) & (entry_pos_rank % 2 == 1) ohlcstcx = vbt.OHLCSTCX.run( entries, data.get("Close"), data.get("Open"), data.get("High"), data.get("Low"), data.get("Close"), tsl_th=vbt.Default(0.2), tsl_stop=vbt.Default(0.1), reverse=vbt.Default(short_entries), is_entry_open=False ) ohlcstcx.plot(column=("BTCUSDT")).show_svg()
In [ ]:
long_entries = ohlcstcx.new_entries.vbt & (~short_entries) long_exits = ohlcstcx.exits.vbt.signals.first_after(long_entries) short_entries = ohlcstcx.new_entries.vbt & short_entries short_exits = ohlcstcx.exits.vbt.signals.first_after(short_entries)
In [ ]:
fig = data.plot( symbol="BTCUSDT", ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) long_entries["BTCUSDT"].vbt.signals.plot_as_entries( ohlcstcx.entry_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="limegreen"), name="Long entries"), fig=fig ) long_exits["BTCUSDT"].vbt.signals.plot_as_exits( ohlcstcx.stop_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="orange"), name="Long exits"), fig=fig ) short_entries["BTCUSDT"].vbt.signals.plot_as_entries( ohlcstcx.entry_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="magenta"), name="Short entries"), fig=fig ) short_exits["BTCUSDT"].vbt.signals.plot_as_exits( ohlcstcx.stop_price["BTCUSDT"], trace_kwargs=dict(marker=dict(color="red"), name="Short exits"), fig=fig ).show_svg()
Pre-analysis¶
Ranking¶
In [ ]:
@njit def rank_func_nb(c): if c.sig_in_part_cnt == 1: return 1 return 0 sample_mask = pd.Series([True, True, False, True, True]) ranked = sample_mask.vbt.signals.rank(rank_func_nb) ranked
In [ ]:
ranked == 1
In [ ]:
ranked = sample_mask.vbt.signals.rank(rank_func_nb, after_false=True) ranked == 1
In [ ]:
sample_entries = pd.Series([True, True, True, True, True]) sample_exits = pd.Series([False, False, True, False, False]) ranked = sample_entries.vbt.signals.rank( rank_func_nb, reset_by=sample_exits ) ranked == 1
In [ ]:
ranked = sample_entries.vbt.signals.rank( rank_func_nb, reset_by=sample_exits, after_reset=True ) ranked == 1
Preset rankers¶
In [ ]:
sample_mask = pd.Series([True, True, False, True, True]) ranked = sample_mask.vbt.signals.pos_rank() ranked
In [ ]:
ranked == 1
In [ ]:
ranked = sample_mask.vbt.signals.pos_rank(allow_gaps=True) ranked
In [ ]:
(ranked > -1) & (ranked % 2 == 1)
In [ ]:
ranked = sample_mask.vbt.signals.partition_pos_rank(allow_gaps=True) ranked
In [ ]:
ranked == 1
In [ ]:
entry_cond1 = data.get("Low") < bb.lowerband entry_cond2 = bandwidth > 0.3 entry_cond3 = data.get("High") > bb.upperband entry_cond4 = bandwidth < 0.15 entries = (entry_cond1 & entry_cond2) | (entry_cond3 & entry_cond4) entries.vbt.signals.from_nth(0).sum()
In [ ]:
entries.vbt.signals.from_nth(1).sum()
In [ ]:
entries.vbt.signals.from_nth(2).sum()
In [ ]:
exit_cond1 = data.get("High") > bb.upperband exit_cond2 = bandwidth > 0.3 exit_cond3 = data.get("Low") < bb.lowerband exit_cond4 = bandwidth < 0.15 exits = (exit_cond1 & exit_cond2) | (exit_cond3 & exit_cond4)
In [ ]:
exits.vbt.signals.pos_rank_after(entries, reset_wait=0).max() + 1
In [ ]:
entries.vbt.signals.pos_rank_after(exits).max() + 1
In [ ]:
ranked = exits.vbt.signals.pos_rank_after(entries, reset_wait=0) highest_ranked = ranked == ranked.max() print(ranked[highest_ranked.any(axis=1)])
In [ ]:
exits_after = exits.vbt.signals.from_nth_after(0, entries, reset_wait=0) (exits ^ exits_after).sum()
Mapped ranks¶
In [ ]:
mask = bandwidth.vbt > vbt.Param(np.arange(1, 10) / 10, name="bw_th") mapped_ranks = mask.vbt.signals.pos_rank(as_mapped=True) mapped_ranks.max(group_by=vbt.ExceptLevel("symbol"))
Cleaning¶
In [ ]:
new_exits = exits.vbt.signals.first_after(entries, reset_wait=0) new_entries = entries.vbt.signals.first_after(exits)
In [ ]:
symbol = "ETHUSDT" fig = data.plot( symbol=symbol, ohlc_trace_kwargs=dict(opacity=0.5), plot_volume=False ) entries[symbol].vbt.signals.plot_as_entries( y=data.get("Close", symbol), fig=fig) exits[symbol].vbt.signals.plot_as_exits( y=data.get("Close", symbol), fig=fig) new_entries[symbol].vbt.signals.plot_as_entry_marks( y=data.get("Close", symbol), fig=fig, trace_kwargs=dict(name="New entries")) new_exits[symbol].vbt.signals.plot_as_exit_marks( y=data.get("Close", symbol), fig=fig, trace_kwargs=dict(name="New exits")).show_svg()
In [ ]:
new_entries, new_exits = entries.vbt.signals.clean(exits)
Duration¶
In [ ]:
ranges = entries.vbt.signals.between_ranges() print(ranges.records)
In [ ]:
ranges.start_idx.min(wrap_kwargs=dict(to_index=True))
In [ ]:
print(ranges.duration.describe(wrap_kwargs=dict(to_timedelta=True)))
In [ ]:
ranges = entries.vbt.signals.between_ranges(target=exits) ranges.avg_duration
In [ ]:
new_ranges = new_entries.vbt.signals.between_ranges(target=new_exits) new_ranges.avg_duration
In [ ]:
ranges = entries.vbt.signals.between_ranges(target=exits, relation="manyone") ranges.avg_duration
In [ ]:
new_ranges = new_entries.vbt.signals.between_ranges(target=new_exits, relation="manyone") new_ranges.avg_duration
In [ ]:
ranges = entries.vbt.signals.partition_ranges() print(ranges.duration.describe())
In [ ]:
new_ranges = new_entries.vbt.signals.partition_ranges() print(new_ranges.duration.describe())
In [ ]:
ranges = entries.vbt.signals.between_partition_ranges() print(ranges.duration.describe(wrap_kwargs=dict(to_timedelta=True)))
Overview¶
In [ ]:
entries.vbt.signals.stats(column="BTCUSDT")
In [ ]:
entries.vbt.signals.stats(column="BTCUSDT", settings=dict(target=exits))
In [ ]: