Files
lightweight-charts-python/lightweight_charts/util.py
2024-05-25 14:33:09 +01:00

163 lines
4.8 KiB
Python

import asyncio
import json
from datetime import datetime
from random import choices
from typing import Literal, Union
from numpy import isin
import pandas as pd
class Pane:
def __init__(self, window):
from lightweight_charts import Window
self.win: Window = window
self.run_script = window.run_script
if hasattr(self, 'id'):
return
self.id = Window._id_gen.generate()
class IDGen(list):
ascii = 'abcdefghijklmnopqrstuvwxyz'
def generate(self) -> str:
var = ''.join(choices(self.ascii, k=8))
if var not in self:
self.append(var)
return f'window.{var}'
self.generate()
def parse_event_message(window, string):
name, args = string.split('_~_')
args = args.split(';;;')
func = window.handlers[name]
return func, args
def js_data(data: Union[pd.DataFrame, pd.Series]):
if isinstance(data, pd.DataFrame):
d = data.to_dict(orient='records')
filtered_records = [{k: v for k, v in record.items() if v is not None} for record in d]
else:
d = data.to_dict()
filtered_records = {k: v for k, v in d.items()}
return json.dumps(filtered_records, indent=2)
def snake_to_camel(s: str):
components = s.split('_')
return components[0] + ''.join(x.title() for x in components[1:])
def js_json(d: dict):
filtered_dict = {}
for key, val in d.items():
if key in ('self') or val in (None,):
continue
if '_' in key:
key = snake_to_camel(key)
filtered_dict[key] = val
return f"JSON.parse('{json.dumps(filtered_dict)}')"
def jbool(b: bool): return 'true' if b is True else 'false' if b is False else None
LINE_STYLE = Literal['solid', 'dotted', 'dashed', 'large_dashed', 'sparse_dotted']
MARKER_POSITION = Literal['above', 'below', 'inside']
MARKER_SHAPE = Literal['arrow_up', 'arrow_down', 'circle', 'square']
CROSSHAIR_MODE = Literal['normal', 'magnet', 'hidden']
PRICE_SCALE_MODE = Literal['normal', 'logarithmic', 'percentage', 'index100']
TIME = Union[datetime, pd.Timestamp, str, float]
NUM = Union[float, int]
FLOAT = Literal['left', 'right', 'top', 'bottom']
def as_enum(value, string_types):
types = string_types.__args__
return -1 if value not in types else types.index(value)
def marker_shape(shape: MARKER_SHAPE):
return {
'arrow_up': 'arrowUp',
'arrow_down': 'arrowDown',
}.get(shape) or shape
def marker_position(p: MARKER_POSITION):
return {
'above': 'aboveBar',
'below': 'belowBar',
'inside': 'inBar',
}.get(p)
class Emitter:
def __init__(self):
self._callable = None
def __iadd__(self, other):
self._callable = other
return self
def _emit(self, *args):
if self._callable:
if asyncio.iscoroutinefunction(self._callable):
asyncio.create_task(self._callable(*args))
else:
self._callable(*args)
class JSEmitter:
def __init__(self, chart, name, on_iadd, wrapper=None):
self._on_iadd = on_iadd
self._chart = chart
self._name = name
self._wrapper = wrapper
def __iadd__(self, other):
def final_wrapper(*arg):
other(self._chart, *arg) if not self._wrapper else self._wrapper(other, self._chart, *arg)
async def final_async_wrapper(*arg):
await other(self._chart, *arg) if not self._wrapper else await self._wrapper(other, self._chart, *arg)
self._chart.win.handlers[self._name] = final_async_wrapper if asyncio.iscoroutinefunction(other) else final_wrapper
self._on_iadd(other)
return self
class Events:
def __init__(self, chart):
self.new_bar = Emitter()
self.search = JSEmitter(chart, f'search{chart.id}',
lambda o: chart.run_script(f'''
Handler.makeSpinner({chart.id})
{chart.id}.search = Handler.makeSearchBox({chart.id})
''')
)
salt = chart.id[chart.id.index('.')+1:]
self.range_change = JSEmitter(chart, f'range_change{salt}',
lambda o: chart.run_script(f'''
let checkLogicalRange{salt} = (logical) => {{
{chart.id}.chart.timeScale().unsubscribeVisibleLogicalRangeChange(checkLogicalRange{salt})
let barsInfo = {chart.id}.series.barsInLogicalRange(logical)
if (barsInfo) window.callbackFunction(`range_change{salt}_~_${{barsInfo.barsBefore}};;;${{barsInfo.barsAfter}}`)
setTimeout(() => {chart.id}.chart.timeScale().subscribeVisibleLogicalRangeChange(checkLogicalRange{salt}), 50)
}}
{chart.id}.chart.timeScale().subscribeVisibleLogicalRangeChange(checkLogicalRange{salt})
'''),
wrapper=lambda o, c, *arg: o(c, *[float(a) for a in arg])
)