From 2c7d2793037f91c8bdefeff570954e1b60a16570 Mon Sep 17 00:00:00 2001 From: David Brazda Date: Thu, 13 Jul 2023 22:40:41 +0200 Subject: [PATCH] refactor direktiv - skoro hotovo --- testy/getdirective.py | 46 ++ testy/pricepivot.py | 64 +- testy/testcrosseddown.py | 10 + .../ENTRY_Vykladaci_RSI_MYSELL_type_NEW.py | 620 +++++++----------- v2realbot/main.py | 2 +- v2realbot/slicingtest.py | 7 + v2realbot/static/js/utils.js | 2 + .../utils/__pycache__/utils.cpython-310.pyc | Bin 11210 -> 11736 bytes v2realbot/utils/utils.py | 39 +- 9 files changed, 353 insertions(+), 437 deletions(-) create mode 100644 testy/getdirective.py create mode 100644 testy/testcrosseddown.py create mode 100644 v2realbot/slicingtest.py diff --git a/testy/getdirective.py b/testy/getdirective.py new file mode 100644 index 0000000..d001973 --- /dev/null +++ b/testy/getdirective.py @@ -0,0 +1,46 @@ +#ROZPRACOVANE pri vytvareni OR/AND dynamickych buysignalu +# indicator.ema +# length = 3 +# buy_if_below = 15 +# OR.buy_if_below = 15 #totozne s predchozim, staci kdyz jedna podminka plati +# AND.buy_if_below = 15 #musi platiti vsechny podminky u vsech indikatoru + + + + +#TOTO dodelat + +#nejspis v INITU nahrat vsechny buysignal directivy do jednoho dictu +#a ten pouzivat v celem lifecyclu + +#tato funkce by je mohla vytvorit +# vysledek: {'AND': [('ema', 'ema20')], 'OR': []} + +#tato funkce vytvori dictionary typu podminek (OR/AND) a indikatoru s direktivami buy_if +# do OR jsou dane i bez prefixu +# {'AND': [('nazev indikatoru', 'nazev direktivy', 'hodnotadirektivy')], 'OR': []} +def get_indicators_with_buy_directive(): + starts_with = "buy_if" + reslist = dict(AND=[], OR=[]) + + for indname, indsettings in vars["indicators"].items(): + for option,value in indsettings.items(): + if option.startswith(starts_with): + reslist["OR"].append((indname, option, value)) + if option == "AND": + #vsechny buy direktivy, ktere jsou pod AND + for key, val in value.items(): + if key.startswith(starts_with): + reslist["AND"].append((indname, key, val)) + if option == "OR" : + #vsechny buy direktivy, ktere jsou pod OR + for key, val in value.items(): + if key.startswith(starts_with): + reslist["OR"].append((indname, key, val)) + return reslist + + +vars = {'maxpozic': 2000, 'def_mode_from': 2000, 'chunk': 100, 'profit': 0.014, 'def_profit': 0.01, 'max_profit': 0.03, 'vykladka': 1, 'curve': [0.03, 0.01, 0.01, 0.0, 0.02, 0.02, 0.01, 0.01, 0.01, 0.03, 0.01, 0.01, 0.01, 0.04, 0.01, 0.01, 0.01, 0.05, 0.01, 0.01, 0.01, 0.01, 0.06, 0.01, 0.01, 0.01, 0.01], 'curve_def': [0.02, 0.03, 0.02, 0, 0, 0.02, 0, 0, 0, 0.02], 'ticks2reset': 0.06, 'consolidation_bar_count': 5, 'first_buy_market': True, 'first_buy_market_def_mode': False, 'market_buy_multiplier': 1, 'rsi_dont_buy_above': 70, 'bigwave_slope_above': 0.12, 'minimum_slope': -0.1, 'open_rush': 30, 'close_rush': 30, 'lastbuy_offset': 5, 'last_buy_offset_reset_after_sell': False, 'buy_only_on_confirmed': False, 'indicators': {'ema': {'type': 'EMA', 'source': 'close', 'length': 5, 'on_confirmed_only': False, 'AND': {'buy_if_crossed_down': 'ema20'}}, 'ema20': {'type': 'EMA', 'source': 'close', 'length': 20, 'on_confirmed_only': False}, 'RSI14': {'type': 'RSI', 'RSI_length': 14, 'source': 'vwap', 'MA_length': 5}, 'slope': {'type': 'slope', 'on_confirmed_only': False, 'MA_length': 5, 'slope_lookback': 10, 'lookback_offset': 3, 'minimum_slope': -0.1, 'maximum_slope': 0.2}, 'slopeLP': {'type': 'slopeLP', 'on_confirmed_only': False, 'leftpoint': 'baropen', 'slope_lookback': 8, 'lookback_offset': 4, 'minimum_slope': -0.09, 'maximum_slope': 0.2}, 'slope10': {'type': 'slope', 'on_confirmed_only': False, 'MA_length': 5, 'slope_lookback': 60, 'lookback_offset': 20, 'minimum_slope': -0.1, 'maximum_slope': 0.45}, 'slope20': {'type': 'slope', 'on_confirmed_only': False, 'MA_length': 5, 'slope_lookback': 120, 'lookback_offset': 25, 'minimum_slope': -0.1, 'dont_buy_above': 0.3, 'maximum_slope': 0.45}}, 'sell_protection': {'enabled': False, 'slopeMA_rising': 2, 'rsi_not_falling': 3}, 'sell_in_progress': False, 'mode': None, 'last_tick_price': 48.41, 'last_50_deltas': [0.027251, 2.050578, 0.205957, 2.226359, 0.00071, 4.034629, 0.0, 0.387769, 2.779256, 1.735168, 1.208257, 0.729469, 1.215035, 2.510941, 4.126465, 0.0, 0.89464, 0.863604, 0.363522, 0.808834, 3.007671, 1.450134, 5.292132, 2.532213, 0.0, 2.185073, 3.185483, 1.664617, 1.972669, 1.552979, 0.45697, 1.370411, 1.125679, 0.0, 0.390609, 0.770417, 0.050253, 1.652531, 0.729488, 0.788619, 0.873404, 1.013351, 0.045697, 2.992692, 4.046475, 0.165393, 0.0, 5.635894, 0.0, 3.558663], 'last_tick_volume': 200, 'next_new': 1, 'lastbuyindex': 103, 'last_update_time': 1689001643.057809, 'reverse_position_waiting_amount': 0, 'pendingbuys': {}, 'limitka': None, 'limitka_price': None, 'jevylozeno': 0, 'blockbuy': 0, 'ticks2reset_backup': 0.06} + +a = get_indicators_with_buy_directive() +print(a) \ No newline at end of file diff --git a/testy/pricepivot.py b/testy/pricepivot.py index a9a583c..628334f 100644 --- a/testy/pricepivot.py +++ b/testy/pricepivot.py @@ -1,4 +1,4 @@ - +from v2realbot.utils.utils import isfalling, isrising test1_threshold = 28.905 bacma = [] @@ -10,54 +10,32 @@ bacma.append([28.91,28.90,28.89,28.88,28.87]) bacma.append([28.91,28.90,28.89,28.88,28.87,28.86]) -#is_pivot function to check if there is A shaped pivot in the list, each leg consists of N points -def is_pivot(list, leg): - """check if there is A shaped pivot in the list, each leg consists of N points""" - try: - if len(list) < leg*2+1: - return False - else: - if list[-leg-1] < list[-leg] and list[-leg] > list[-leg+1] and list[-leg-1] > list[-leg-2] and list[-leg] > list[-leg+2]: - return True - else: - return False - except IndexError: - return False +pole = [1,2,3,2,1,2,3,4,5,6,2,1] - - -def crossed_up(threshold, list): - """check if threshold has crossed up last thresholdue in list""" - try: - if threshold < list[-1] and threshold >= list[-2]: - return True - else: - return False - except IndexError: +#is_pivot function to check if there is A(V) shaped pivot in the list, each leg consists of N points +#middle point is the shared one [1,2,3,2,1] - one leg is [1,2,3] second leg is [3,2,1] +def is_pivot(stock_prices_list, leg_number, type: str = "A"): + if len(stock_prices_list) < (2 * leg_number)-1: + print("Not enough values in the list") return False -def crossed_down(threshold, list): - """check if threshold has crossed down last thresholdue in list""" - try: - if threshold > list[-1] and threshold <= list[-2]: + left_leg = stock_prices_list[-2*leg_number+1:-leg_number+1] + print(left_leg) + right_leg = stock_prices_list[-leg_number:] + print(right_leg) + + if type == "A": + if isrising(left_leg) and isfalling(right_leg): + return True + else: + return False + elif type == "V": + if isfalling(left_leg) and isrising(right_leg): return True else: return False - except IndexError: - return False - -def crossed(threshold, list): - """check if threshold has crossed last thresholdue in list""" - if crossed_down(threshold, list) or crossed_up(threshold, list): - return True else: + print("Unknown type") return False -for i in bacma: - print(i) - print(f"threshold crossed down {i}", threshold_crossed_down(test1_threshold, i)) - print(f"threshold crossed up {i}", threshold_crossed_up(test1_threshold, i)) - - - - +print(is_pivot(pole, 3)) \ No newline at end of file diff --git a/testy/testcrosseddown.py b/testy/testcrosseddown.py new file mode 100644 index 0000000..7c687cc --- /dev/null +++ b/testy/testcrosseddown.py @@ -0,0 +1,10 @@ +from v2realbot.utils.utils import crossed_down + +pole = [48.495432098765434, 48.48296296296296, 48.480617283950615, 48.47475308641976, 48.467543209876546] +thr = 48.4778923123465 + + + +res = crossed_down(threshold=thr, list=pole) + +print(res) \ No newline at end of file diff --git a/v2realbot/ENTRY_Vykladaci_RSI_MYSELL_type_NEW.py b/v2realbot/ENTRY_Vykladaci_RSI_MYSELL_type_NEW.py index d886acd..4599b1c 100644 --- a/v2realbot/ENTRY_Vykladaci_RSI_MYSELL_type_NEW.py +++ b/v2realbot/ENTRY_Vykladaci_RSI_MYSELL_type_NEW.py @@ -5,7 +5,7 @@ from v2realbot.strategy.StrategyOrderLimitVykladaciNormalizedMYSELL import Strat from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account, OrderSide, OrderType from v2realbot.indicators.indicators import ema from v2realbot.indicators.oscillators import rsi -from v2realbot.utils.utils import ltp, isrising, isfalling,trunc,AttributeDict, zoneNY, price2dec, print, safe_get, get_tick, round2five, is_open_rush, is_close_rush, eval_cond_dict, Average, crossed_down, crossed_up +from v2realbot.utils.utils import ltp, isrising, isfalling,trunc,AttributeDict, zoneNY, price2dec, print, safe_get, get_tick, round2five, is_open_rush, is_close_rush, eval_cond_dict, Average, crossed_down, crossed_up, is_pivot from datetime import datetime #from icecream import install, ic #from rich import print @@ -217,7 +217,7 @@ def next(data, state: StrategyState): last_price = price2dec(state.interface.get_last_price(state.symbol)) #profit = float(state.vars.profit) price = last_price - state.ilog(e="BUY Vykladame", msg=f"first price {price=} {vykladka=}", curve=curve, ema=state.indicators.ema[-1], trend=state.vars.Trend, price=price, vykladka=vykladka) + state.ilog(e="BUY Vykladame", msg=f"first price {price=} {vykladka=}", curve=curve, price=price, vykladka=vykladka) ##prvni se vyklada na aktualni cenu, další jdou podle krivky, nula v krivce zvyšuje množství pro následující iteraci ##VAR - na zaklade conf. muzeme jako prvni posilat MARKET order @@ -352,154 +352,6 @@ def next(data, state: StrategyState): modded_section = section return safe_get(modded_section, name, safe_get(section, name, default)) - def populate_ema_indicator(): - #BAR EMA INDICATOR - - #plnime MAcko - nyni posilame jen N poslednich hodnot - #zaroven osetrujeme pripady, kdy je malo dat a ukladame nulu - try: - ma = int(state.vars.MA) - #poslednich ma hodnot - source = state.bars.close[-ma:] #state.bars.vwap - ema_value = ema(source, ma) - - ##pokus MACKO zakrouhlit na tri desetina a petku - state.indicators.ema[-1]=round2five(ema_value[-1]) - ##state.indicators.ema[-1]=trunc(ema_value[-1],3) - #state.ilog(e=f"EMA {state.indicators.ema[-1]}", ema_last=state.indicators.ema[-6:]) - except Exception as e: - state.ilog(e="EMA nechavame 0", message=str(e)+format_exc()) - #state.indicators.ema[-1]=(0) - #evaluate buy signal - #consolidation - - # [stratvars.indicators.slope] - # lookback - # lookback_offset - - def populate_slow_slope_indicator(): - options = safe_get(state.vars.indicators, 'slow_slope', None) - if options is None: - state.ilog(e="No options for slow slope in stratvars") - return - - - #SLOW SLOPE INDICATOR - #úhel stoupání a klesání vyjádřený mezi -1 až 1 - #pravý bod přímky je aktuální cena, levý je průměr X(lookback offset) starších hodnot od slope_lookback. - #obsahuje statický indikátor (angle) pro vizualizaci - try: - slow_slope = 99 - slope_lookback = safe_get(options, 'slope_lookback', 100) - minimum_slope = safe_get(options, 'minimum_slope', 25) - maximum_slope = safe_get(options, "maximum_slope",0.9) - lookback_offset = safe_get(options, 'lookback_offset', 25) - - if len(state.bars.close) > (slope_lookback + lookback_offset): - array_od = slope_lookback + lookback_offset - array_do = slope_lookback - lookbackprice_array = state.bars.vwap[-array_od:-array_do] - #obycejný prumer hodnot - lookbackprice = round(sum(lookbackprice_array)/lookback_offset,3) - lookbacktime = state.bars.time[-slope_lookback] - else: - #kdyz neni dostatek hodnot, pouzivame jako levy bod open hodnotu close[0] - lookbackprice = state.bars.close[0] - lookbacktime = state.bars.time[0] - state.ilog(e="Slow Slope - not enough data bereme left bod open", slope_lookback=slope_lookback, slope=state.indicators.slope, slopeMA=state.indicators.slopeMA) - - #výpočet úhlu - a jeho normalizace - slope = ((state.bars.close[-1] - lookbackprice)/lookbackprice)*100 - slope = round(slope, 4) - state.indicators.slow_slope[-1]=slope - - #angle je ze slope - state.statinds.angle_slow = dict(time=state.bars.time[-1], price=state.bars.close[-1], lookbacktime=lookbacktime, lookbackprice=lookbackprice, minimum_slope=minimum_slope, maximum_slope=maximum_slope) - - #slope MA vyrovna vykyvy ve slope, dále pracujeme se slopeMA - slope_MA_length = safe_get(options, 'MA_length', 5) - source = state.indicators.slow_slope[-slope_MA_length:] - slopeMAseries = ema(source, slope_MA_length) #state.bars.vwap - slopeMA = slopeMAseries[-1] - state.indicators.slow_slopeMA[-1]=slopeMA - - state.ilog(e=f"SLOW {slope=} {slopeMA=}", msg=f"{lookbackprice=}", lookbackoffset=lookback_offset, minimum_slope=minimum_slope, last_slopes=state.indicators.slope[-10:], last_slopesMA=state.indicators.slopeMA[-10:]) - #dale pracujeme s timto MAckovanym slope - #slope = slopeMA - - except Exception as e: - print("Exception in NEXT Slow Slope Indicator section", str(e)) - state.ilog(e="EXCEPTION", msg="Exception in Slow Slope Indicator section" + str(e) + format_exc()) - - def populate_slope_indicator(): - #populuje indikator typu SLOPE - - - #SLOPE INDICATOR - #úhel stoupání a klesání vyjádřený mezi -1 až 1 - #pravý bod přímky je aktuální cena, levý je průměr X(lookback offset) starších hodnot od slope_lookback. - #obsahuje statický indikátor (angle) pro vizualizaci - try: - slope = 99 - slope_lookback = int(state.vars.slope_lookback) - minimum_slope = float(state.vars.minimum_slope) - lookback_offset = int(state.vars.lookback_offset) - - if len(state.bars.close) > (slope_lookback + lookback_offset): - array_od = slope_lookback + lookback_offset - array_do = slope_lookback - lookbackprice_array = state.bars.vwap[-array_od:-array_do] - #obycejný prumer hodnot - lookbackprice = round(sum(lookbackprice_array)/lookback_offset,3) - - #výpočet úhlu - a jeho normalizace - slope = ((state.bars.close[-1] - lookbackprice)/lookbackprice)*100 - slope = round(slope, 4) - state.indicators.slope[-1]=slope - - #angle je ze slope - state.statinds.angle = dict(time=state.bars.time[-1], price=state.bars.close[-1], lookbacktime=state.bars.time[-slope_lookback], lookbackprice=lookbackprice, minimum_slope=minimum_slope, maximum_slope=safe_get(state.vars, "bigwave_slope_above",0.20)) - - #slope MA vyrovna vykyvy ve slope, dále pracujeme se slopeMA - slope_MA_length = 5 - source = state.indicators.slope[-slope_MA_length:] - slopeMAseries = ema(source, slope_MA_length) #state.bars.vwap - slopeMA = slopeMAseries[-1] - state.indicators.slopeMA[-1]=slopeMA - - state.ilog(e=f"{slope=} {slopeMA=}", msg=f"{lookbackprice=}", lookbackoffset=lookback_offset, minimum_slope=minimum_slope, last_slopes=state.indicators.slope[-10:], last_slopesMA=state.indicators.slopeMA[-10:]) - - #dale pracujeme s timto MAckovanym slope - slope = slopeMA - else: - #pokud plnime historii musime ji plnit od zacatku, vsehcny idenitifkatory maji spolecny time - #kvuli spravnemu zobrazovani na gui - #state.indicators.slopeMA[-1]=0 - #state.indicators.slopeMA.append(0) - state.ilog(e="Slope - not enough data", slope_lookback=slope_lookback, slope=state.indicators.slope, slopeMA=state.indicators.slopeMA) - except Exception as e: - print("Exception in NEXT Slope Indicator section", str(e)) - state.ilog(e="EXCEPTION", msg="Exception in Slope Indicator section" + str(e) + format_exc()) - - #TODO predelat na dynamicky type=RSI, source = ["close", "vwap","hlcc4"] - - def populate_rsi_indicator(): - #RSI14 INDICATOR - try: - rsi_length = int(safe_get(state.vars, "rsi_length",14)) - source = state.bars.vwap #[-rsi_length:] #state.bars.vwap - - #cekame na dostatek dat - if len(source) > rsi_length: - rsi_res = rsi(source, rsi_length) - rsi_value = trunc(rsi_res[-1],3) - state.indicators.RSI14[-1]=rsi_value - else: - state.ilog(e=f"RSI {rsi_length=} necháváme 0", message="not enough source data") - #state.ilog(e=f"RSI {rsi_length=} {rsi_value=} {rsi_dont_buy=} {rsi_buy_signal=}", rsi_indicator=state.indicators.RSI14[-5:]) - except Exception as e: - state.ilog(e=f"RSI {rsi_length=} necháváme 0", message=str(e)+format_exc()) - #state.indicators.RSI14[-1]=0 - def populate_cbar_rsi_indicator(): #CBAR RSI indicator options = safe_get(state.vars.indicators, 'crsi', None) @@ -520,108 +372,24 @@ def next(data, state: StrategyState): state.ilog(e=f"CRSI {crsi_length=} necháváme 0", message=str(e)+format_exc()) #state.indicators.RSI14[-1]=0 - # def populate_secondary_rsi_indicator(): - # #SBAR RSI indicator - # try: - # srsi_length = int(safe_get(state.vars, "srsi_length",14)) - # source = state.secondary_indicators.sec_price #[-rsi_length:] #state.bars.vwap - # srsi_res = rsi(source, srsi_length) - # srsi_value = trunc(srsi_res[-1],3) - # state.secondary_indicators.SRSI[-1]=srsi_value - # #state.ilog(e=f"RSI {rsi_length=} {rsi_value=} {rsi_dont_buy=} {rsi_buy_signal=}", rsi_indicator=state.indicators.RSI14[-5:]) - # except Exception as e: - # state.ilog(e=f"SRSI {srsi_length=} necháváme 0", message=str(e)+format_exc()) - # #state.indicators.RSI14[-1]=0 - - #TODO predelat na dynamicky - tzn. vstup je nazev slopu a i z nej se dotahne minimum slope - - - #gets all indicators of type slow and check which they have dont_buy_below_minimum - # [stratvars.indicators.slope] - # type = "slope" - # dont_buy_below = -0.10 - #get all indicators of type slow and check whether they have dont_buy_below_minimum - # if so, it evaluates if it is below current value - def slope_too_low(): - retboolList = [] - desc = "" - for indname, indsettings in state.vars.indicators.items(): - for option,value in indsettings.items(): - if (option == "type" and value == "slope") or (option == "type" and value == "slopeLP"): - #pokud zde mame dont_buy_below - minimum_val = safe_get(indsettings, "dont_buy_below", None) - if minimum_val is not None: - #minimum_val = float(safe_get(indsettings, "minimum_slope", 1)) - - #pokud exisuje MA, měříme na MA, jinak na standardu - try: - curr_val = state.indicators[indname+"MA"][-1] - except KeyError: - curr_val = state.indicators[indname][-1] - - ret = (curr_val < minimum_val) - if ret: - desc += f"ID:{indname}/{curr_val} below {minimum_val=} /" - else: - desc += f"ID:{indname}{curr_val} OK above {minimum_val=} /" - retboolList.append(ret) - else: - desc += f"ID:{indname} - no min set /" - retboolList.append(False) - #pokud obsahuje aspon jedno true - slopelow = any(retboolList) - - #DEBUG - poté zapsat jen když je True - if slopelow: - state.ilog(e=f"SLOPELOW {slopelow}", msg=desc) - - return slopelow - - #gets all indicators of type slow and check which they have dont_buy_above - # [stratvars.indicators.slope] - # type = "slope" - # dont_buy_above = 0.20 - #get all indicators of type slow and check whether they have dont_buy_above - # if so, it evaluates if it is above current value - def slope_too_high(): - retboolList = [] - desc = "" - for indname, indsettings in state.vars.indicators.items(): - for option,value in indsettings.items(): - if (option == "type" and value == "slope") or (option == "type" and value == "slopeLP"): - #pokud zde mame dont_buy_above - maximum_val = safe_get(indsettings, "dont_buy_above", None) - if maximum_val is not None: - #pokud exisuje MA, měříme na MA, jinak na standardu - try: - curr_val = state.indicators[indname+"MA"][-1] - except KeyError: - curr_val = state.indicators[indname][-1] - - ret = (curr_val > maximum_val) - if ret: - desc += f"ID:{indname}/{curr_val} above {maximum_val=} /" - else: - desc += f"ID:{indname}{curr_val} OK below {maximum_val=} /" - retboolList.append(ret) - else: - desc += f"ID:{indname} - no max set /" - retboolList.append(False) - #pokud obsahuje aspon jedno true - slopehigh = any(retboolList) - - #DEBUG - poté zapsat jen když je True - if slopehigh: - state.ilog(e=f"SLOPEHIGH {slopehigh}", msg=desc) - - return slopehigh - #resetujeme, kdyz 1) je aktivni buy protection 2) kdyz to ujede #TODO mozna tick2reset spoustet jednou za X opakovani def pendingbuys_optimalization(): if len(state.vars.pendingbuys)>0: - if buy_protection_enabled(): - #state.ilog(e="PENDINGBUYS reset", message=inspect.currentframe().f_code.co_name) + #misto volani buy_protection vytvarime direktivu cancel_pendingbuys + + #pouze jako OR + #cancel_pendingbuys_above + #cancel_pendingbuys_below + + pb_dict= get_work_dict_with_directive(starts_with="cancel_pendingbuys_if") + + state.ilog(e=f"CANCEL PB work_dict", message=pb_dict) + #u techto ma smysl pouze OR + cond = create_conditions_from_directives(pb_dict, "OR") + result, conditions_met = eval_cond_dict(cond) + state.ilog(e=f"CANCEL PB =OR= {result}", **conditions_met) + if result: res = asyncio.run(state.cancel_pending_buys()) state.ilog(e="CANCEL pendingbuyes", pb=state.vars.pendingbuys, res=res) else: @@ -645,18 +413,6 @@ def next(data, state: StrategyState): state.vars.jevylozeno = 0 state.ilog(e="PB prazdne nastavujeme: neni vylozeno", jevylozeno=state.vars.jevylozeno) - ##kdy nesmí být žádné nákupní objednávky - zruší se - def buy_protection_enabled(): - dont_buy_when = dict(AND=dict(), OR=dict()) - ##add conditions here - dont_buy_when['rsi_too_high'] = state.indicators.RSI14[-1] > safe_get(state.vars, "rsi_dont_buy_above",50) - dont_buy_when['slope_too_low'] = slope_too_low() - - result, cond_met = eval_cond_dict(dont_buy_when) - if result: - state.ilog(e=f"BUY_PROTECTION {cond_met}", message=cond_met) - return result - def sell_protection_enabled(): options = safe_get(state.vars, 'sell_protection', None) if options is None: @@ -669,148 +425,223 @@ def next(data, state: StrategyState): disable_sell_proteciton_when['disabled_in_config'] = safe_get(options, 'enabled', False) is False #too good to be true (maximum profit) #disable_sell_proteciton_when['tgtbt_reached'] = safe_get(options, 'tgtbt', False) is False - + disable_sell_proteciton_when['disable_if_positions_above'] = int(safe_get(options, 'disable_if_positions_above', 0)) < state.positions #testing preconditions result, conditions_met = eval_cond_dict(disable_sell_proteciton_when) if result: - state.ilog(e=f"SELL_PROTECTION DISABLED by precondition {conditions_met}") + state.ilog(e=f"SELL_PROTECTION DISABLED by {conditions_met}", **conditions_met) return False - - dont_sell_when = dict(AND=dict(), OR=dict()) - ##add conditions here - - #IDENTIFIKOVAce rustoveho MOMENTA - pokud je momentum, tak prodávat později - #pokud je slope too high, pak prodavame jakmile slopeMA zacne klesat, napr. 4MA (TODO 3) + work_dict_dont_sell_if = get_work_dict_with_directive(starts_with="dont_sell_if") + state.ilog(e=f"SELL PROTECTION work_dict", message=work_dict_dont_sell_if) - #TODO zkusit pro pevny profit, jednoduse pozdrzet prodej - dokud tick_price roste nebo se drzi tak neprodavat, pokud klesne prodat - #mozna mit dva mody - pri vetsi volatilite pouzivat momentum, pri mensi nebo kdyz potrebuju pryc, tak prodat hned - - - #toto docasne pryc dont_sell_when['slope_too_high'] = slope_too_high() and not isfalling(state.indicators.slopeMA,4) - dont_sell_when['AND']['slopeMA_rising'] = isrising(state.indicators.slopeMA,safe_get(options, 'slopeMA_rising', 2)) - dont_sell_when['AND']['rsi_not_falling'] = not isfalling(state.indicators.RSI14,safe_get(options, 'rsi_not_falling',3)) - #dont_sell_when['rsi_dont_buy'] = state.indicators.RSI14[-1] > safe_get(state.vars, "rsi_dont_buy_above",50) - - result, conditions_met = eval_cond_dict(dont_sell_when) + or_cond = create_conditions_from_directives(work_dict_dont_sell_if, "OR") + result, conditions_met = eval_cond_dict(or_cond) + state.ilog(e=f"SELL PROTECTION =OR= {result}", **conditions_met) if result: - state.ilog(e=f"SELL_PROTECTION {conditions_met} enabled") - return result + return True + + #OR neprosly testujeme AND + and_cond = create_conditions_from_directives(work_dict_dont_sell_if, "AND") + result, conditions_met = eval_cond_dict(and_cond) + state.ilog(e=f"SELL PROTECTION =AND= {result}", **conditions_met) + return result + + # #IDENTIFIKOVAce rustoveho MOMENTA - pokud je momentum, tak prodávat později + + # #pokud je slope too high, pak prodavame jakmile slopeMA zacne klesat, napr. 4MA (TODO 3) + + # #TODO zkusit pro pevny profit, jednoduse pozdrzet prodej - dokud tick_price roste nebo se drzi tak neprodavat, pokud klesne prodat + # #mozna mit dva mody - pri vetsi volatilite pouzivat momentum, pri mensi nebo kdyz potrebuju pryc, tak prodat hned + + #puvodni nastaveni +# slopeMA_rising = 2 +#     rsi_not_falling = 3 + + + # #testing preconditions + # result, conditions_met = eval_cond_dict(disable_sell_proteciton_when) + # if result: + # state.ilog(e=f"SELL_PROTECTION DISABLED by precondition {conditions_met}") + # return False + + # dont_sell_when = dict(AND=dict(), OR=dict()) + # ##add conditions here + + # #IDENTIFIKOVAce rustoveho MOMENTA - pokud je momentum, tak prodávat později + + # #pokud je slope too high, pak prodavame jakmile slopeMA zacne klesat, napr. 4MA (TODO 3) + + # #TODO zkusit pro pevny profit, jednoduse pozdrzet prodej - dokud tick_price roste nebo se drzi tak neprodavat, pokud klesne prodat + # #mozna mit dva mody - pri vetsi volatilite pouzivat momentum, pri mensi nebo kdyz potrebuju pryc, tak prodat hned + + # #TOTO do budoucna zdynamictit + + # #toto docasne pryc dont_sell_when['slope_too_high'] = slope_too_high() and not isfalling(state.indicators.slopeMA,4) + # dont_sell_when['AND']['slopeMA_rising'] = isrising(state.indicators.slopeMA,safe_get(options, 'slopeMA_rising', 2)) + # dont_sell_when['AND']['rsi_not_falling'] = not isfalling(state.indicators.RSI14,safe_get(options, 'rsi_not_falling',3)) + # #dont_sell_when['rsi_dont_buy'] = state.indicators.RSI14[-1] > safe_get(state.vars, "rsi_dont_buy_above",50) + + # result, conditions_met = eval_cond_dict(dont_sell_when) + # if result: + # state.ilog(e=f"SELL_PROTECTION {conditions_met} enabled") + # return result #preconditions and conditions of BUY SIGNAL - def buy_conditions_met(): - #preconditions - + def conditions_met(): + #preconditions - TODO zdynamictit dont_buy_when = dict(AND=dict(), OR=dict()) + #OBECNE DONT BUYS if safe_get(state.vars, "buy_only_on_confirmed",True): dont_buy_when['bar_not_confirmed'] = (data['confirmed'] == 0) #od posledniho vylozeni musi ubehnout N baru dont_buy_when['last_buy_offset_too_soon'] = data['index'] < (int(state.vars.lastbuyindex) + int(safe_get(state.vars, "lastbuy_offset",3))) dont_buy_when['blockbuy_active'] = (state.vars.blockbuy == 1) dont_buy_when['jevylozeno_active'] = (state.vars.jevylozeno == 1) - dont_buy_when['rsi_too_high'] = state.indicators.RSI14[-1] > safe_get(state.vars, "rsi_dont_buy_above",50) - dont_buy_when['slope_too_low'] = slope_too_low() - dont_buy_when['slope_too_high'] = slope_too_high() dont_buy_when['open_rush'] = is_open_rush(datetime.fromtimestamp(data['updated']).astimezone(zoneNY), safe_get(state.vars, "open_rush",0)) dont_buy_when['close_rush'] = is_close_rush(datetime.fromtimestamp(data['updated']).astimezone(zoneNY), safe_get(state.vars, "close_rush",0)) - dont_buy_when['rsi_is_zero'] = (state.indicators.RSI14[-1] == 0) - dont_buy_when['reverse_position_waiting_amount_not_0'] = (state.vars.reverse_position_waiting_amount != 0) - + #testing preconditions result, cond_met = eval_cond_dict(dont_buy_when) if result: - state.ilog(e=f"BUY precondition not met {cond_met}", message=cond_met) + state.ilog(e=f"BUY PRECOND GENERAL not met {cond_met}", message=cond_met) return False - #conditions - bud samostatne nebo v groupe - ty musi platit dohromady - buy_cond = dict(AND=dict(), OR=dict()) - ##add buy conditions here - #cond groups ["AND"] - #cond groups ["OR"] - #no cond group - takes first - #TEST BUY SIGNALu z cbartick_price - 3klesave za sebou - #buy_cond['tick_price_falling_trend'] = isfalling(state.cbar_indicators.tick_price,state.vars.Trend) + #SPECIFICKE DONT BUYS - direktivy zacinajici dont_buy + #dont_buy_below = value nebo nazev indikatoru + #dont_buy_above = value nebo hazev indikatoru - #slopeMA jde dolu, rsi jde nahoru - #buy mame kazdy potvrzeny, tzn. rsi falling muze byt jen 2 + #do INITU + work_dict_dont_buy = get_work_dict_with_directive(starts_with="dont_buy_if") - #buy_cond['AND']['slopeMA_falling'] = isfalling(state.indicators.slopeMA,3) - #buy_cond['AND']['rsi_is_rising'] = isrising(state.indicators.RSI14,2) - #buy_cond["AND"]["rsi_buy_signal_below"] = state.indicators.RSI14[-1] < safe_get(state.vars, "rsi_buy_signal_below",40) + state.ilog(e=f"BUY PRECOND DONTBUY work_dict", message=work_dict_dont_buy) + #u techto ma smysl pouze OR + precond = create_conditions_from_directives(work_dict_dont_buy, "OR") + result, conditions_met = eval_cond_dict(precond) + state.ilog(e=f"BUY PRECOND DONTBUY =OR= {result}", **conditions_met) + if result: + return False - #puvodni buy conditiony RSI pod + EMA klesajici - #buy_cond["AND"]["rsi_buy_signal_below"] = state.indicators.RSI14[-1] < safe_get(state.vars, "rsi_buy_signal_below",40) - #buy_cond["AND"]["ema_trend_is_falling"] = isfalling(state.indicators.ema,state.vars.Trend) + #tyto timto nahrazeny - dat do konfigurace + #dont_buy_when['rsi_too_high'] = state.indicators.RSI14[-1] > safe_get(state.vars, "rsi_dont_buy_above",50) + #dont_buy_when['slope_too_low'] = slope_too_low() + #dont_buy_when['slope_too_high'] = slope_too_high() + #dont_buy_when['rsi_is_zero'] = (state.indicators.RSI14[-1] == 0) + #dont_buy_when['reverse_position_waiting_amount_not_0'] = (state.vars.reverse_position_waiting_amount != 0) - #cela tato sekce prepracovana do dynamickych podminek - #buy_cond["AND"]["rsi_buy_signal_below"] = state.indicators.RSI14[-1] < safe_get(state.vars, "rsi_buy_signal_below",40) - #buy_cond["AND"]["rsi_is_falling"] = isfalling(state.indicators.RSI14,state.vars.Trend) - #buy_cond["AND"]['crsi_below_crsi_buy_limit'] = state.cbar_indicators.CRSI[-1] < safe_get(state.vars, "crsi_buy_signal_below",25) - #buy_cond["AND"]['slopeMA_is_below_limit'] = state.indicators.slopeMA[-1] < safe_get(state.vars, "slopeMA_buy_signal_below",1) - - #TBD nize prepracovat do funkce a pouzit obecne - # do pole podminek pridame podminky z indikatoru, ktere maji nastavenou direktivu buy_if_cross_down(up) - #buy_cond["AND"].update(get_conditions_from_indicators_with_directive("buy_if_cross_down")) #u indikatoru muzoun byt tyto directivy pro generovani buysignalu - # buy_if_crossed_down - kdyz prekrocil dolu - # buy_if_crossed_up - kdyz prekrocil nahoru - # buy_if_is_falling - kdyz je klesajici po N - # buy_if_is_rising - kdyz je rostouci po N - # buy_if_is_below - kdyz je pod prahem - # buy_if_is_above - kdyz je nad prahem + # buy_if_crossed_down - kdyz prekrocil dolu, VALUE: hodnota nebo nazev indikatoru + # buy_if_crossed_up - kdyz prekrocil nahoru, VALUE: hodnota nebo nazev indikatoru + # buy_if_falling - kdyz je klesajici po N, VALUE: hodnota + # buy_if_rising - kdyz je rostouci po N, VALUE: hodnota + # buy_if_below - kdyz je pod prahem, VALUE: hodnota nebo nazev indikatoru + # buy_if_above - kdyz je nad prahem, VALUE: hodnota nebo nazev indikatoru + # buy_if_pivot_a - kdyz je pivot A. VALUE: delka nohou + # buy_if_pivot_v - kdyz je pivot V. VALUE: delka nohou + + # direktivy se mohou nachazet v podsekci AND nebo OR - daneho indikatoru (nebo na volno, pak = OR) + # OR - staci kdyz plati jedna takova podminka a buysignal je aktivni + # AND - musi platit vsechny podminky ze vsech indikatoru, aby byl buysignal aktivni - #sekce pro dynamicke vytvareni podminek - pokud indikatory maji nastavenou buysignal direktivu - #dotahneme si jejich value a vytvorime podminku (plati AND) + #populate work dict - muze byt i jen jednou v INIT nebo 1x za cas + #dict oindexovane podminkou (OR/AND) obsahuje vsechny buy_if direktivy v tuplu (nazevind,direktiva,hodnota + # {'AND': [('nazev indikatoru', 'nazev direktivy', 'hodnotadirektivy')], 'OR': []} + work_dict_buy_if = get_work_dict_with_directive(starts_with="buy_if") + state.ilog(e=f"BUY SIGNAL work_dict", message=work_dict_buy_if) - indicators_with_is_above = get_indicators_with_directive("buy_if_is_above") - for indicator, value in indicators_with_is_above: - buy_cond["AND"]["buy_if_is_above_"+indicator] = get_source_or_MA(indicator)[-1] > value + buy_or_cond = create_conditions_from_directives(work_dict_buy_if, "OR") + result, conditions_met = eval_cond_dict(buy_or_cond) + state.ilog(e=f"BUY SIGNAL =OR= {result}", **conditions_met) + if result: + return True + + #OR neprosly testujeme AND + buy_and_cond = create_conditions_from_directives(work_dict_buy_if, "AND") + result, conditions_met = eval_cond_dict(buy_and_cond) + state.ilog(e=f"BUY SIGNAL =AND= {result}", **conditions_met) + return result - indicators_with_is_below = get_indicators_with_directive("buy_if_is_below") - for indicator, value in indicators_with_is_below: - buy_cond["AND"]["buy_if_is_below_"+indicator] = get_source_or_MA(indicator)[-1] < value + #preklad direktivy podle typu, pokud je int anebo float - je to primo hodnota + #pokud je str, jde o indikator a dotahujeme posledni hodnotu z nej + def value_or_indicator(value): + if isinstance(value, (int, float)): + return value + elif isinstance(value, str): + try: + #pokud existuje MA bereme MA jinak standard + ret = get_source_or_MA(indicator=value)[-1] + state.ilog(e=f"Pro porovnani bereme posledni hodnotu {ret} z indikatoru {value}") + except Exception as e : + ret = 0 + state.ilog(e=f"Neexistuje indikator s nazvem {value} vracime 0" + str(e) + format_exc()) + return ret - indicators_with_is_falling = get_indicators_with_directive("buy_if_is_falling") - for indicator, value in indicators_with_is_falling: - buy_cond["AND"]["buy_if_is_falling_"+indicator] = isfalling(get_source_or_MA(indicator),value) + #funkce vytvori podminky (bud pro AND/OR) z pracovniho dict + def create_conditions_from_directives(work_dict, cond_type): + cond = {} + cond[cond_type] = {} + for indname, directive, value in work_dict[cond_type]: + #direktivy zobecnime ve tvaru prefix_ACTION + # ACTIONS = is_above, is_below, is_falling, is_rising, crossed_up, crossed_down, is_pivot_a, is_pivot_v + + #OBECNE DIREKTIVY - REUSOVATELNE + if directive.endswith("above"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = get_source_or_MA(indname)[-1] > value_or_indicator(value) + elif directive.endswith("below"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = get_source_or_MA(indname)[-1] < value_or_indicator(value) + elif directive.endswith("falling"): + if directive.endswith("not_falling"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = not isfalling(get_source_or_MA(indname),value) + else: + cond[cond_type][directive+"_"+indname+"_"+str(value)] = isfalling(get_source_or_MA(indname),value) + elif directive.endswith("rising"): + if directive.endswith("not_rising"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = not isrising(get_source_or_MA(indname),value) + else: + cond[cond_type][directive+"_"+indname+"_"+str(value)] = isrising(get_source_or_MA(indname),value) + elif directive.endswith("crossed_down"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = buy_if_crossed_down(indname, value_or_indicator(value)) + elif directive.endswith("crossed_up"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = buy_if_crossed_up(indname, value_or_indicator(value)) + elif directive.endswith("pivot_a"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = is_pivot(source=get_source_or_MA(indname), leg_number=value, type="A") + elif directive.endswith("pivot_v"): + cond[cond_type][directive+"_"+indname+"_"+str(value)] = is_pivot(source=get_source_or_MA(indname), leg_number=value, type="V") + + #PRIPADNE DALSI SPECIFICKE ZDE + # elif directive == "buy_if_necospecifckeho": + # pass + + return cond - indicators_with_is_rising = get_indicators_with_directive("buy_if_is_rising") - for indicator, value in indicators_with_is_rising: - buy_cond["AND"]["buy_if_is_rising_"+indicator] = isrising(get_source_or_MA(indicator),value) + #tato funkce vytvori dictionary typu podminek (OR/AND) + # z indikatoru, ktere obsahuji direktivami daneho typu(buy_if, dont_buy_when) + # v tuplu (nazevind,direktiva,hodnota) + # do OR jsou dane i bez prefixu + # {'AND': [('nazev indikatoru', 'nazev direktivy', 'hodnotadirektivy')], 'OR': []} + def get_work_dict_with_directive(starts_with: str): + reslist = dict(AND=[], OR=[]) - indicators_with_cross_down = get_indicators_with_directive("buy_if_crossed_down") - for indicator, value in indicators_with_cross_down: - buy_cond["AND"]["buy_if_crossed_down_"+indicator] = buy_if_crossed_down(indicator, value) - - indicators_with_cross_up = get_indicators_with_directive("buy_if_crossed_up") - for indicator, value in indicators_with_cross_up: - buy_cond["AND"]["buy_if_crossed_up_"+indicator] = buy_if_crossed_up(indicator, value) - - - #slopME klesa a RSI začalo stoupat - # buy_cond["AND"]["rsi_is_rising2"] = isrising(state.indicators.RSI14,2) - # buy_cond['AND']['slopeMA_falling_Trend'] = isfalling(state.indicators.slopeMA,state.vars.Trend) - # buy_cond["AND"]["rsi_buy_signal_below"] = state.indicators.RSI14[-1] < safe_get(state.vars, "rsi_buy_signal_below",40) - - #zkusit jako doplnkovy BUY SIGNAL 3 klesavy cbar RSI pripadne TICK PRICE - result, conditions_met = eval_cond_dict(buy_cond) - #if result: - state.ilog(e=f"BUY SIGNAL {result} {conditions_met}") - return result - - #vrati vsechny indikatory, ktere obsahuji danou directivu a vrati v listu tuple (nazev, hodnota) - def get_indicators_with_directive(directive): - reslist = [] for indname, indsettings in state.vars.indicators.items(): for option,value in indsettings.items(): - if option == directive: - reslist.append((indname, value)) - return reslist + if option.startswith(starts_with): + reslist["OR"].append((indname, option, value)) + if option == "AND": + #vsechny buy direktivy, ktere jsou pod AND + for key, val in value.items(): + if key.startswith(starts_with): + reslist["AND"].append((indname, key, val)) + if option == "OR" : + #vsechny buy direktivy, ktere jsou pod OR + for key, val in value.items(): + if key.startswith(starts_with): + reslist["OR"].append((indname, key, val)) + return reslist def get_source_or_MA(indicator): #pokud ma, pouzije MAcko, pokud ne tak standardni indikator @@ -832,7 +663,7 @@ def next(data, state: StrategyState): return res def eval_buy(): - if buy_conditions_met(): + if conditions_met(): vyloz() def populate_cbar_tick_price_indicator(): @@ -863,11 +694,11 @@ def next(data, state: StrategyState): #print(state.indicators.items()) for key in state.indicators: if key != 'time': - last_ind_vals[key] = state.indicators[key][-5:] + last_ind_vals[key] = state.indicators[key][-6:] for key in state.cbar_indicators: if key != 'time': - last_ind_vals[key] = state.cbar_indicators[key][-5:] + last_ind_vals[key] = state.cbar_indicators[key][-6:] # for key in state.secondary_indicators: # if key != 'time': @@ -918,14 +749,14 @@ def next(data, state: StrategyState): if on_confirmed_only is False or (on_confirmed_only is True and data['confirmed']==1): try: source = state.bars[req_source][-ema_length:] - if len(source) > ema_length: - ema_value = ema(source, ema_length) - val = ema_value[-1] - state.indicators[name][-1]= val - #state.indicators[name][-1]= round2five(val) - state.ilog(e=f"IND {name} EMA {val} {ema_length=}") - else: - state.ilog(e=f"IND {name} EMA necháváme 0", message="not enough source data", source=source, ema_length=ema_length) + #if len(source) > ema_length: + ema_value = ema(source, ema_length) + val = round(ema_value[-1],4) + state.indicators[name][-1]= val + #state.indicators[name][-1]= round2five(val) + state.ilog(e=f"IND {name} EMA {val} {ema_length=}") + #else: + # state.ilog(e=f"IND {name} EMA necháváme 0", message="not enough source data", source=source, ema_length=ema_length) except Exception as e: state.ilog(e=f"IND ERROR {name} EMA necháváme 0", message=str(e)+format_exc()) @@ -958,14 +789,14 @@ def next(data, state: StrategyState): #cekame na dostatek dat if len(source) > rsi_length: rsi_res = rsi(source, rsi_length) - rsi_value = trunc(rsi_res[-1],4) + rsi_value = round(rsi_res[-1],4) state.indicators[name][-1]=rsi_value state.ilog(e=f"IND {name} RSI {rsi_value}") if rsi_MA_length is not None: src = state.indicators[name][-rsi_MA_length:] rsi_MA_res = ema(src, rsi_MA_length) - rsi_MA_value = rsi_MA_res[-1] + rsi_MA_value = round(rsi_MA_res[-1],4) state.indicators[name+"MA"][-1]=rsi_MA_value state.ilog(e=f"IND {name} RSIMA {rsi_MA_value}") @@ -1077,7 +908,7 @@ def next(data, state: StrategyState): if slope_MA_length is not None: source = state.indicators[name][-slope_MA_length:] slopeMAseries = ema(source, slope_MA_length) #state.bars.vwap - slopeMA = slopeMAseries[-1] + slopeMA = round(slopeMAseries[-1],5) state.indicators[name+"MA"][-1]=slopeMA last_slopesMA = state.indicators[name+"MA"][-10:] @@ -1089,7 +920,6 @@ def next(data, state: StrategyState): print(f"Exception in {name} slope Indicator section", str(e)) state.ilog(e=f"EXCEPTION in {name}", msg="Exception in slope Indicator section" + str(e) + format_exc()) - def populate_dynamic_slope_indicator(name): options = safe_get(state.vars.indicators, name, None) if options is None: @@ -1167,7 +997,7 @@ def next(data, state: StrategyState): if slope_MA_length is not None: source = state.indicators[name][-slope_MA_length:] slopeMAseries = ema(source, slope_MA_length) #state.bars.vwap - slopeMA = slopeMAseries[-1] + slopeMA = round(slopeMAseries[-1],4) state.indicators[name+"MA"][-1]=slopeMA last_slopesMA = state.indicators[name+"MA"][-10:] @@ -1231,7 +1061,8 @@ def next(data, state: StrategyState): #lp = state.interface.get_last_price(symbol=state.symbol) lp = data['close'] state.ilog(e="ENTRY", msg=f"LP:{lp} P:{state.positions}/{round(float(state.avgp),3)} profit:{round(float(state.profit),2)} Trades:{len(state.tradeList)} DEF:{str(is_defensive_mode())}", pb=str(state.vars.pendingbuys), last_price=lp, data=data, stratvars=str(state.vars)) - state.ilog(e="Indikatory", msg=str(get_last_ind_vals())) + inds = get_last_ind_vals() + state.ilog(e="Indikatory", **inds) eval_buy() pendingbuys_optimalization() @@ -1244,16 +1075,20 @@ def init(state: StrategyState): #pro vsechny indikatory, ktere maji ve svych stratvars TYPE inicializujeme for indname, indsettings in state.vars.indicators.items(): for option,value in indsettings.items(): - if option == "type": - state.indicators[indname] = [] - #pokud ma MA_length incializujeme i MA variantu - if safe_get(indsettings, 'MA_length', False): - state.indicators[indname+"MA"] = [] - #specifika pro slope - if value == "slope": - #inicializujeme statinds (pro uhel na FE) - state.statinds[indname] = dict(minimum_slope=safe_get(indsettings, 'minimum_slope', -1), maximum_slope=safe_get(indsettings, 'maximum_slope', 1)) + #inicializujeme nejenom typizovane + #if option == "type": + state.indicators[indname] = [] + #pokud ma MA_length incializujeme i MA variantu + if safe_get(indsettings, 'MA_length', False): + state.indicators[indname+"MA"] = [] + #specifika pro slope + if value == "slope": + #inicializujeme statinds (pro uhel na FE) + state.statinds[indname] = dict(minimum_slope=safe_get(indsettings, 'minimum_slope', -1), maximum_slope=safe_get(indsettings, 'maximum_slope', 1)) + + #TODO presunout inicializaci work_dict u podminek - sice hodnoty nepujdou zmenit, ale zlepsi se performance + #pripadne udelat refresh kazdych x-iterací state.vars['sell_in_progress'] = False state.vars.mode = None state.vars.last_tick_price = 0 @@ -1263,14 +1098,21 @@ def init(state: StrategyState): state.vars.lastbuyindex = 0 state.vars.last_update_time = 0 state.vars.reverse_position_waiting_amount = 0 + #INIT promenne, ktere byly zbytecne ve stratvars + state.vars.pendingbuys={} + state.vars.limitka = None + state.vars.limitka_price=0 + state.vars.jevylozeno=0 + state.vars.blockbuy = 0 + state.vars["ticks2reset_backup"] = state.vars.ticks2reset #state.cbar_indicators['ivwap'] = [] state.cbar_indicators['tick_price'] = [] state.cbar_indicators['tick_volume'] = [] state.cbar_indicators['CRSI'] = [] #state.secondary_indicators['SRSI'] = [] - state.indicators['ema'] = [] - state.indicators['RSI14'] = [] + #state.indicators['ema'] = [] + #state.indicators['RSI14'] = [] initialize_dynamic_indicators() diff --git a/v2realbot/main.py b/v2realbot/main.py index 5e50b31..669a4aa 100644 --- a/v2realbot/main.py +++ b/v2realbot/main.py @@ -209,7 +209,7 @@ def _modify_stratin(stratin: StrategyInstance, stratin_id: UUID): if cs.is_stratin_running(id=stratin_id): res,id = cs.modify_stratin_running(si=stratin, id=stratin_id) else: - res, id = cs.modify_stratin(si=stratin, id=stratin_id) + res, id = cs.modify_stratin(si=stratin, id=stratin_id) if res == 0: return id elif res == -2: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND, detail=f"Error not found: {res}:{id}") diff --git a/v2realbot/slicingtest.py b/v2realbot/slicingtest.py new file mode 100644 index 0000000..f359609 --- /dev/null +++ b/v2realbot/slicingtest.py @@ -0,0 +1,7 @@ +word = "buy_if_not_something" + + +if word.endswith("something") and word[:-len] == "not_": + print("Word meets the condition.") +else: + print("Word does not meet the condition.") \ No newline at end of file diff --git a/v2realbot/static/js/utils.js b/v2realbot/static/js/utils.js index fd8444d..4bcd3bb 100644 --- a/v2realbot/static/js/utils.js +++ b/v2realbot/static/js/utils.js @@ -22,6 +22,7 @@ settings = {} settings //ostatni indicatory nez vwap, volume a bary indConfig = [ {name: "ema", titlevisible: false, embed: true, display: true, priceScaleId: "right", lastValueVisible: false}, + {name: "ema20", titlevisible: false, embed: true, display: true, priceScaleId: "right", lastValueVisible: false}, {name: "tick_volume", histogram: true, titlevisible: true, embed: true, display: true, priceScaleId: '', lastValueVisible: false}, {name: "tick_price", titlevisible: true, embed: true, display: true, priceScaleId: "right", lastValueVisible: false}, {name: "ivwap", titlevisible: true, embed: true, display: false, priceScaleId: "right", lastValueVisible: false}, @@ -38,6 +39,7 @@ indConfig = [ {name: "ema", titlevisible: false, embed: true, display: true, pri {name: "emaSlow", titlevisible: true, embed: true, display: true, priceScaleId: "right", lastValueVisible: false}, {name: "emaFast", titlevisible: true, embed: true, display: true, priceScaleId: "right", lastValueVisible: false}, {name: "RSI14", titlevisible: true, embed: true, display: true, priceScaleId: "middle", lastValueVisible: false}, + {name: "RSI14MA", titlevisible: true, embed: true, display: true, priceScaleId: "middle", lastValueVisible: false}, {name: "CRSI", titlevisible: true, embed: true, display: true, priceScaleId: "middle", lastValueVisible: false}, {name: "aroon", titlevisible: true, embed: true, display: true, priceScaleId: "middle", lastValueVisible: false}, {name: "apo", titlevisible: true, embed: true, display: true, priceScaleId: "middle", lastValueVisible: false}, diff --git a/v2realbot/utils/__pycache__/utils.cpython-310.pyc b/v2realbot/utils/__pycache__/utils.cpython-310.pyc index 58d3f8672fb3806b7b705e94a376f4cf2e924c68..74fa1378374325bcd0d543ff69344fd3b6882b3b 100644 GIT binary patch delta 4201 zcma)9d2AHd8J{;ZyB@FC%L0opjJ<2{u{MVRa~RBF<8YY79Ohi7OqP9b?ZNAr<;^Sx z$H=616Xj4t^3pbGnkH$|wEm+s)KXiur4&_4dZ?F3t8|*SBCRS}>K~PeRH;%zzi$?U z*-)vA{CnSg@4Mgk4t{a;{r-?uS{jhxcjd|@x~lcLP&N7Ja?5fuo}yvaM(4A2bOBqR z_S1zlGU-cfpo?hPq?Fi5%V`CS?X;3s!MKT1TFo}oC2VtAr8Tq`WIJe-ZJ{x?m2CqH za-{?_b#&>Zn%GX4(R$G8q>MJu#z~UsqD?dooE@~8w!qjuPM4z+T|rlZ!UMY`x{9_= zO0;!CPV8jrgp}CDg0JnSt6wLHJ#-CS3rZoH_JUXkLB}8vKV45ZKp;KH1zQ_wJJ{Mw zH$k8=whqGCOgn({AcV1nZiO%&q62h0^U+Q=j?-Oq2dMYnVIr@aJ^;LZpt6%~0i(O< zZus60-+Q25J)YUU^g-YqxQF);?FHUJx{ndi+D{LF<3331Ank*+`XH@C^bkD^X&eUb zk^S^2u#d7W6H4Mpu|$utBlI|14{;xXyb`n@)Q{4K!DXJ?2Helk6Tm%2b@~XHIZl)G zB#a3v_et@=X-#P%<1XoR{kE0kDdq+;Y(O`2!zUScRp(rmr4($0+dm(SD*Ww$Kl=p$ zM;enR2qj}AB2l?Q8j^U}AykRL5lM zACh^SBaeYHU*jlaDvq?MFKuf5pgS8iBSa%AsggrPlfyu%vPN!?vPyU}h@@n3dSON4AdsUqST;&3vCyR+4aFSt z*_Ctn)gG-NpI0CCst4%ngOzinb4^6Tf5$z}h!ppaZW0&$wc?T*E#gbT$IQp#w`xN> zZvr|GaiH8lid(kLsGiHNh3}go09kI8{lqT^iJ!=~_!?l{KUj56uw+iKD!&^n`M-kg z@PhrwSJTx3-n@YMa-a_Fm}Prk9fT?BA#w47OlbBRx}VN(5JRj z=)S1)4CBcG_678gJ0eVYuNV(r-n0^xltdt9Wil)Uk(+!su-qcb(#c%rQ!NS}B+)VZx7kTF25I2D^YKhrfZ|BLGl25+t%5fesImGNN68 zrs>CIrn~1|C!sEo;gtwc0MRhNqY3MV$yp~)6((15%C=11X52_-ro~h9*OHUs$Me5P zTEyW6>q(b*WZ$Qcp1IQ2?#s@p{g8WHX;e#R> zi#)&Z6xkw%7QWYf3S_2aSG8Ft?aJwFp=nLm4byOR{ZGP(Jkxd>y$>UB09U2C;VefR z+(9UaOSO|{*bNb1v;@5ETGW#CB(ZpW9;(mN^`vQ94mO)<>pCC8DGZyBAdDj1wdql~ zeYpYfKK$7MfB-Z?m;^}B&kb>XQ7_pm>dG48clA6KJoIKz1^XV?Zh_(*gghorm;J2S z>;2Ed)Es}FUOVvqg?iX=RX*N+r(G|UU)&S}W}NWJ{NNe6*RLDkR?ehr(#9F~?^!by ze*U=Vsrc4*>=9Q>+Pag=W|*t7vsuezrUNFQfT_Y*CV+SAjuDklhz*tLXn{kL^}$VH z zPy=#+_&=#rqlCXHh8O=%7ii)l!V3srLcsfpzliW`bCpdiYu>db&27UPn{9P9RtjS7U5ev zM7p+R9LqDMcxNu}2HI6djVv&}jmF+a*n@Cyq`VP$8=>Aeaqr;H36Usl2alhUfj9fZ znnU%x=MldQMBLZAGXo9+b9zXMZ+{=N&mZqVZ26Pajig}0{PN4X!| zN4Noa+fn$3IN2bsMo)!*R3InXV~Yd3K*-yY5uw+Wi(G7Z*P|#mhHxIi+l=0N-YtMf zdE5FLi1eThl+fTn&U)jR96itu;x$EZ3zTK9L-gB5_+ zmCw#)8Rze!g%N}k$X9p{=?e&t0Zb9MScm3M+IZyM4GA^+#W$BV9=%9m za7%CNrdTF3tM3{4IXdu?!vGGj<7p z&sisY!p8d}a?u6d+ssxiaR9&euCcfmfyI2 z1ytp8Rq}p503rTogufyb7!IZeS!&3;;Fgc~vdPrYN_N)asLOvUi6!yf{&A$A6hrZ5Ki1976cY#f)+RSeVN-oiDb@EU|R zgmnm45jG>>D#K>+E;IF{Wo38|vU?Hm9OV57ClLk_41k1uI_LV}G3Jcs&tSZ9`Qr$1 z-;GQBStQ>?cnjeYLL)*0!cP#cA-sctt)K5R>?}Y~(qNUVVV(1<9TCGVOSB4~R;HC`CE{Ysva(2dL<<##6ip2)a;VH-7S@76?Z1c9 BvHJi3 delta 3620 zcma);du&tJ8Nlzk*LH3k$2h?Yc@pPIY!e6+2%&*E;Smbu8N$O1F2?6LapL%zbM1tr zuvAM{wzb;mPKQRjc4L2ZO|=(I%hXoYiZ*SCt=*=*y4(FVZPS>hNmKurI*GCGJAot^ zCM}VF_nhy3&-1(=9KU(QYZe!~75H0H_bs|6{H(W{d~h|ql1wh4zG)@8j+W3;c&?{Q zsUMyj=m}cJ+UYXZo^a4|S~2a6Zlsm0gI2Lk>`~U4P%9L8Q%!589nsBnIjse&E%YP} z(7I_7?V|Ox0VEOHNSolJYU~V!&gXj z2VF^9!N^MunZ#<=3t`(6F4{)dKq5O)3eMKjN5I)*bR8sG#d;x~^>hPBc0n5LbR(p( zo1UVZn3Fy#-*(c?VEy=gvdJy93xs>XB*HqtX*cbG{$9G3?W5bI*><`EobF#F>7_eC za)3U@2w3c*yPaRftvO}NquxL-3KKc0_lEw0Hg<4$CMWBFI4D3)=&FbC#2k; zaIK?~q_6U^vD z={e;bp`=0?R=7{LVRD9@+;>EwTAu=R^eGM{tBh9@1^$_gPF%#LdVuO3-YnXx{9?bWFh2vwq72CYmBEdI zuFW+NfiDLf+QDnFU599#M-#Mc&%Ly$IhL01R-i(QOyz{0Y;VDCGvZ<91$WI?fr8}% z_*DmSs604(Tkjy9VYFKbu$?@XO`7@9^#PVOvdQ@HdbHO8y6zyl<{^CbEO*~lowk-@ zX^}0i+t`L~v=M8-);P=B+Atf(mX+_*%SK} z@T6O@1T&z(^Ppvw0aVRPJj4N0t!i%KBPxuPdYkvoPA_R9WT$x5*Ik!4JwA&;9a+=J zvJ@NOv5{wlr=*7Lm*p6hg|odSH_4KAaLy|*8AoL83>ed7df;{OQ-1|HDPj$N@v(oXU=3L9jT6!BDi@#r#I?{V=+CIO!(yp~ttK-0 z9(BV=rjuF2cvqY*d*q_J@XU9^%J304@wm7lezt5c ziHNH5=HP;zG${hj_t6CR2J8${s&SOvQpHgDjgZ_Q^Ja}OR&<(Tko#l z%bU)EG)VXpa}Vj@{+PC#o4GVyn@EnbKiqRs?&9ag*2-^_cJa^3%BD|&JU=9^_&?3- zJNN_~0f|7KZ<6^NsyG1Uq?GWd#I>qFk_+N;b%^%`D3p zG;`dF>O5x+W|Jct0D(hyi2|6h@Vnyrs9HcVsV z7Qcv^m__n01z_kw>vEK#GkygzuM%|fFQecR0#^gaC6=q%d@JnDcqj5YRc?ZDOCXyKtd=mw)Ahsa#GAm44iC8KHyB@AWRewSrd5T-xX)->(*jY{CkMk z9#ppVq{Wl4&Iany;&s$`L;RqA$agJIS#&g16}Nzr+*5wS--5#_(sKsYIKo264JD^% zp~!{I-UgL0xKS=hx9z9zTaruJK_Mj3UMWGsG@5U?s<3HyLv=eWCX1XI7Z?Pi1!ii zA!JAyYy)&*7ySyQ7=r&C@e9O5?j-5rFc#zo4V)=AtjW2bPhP`PBMushmA7&@q0L!1oaz?Qjo-&?Yk1zI!!X zz+N<31fNQoSbTUDJDcTb#NPqf&NDeC!r^XL4n=H#Q zYm53(T!lC+)~~Fe{53RfS0L4-3y2pH zFC*ki!UKrEfp`mnJD9(VxQ;l1kc;g@Y~i3TngKZ%avSTNBW4lJrPvl+FIHNx)-^6& g)!n*VywuuQR$5l-^18g5?(k`)&jzey}Z3jhEB diff --git a/v2realbot/utils/utils.py b/v2realbot/utils/utils.py index b41da08..1a66ac5 100644 --- a/v2realbot/utils/utils.py +++ b/v2realbot/utils/utils.py @@ -23,10 +23,36 @@ import numpy as np import pandas as pd from collections import deque + +#is_pivot function to check if there is A(V) shaped pivot in the list, each leg consists of N points +#middle point is the shared one [1,2,3,2,1] - one leg is [1,2,3] second leg is [3,2,1] +def is_pivot(source: list, leg_number: int, type: str = "A"): + if len(source) < (2 * leg_number)-1: + print("Not enough values in the list") + return False + + left_leg = source[-2*leg_number+1:-leg_number+1] + right_leg = source[-leg_number:] + + if type == "A": + if isrising(left_leg) and isfalling(right_leg): + return True + else: + return False + elif type == "V": + if isfalling(left_leg) and isrising(right_leg): + return True + else: + return False + else: + print("Unknown type") + return False + def crossed_up(threshold, list): """check if threshold has crossed up last thresholdue in list""" try: - if threshold < list[-1] and threshold >= list[-2]: + #upraveno, ze threshold muze byt vetsi nez predpredposledni + if threshold < list[-1] and threshold >= list[-2] or threshold < list[-1] and threshold >= list[-3]: return True else: return False @@ -36,7 +62,8 @@ def crossed_up(threshold, list): def crossed_down(threshold, list): """check if threshold has crossed down last thresholdue in list""" try: - if threshold > list[-1] and threshold <= list[-2]: + #upraveno, ze threshold muze byt mensi nez predpredposledni + if threshold > list[-1] and threshold <= list[-2] or threshold > list[-1] and threshold <= list[-3]: return True else: return False @@ -281,13 +308,17 @@ def is_open_hours(dt, business_hours: dict = None): and dt.date() not in holidays \ and business_hours["from"] <= dt.time() < business_hours["to"] -def isfalling(pole: list, pocet: int): +#vraci zda dane pole je klesajici (bud cele a nebo jen pocet poslednich) +def isfalling(pole: list, pocet: int = None): + if pocet is None: pocet = len(pole) if len(pole) j for i, j in zip(pole, pole[1:])) return res -def isrising(pole: list, pocet: int): +#vraci zda dane pole je roustouci (bud cele a nebo jen pocet poslednich) +def isrising(pole: list, pocet: int = None): + if pocet is None: pocet = len(pole) if len(pole)