tick based support including gui preview, custom suppoer, new classed tickbased inds,#85

This commit is contained in:
David Brazda
2023-12-11 19:24:06 +01:00
parent 5cc3a1c318
commit 3158cdb68b
17 changed files with 431 additions and 122 deletions

View File

@ -30,12 +30,26 @@ def classed(state, params, name):
init_params = safe_get(params, "init", None) #napr sekce obcahuje threshold = 1222, ktere jdou kwargs do initu fce
#next_params = safe_get(params, "next", None)
source = safe_get(params, "source", None) #source, ktery jde do initu
source = get_source_series(state, source)
#lookback = int(value_or_indicator(state, lookback))
#List of sources, ktere jde do nextu (muze jit i vice serie)
#Do nextu jde ve stejnojmenném parametru
next_sources = safe_get(params, "next", []) #this will map to the sources_dict
next_mapping = safe_get(params, "next_mapping", next_sources) #this will dictate the final name of the key in sources_dict
#ukládáme si do cache incializaci
cache = safe_get(params, "CACHE", None)
if cache is None:
if len(next_sources) != len(next_mapping):
return -2, "next and next_mapping length must be the same"
# Vytvorime dictionary pro kazdy source a priradime serii
#source_dict = {name: get_source_series(state, name) for name in next_sources}
#TBD toto optimalizovat aby se nevolalo pri kazde iteraci
source_dict = {new_key: get_source_series(state, name)
for name, new_key in zip(next_sources, next_mapping)}
params["CACHE"] = {}
params["CACHE"]["source_dict"] = source_dict
else:
source_dict = params["CACHE"]["source_dict"]
#class_next_params = safe_get(params, "class_next_params", None)
try:
if name not in state.classed_indicators:
classname = name
@ -46,8 +60,8 @@ def classed(state, params, name):
state.classed_indicators[name] = instance
state.ilog(lvl=1,e=f"IND CLASS {name} INITIALIZED", **params)
if source is not None:
val = state.classed_indicators[name].next(source[-1])
if len(source_dict) >0:
val = state.classed_indicators[name].next(**source_dict)
else:
val = state.classed_indicators[name].next()
@ -56,5 +70,4 @@ def classed(state, params, name):
except Exception as e:
printanyway(str(e)+format_exc())
return -2, str(e)+format_exc()
return -2, str(e)+format_exc()

View File

@ -7,7 +7,8 @@ class CUSUM(IndicatorBase):
self.cumulative_sum = 0
self.previous_price = None
def next(self, new_price):
def next(self, close):
new_price = close[-1]
if self.previous_price is None:
# First data point, no previous price to compare with
self.previous_price = new_price

View File

@ -0,0 +1,37 @@
from collections import deque
#import time
from v2realbot.strategyblocks.indicators.custom.classes.indicatorbase import IndicatorBase
class TickTimeBasedROC(IndicatorBase):
def __init__(self, state, window_size_seconds=5):
"""
Initialize the TimeBasedROC class.
:param window_size_seconds: Window size in seconds for the rate of change.
"""
super().__init__(state)
self.window_size_seconds = window_size_seconds
self.tick_data = deque() # Efficient deque for (timestamp, price)
def next(self, time, close):
"""
Update the ROC with a new tick time and price.
:param new_time: Timestamp of the new tick (float with up to 6 decimals).
:param new_price: Price of the new tick.
:return: The updated ROC value, or None if the window is not yet full.
"""
new_time = time[-1]
new_price = close[-1]
# Add new tick data
self.tick_data.append((new_time, new_price))
# Remove old data outside the time window efficiently
while self.tick_data and new_time - self.tick_data[0][0] > self.window_size_seconds:
self.tick_data.popleft()
if len(self.tick_data) >= 2:
# Compute ROC using the earliest and latest prices in the window
old_time, old_price = self.tick_data[0]
roc = ((new_price - old_price) / old_price) * 100 if old_price != 0 else 0
return round(float(roc),5)
else:
return 0 # ROC is undefined until the window has enough data

