new ind basestats + indname cond s bar dictionary

This commit is contained in:
David Brazda
2023-09-21 22:06:05 +02:00
parent a0cbeb2bc6
commit 2bfd51845f
4 changed files with 176 additions and 21 deletions

View File

@ -6,7 +6,7 @@ from v2realbot.enums.enums import RecordType, StartBarAlign, Mode, Account, Orde
from v2realbot.indicators.indicators import ema, natr, roc
from v2realbot.indicators.oscillators import rsi
from v2realbot.common.PrescribedTradeModel import Trade, TradeDirection, TradeStatus, TradeStoplossType
from v2realbot.utils.utils import ltp, isrising, isfalling,trunc,AttributeDict, zoneNY, price2dec, print, safe_get, round2five, is_open_rush, is_close_rush, is_still, is_window_open, eval_cond_dict, crossed_down, crossed_up, crossed, is_pivot, json_serial, pct_diff
from v2realbot.utils.utils import ltp, isrising, isfalling,trunc,AttributeDict, zoneNY, price2dec, print, safe_get, round2five, is_open_rush, is_close_rush, is_still, is_window_open, eval_cond_dict, crossed_down, crossed_up, crossed, is_pivot, json_serial, pct_diff, create_new_bars
from v2realbot.utils.directive_utils import get_conditions_from_configuration
from v2realbot.common.model import SLHistory
from datetime import datetime, timedelta
@ -22,6 +22,7 @@ from msgpack import packb, unpackb
import asyncio
import os
from traceback import format_exc
from collections import defaultdict
print(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
""""
@ -89,7 +90,7 @@ def next(data, state: StrategyState):
return value
elif isinstance(value, str):
try:
#pokud existuje MA bereme MA jinak standard
#pokud existuje v indikatoru MA bereme MA jinak indikator, pokud neexistuje bereme bar
ret = get_source_or_MA(indicator=value)[-1]
state.ilog(lvl=0,e=f"Pro porovnani bereme posledni hodnotu {ret} z indikatoru {value}")
except Exception as e :
@ -108,6 +109,8 @@ def next(data, state: StrategyState):
#OBECNE DIREKTIVY - REUSOVATELNE
if directive.endswith("above"):
cond[cond_type][directive+"_"+indname+"_"+str(value)] = get_source_or_MA(indname)[-1] > value_or_indicator(value)
elif directive.endswith("equals"):
cond[cond_type][directive+"_"+indname+"_"+str(value)] = get_source_or_MA(indname)[-1] == value_or_indicator(value)
elif directive.endswith("below"):
cond[cond_type][directive+"_"+indname+"_"+str(value)] = get_source_or_MA(indname)[-1] < value_or_indicator(value)
elif directive.endswith("falling"):
@ -170,10 +173,14 @@ def next(data, state: StrategyState):
def get_source_or_MA(indicator):
#pokud ma, pouzije MAcko, pokud ne tak standardni indikator
#pokud to jmeno neexistuje, tak pripadne bere z barů (close,open,hlcc4, vwap atp.)
try:
return state.indicators[indicator+"MA"]
except KeyError:
return state.indicators[indicator]
try:
return state.indicators[indicator]
except KeyError:
return state.bars[indicator]
# #vrati true pokud dany indikator krosnul obema smery
# def buy_if_crossed(indicator, value):
# res = crossed(threshold=value, list=get_source_or_MA(indicator))
@ -256,7 +263,6 @@ def next(data, state: StrategyState):
else:
return
#WIP -
def populate_dynamic_custom_indicator(name):
ind_type = "custom"
@ -396,9 +402,53 @@ def next(data, state: StrategyState):
# return 0, Average(source_series[-lookback:])
#abs/rel divergence of two indicators
def divergence(params):
funcName = "indicatorDivergence"
#indicator to run on bar multiples
#např. umožní RSI na 5min
#params: resolution (bar multiples)
def upscaledrsi(params):
funcName = "upscaledrsi"
#new res in seconds
new_resolution = safe_get(params, "resolution", None)
old_resolution = state.bars["resolution"][-1]
#pokud potrebuju vsechny bary, tak si je dotahnu
new_bars = {}
new_bars = create_new_bars(state.bars, new_resolution)
#val = rsi(bars.)
#pokud potrebuju jen close nebo open muzu pouzit toto
# vezme to N-th element z pole
def resample_close_prices(bars, new_resolution):
# Check that the new resolution is a multiple of the old resolution.
if new_resolution % bars['resolution'][-1] != 0:
raise ValueError('New resolution must be a multiple of the old resolution.')
# Calculate the step size for selecting every Nth element.
step = new_resolution // bars['resolution'][-1]
# Extract close prices at the new resolution.
new_close_prices = bars['close'][::step]
#optimizied - but works only for numpy arrays, prevedeni z listu na numpy is costly - bars_array = np.array(bars)
#new_close_prices = np.take(bars['close'], np.arange(0, len(bars['close']), step), axis=0)
return new_close_prices
#pokud je vstup jedna hodnota - muzu brat close,open v danem rozliseni tzn. jen N-th hodnotu zde
# Check that the new resolution is a multiple of the old resolution.
if new_resolution % state.bars["resolution"][-1] != 0:
raise ValueError('The new resolution must be a multiple of the old resolution.')
#get the number of bars in the new resolution.
n = new_resolution // old_resolution
# Calculate the new resolution values.
new_resolution_values = old_resolution_values.reshape((-1, new_resolution // len(old_resolution_values)))
# Select the N-th values from the new resolution values.
new_resolution_values[:, n]
source1 = safe_get(params, "source1", None)
if source1 in ["open","high","low","close","vwap","hlcc4"]:
source1_series = state.bars[source1]
@ -411,6 +461,16 @@ def next(data, state: StrategyState):
source2_series = state.indicators[source2]
mode = safe_get(params, "type")
state.ilog(lvl=0,e=f"INSIDE {funcName} {source1=} {source2=} {mode=}", **params)
#abs/rel divergence of two indicators
def divergence(params):
funcName = "indicatorDivergence"
source1 = safe_get(params, "source1", None)
source1_series = get_source_series(source1)
source2 = safe_get(params, "source2", None)
source2_series = get_source_series(source2)
mode = safe_get(params, "type")
state.ilog(lvl=0,e=f"INSIDE {funcName} {source1=} {source2=} {mode=}", **params)
val = 0
if mode == "abs":
val = round(abs(float(source1_series[-1]) - float(source2_series[-1])),4)
@ -426,16 +486,65 @@ def next(data, state: StrategyState):
val = pct_diff(num1=float(source1_series[-1]),num2=float(source2_series[-1]))
return 0, val
#indicator allowing to be based on any bar parameter (index, high,open,close,trades,volume, etc.)
def barparams(params):
funcName = "barparams"
if params is None:
return -2, "params required"
source = safe_get(params, "source", None)
if source is None:
return -2, "source required"
try:
return 0, state.bars[source][-1]
except Exception as e:
return -2, str(e)+format_exc()
#vstupem je bud indicator nebo bar parametr
#na tomto vstupu dokaze provest zakladni statisticke funkce pro subpole X hodnot zpatky
#podporovane functions: min, max, mean
def basestats(params):
funcName = "basestats"
#name of indicator or
source = safe_get(params, "source", None)
lookback = safe_get(params, "lookback", None)
func = safe_get(params, "function", None)
source_dict = defaultdict(list)
source_dict[source] = get_source_series(source)
if lookback is None:
source_array = source_dict[source]
else:
try:
source_array = source_dict[source][-lookback-1:]
except IndexError:
source_array = source_dict[source]
if func == "min":
val = np.amin(source_array)
elif func == "max":
val = np.amax(source_array)
elif func == "mean":
val = np.mean(source_array)
else:
return -2, "wrong function"
return 0, val
def get_source_series(source):
try:
return state.bars[source]
except KeyError:
return state.indicators[source]
#strength, absolute change of parameter between current value and lookback value (n-past)
#used for example to measure unusual peaks
def delta(params):
funcName = "delta"
source = safe_get(params, "source", None)
lookback = safe_get(params, "lookback",1)
if source in ["open","high","low","close","vwap","hlcc4"]:
source_series = state.bars[source]
else:
source_series = state.indicators[source]
source_series = get_source_series(source)
lookbackval = source_series[-lookback-1]
currval = source_series[-1]
@ -448,19 +557,11 @@ def next(data, state: StrategyState):
def slope(params):
funcName = "slope"
source = safe_get(params, "source", None)
if source in ["open","high","low","close","vwap","hlcc4"]:
source_series = state.bars[source]
else:
source_series = state.indicators[source]
source_series = get_source_series(source)
lookback = safe_get(params, "lookback", 5)
lookback_priceline = safe_get(params, "lookback_priceline", None)
if lookback_priceline is None:
lookback_series = source_series
elif lookback_priceline in ["open","high","low","close","vwap","hlcc4"]:
lookback_series = state.bars[lookback_priceline]
else:
lookback_series = state.indicators[lookback_priceline]
lookback_series = get_source_series(lookback_priceline)
try:
lookbackprice = lookback_series[-lookback-1]

View File

@ -25,6 +25,58 @@ import numpy as np
import pandas as pd
from collections import deque
import numpy as np
def create_new_bars(bars, new_resolution):
"""Creates new bars dictionary in the new resolution.
Args:
bars: A dictionary representing ohlcv bars.
new_resolution: A new resolution in seconds.
Returns:
A dictionary representing ohlcv bars in the new resolution.
"""
# Check that the new resolution is a multiple of the old resolution.
if new_resolution % bars['resolution'][0] != 0:
raise ValueError('New resolution must be a multiple of the old resolution.')
# Calculate the number of bars in the new resolution.
new_bar_count = int(len(bars['time']) / (new_resolution / bars['resolution'][0]))
# Create a new dictionary to store the new bars.
new_bars = {'high': np.empty(new_bar_count),
'low': np.empty(new_bar_count),
'volume': np.empty(new_bar_count),
'close': np.empty(new_bar_count),
'open': np.empty(new_bar_count),
'time': np.empty(new_bar_count),
'resolution': [new_resolution]}
# Calculate the start and end time of each new bar.
new_bar_start_times = np.arange(0, new_bar_count) * new_resolution
new_bar_end_times = new_bar_start_times + new_resolution
# Find all the old bars that are within each new bar.
old_bar_indices_in_new_bars = np.searchsorted(bars['time'], new_bar_start_times, side='right') - 1
# Calculate the high, low, volume, and close of each new bar.
new_bar_highs = np.amax(bars['high'][old_bar_indices_in_new_bars:], axis=1)
new_bar_lows = np.amin(bars['low'][old_bar_indices_in_new_bars:], axis=1)
new_bar_volumes = np.sum(bars['volume'][old_bar_indices_in_new_bars:], axis=1)
new_bar_closes = bars['close'][old_bar_indices_in_new_bars[:,-1]]
# Add the new bars to the new dictionary.
new_bars['high'] = new_bar_highs
new_bars['low'] = new_bar_lows
new_bars['volume'] = new_bar_volumes
new_bars['close'] = new_bar_closes
new_bars['open'] = new_bar_closes[:-1]
new_bars['time'] = new_bar_start_times
return new_bars
def pct_diff(num1: float, num2: float, decimals: int = 3, absolute: bool = False):
if num1 == 0:
return 0
@ -246,6 +298,8 @@ def json_serial(obj):
return str(obj)
if isinstance(obj, Enum):
return str(obj)
if isinstance(obj, np.int64):
return int(obj)
if type(obj) is Order:
return obj.__dict__
if type(obj) is TradeUpdate: