Files
lightweight-charts-python/lightweight_charts/util.py
2024-06-27 13:47:12 +02:00

325 lines
13 KiB
Python

import asyncio
import orjson
from datetime import datetime
from random import choices
from typing import Literal, Union
from numpy import isin
import pandas as pd
import re
from matplotlib.colors import to_rgba
# # Predefined colors that stand out well on dark backgrounds
# COLORS = [
# 'rgba(255, 0, 0, 0.6)', # Red
# 'rgba(0, 255, 0, 0.6)', # Green
# 'rgba(0, 0, 255, 0.6)', # Blue
# 'rgba(255, 255, 0, 0.6)', # Yellow
# 'rgba(255, 165, 0, 0.6)', # Orange
# 'rgba(75, 0, 130, 0.6)', # Indigo
# 'rgba(238, 130, 238, 0.6)', # Violet
# 'rgba(0, 255, 255, 0.6)', # Cyan
# 'rgba(255, 192, 203, 0.6)', # Pink
# 'rgba(0, 128, 128, 0.6)', # Teal
# 'rgba(128, 0, 128, 0.6)', # Purple
# 'rgba(255, 215, 0, 0.6)', # Gold
# 'rgba(173, 255, 47, 0.6)', # Green Yellow
# ]
# def get_next_color():
# return random.choice(COLORS)
# Predefined pool of colors
COLORS = [
"#63AA57", "#8F8AB0", "#E24AEE", "#D06AA6", "#7891BA", "#A39A34", "#8A94A2", "#61BB2F",
"#FD569D", "#1EB6E1", "#379AC9", "#FD6F2E", "#8C9858", "#39A4A3", "#6D97F4", "#1ECB01", "#FA5B16", "#A6891C",
"#48CF10", "#D27B26", "#D56B55", "#FE3AB8", "#E35C51", "#EC4FE6", "#E250A3", "#BA618E", "#1BC074", "#C57784",
"#888BC5", "#4FA452", "#80885C", "#B97272", "#33BF98", "#B7961D", "#A07284", "#02E54E", "#AF7F35", "#F852EF",
"#6D955B", "#E0676E", "#F73DEC", "#CE53FD", "#9773D3", "#649E81", "#D062CE", "#AB73E7", "#A4729C", "#E76A07",
"#E85CCB", "#A16FB1", "#4BB859", "#B25EE2", "#8580CE", "#A275EF", "#AC9245", "#4D988D", "#B672C9", "#4CA96E",
"#C9873E", "#5BB147", "#10C783", "#D7647D", "#CB893A", "#A586BA", "#28C0A2", "#61A755", "#0EB7C5", "#2DADBC",
"#17BB71", "#2BC733", "#2BB890", "#F04EF8", "#699580", "#A88809", "#EB3FF6", "#A75ED3", "#859171", "#BB6285",
"#81A147", "#AD7CD2", "#65B630", "#C9616C", "#BD5EFA", "#7A9F30", "#2AB6AB", "#FC496A", "#687FC7", "#DB40E7",
"#07BCE9", "#509F63", "#EC4FDD", "#A079BE", "#C17297", "#E447C2", "#E95AD9", "#9FA01E", "#7E86CF", "#21E316",
"#1CABF9", "#17C24F", "#9C9254", "#C97994", "#4BA9DA", "#0DD595", "#13BEA8", "#C2855D", "#DF6C13", "#60B370",
"#0FC3F6", "#C1830E", "#3AC917", "#0EBBB0", "#CC50B4", "#B768EC", "#D47F49", "#B47BC5", "#38ADBD", "#05DC53",
"#44CD4E", "#838E65", "#49D70F", "#2DADBE", "#2CB0C9", "#DA703E", "#06B5CA", "#7BAF3E", "#918E79", "#2AA5E5",
"#C37F5E", "#07B8C9", "#4CBA27", "#E752C6", "#7F93B2", "#4798CD", "#45AA4C", "#4DB666", "#7683A7", "#758685",
"#4B9FAD", "#9280FD", "#6682DD", "#42ACBE", "#C1609F", "#D850DB", "#649A62", "#54CC22", "#AD81C1", "#BF7A43",
"#0FCEA5", "#D06DAF", "#87799B", "#4DA94E", "#2FD654", "#07D587", "#21CF0C", "#03CF34", "#42C771", "#D563CD",
"#6D9E9A", "#C76C59", "#68B368", "#11BCE5", "#0DCFB3", "#9266D8", "#BF67F6", "#88A04E", "#73BE17", "#67B437",
"#8586E4", "#9F8749", "#479CA5", "#CC777E", "#4FAF46", "#9D9836", "#918DAF", "#D167B8", "#6F9DA5", "#2BB167",
"#16B8BC", "#B4861F", "#A08487", "#67B357", "#5CAA5C", "#20CA49", "#D18813", "#15D63F", "#C8618F", "#887E92",
"#21C457", "#4EA8CE", "#53BE49", "#5A86D5", "#BD7E4E", "#27B0A1", "#33CF42", "#709083", "#38A8DE", "#4CA762",
"#1EA4FF", "#DE3EE4", "#70A860", "#39A3C8", "#6BBB39", "#F053F4", "#8C7FB5", "#969F21", "#B19841", "#E57148",
"#C25DA7", "#6DA979", "#B27D73", "#7F9786", "#41AC99", "#C58848", "#948F9E", "#6BB620", "#81AB3B", "#09DE44",
"#43A9D2", "#41B0D7", "#20ACAA", "#649FCB", "#CD8345", "#A88669", "#3EA5E7", "#F36A19", "#E06B48", "#8388BD",
"#EC6153", "#639082", "#52CA32", "#878BAA", "#02BCDB", "#828FD9", "#3DC07F", "#29D46A", "#9C7CC1", "#EB7713",
"#F95F6A", "#E25F4C", "#589994", "#D45AB7", "#DE66AB", "#B8715F", "#E850F4", "#FB6420", "#C2832C", "#6383C5",
"#D57A58", "#EF652C", "#02D71A", "#ED664D", "#60A526"
]
# Iterator to keep track of the current color index
color_index = 0
def get_next_color():
global color_index
# Get the next color from the list
color = COLORS[color_index]
# Convert the color from HEX to RGBA format
color_index = (color_index + 1) % len(COLORS)
return hex_to_rgba(color)
def hex_to_rgba(hex_color, alpha=0.5):
hex_color = hex_color.lstrip('#')
r, g, b = int(hex_color[0:2], 16), int(hex_color[2:4], 16), int(hex_color[4:6], 16)
return f'rgba({r}, {g}, {b}, {alpha})'
def apply_opacity(color, opacity):
"""
Converts any color format (named, hex, RGB, or RGBA) to RGBA format and applies a specified opacity.
Parameters:
- color (str): The color in named, hex, RGB, or RGBA format.
- opacity (float): The opacity value to apply, ranging from 0.0 to 1.0.
Returns:
- str: The color in 'rgba(r, g, b, opacity)' format with the specified opacity applied.
Raises:
- ValueError: If the opacity is not within the range of 0 to 1.
"""
# Validate the opacity
if not (0 <= opacity <= 1):
raise ValueError("Opacity must be between 0 and 1")
# Check if color is already in rgba format
rgba_regex = r'rgba?\((\d{1,3}),\s*(\d{1,3}),\s*(\d{1,3})(?:,\s*([0-9\.]+))?\)'
match = re.match(rgba_regex, color)
if match:
# Color is in RGB or RGBA format
r, g, b = int(match.group(1)), int(match.group(2)), int(match.group(3))
current_opacity = float(match.group(4)) if match.group(4) else 1
# Apply the new opacity by multiplying with the current opacity if it exists
final_opacity = current_opacity * opacity
else:
# Color is a named color or hex; convert it using matplotlib
rgba = to_rgba(color)
r, g, b, _ = [int(255 * x) for x in rgba]
final_opacity = opacity # Directly use the given opacity
return f"rgba({r}, {g}, {b}, {final_opacity})"
def is_vbt_indicator(variable):
# Get the module path of the variable's type
module_path = variable.__class__.__module__
# Check if it starts with 'vectorbtpro.indicators'
return module_path.startswith('vectorbtpro.indicators')
class Pane:
def __init__(self, window):
from lightweight_charts import Window
self.win: Window = window
self.run_script = window.run_script
self.bulk_run = window.bulk_run
if hasattr(self, 'id'):
return
self.id = Window._id_gen.generate()
class IDGen(list):
ascii = 'abcdefghijklmnopqrstuvwxyz'
def generate(self) -> str:
var = ''.join(choices(self.ascii, k=8))
if var not in self:
self.append(var)
return f'window.{var}'
self.generate()
def parse_event_message(window, string):
name, args = string.split('_~_')
args = args.split(';;;')
func = window.handlers[name]
return func, args
# def js_data(data: Union[pd.DataFrame, pd.Series]):
# if isinstance(data, pd.DataFrame):
# d = data.to_dict(orient='records')
# filtered_records = [{k: v for k, v in record.items() if v is not None and not pd.isna(v)} for record in d]
# else:
# d = data.to_dict()
# filtered_records = {k: v for k, v in d.items()}
# return json.dumps(filtered_records)
def js_data(data: Union[pd.DataFrame, pd.Series]):
if isinstance(data, pd.DataFrame):
# Converting DataFrame to a list of dictionaries, filtering out NaN values
filtered_records = data.dropna().to_dict(orient='records')
else:
# For pd.Series, convert to dict and drop NaN values
filtered_records = data.dropna().to_dict()
# Serialize using orjson, which returns bytes
# Decode bytes to string if necessary (JavaScript consumption requires string)
return orjson.dumps(filtered_records).decode('utf-8')
def snake_to_camel(s: str):
components = s.split('_')
return components[0] + ''.join(x.title() for x in components[1:])
# def js_json(d: dict):
# filtered_dict = {}
# for key, val in d.items():
# if key in ('self') or val in (None,):
# continue
# if '_' in key:
# key = snake_to_camel(key)
# filtered_dict[key] = val
# return f"JSON.parse('{json.dumps(filtered_dict)}')"
def js_json(d: dict):
filtered_dict = {}
for key, val in d.items():
if key == 'self' or val in (None,):
continue
if '_' in key:
key = snake_to_camel(key)
filtered_dict[key] = val
# Serialize the dictionary using orjson, automatically handling types that orjson can serialize
# Decode the bytes to string for use in JavaScript, escaping single quotes for JavaScript consumption
json_str = orjson.dumps(filtered_dict).decode('utf-8').replace("'", "\\'")
return f"JSON.parse('{json_str}')"
def jbool(b: bool): return 'true' if b is True else 'false' if b is False else None
MARKER_TYPE = Literal['entries', 'exits']
LINE_STYLE = Literal['solid', 'dotted', 'dashed', 'large_dashed', 'sparse_dotted']
MARKER_POSITION = Literal['above', 'below', 'inside']
MARKER_SHAPE = Literal['arrow_up', 'arrow_down', 'circle', 'square']
CROSSHAIR_MODE = Literal['normal', 'magnet', 'hidden']
PRICE_SCALE_MODE = Literal['normal', 'logarithmic', 'percentage', 'index100']
TIME = Union[datetime, pd.Timestamp, str, float]
NUM = Union[float, int]
FLOAT = Literal['left', 'right', 'top', 'bottom']
def as_enum(value, string_types):
types = string_types.__args__
return -1 if value not in types else types.index(value)
def marker_shape(shape: MARKER_SHAPE):
return {
'arrow_up': 'arrowUp',
'arrow_down': 'arrowDown',
}.get(shape) or shape
def marker_position(p: MARKER_POSITION):
return {
'above': 'aboveBar',
'below': 'belowBar',
'inside': 'inBar',
}.get(p)
class Emitter:
def __init__(self):
self._callable = None
def __iadd__(self, other):
self._callable = other
return self
def _emit(self, *args):
if self._callable:
if asyncio.iscoroutinefunction(self._callable):
asyncio.create_task(self._callable(*args))
else:
self._callable(*args)
class JSEmitter:
def __init__(self, chart, name, on_iadd, wrapper=None):
self._on_iadd = on_iadd
self._chart = chart
self._name = name
self._wrapper = wrapper
def __iadd__(self, other):
def final_wrapper(*arg):
other(self._chart, *arg) if not self._wrapper else self._wrapper(other, self._chart, *arg)
async def final_async_wrapper(*arg):
await other(self._chart, *arg) if not self._wrapper else await self._wrapper(other, self._chart, *arg)
self._chart.win.handlers[self._name] = final_async_wrapper if asyncio.iscoroutinefunction(other) else final_wrapper
self._on_iadd(other)
return self
class Events:
def __init__(self, chart):
self.new_bar = Emitter()
self.search = JSEmitter(chart, f'search{chart.id}',
lambda o: chart.run_script(f'''
Lib.Handler.makeSpinner({chart.id})
{chart.id}.search = Lib.Handler.makeSearchBox({chart.id})
''')
)
salt = chart.id[chart.id.index('.')+1:]
self.range_change = JSEmitter(chart, f'range_change{salt}',
lambda o: chart.run_script(f'''
let checkLogicalRange{salt} = (logical) => {{
{chart.id}.chart.timeScale().unsubscribeVisibleLogicalRangeChange(checkLogicalRange{salt})
let barsInfo = {chart.id}.series.barsInLogicalRange(logical)
if (barsInfo) window.callbackFunction(`range_change{salt}_~_${{barsInfo.barsBefore}};;;${{barsInfo.barsAfter}}`)
setTimeout(() => {chart.id}.chart.timeScale().subscribeVisibleLogicalRangeChange(checkLogicalRange{salt}), 50)
}}
{chart.id}.chart.timeScale().subscribeVisibleLogicalRangeChange(checkLogicalRange{salt})
'''),
wrapper=lambda o, c, *arg: o(c, *[float(a) for a in arg])
)
self.click = JSEmitter(chart, f'subscribe_click{salt}',
lambda o: chart.run_script(f'''
let clickHandler{salt} = (param) => {{
if (!param.point) return;
const time = {chart.id}.chart.timeScale().coordinateToTime(param.point.x)
const price = {chart.id}.series.coordinateToPrice(param.point.y);
window.callbackFunction(`subscribe_click{salt}_~_${{time}};;;${{price}}`)
}}
{chart.id}.chart.subscribeClick(clickHandler{salt})
'''),
wrapper=lambda func, c, *args: func(c, *[float(a) for a in args])
)
class BulkRunScript:
def __init__(self, script_func):
self.enabled = False
self.scripts = []
self.script_func = script_func
def __enter__(self):
self.enabled = True
def __exit__(self, *args):
self.enabled = False
self.script_func('\n'.join(self.scripts))
self.scripts = []
def add_script(self, script):
self.scripts.append(script)