View File

@ -0,0 +1,34 @@
import numpy as np
from v2realbot.strategyblocks.indicators.custom.classes.indicatorbase import IndicatorBase
#usecase - pocitat variance ticku
# v ramci BARu - posilame sem index a resetujeme pri naslednem indxu
# do budoucna mo
class TickVariance(IndicatorBase):
def __init__(self, state, window_size=1):
"""
Initialize the TickPriceVariance class.
:param window_size: The size of the window for calculating variance - zatim mame jeden bar, do budoucna X
"""
super().__init__(state)
self.window_size = window_size
self.window_prices = []
self.prev_index = None
def next(self, close, index):
close = close[-1]
index = index[-1]
# Add new price to the window
self.window_prices.append(close)
if self.prev_index is not None and self.prev_index != index:
self.window_prices = []
self.prev_index = index
# Calculate the variance for the current window
if len(self.window_prices) > 1:
return round(float(np.var(self.window_prices)),5)
else:
return 0 # Variance is undefined for a single data point

View File

@ -0,0 +1,33 @@
from v2realbot.utils.utils import isrising, isfalling,zoneNY, price2dec, print, safe_get, is_still, is_window_open, eval_cond_dict, crossed_down, crossed_up, crossed, is_pivot, json_serial, pct_diff, create_new_bars, slice_dict_lists
from v2realbot.strategy.base import StrategyState
from v2realbot.indicators.indicators import ema as ext_ema
from v2realbot.strategyblocks.indicators.helpers import get_source_series
from rich import print as printanyway
from traceback import format_exc
import numpy as np
from v2realbot.indicators.oscillators import rsi as ind_rsi
from collections import defaultdict
from v2realbot.strategyblocks.indicators.helpers import value_or_indicator
#strength, absolute change of parameter between current value and lookback value (n-past)
#used for example to measure unusual peaks
def rsi(state, params, name):
req_source = safe_get(params, "source", "vwap")
rsi_length = safe_get(params, "length",14)
start = safe_get(params, "start","linear") #linear/sharp
#lookback muze byt odkaz na indikator, pak berem jeho hodnotu
rsi_length = int(value_or_indicator(state, rsi_length))
source = get_source_series(state, req_source)
delka = len(source)
if delka > rsi_length or start == "linear":
if delka <= rsi_length and start == "linear":
rsi_length = delka
rsi_res = ind_rsi(source, rsi_length)
val = rsi_res[-1] if np.isfinite(rsi_res[-1]) else 0
return 0, round(val,4)
else:
state.ilog(lvl=0,e=f"IND {name} RSI necháváme 0", message="not enough source data", source=source, rsi_length=rsi_length)
return -2, "necháváma 0 nedostatek hodnot"

View File

