multioutput indicators #15 + talib custom indicator support

This commit is contained in:
David Brazda
2024-01-16 15:17:14 +01:00
parent 5d47a7ac58
commit c1145fec5b
28 changed files with 598 additions and 141 deletions

View File

@ -10,7 +10,7 @@ import math
from collections import defaultdict
#indicator allowing to be based on any bar parameter (index, high,open,close,trades,volume, etc.)
def barparams(state, params, name):
def barparams(state, params, name, returns):
funcName = "barparams"
if params is None:
return -2, "params required"

View File

@ -9,22 +9,25 @@ from collections import defaultdict
from scipy.stats import linregress
from scipy.fft import fft
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
import pywt
#vstupem je bud indicator nebo bar parametr
#na tomto vstupu dokaze provest zakladni statisticke funkce pro subpole X hodnot zpatky
#podporovane functions: min, max, mean
def basestats(state, params, name):
def basestats(state, params, name, returns):
funcName = "basestats"
#name of indicator or
source = safe_get(params, "source", None)
lookback = safe_get(params, "lookback", None)
func = safe_get(params, "function", None)
returns = safe_get(params, "returns", None)
source_dict = defaultdict(list)
source_dict[source] = get_source_series(state, source)
self = state.indicators[name]
try:
self = state.indicators[name]
except KeyError:
self = state.cbar_indicators[name]
if lookback is None:
source_array = source_dict[source]
@ -69,7 +72,23 @@ def basestats(state, params, name):
#val = 2 * (angle_deg / 180) - 1
elif func =="stdev":
val = np.std(source_array)
#linregres slope
#linregress mutlioutput
# slope : float, Slope of the regression line.
# intercept : float, Intercept of the regression line.
# rvalue : float, The Pearson correlation coefficient. The square of rvalue is equal to the coefficient of determination.
# pvalue : float, The p-value for a hypothesis test whose null hypothesis is that the slope is zero, using Wald Test with t-distribution of the test statistic. See alternative above for alternative hypotheses.
# stderr : float
elif func == "linregress":
if len(source_array) < 4:
return -2, "less than 4 elmnts"
try:
val = []
np.seterr(all="raise")
slope, intercept, rvalue, pvalue, stderr = linregress(np.arange(len(source_array)), source_array)
val = [slope*1000, intercept,rvalue, pvalue, stderr]
except FloatingPointError:
return -2, "FloatingPointError"
#linregres slope DECOMM
elif func == "slope":
if len(source_array) < 4:
return -2, "less than 4 elmnts"
@ -79,17 +98,82 @@ def basestats(state, params, name):
val = val*1000
except FloatingPointError:
return -2, "FloatingPointError"
#zatim takto, dokud nebudou podporovany indikatory s vice vystupnimi
#zatim takto, dokud nebudou podporovany indikatory s vice vystupnimi - DECOMM
elif func == "intercept":
if len(source_array) < 4:
return -2, "less than 4 elmnts"
try:
np.seterr(all="raise")
_, val, _, _, _ = linregress(np.arange(len(source_array)), source_array)
val = round(val, 4)
except FloatingPointError:
return -2, "FloatingPointError"
#work with different wavelet names and change max_scale
#https://chat.openai.com/c/44b917d7-43df-4d80-be2f-01a5ee92158b
elif func == "wavelet":
def extract_wavelet_features(time_series, wavelet_name='morl', max_scale=64):
scales = np.arange(1, max_scale + 1)
coefficients, frequencies = pywt.cwt(time_series, scales, wavelet_name)
# Extract features - for instance, mean and variance of coefficients at each scale
mean_coeffs = np.mean(coefficients, axis=1)[-1] # Last value of mean coefficients
var_coeffs = np.var(coefficients, axis=1)[-1] # Last value of variance of coefficients
# Energy distribution for the latest segment
energy = np.sum(coefficients**2, axis=1)[-1]
# Entropy for the latest segment
entropy = -np.sum((coefficients**2) * np.log(coefficients**2), axis=1)[-1]
# Dominant and mean frequency for the latest segment
dominant_frequency = frequencies[np.argmax(energy)]
mean_frequency = 0 # np.average(frequencies, weights=energy)
return [energy, entropy, dominant_frequency, mean_frequency,mean_coeffs, var_coeffs]
time_series = np.array(source_array)
wavelet_name = "morl"
max_scale = 64
features = extract_wavelet_features(time_series)
return 0, features
#better fourier for frequency bins as suggested here https://chat.openai.com/c/44b917d7-43df-4d80-be2f-01a5ee92158b
elif func == "fourier":
def compute_fft_features(time_series, num_bins):
n = len(time_series)
yf = fft(time_series)
# Frequency values for FFT output
xf = np.linspace(0.0, 1.0/(2.0), n//2)
# Compute power spectrum
power_spectrum = np.abs(yf[:n//2])**2
# Define frequency bins
max_freq = 1.0 / 2.0
bin_edges = np.linspace(0, max_freq, num_bins + 1)
# Initialize feature array
features = np.zeros(num_bins)
# Compute power in each bin
for i in range(num_bins):
# Find indices of frequencies in this bin
indices = np.where((xf >= bin_edges[i]) & (xf < bin_edges[i+1]))[0]
features[i] = np.sum(power_spectrum[indices])
return features
# Example usage
time_series = np.array(source_array) # Replace with your data
num_bins = 20 # Example: 10 frequency bins
features = compute_fft_features(time_series, num_bins)
return 0, features.tolist()
#returns X frequencies
elif func == "fourier_old":
time_series = np.array(source_array)
n = len(time_series)
@ -97,30 +181,50 @@ def basestats(state, params, name):
yf = fft(time_series)
xf = np.linspace(0.0, 1.0/(2.0), n//2)
#three most dominant frequencies
dominant_frequencies = xf[np.argsort(np.abs(yf[:n//2]))[-3:]]
state.ilog(lvl=1,e=f"IND {name}:{funcName} 3 dominant freq are {str(dominant_frequencies)}", **params)
#rt = dict(zip(returns, dominant_frequencies.tolist()))
return 0, dominant_frequencies.tolist()
if returns is not None:
#vracime druhou
if returns == "second":
if len(dominant_frequencies) > 1:
val = dominant_frequencies[-2]
else:
val = 0
else:
#vracime most dominant
val = float(np.max(dominant_frequencies))
return 0, val
# if returns is not None:
# #vracime druhou
# if returns == "second":
# if len(dominant_frequencies) > 1:
# val = dominant_frequencies[-2]
# else:
# val = 0
# else:
# #vracime most dominant
# val = float(np.max(dominant_frequencies))
# return 0, val
#returns histogram bins https://chat.openai.com/share/034f8742-b091-4859-8c3e-570edb9c1006
# pocet vyskytu v danem binu
elif func == "histogram":
#takes only first N - items
# Convert to numpy array
dt = np.array(source_array)
#creates 4 buckets
bins = 4
mean_of_4th_bin = np.mean(dt[np.where(np.histogram(dt, bins)[1][3] <= dt)[0]])
if not np.isfinite(mean_of_4th_bin):
mean_of_4th_bin = 0
return 0, float(mean_of_4th_bin)
# Create 4 bins
bins = np.histogram_bin_edges(dt, bins=4)
# Assign elements to bins
bin_indices = np.digitize(dt, bins)
# Calculate mean for each bin
means = [dt[bin_indices == i].mean() if dt[bin_indices == i].size > 0 else 0 for i in range(1, len(bins))]
return 0, dict(zip(returns, means))
# #takes only first N - items
# dt = np.array(source_array)
# #creates 4 buckets
# bins = 4
# mean_of_4th_bin = np.mean(dt[np.where(np.histogram(dt, bins)[1][3] <= dt)[0]])
# if not np.isfinite(mean_of_4th_bin):
# mean_of_4th_bin = 0
# return 0, float(mean_of_4th_bin)
elif func == "maxima":
if len(source_array) < 3:

View File

@ -22,7 +22,7 @@ import importlib
#OBECNA trida pro statefull indicators - realized by class with the same name, deriving from parent IndicatorBase class
#todo v initu inicializovat state.classed_indicators a ve stopu uklidit - resetovat
def classed(state, params, name):
def classed(state, params, name, returns):
try:
funcName = "classed"
if params is None:

View File

@ -23,7 +23,7 @@ from collections import defaultdict
#novy podminkovy indikator, muze obsahovat az N podminek ve stejne syntaxy jako u signalu
#u kazde podminky je hodnota, ktera se vraci pokud je true
#hodi se pro vytvareni binarnich targetu pro ML
def conditional(state, params, name):
def conditional(state, params, name, returns):
funcName = "conditional"
if params is None:
return -2, "params required"

View File

@ -9,7 +9,7 @@ from collections import defaultdict
#strength, absolute change of parameter between current value and lookback value (n-past)
#used for example to measure unusual peaks
def delta(state, params, name):
def delta(state, params, name, returns):
funcName = "delta"
source = safe_get(params, "source", None)
lookback = safe_get(params, "lookback",1)

View File

@ -8,7 +8,7 @@ import numpy as np
from collections import defaultdict
#abs/rel divergence of two indicators
def divergence(state, params, name):
def divergence(state, params, name, returns):
funcName = "indicatorDivergence"
source1 = safe_get(params, "source1", None)
source1_series = get_source_series(state, source1)

View File

@ -9,7 +9,7 @@ from collections import defaultdict
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
#strength, absolute change of parameter between current value and lookback value (n-past)
#used for example to measure unusual peaks
def ema(state, params, name):
def ema(state, params, name, returns):
funcName = "ema"
source = safe_get(params, "source", None)
lookback = safe_get(params, "lookback",14)

View File

@ -15,7 +15,7 @@ from copy import deepcopy
#eval nyni umi i user-defined function, string operation and control statements
#teroeticky se dá pouzit i SYMPY - kde se daji vytvorit jednotlive symboly s urcitou funkcni
def expression(state: StrategyState, params, name):
def expression(state: StrategyState, params, name, returns):
try:
funcName = "expression"
#indicator name
@ -56,7 +56,17 @@ def expression(state: StrategyState, params, name):
val = eval(operation, {'state': state, 'np': np, 'utls': utls, 'math' : math}, temp_ind_mapping)
#printanyway(val)
val = 0 if not np.isfinite(val) else val
#toto dát nejspíš do custom_hubu asi te automaticky aplikovalo na vše
if isinstance(val, list):
for index, value in enumerate(val):
val[index] = 0 if not np.isfinite(value) else value
elif isinstance(val, dict):
for key, value in val.items():
val[key] = 0 if not np.isfinite(value) else value
else:
val = 0 if not np.isfinite(val) else val
#val = ne.evaluate(operation, state.ind_mapping)
state.ilog(lvl=1,e=f"IND {name}:{funcName} {operation=} res:{val}", **params)

View File

@ -8,10 +8,11 @@ import numpy as np
from collections import defaultdict
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
# from talib import BBANDS, MACD, RSI, MA_Type
from talib import BBANDS
#IMPLEMENTS different types of moving averages in package v2realbot.indicators.moving_averages
def ma(state, params, name):
def ma(state, params, name, returns):
funcName = "ma"
type = safe_get(params, "type", "ema")
source = safe_get(params, "source", None)

View File

@ -4,7 +4,7 @@ from v2realbot.strategyblocks.indicators.helpers import get_source_series, value
#allows basic mathematical operators to one or more indicators (add two indicator, add value to a indicator etc.)
#REPLACED by EXPRESSION
def mathop(state, params, name):
def mathop(state, params, name, returns):
funcName = "mathop"
#indicator name
source1 = safe_get(params, "source1", None)

View File

@ -10,7 +10,7 @@ from collections import defaultdict
"""
"""
def model(state, params, ind_name):
def model(state, params, ind_name, returns):
funcName = "model"
if params is None:
return -2, "params required"

View File

@ -9,7 +9,7 @@ from collections import defaultdict
#WIP -
#testing custom indicator CODE
def opengap(state, params, name):
def opengap(state, params, name, returns):
funcName = "opengap"
param1 = safe_get(params, "param1")
param2 = safe_get(params, "param2")

View File

@ -10,7 +10,7 @@ from collections import defaultdict
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
#strength, absolute change of parameter between current value and lookback value (n-past)
#used for example to measure unusual peaks
def rsi(state, params, name):
def rsi(state, params, name, returns):
req_source = safe_get(params, "source", "vwap")
rsi_length = safe_get(params, "length",14)
start = safe_get(params, "start","linear") #linear/sharp

View File

@ -10,7 +10,7 @@ import bisect
#strength, absolute change of parameter between current value and lookback value (n-past)
#used for example to measure unusual peaks
def sameprice(state, params, name):
def sameprice(state, params, name, returns):
funcName = "sameprice"
typ = safe_get(params, "type", None)

View File

@ -8,7 +8,7 @@ import numpy as np
from collections import defaultdict
#rate of change - last value of source indicator vs lookback value of lookback_priceline indicator
def slope(state, params, name):
def slope(state, params, name, returns):
funcName = "slope"
source = safe_get(params, "source", None)
source_series = get_source_series(state, source)

View File

@ -0,0 +1,77 @@
from v2realbot.utils.utils import isrising, isfalling,zoneNY, price2dec, print, safe_get, is_still, is_window_open, eval_cond_dict, crossed_down, crossed_up, crossed, is_pivot, json_serial, pct_diff, create_new_bars, slice_dict_lists
from v2realbot.strategy.base import StrategyState
import v2realbot.indicators.moving_averages as mi
from v2realbot.strategyblocks.indicators.helpers import get_source_series
from rich import print as printanyway
from traceback import format_exc
import numpy as np
from collections import defaultdict
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
# from talib import BBANDS, MACD, RSI, MA_Type
import talib
# příklad toml pro indikátor ATR(high, low, close, timeperiod=14)
# POSITION MATTERS
# params.series.high = "high"   //series key určuje, že jde o série
# params.series.low = "low"
# params.series.low = "close"
# params.val.timeperiod = 14  //val key určuje, že jde o konkrétní hodnotu, tzn. buď hodnotu nebo název série, ze které vezmu poslední hodnotu (v tom případě by to byl string)
# params.series = ["high","low","close"] #pozicni parametry
# params.keys.timeperiod = 14 #keyword argumenty
#TA-lib prijma positional arguments (zejmena teda ty series)m tzn. series musi byt pozicni
# lookback se aplikuje na vsechy ?
#IMPLEMENTS usiong of any indicator from TA-lib library
def talib_ind(state, params, name, returns):
funcName = "ma"
type = safe_get(params, "type", "SMA")
#ßsource = safe_get(params, "source", None)
lookback = safe_get(params, "lookback",None) #celkovy lookback pro vsechny vstupni serie
start = safe_get(params, "start","linear") #linear/sharp
defval = safe_get(params, "defval",0)
params = safe_get(params, "params", dict(series=[], keys=[]))
#lookback muze byt odkaz na indikator, pak berem jeho hodnotu
lookback = int(value_or_indicator(state, lookback))
defval = int(value_or_indicator(state, defval))
#TODO dopracovat caching, tzn. jen jednou pri inicializaci (linkuje se list) nicmene pri kazde iteraci musime prevest na numpy
#NOTE doresit, kdyz je val indiaktor, aby se i po inicializaci bral z indikatoru (doresit az pokud bude treba)
#NOTE doresit lookback, zda se aplikuje na vsechny pred volanim funkce nebo kdy?
series_list = []
keyArgs = {}
for index, item in enumerate(params.get("series",[])):
source_series = get_source_series(state, item)
#upravujeme lookback pokud not enough values (staci jen pro prvni - jsou vsechny stejne)
if index == 0 and lookback is not None:
akt_pocet = len(source_series)
if akt_pocet < lookback and start == "linear":
lookback = akt_pocet
series_list.append(np.array(source_series[-lookback:] if lookback is not None else source_series))
for key, val in params.get("keys",{}).items():
keyArgs[key] = int(value_or_indicator(state, val))
type = "talib."+type
talib_function = eval(type)
ma_value = talib_function(*series_list, **keyArgs)
if not np.isfinite(ma_value[-1]):
val = defval
else:
val = round(ma_value[-1],4)
if val == 0:
val = defval
state.ilog(lvl=1,e=f"INSIDE {name}:{funcName} {val} {type=} {lookback=}", **params)
return 0, val

View File

@ -20,7 +20,7 @@ pct_change_full_scale = #pct change that is considered 1, used in scaler to dete
TODO musi se signal trochu tahnout az kde se opravdu rozjede, aby si to model spojil
"""""
def target(state, params, name):
def target(state, params, name, returns):
funcName = "target"
source = safe_get(params, "source", "vwap")
source_series = get_source_series(state, source)

View File

@ -17,7 +17,7 @@ Where
- start is last crossing of source with ema and
- end is the current position -1
"""""
def targetema(state, params, name):
def targetema(state, params, name, returns):
try:
funcName = "targetema"
window_length_value = safe_get(params, "window_length_value", None)

View File

@ -10,7 +10,7 @@ from collections import defaultdict
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
# Volume(or reference_source) Weighted moving Average
def vwma(state, params, name):
def vwma(state, params, name, returns):
funcName = "vwma"
source = safe_get(params, "source", None)
ref_source = safe_get(params, "ref_source", "volume")

View File

@ -46,6 +46,8 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
save_to_past = int(safe_get(options, "save_to_past", 0))
save_to_past_unit = safe_get(options, "save_to_past_unit", "position")
#pokud neni multioutput, davame vystup do stejnojmenne serie
returns = safe_get(options, 'returns', [name])
def is_time_to_run():
# on_confirmed_only = true (def. False)
@ -139,15 +141,17 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
state.vars.indicators[name]["last_run_index"] = data["index"]
#pomocna funkce
def save_to_past_func(indicators_dict,name,save_to_past_unit, steps, new_val):
#pomocna funkce (returns je pole toho , co indikator vraci a ret_val je dictionary, kde key je item z pole a val hodnota)
def save_to_past_func(indicators_dict,name,save_to_past_unit, steps, ret_val):
if save_to_past_unit == "position":
indicators_dict[name][-1-steps]=new_val
for ind_name, ind_value in ret_val.items():
indicators_dict[ind_name][-1-steps]=ind_value
#time
else:
##find index X seconds ago
lookback_idx = find_index_optimized(time_list=indicators_dict["time"], seconds=steps)
indicators_dict[name][lookback_idx]=new_val
for ind_name, ind_value in ret_val.items():
indicators_dict[ind_name][lookback_idx]=ind_value
# - volame custom funkci pro ziskani hodnoty indikatoru
# - tu ulozime jako novou hodnotu indikatoru a prepocteme MAcka pokud je pozadovane
@ -157,40 +161,61 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
subtype = "ci."+subtype+"."+subtype
custom_function = eval(subtype)
res_code, new_val = custom_function(state, custom_params, name)
res_code, ret_val = custom_function(state, custom_params, name, returns)
if res_code == 0:
save_to_past_func(indicators_dict,name,save_to_past_unit, save_to_past, new_val)
state.ilog(lvl=1,e=f"IND {name} {subtype} VAL FROM FUNCTION: {new_val}", lastruntime=state.vars.indicators[name]["last_run_time"], lastrunindex=state.vars.indicators[name]["last_run_index"], save_to_past=save_to_past)
#prepocitame MA if required
if MA_length is not None:
src = indicators_dict[name][-MA_length:]
MA_res = ema(src, MA_length)
MA_value = round(MA_res[-1],7)
#ret_val byl puvodne jedna hodnota
#nyni podporujeme multi output ve format dict(indName:value, indName2:value2...)
#podporujeme vystup (list, dict a single value) - vse se transformuje do dict formatu
# pri listu zipneme s return) a vytvorime dict (v pripade mismatch sizes se matchnou jen kratsi)
if isinstance(ret_val, list):
ret_val = dict(zip(returns, ret_val))
#pokud je to neco jineho nez dict (float,int..) jde o puvodni single output udelame z toho dict s hlavnim jmenem as key
elif not isinstance(ret_val, dict):
ret_val = {name: ret_val}
#v ostatnich pripadech predpokladame jiz dict
save_to_past_func(indicators_dict,name+"MA",save_to_past_unit, save_to_past, MA_value)
state.ilog(lvl=0,e=f"IND {name}MA {subtype} {MA_value}",save_to_past=save_to_past)
save_to_past_func(indicators_dict,name,save_to_past_unit, save_to_past, ret_val)
state.ilog(lvl=1,e=f"IND {name} {subtype} VAL FROM FUNCTION: {ret_val}", lastruntime=state.vars.indicators[name]["last_run_time"], lastrunindex=state.vars.indicators[name]["last_run_index"], save_to_past=save_to_past)
#prepocitame MA if required
#pokud je MA nastaveno, tak pocitame MAcka pro vsechny multiouputy, tzn. vytvorime novem multioutput dict (ma_val)
if MA_length is not None:
ma_val = {}
for ind_name, ind_val in ret_val.items():
src = indicators_dict[ind_name][-MA_length:]
MA_res = ema(src, MA_length)
MA_value = round(MA_res[-1],7)
ma_val[ind_name+"MA"] = MA_value
save_to_past_func(indicators_dict,name+"MA",save_to_past_unit, save_to_past, ma_val)
state.ilog(lvl=0,e=f"IND {name}MA {subtype} {ma_val}",save_to_past=save_to_past)
return
else:
err = f"IND ERROR {name} {subtype}Funkce {custom_function} vratila {res_code} {new_val}."
err = f"IND ERROR {name} {subtype}Funkce {custom_function} vratila {res_code} {ret_val}."
raise Exception(err)
except Exception as e:
if len(indicators_dict[name]) >= 2:
indicators_dict[name][-1]=indicators_dict[name][-2]
if MA_length is not None and len(indicators_dict[name+"MA"])>=2:
indicators_dict[name+"MA"][-1]=indicators_dict[name+"MA"][-2]
state.ilog(lvl=1,e=f"IND ERROR {name} {subtype} necháváme původní", message=str(e)+format_exc())
use_last_values(indicators_dict, name, returns, MA_length)
state.ilog(lvl=1,e=f"IND ERROR {name} {subtype} necháváme původní u vsech z returns", returns=str(returns), message=str(e)+format_exc())
else:
state.ilog(lvl=0,e=f"IND {name} {subtype} COND NOT READY: {msg}")
state.ilog(lvl=0,e=f"IND {name} {subtype} COND NOT READY: {msg}", returns=returns)
#not time to run - copy last value
use_last_values(indicators_dict, name, returns, MA_length)
state.ilog(lvl=0,e=f"IND {name} {subtype} NOT TIME TO RUN - value(and MA) still original", returns=returns)
#zde nechavame puvodni (pri multiinputu nastavime predchozi hodnoty u vsech vystupu v defaultnim returns)
def use_last_values(indicators_dict, name, returns, MA_length):
def use_last_values_(indicators_dict, name, MA_length):
if len(indicators_dict[name]) >= 2:
indicators_dict[name][-1]=indicators_dict[name][-2]
if MA_length is not None and len(indicators_dict[name+"MA"])>=2:
indicators_dict[name+"MA"][-1]=indicators_dict[name+"MA"][-2]
state.ilog(lvl=0,e=f"IND {name} {subtype} NOT TIME TO RUN - value(and MA) still original")
if returns is not None and len(returns)>0:
for ind_name in returns:
use_last_values_(indicators_dict, ind_name, MA_length)
else:
use_last_values_(indicators_dict, name, MA_length)

View File

@ -32,10 +32,21 @@ def initialize_dynamic_indicators(state):
case _:
raise(f"ind output must be bar or tick {indname}")
#inicializujeme vzdy main
indicators_dict[indname] = []
#inicializujeme multioutputs
returns = safe_get(indsettings, 'returns', [])
for ind_name in returns:
indicators_dict[ind_name] = []
#pokud ma MA_length incializujeme i MA variantu
if safe_get(indsettings, 'MA_length', False):
indicators_dict[indname+"MA"] = []
#inicializujeme bud v hlavni serii
if len(returns)==0:
indicators_dict[indname+"MA"] = []
#nebo v multivystupech
else:
for ind_name in returns:
indicators_dict[ind_name+"MA"] = []
#Specifické Inicializace dle type
for option,value in list(indsettings.items()):