tulipy_ind support, multioutput scale and gzip cache support #106 #115 #114 #112 #107

This commit is contained in:
David Brazda
2024-02-04 17:54:03 +07:00
parent c1145fec5b
commit 8456e6d739
15 changed files with 632 additions and 55 deletions

View File

@ -175,8 +175,162 @@ def is_pivot(source: list, leg_number: int, type: str = "A"):
print("Unknown type")
return False
#upravene a rozsirene o potencialne vetsi confrm body
#puvodni verze odpovida confirm_points = 1
#https://chat.openai.com/c/0e614d96-6af4-40db-a6ec-a8c57ce481b8
# def crossed_up(threshold, list, confirm_points=2):
# """
# Check if the threshold has crossed up in the last few values in price_list.
# A crossover is confirmed if the threshold is below the earlier prices and then crosses above in the later prices.
# The number of confirmation points can be specified; the default is 2.
# """
# try:
# if len(list) < confirm_points * 2:
# # Not enough data to confirm crossover
# return False
def crossed_up(threshold, list):
# # Split the list into two parts for comparison
# earlier_prices = list[-confirm_points*2:-confirm_points]
# later_prices = list[-confirm_points:]
# # Check if threshold was below earlier prices and then crossed above
# was_below = all(threshold < price for price in earlier_prices)
# now_above = all(threshold >= price for price in later_prices)
# return was_below and now_above
# except IndexError:
# # In case of an IndexError, return False
# return False
#recent cross up of two arrays (price1 crossed up price2), fallback to standard
#inputs are numpy arrays
# def crossed_up_numpy(price1, price2):
# if price1.size < 2 or price2.size < 2:
# return False # Not enough data
# # Calculate slopes for the last two points
# x = np.array([price1.size - 2, price1.size - 1])
# slope1, intercept1 = np.polyfit(x, price1[-2:], 1)
# slope2, intercept2 = np.polyfit(x, price2[-2:], 1)
# # Check if lines are almost parallel
# if np.isclose(slope1, slope2):
# return False
# # Calculate intersection point
# x_intersect = (intercept2 - intercept1) / (slope1 - slope2)
# y_intersect = slope1 * x_intersect + intercept1
# # Check if the intersection occurred between the last two points
# if x[0] < x_intersect <= x[1]:
# # Check if line1 crossed up line2
# return price1[-1] > price2[-1] and price1[-2] <= price2[-2]
# return False
#same but more efficient approach
def crossed_up_numpy(price1, price2):
if price1.size < 2 or price2.size < 2:
return False # Not enough data
# Indices for the last two points
x1, x2 = price1.size - 2, price1.size - 1
# Direct calculation of slopes and intercepts
slope1 = (price1[-1] - price1[-2]) / (x2 - x1)
intercept1 = price1[-1] - slope1 * x2
slope2 = (price2[-1] - price2[-2]) / (x2 - x1)
intercept2 = price2[-1] - slope2 * x2
# Check if lines are almost parallel
if np.isclose(slope1, slope2):
return False
# Calculate intersection point (x-coordinate only)
if slope1 != slope2: # Avoid division by zero
x_intersect = (intercept2 - intercept1) / (slope1 - slope2)
# Check if the intersection occurred between the last two points
if x1 < x_intersect <= x2:
# Check if line1 crossed up line2
return price1[-1] > price2[-1] and price1[-2] <= price2[-2]
return False
#recent cross up of two arrays (price1 crossed up price2), fallback to standard
#inputs are numpy arrays
# def crossed_down_numpy(price1, price2):
# if price1.size < 2 or price2.size < 2:
# return False # Not enough data
# # Calculate slopes for the last two points
# x = np.array([price1.size - 2, price1.size - 1])
# slope1, intercept1 = np.polyfit(x, price1[-2:], 1)
# slope2, intercept2 = np.polyfit(x, price2[-2:], 1)
# # Check if lines are almost parallel
# if np.isclose(slope1, slope2):
# return False
# # Calculate intersection point
# x_intersect = (intercept2 - intercept1) / (slope1 - slope2)
# y_intersect = slope1 * x_intersect + intercept1
# # Check if the intersection occurred between the last two points
# if x[0] < x_intersect <= x[1]:
# # Check if line1 crossed down line2
# return price1[-1] < price2[-1] and price1[-2] >= price2[-2]
# return False
#more efficient yet same, price1 - faster, price2 - slower
def crossed_down_numpy(price1, price2):
if price1.size < 2 or price2.size < 2:
return False # Not enough data
# Indices for the last two points
x1, x2 = price1.size - 2, price1.size - 1
# Direct calculation of slopes and intercepts
slope1 = (price1[-1] - price1[-2]) / (x2 - x1)
intercept1 = price1[-1] - slope1 * x2
slope2 = (price2[-1] - price2[-2]) / (x2 - x1)
intercept2 = price2[-1] - slope2 * x2
# Check if lines are almost parallel
if np.isclose(slope1, slope2):
return False
# Calculate intersection point (x-coordinate only)
if slope1 != slope2: # Avoid division by zero
x_intersect = (intercept2 - intercept1) / (slope1 - slope2)
# Check if the intersection occurred between the last two points
if x1 < x_intersect <= x2:
# Check if line1 crossed down line2
return price1[-1] < price2[-1] and price1[-2] >= price2[-2]
return False
#obalka pro crossup listu a thresholdu nebo listu a druheho listu
#value - svcalar or list, primary_line - usually faster
def crossed_up(value, primary_line):
if isinstance(value, list):
return crossed_up_numpy(np.array(primary_line), np.array(value))
else:
return crossed_up_threshold(threshold=value, list=primary_line)
#obalka pro crossdown listu a thresholdu nebo listu a druheho listu
#value - svcalar or list, primary_line - usually faster
def crossed_down(value, primary_line):
if isinstance(value, list):
return crossed_down_numpy(np.array(primary_line), np.array(value))
else:
return crossed_down_threshold(threshold=value, list=primary_line)
def crossed_up_threshold(threshold, list):
"""check if threshold has crossed up last thresholdue in list"""
try:
if threshold < list[-1] and threshold > list[-2]:
@ -190,7 +344,7 @@ def crossed_up(threshold, list):
except IndexError:
return False
def crossed_down(threshold, list):
def crossed_down_threshold(threshold, list):
"""check if threshold has crossed down last thresholdue in list"""
"""
Checks if a threshold has just crossed down a line represented by a list.
@ -383,6 +537,7 @@ def json_serial(obj):
datetime: lambda obj: obj.timestamp(),
UUID: lambda obj: str(obj),
Enum: lambda obj: str(obj),
np.int32: lambda obj: int(obj),
np.int64: lambda obj: int(obj),
np.float64: lambda obj: float(obj),
Order: lambda obj: obj.__dict__,
@ -445,7 +600,7 @@ qu = Queue()
#zoneNY = tz.gettz('America/New_York')
zoneNY = pytz.timezone('US/Eastern')
zoneUTC = pytz.utc
zonePRG = pytz.timezone('Europe/Amsterdam')
def print(*args, **kwargs):