@ -13,17 +13,31 @@ def slope(state, params, name):
source = safe_get(params, "source", None)
source_series = get_source_series(state, source)
lookback_type = safe_get(params, "lookback_type", "positions")
lookback = safe_get(params, "lookback", 5)
lookback_priceline = safe_get(params, "lookback_priceline", None)
lookback_priceline = safe_get(params, "lookback_priceline", None) #bars|close
lookback_series = get_source_series(state, lookback_priceline)
try:
lookbackprice = lookback_series[-lookback-1]
lookbacktime = state.bars.updated[-lookback-1]
except IndexError:
max_delka = len(lookback_series)
lookbackprice =lookback_series[-max_delka]
lookbacktime = state.bars.updated[-max_delka]
match lookback_type:
case "positions":
try:
lookbackprice = lookback_series[-lookback-1]
lookbacktime = state.bars.updated[-lookback-1]
except IndexError:
max_delka = len(lookback_series)
lookbackprice =lookback_series[-max_delka]
lookbacktime = state.bars.updated[-max_delka]
case "seconds":
#předpokládáme, že lookback_priceline je ve formě #bars|close
#abychom ziskali relevantní time
split_index = lookback_priceline.find("|")
if split_index == -1:
return -2, "for time it is required in format bars|close"
dict_name = lookback_priceline[:split_index]
time_series = getattr(state, dict_name)["time"]
lookback_idx = find_index_optimized(time_list=time_series, seconds=lookback)
lookbackprice = lookback_series[lookback_idx]
lookbacktime = time_series[lookback_idx]
#výpočet úhlu - a jeho normalizace
currval = source_series[-1]
@ -32,3 +46,24 @@ def slope(state, params, name):
state.ilog(lvl=1,e=f"INSIDE {name}:{funcName} {slope} {source=} {lookback=}", currval_source=currval, lookbackprice=lookbackprice, lookbacktime=lookbacktime, **params)
return 0, slope
"""
TODO pripadne dat do
Finds index of first value less than X seconds
This version assumes:
time_list is always non-empty and sorted.
There's always a timestamp at least 5 seconds before the current time.
"""
def find_index_optimized(time_list, seconds):
current_time = time_list[-1]
threshold = current_time - seconds
left, right = 0, len(time_list) - 1
while left < right:
mid = (left + right) // 2
if time_list[mid] < threshold:
left = mid + 1
else:
right = mid
return left if time_list[left] >= threshold else None

View File

@ -27,6 +27,16 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
#if MA is required
MA_length = safe_get(options, "MA_length", None)
output = safe_get(options, "output", "bar")
match output:
case "bar":
indicators_dict = state.indicators
case "tick":
indicators_dict = state.cbar_indicators
case _:
state.ilog(lvl=1,e=f"Output must be bar or tick for {name} in stratvars")
return
active = safe_get(options, 'active', True)
if not active:
return
@ -121,7 +131,7 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
if should_run:
#TODO get custom params
custom_params = safe_get(options, "cp", None)
#vyplnime last_run_time a last_run_index
#vyplnime last_run_time a last_run_index do stratvars
state.vars.indicators[name]["last_run_time"] = datetime.fromtimestamp(data["updated"]).astimezone(zoneNY)
state.vars.indicators[name]["last_run_index"] = data["index"]
@ -135,14 +145,14 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
custom_function = eval(subtype)
res_code, new_val = custom_function(state, custom_params, name)
if res_code == 0:
state.indicators[name][-1-save_to_past]=new_val
indicators_dict[name][-1-save_to_past]=new_val
state.ilog(lvl=1,e=f"IND {name} {subtype} VAL FROM FUNCTION: {new_val}", lastruntime=state.vars.indicators[name]["last_run_time"], lastrunindex=state.vars.indicators[name]["last_run_index"], save_to_past=save_to_past)
#prepocitame MA if required
if MA_length is not None:
src = state.indicators[name][-MA_length:]
src = indicators_dict[name][-MA_length:]
MA_res = ema(src, MA_length)
MA_value = round(MA_res[-1],7)
state.indicators[name+"MA"][-1-save_to_past]=MA_value
indicators_dict[name+"MA"][-1-save_to_past]=MA_value
state.ilog(lvl=0,e=f"IND {name}MA {subtype} {MA_value}",save_to_past=save_to_past)
else:
@ -150,20 +160,20 @@ def populate_dynamic_custom_indicator(data, state: StrategyState, name):
raise Exception(err)
except Exception as e:
if len(state.indicators[name]) >= 2:
state.indicators[name][-1]=state.indicators[name][-2]
if MA_length is not None and len(state.indicators[name+"MA"])>=2:
state.indicators[name+"MA"][-1]=state.indicators[name+"MA"][-2]
if len(indicators_dict[name]) >= 2:
indicators_dict[name][-1]=indicators_dict[name][-2]
if MA_length is not None and len(indicators_dict[name+"MA"])>=2:
indicators_dict[name+"MA"][-1]=indicators_dict[name+"MA"][-2]
state.ilog(lvl=1,e=f"IND ERROR {name} {subtype} necháváme původní", message=str(e)+format_exc())
else:
state.ilog(lvl=0,e=f"IND {name} {subtype} COND NOT READY: {msg}")
#not time to run - copy last value
if len(state.indicators[name]) >= 2:
state.indicators[name][-1]=state.indicators[name][-2]
if len(indicators_dict[name]) >= 2:
indicators_dict[name][-1]=indicators_dict[name][-2]
if MA_length is not None and len(state.indicators[name+"MA"])>=2:
state.indicators[name+"MA"][-1]=state.indicators[name+"MA"][-2]
if MA_length is not None and len(indicators_dict[name+"MA"])>=2:
indicators_dict[name+"MA"][-1]=indicators_dict[name+"MA"][-2]
state.ilog(lvl=0,e=f"IND {name} {subtype} NOT TIME TO RUN - value(and MA) still original")

View File

@ -60,6 +60,8 @@ def evaluate_directive_conditions(state, work_dict, cond_type):
return eval_cond_dict(cond)
#TODO toto pripadne sloucit s get_source_series - revidovat dopady
def get_source_or_MA(state, indicator):
#pokud ma, pouzije MAcko, pokud ne tak standardni indikator
#pokud to jmeno neexistuje, tak pripadne bere z barů (close,open,hlcc4, vwap atp.)
@ -69,7 +71,10 @@ def get_source_or_MA(state, indicator):
try:
return state.indicators[indicator]
except KeyError:
return state.bars[indicator]
try:
return state.bars[indicator]
except KeyError:
return state.cbar_indicators[indicator]
def get_source_series(state, source: str):
"""
@ -85,7 +90,10 @@ def get_source_series(state, source: str):
try:
return state.indicators[source]
except KeyError:
return None
try:
return state.cbar_indicators[source]
except KeyError:
return None
else:
dict_name = source[:split_index]
key = source[split_index + 1:]

View File

@ -63,12 +63,11 @@ def populate_all_indicators(data, state: StrategyState):
else:
pass
#toto je spíše interní ukládání tick_price a tick_volume - s tím pak mohou pracovat jak bar based tak tick based indikatory
#TODO do budoucna prejmenovat state.cbar_indicators na state.tick_indicators
populate_cbar_tick_price_indicator(data, state)
#TBD nize predelat na typizovane RSI (a to jak na urovni CBAR tak confirmed)
#populate_cbar_rsi_indicator()
#populate indicators, that have type in stratvars.indicators
#populate indicators, that have type in stratvars.indicators - pridana podpora i pro CBAR typu CUSTOM
populate_dynamic_indicators(data, state)
#vytiskneme si indikatory

View File

@ -23,13 +23,23 @@ def initialize_dynamic_indicators(state):
##ßprintanyway(state.vars, state)
dict_copy = state.vars.indicators.copy()
for indname, indsettings in dict_copy.items():
#inicializace indikatoru na dane urovni
output = safe_get(indsettings, 'output', "bar")
match output:
case "bar":
indicators_dict = state.indicators
case "tick":
indicators_dict = state.cbar_indicators
case _:
raise(f"ind output must be bar or tick {indname}")
indicators_dict[indname] = []
#pokud ma MA_length incializujeme i MA variantu
if safe_get(indsettings, 'MA_length', False):
indicators_dict[indname+"MA"] = []
#Specifické Inicializace dle type
for option,value in list(indsettings.items()):
#inicializujeme nejenom typizovane
#if option == "type":
state.indicators[indname] = []
#pokud ma MA_length incializujeme i MA variantu
if safe_get(indsettings, 'MA_length', False):
state.indicators[indname+"MA"] = []
#specifika pro slope
if option == "type":
if value == "slope":