import asyncio import json import os from base64 import b64decode from datetime import datetime from typing import Callable, Union, Literal, List, Optional import pandas as pd import random #from vectorbtpro.indicators import IndicatorFactory from .table import Table from .toolbox import ToolBox from .drawings import Box, HorizontalLine, RayLine, TrendLine, TwoPointDrawing, VerticalLine, VerticalSpan from .topbar import TopBar from .util import ( BulkRunScript, Pane, Events, IDGen, as_enum, jbool, js_json, TIME, NUM, FLOAT, LINE_STYLE, MARKER_POSITION, MARKER_SHAPE, CROSSHAIR_MODE, MARKER_TYPE, PRICE_SCALE_MODE, marker_position, marker_shape, js_data, is_vbt_indicator, apply_opacity, get_next_color ) current_dir = os.path.dirname(os.path.abspath(__file__)) INDEX = os.path.join(current_dir, 'js', 'index.html') class Window: _id_gen = IDGen() handlers = {} def __init__( self, script_func: Optional[Callable] = None, js_api_code: Optional[str] = None, run_script: Optional[Callable] = None ): self.loaded = False self.script_func = script_func self.scripts = [] self.final_scripts = [] self.bulk_run = BulkRunScript(script_func) if run_script: self.run_script = run_script if js_api_code: self.run_script(f'window.callbackFunction = {js_api_code}') def on_js_load(self): if self.loaded: return self.loaded = True if hasattr(self, '_return_q'): while not self.run_script_and_get('document.readyState == "complete"'): continue # scary, but works initial_script = '' self.scripts.extend(self.final_scripts) for script in self.scripts: initial_script += f'\n{script}' self.script_func(initial_script) def run_script(self, script: str, run_last: bool = False): """ For advanced users; evaluates JavaScript within the Webview. """ if self.script_func is None: raise AttributeError("script_func has not been set") if self.loaded: if self.bulk_run.enabled: self.bulk_run.add_script(script) else: self.script_func(script) elif run_last: self.final_scripts.append(script) else: self.scripts.append(script) def run_script_and_get(self, script: str): self.run_script(f'_~_~RETURN~_~_{script}') return self._return_q.get() def create_table( self, width: NUM, height: NUM, headings: tuple, widths: Optional[tuple] = None, alignments: Optional[tuple] = None, position: FLOAT = 'left', draggable: bool = False, background_color: str = '#121417', border_color: str = 'rgb(70, 70, 70)', border_width: int = 1, heading_text_colors: Optional[tuple] = None, heading_background_colors: Optional[tuple] = None, return_clicked_cells: bool = False, func: Optional[Callable] = None ) -> 'Table': return Table(*locals().values()) def create_subchart( self, position: FLOAT = 'left', width: float = 0.5, height: float = 0.5, sync_id: Optional[str] = None, scale_candles_only: bool = False, sync_crosshairs_only: bool = False, toolbox: bool = False, leftScale: bool = False ) -> 'AbstractChart': subchart = AbstractChart( self, width, height, scale_candles_only, toolbox, position=position, leftScale=leftScale ) if not sync_id: return subchart self.run_script(f''' Lib.Handler.syncCharts( {subchart.id}, {sync_id}, {jbool(sync_crosshairs_only)} ) ''', run_last=True) return subchart def style( self, background_color: str = '#0c0d0f', hover_background_color: str = '#3c434c', click_background_color: str = '#50565E', active_background_color: str = 'rgba(0, 122, 255, 0.7)', muted_background_color: str = 'rgba(0, 122, 255, 0.3)', border_color: str = '#3C434C', color: str = '#d8d9db', active_color: str = '#ececed' ): self.run_script(f'Lib.Handler.setRootStyles({js_json(locals())});') class SeriesCommon(Pane): def __init__(self, chart: 'AbstractChart', name: str = ''): super().__init__(chart.win) self._chart = chart if hasattr(chart, '_interval'): self._interval = chart._interval else: self._interval = 1 self._last_bar = None self.name = name self.num_decimals = 2 self.offset = 0 self.data = pd.DataFrame() self.markers = {} def _set_interval(self, df: pd.DataFrame): if not pd.api.types.is_datetime64_any_dtype(df['time']): df['time'] = pd.to_datetime(df['time']) common_interval = df['time'].diff().value_counts() if common_interval.empty: return self._interval = common_interval.index[0].total_seconds() units = [ pd.Timedelta(microseconds=df['time'].dt.microsecond.value_counts().index[0]), pd.Timedelta(seconds=df['time'].dt.second.value_counts().index[0]), pd.Timedelta(minutes=df['time'].dt.minute.value_counts().index[0]), pd.Timedelta(hours=df['time'].dt.hour.value_counts().index[0]), pd.Timedelta(days=df['time'].dt.day.value_counts().index[0]), ] self.offset = 0 for value in units: value = value.total_seconds() if value == 0: continue elif value >= self._interval: break self.offset = value break @staticmethod def _format_labels(data, labels, index, exclude_lowercase): def rename(la, mapper): return [mapper[key] if key in mapper else key for key in la] if 'date' not in labels and 'time' not in labels: labels = labels.str.lower() if exclude_lowercase: labels = rename(labels, {exclude_lowercase.lower(): exclude_lowercase}) if 'date' in labels: labels = rename(labels, {'date': 'time'}) elif 'time' not in labels: data['time'] = index labels = [*labels, 'time'] return labels def _df_datetime_format(self, df: pd.DataFrame, exclude_lowercase=None): df = df.copy() df.columns = self._format_labels(df, df.columns, df.index, exclude_lowercase) self._set_interval(df) if not pd.api.types.is_datetime64_any_dtype(df['time']): df['time'] = pd.to_datetime(df['time']) df['time'] = (df['time'].astype('int64') / 10 ** 9).astype(float) # if 'updated' in df.columns: # df['updated'] = pd.to_datetime(df['updated']).astype('int64') / 10**9 # Iterate over all columns and convert any additional datetime columns to timestamps (for example updated) for col in df.columns: # Skip columns that are explicitly managed elsewhere or meant to be excluded if col == 'time' or not pd.api.types.is_datetime64_any_dtype(df[col]): continue # Convert datetime to timestamp with nanosecond precision after the decimal df[col] = (pd.to_datetime(df[col]).astype('int64') / 10**9).astype(float) return df def _series_datetime_format(self, series: pd.Series, exclude_lowercase=None): series = series.copy() series.index = self._format_labels(series, series.name, series.index, exclude_lowercase) series['time'] = self._single_datetime_format(series['time']) return series def _single_datetime_format(self, arg) -> float: if isinstance(arg, (str, int, float)) or not pd.api.types.is_datetime64_any_dtype(arg): try: arg = pd.to_datetime(arg, unit='ms') except ValueError: arg = pd.to_datetime(arg) arg = self._interval * (arg.timestamp() // self._interval)+self.offset return arg def set(self, df: Optional[Union[pd.DataFrame, pd.Series]] = None, format_cols: bool = True): if df is None or (isinstance(df, pd.DataFrame) and df.empty): self.run_script(f'{self.id}.series.setData([])') self.data = pd.DataFrame() return if is_vbt_indicator(df): df = df.real #if df is pd.Series then convert to df if isinstance(df, pd.Series): df = df.to_frame(name=self.name) if format_cols: df = self._df_datetime_format(df, exclude_lowercase=self.name) if self.name: if self.name and len(df.columns) == 1: #if only one col rename it df.columns = ['value'] elif self.name not in df: raise NameError(f'No column named "{self.name}".') else: df = df.rename(columns={self.name: 'value'}) self.data = df.copy() self._last_bar = df.iloc[-1] self.run_script(f'{self.id}.series.setData({js_data(df)}); ') def update(self, series: pd.Series): series = self._series_datetime_format(series, exclude_lowercase=self.name) if self.name in series.index: series.rename({self.name: 'value'}, inplace=True) if self._last_bar is not None and series['time'] != self._last_bar['time']: self.data.loc[self.data.index[-1]] = self._last_bar self.data = pd.concat([self.data, series.to_frame().T], ignore_index=True) self._last_bar = series self.run_script(f'{self.id}.series.update({js_data(series)})') def _update_markers(self): self.run_script(f'{self.id}.series.setMarkers({json.dumps(list(self.markers.values()))})') def markers_set(self, markers: Union[pd.Series, pd.DataFrame], type: MARKER_TYPE = None, col_name: Optional[str] = None, position: MARKER_POSITION = 'below', shape: MARKER_SHAPE = 'arrow_up', color: str = None, text: str = ''): """ Adds multiple markers from pd series or Dataframe :param markers: A pandas Series or Dataframe with DateTimeIndex and boolean values. The index should be DateTimeIndex and values should be True/False. :param type: The type of the marker to quickly style entries or exits :param col_name: The name of the column to use in case of DataFrame :param position: The position of the marker. :param shape: The shape of the marker. :param color: The color of the marker. :return: a list of marker ids. It adds new markers to the chart. Existing markers will remain. To delete markers use remove_marker() or clear_markers() """ if type is not None: match type: case "entries": position = "below" shape = "arrow_up" color = "blue" if color is None else color case "exits": position = "above" shape = "arrow_down" color = "red" if color is None else color if isinstance(markers, pd.Series): markers = markers.to_frame(name="markers") markers = self._df_datetime_format(markers) # Get the list of columns in the DataFrame - must be datetimeindex #either with one column, or col_name is set columns = markers.columns.tolist() if "time" not in columns: raise ValueError("Time column required in the dataframe.") # If there are exactly two columns if len(columns) == 2: # Rename the non-"time" column to "value" other_col = [col for col in columns if col != "time"][0] markers.rename(columns={other_col: "value"}, inplace=True) # If there are more than two columns elif len(columns) > 2: if col_name in columns: # Keep "time" and col_name column and rename it to "value" markers = markers[["time", col_name]] markers.rename(columns={col_name: "value"}, inplace=True) else: raise ValueError("Specify a column name in the dataframe.") else: raise ValueError("No matching columns in the dataframe.") marker_ids = [] #self.markers = {} valid_rows = markers[markers['value']] # Filter rows where value is True for timestamp in valid_rows['time']: marker_id = self.win._id_gen.generate() self.markers[marker_id] = { "time": timestamp, "position": marker_position(position), # Default position "color": color, # Default color "shape": marker_shape(shape), # Default shape "text": text, # Default text } marker_ids.append(marker_id) # Sort markers by time self.markers = dict(sorted(self.markers.items(), key=lambda item: item[1]["time"])) self._update_markers() return marker_ids def marker_list(self, markers: list): """ Creates multiple markers.\n :param markers: The list of markers to set. These should be in the format:\n [ {"time": "2021-01-21", "position": "below", "shape": "circle", "color": "#2196F3", "text": ""}, {"time": "2021-01-22", "position": "below", "shape": "circle", "color": "#2196F3", "text": ""}, ... ] :return: a list of marker ids. """ markers = markers.copy() marker_ids = [] for marker in markers: marker_id = self.win._id_gen.generate() self.markers[marker_id] = { "time": self._single_datetime_format(marker['time']), "position": marker_position(marker['position']), "color": marker['color'], "shape": marker_shape(marker['shape']), "text": marker['text'], } marker_ids.append(marker_id) self._update_markers() return marker_ids def marker(self, time: Optional[datetime] = None, position: MARKER_POSITION = 'below', shape: MARKER_SHAPE = 'arrow_up', color: str = '#2196F3', text: str = '' ) -> str: """ Creates a new marker.\n :param time: Time location of the marker. If no time is given, it will be placed at the last bar. :param position: The position of the marker. :param color: The color of the marker (rgb, rgba or hex). :param shape: The shape of the marker. :param text: The text to be placed with the marker. :return: The id of the marker placed. """ try: formatted_time = self._last_bar['time'] if not time else self._single_datetime_format(time) except TypeError: raise TypeError('Chart marker created before data was set.') marker_id = self.win._id_gen.generate() self.markers[marker_id] = { "time": formatted_time, "position": marker_position(position), "color": color, "shape": marker_shape(shape), "text": text, } self._update_markers() return marker_id def remove_marker(self, marker_id: str): """ Removes the marker with the given id.\n """ self.markers.pop(marker_id) self._update_markers() def horizontal_line(self, price: NUM, color: str = 'rgb(122, 146, 202)', width: int = 2, style: LINE_STYLE = 'solid', text: str = '', axis_label_visible: bool = True, func: Optional[Callable] = None ) -> 'HorizontalLine': """ Creates a horizontal line at the given price. """ return HorizontalLine(self, price, color, width, style, text, axis_label_visible, func) def trend_line( self, start_time: TIME, start_value: NUM, end_time: TIME, end_value: NUM, round: bool = False, line_color: str = '#1E80F0', width: int = 2, style: LINE_STYLE = 'solid', ) -> TwoPointDrawing: return TrendLine(*locals().values()) def box( self, start_time: TIME, start_value: NUM, end_time: TIME, end_value: NUM, round: bool = False, color: str = '#1E80F0', fill_color: str = 'rgba(255, 255, 255, 0.2)', width: int = 2, style: LINE_STYLE = 'solid', ) -> TwoPointDrawing: return Box(*locals().values()) def ray_line( self, start_time: TIME, value: NUM, round: bool = False, color: str = '#1E80F0', width: int = 2, style: LINE_STYLE = 'solid', text: str = '' ) -> RayLine: # TODO return RayLine(*locals().values()) def vertical_line( self, time: TIME, color: str = '#1E80F0', width: int = 2, style: LINE_STYLE ='solid', text: str = '' ) -> VerticalLine: return VerticalLine(*locals().values()) def clear_markers(self): """ Clears the markers displayed on the data.\n """ self.markers.clear() self._update_markers() def price_line(self, label_visible: bool = True, line_visible: bool = True, title: str = ''): self.run_script(f''' {self.id}.series.applyOptions({{ lastValueVisible: {jbool(label_visible)}, priceLineVisible: {jbool(line_visible)}, title: '{title}', }})''') def precision(self, precision: int): """ Sets the precision and minMove.\n :param precision: The number of decimal places. """ min_move = 1 / (10**precision) self.run_script(f''' {self.id}.series.applyOptions({{ priceFormat: {{precision: {precision}, minMove: {min_move}}} }})''') self.num_decimals = precision def hide_data(self): self._toggle_data(False) def show_data(self): self._toggle_data(True) def _toggle_data(self, arg): self.run_script(f''' {self.id}.series.applyOptions({{visible: {jbool(arg)}}}) if ('volumeSeries' in {self.id}) {self.id}.volumeSeries.applyOptions({{visible: {jbool(arg)}}}) ''') def vertical_span( self, start_time: Union[TIME, tuple, list], end_time: Optional[TIME] = None, color: str = 'rgba(252, 219, 3, 0.2)', round: bool = False ): """ Creates a vertical line or span across the chart.\n Start time and end time can be used together, or end_time can be omitted and a single time or a list of times can be passed to start_time. """ if round: start_time = self._single_datetime_format(start_time) end_time = self._single_datetime_format(end_time) if end_time else None return VerticalSpan(self, start_time, end_time, color) class Line(SeriesCommon): def __init__(self, chart, name, color, style, width, price_line, price_label, crosshair_marker=True, priceScaleId="right"): super().__init__(chart, name) self.color = color #priceScaleId_line = f"priceScaleId: '{priceScaleId}'," if priceScaleId is not None else '' script = f''' {self.id} = {self._chart.id}.createLineSeries( "{name}", {{ color: '{color}', lineStyle: {as_enum(style, LINE_STYLE)}, lineWidth: {width}, lastValueVisible: {jbool(price_label)}, priceScaleId: '{priceScaleId}', priceLineVisible: {jbool(price_line)}, crosshairMarkerVisible: {jbool(crosshair_marker)}, {"""autoscaleInfoProvider: () => ({ priceRange: { minValue: 1_000_000_000, maxValue: 0, }, }), """ if chart._scale_candles_only else ''} }} ) null''' #print(script) self.run_script(script) # def _set_trend(self, start_time, start_value, end_time, end_value, ray=False, round=False): # if round: # start_time = self._single_datetime_format(start_time) # end_time = self._single_datetime_format(end_time) # else: # start_time, end_time = pd.to_datetime((start_time, end_time)).astype('int64') // 10 ** 9 # self.run_script(f''' # {self._chart.id}.chart.timeScale().applyOptions({{shiftVisibleRangeOnNewBar: false}}) # {self.id}.series.setData( # calculateTrendLine({start_time}, {start_value}, {end_time}, {end_value}, # {self._chart.id}, {jbool(ray)})) # {self._chart.id}.chart.timeScale().applyOptions({{shiftVisibleRangeOnNewBar: true}}) # ''') def scale(self, scale_margin_top: float = 0.0, scale_margin_bottom: float = 0.0): self.run_script(f''' {self.id}.series.priceScale().applyOptions({{ scaleMargins: {{top: {scale_margin_top}, bottom: {scale_margin_bottom}}} }})''') def delete(self): """ Irreversibly deletes the line, as well as the object that contains the line. """ self._chart._lines.remove(self) if self in self._chart._lines else None self.run_script(f''' {self.id}legendItem = {self._chart.id}.legend._lines.find((line) => line.series == {self.id}.series) {self._chart.id}.legend._lines = {self._chart.id}.legend._lines.filter((item) => item != {self.id}legendItem) if ({self.id}legendItem) {{ {self._chart.id}.legend.div.removeChild({self.id}legendItem.row) }} {self._chart.id}.chart.removeSeries({self.id}.series) delete {self.id}legendItem delete {self.id} ''') class Histogram(SeriesCommon): def __init__(self, chart, name, color, price_line, price_label, scale_margin_top, scale_margin_bottom): super().__init__(chart, name) self.color = color self.run_script(f''' {self.id} = {chart.id}.createHistogramSeries( "{name}", {{ color: '{color}', lastValueVisible: {jbool(price_label)}, priceLineVisible: {jbool(price_line)}, priceScaleId: '{self.id}', priceFormat: {{type: "volume"}}, }}, // precision: 2, ) {self.id}.series.priceScale().applyOptions({{ scaleMargins: {{top:{scale_margin_top}, bottom: {scale_margin_bottom}}} }})''') def delete(self): """ Irreversibly deletes the histogram. """ self.run_script(f''' {self.id}legendItem = {self._chart.id}.legend._lines.find((line) => line.series == {self.id}.series) {self._chart.id}.legend._lines = {self._chart.id}.legend._lines.filter((item) => item != {self.id}legendItem) if ({self.id}legendItem) {{ {self._chart.id}.legend.div.removeChild({self.id}legendItem.row) }} {self._chart.id}.chart.removeSeries({self.id}.series) delete {self.id}legendItem delete {self.id} ''') def scale(self, scale_margin_top: float = 0.0, scale_margin_bottom: float = 0.0): self.run_script(f''' {self.id}.series.priceScale().applyOptions({{ scaleMargins: {{top: {scale_margin_top}, bottom: {scale_margin_bottom}}} }})''') class Candlestick(SeriesCommon): def __init__(self, chart: 'AbstractChart'): super().__init__(chart) self._volume_up_color = 'rgba(83,141,131,0.8)' self._volume_down_color = 'rgba(200,127,130,0.8)' self.candle_data = pd.DataFrame() # self.run_script(f'{self.id}.makeCandlestickSeries()') def set(self, df: Optional[pd.DataFrame] = None, keep_drawings=False): """ Sets the initial data for the chart.\n :param df: columns: date/time, open, high, low, close, volume (if volume enabled). :param keep_drawings: keeps any drawings made through the toolbox. Otherwise, they will be deleted. """ if df is None or df.empty: self.run_script(f'{self.id}.series.setData([])') self.run_script(f'{self.id}.volumeSeries.setData([])') self.candle_data = pd.DataFrame() return df = self._df_datetime_format(df) self.candle_data = df.copy() self._last_bar = df.iloc[-1] self.run_script(f'{self.id}.series.setData({js_data(df)})') if 'volume' not in df: return volume = df.drop(columns=['open', 'high', 'low', 'close']).rename(columns={'volume': 'value'}) volume['color'] = self._volume_down_color volume.loc[df['close'] > df['open'], 'color'] = self._volume_up_color self.run_script(f'{self.id}.volumeSeries.setData({js_data(volume)})') for line in self._lines: if line.name not in df.columns: continue line.set(df[['time', line.name]], format_cols=False) # set autoScale to true in case the user has dragged the price scale self.run_script(f''' if (!{self.id}.chart.priceScale("right").options.autoScale) {self.id}.chart.priceScale("right").applyOptions({{autoScale: true}}) ''') # TODO keep drawings doesn't work consistenly w if keep_drawings: self.run_script(f'{self._chart.id}.toolBox?._drawingTool.repositionOnTime()') else: self.run_script(f"{self._chart.id}.toolBox?.clearDrawings()") def update(self, series: pd.Series, _from_tick=False): """ Updates the data from a bar; if series['time'] is the same time as the last bar, the last bar will be overwritten.\n :param series: labels: date/time, open, high, low, close, volume (if using volume). """ series = self._series_datetime_format(series) if not _from_tick else series if series['time'] != self._last_bar['time']: self.candle_data.loc[self.candle_data.index[-1]] = self._last_bar self.candle_data = pd.concat([self.candle_data, series.to_frame().T], ignore_index=True) self._chart.events.new_bar._emit(self) self._last_bar = series self.run_script(f'{self.id}.series.update({js_data(series)})') if 'volume' not in series: return volume = series.drop(['open', 'high', 'low', 'close']).rename({'volume': 'value'}) volume['color'] = self._volume_up_color if series['close'] > series['open'] else self._volume_down_color self.run_script(f'{self.id}.volumeSeries.update({js_data(volume)})') def update_from_tick(self, series: pd.Series, cumulative_volume: bool = False): """ Updates the data from a tick.\n :param series: labels: date/time, price, volume (if using volume). :param cumulative_volume: Adds the given volume onto the latest bar. """ series = self._series_datetime_format(series) if series['time'] < self._last_bar['time']: raise ValueError(f'Trying to update tick of time "{pd.to_datetime(series["time"])}", which occurs before the last bar time of "{pd.to_datetime(self._last_bar["time"])}".') bar = pd.Series(dtype='float64') if series['time'] == self._last_bar['time']: bar = self._last_bar bar['high'] = max(self._last_bar['high'], series['price']) bar['low'] = min(self._last_bar['low'], series['price']) bar['close'] = series['price'] if 'volume' in series: if cumulative_volume: bar['volume'] += series['volume'] else: bar['volume'] = series['volume'] else: for key in ('open', 'high', 'low', 'close'): bar[key] = series['price'] bar['time'] = series['time'] if 'volume' in series: bar['volume'] = series['volume'] self.update(bar, _from_tick=True) def price_scale( self, auto_scale: bool = True, mode: PRICE_SCALE_MODE = 'normal', invert_scale: bool = False, align_labels: bool = True, scale_margin_top: float = 0.2, scale_margin_bottom: float = 0.2, border_visible: bool = False, border_color: Optional[str] = None, text_color: Optional[str] = None, entire_text_only: bool = False, visible: bool = True, ticks_visible: bool = False, minimum_width: int = 0 ): self.run_script(f''' {self.id}.series.priceScale().applyOptions({{ autoScale: {jbool(auto_scale)}, mode: {as_enum(mode, PRICE_SCALE_MODE)}, invertScale: {jbool(invert_scale)}, alignLabels: {jbool(align_labels)}, scaleMargins: {{top: {scale_margin_top}, bottom: {scale_margin_bottom}}}, borderVisible: {jbool(border_visible)}, {f'borderColor: "{border_color}",' if border_color else ''} {f'textColor: "{text_color}",' if text_color else ''} entireTextOnly: {jbool(entire_text_only)}, visible: {jbool(visible)}, ticksVisible: {jbool(ticks_visible)}, minimumWidth: {minimum_width} }})''') def candle_style( self, up_color: str = 'rgba(39, 157, 130, 100)', down_color: str = 'rgba(200, 97, 100, 100)', wick_visible: bool = True, border_visible: bool = True, border_up_color: str = '', border_down_color: str = '', wick_up_color: str = '', wick_down_color: str = ''): """ Candle styling for each of its parts.\n If only `up_color` and `down_color` are passed, they will color all parts of the candle. """ border_up_color = border_up_color if border_up_color else up_color border_down_color = border_down_color if border_down_color else down_color wick_up_color = wick_up_color if wick_up_color else up_color wick_down_color = wick_down_color if wick_down_color else down_color self.run_script(f"{self.id}.series.applyOptions({js_json(locals())})") def volume_config(self, scale_margin_top: float = 0.8, scale_margin_bottom: float = 0.0, up_color='rgba(83,141,131,0.8)', down_color='rgba(200,127,130,0.8)'): """ Configure volume settings.\n Numbers for scaling must be greater than 0 and less than 1.\n Volume colors must be applied prior to setting/updating the bars.\n """ self._volume_up_color = up_color if up_color else self._volume_up_color self._volume_down_color = down_color if down_color else self._volume_down_color self.run_script(f''' {self.id}.volumeSeries.priceScale().applyOptions({{ scaleMargins: {{ top: {scale_margin_top}, bottom: {scale_margin_bottom}, }} }})''') class AbstractChart(Candlestick, Pane): def __init__(self, window: Window, width: float = 1.0, height: float = 1.0, scale_candles_only: bool = False, toolbox: bool = False, autosize: bool = True, position: FLOAT = 'left', leftScale: bool = False): Pane.__init__(self, window) self._lines = [] self._scale_candles_only = scale_candles_only self._width = width self._height = height self.events: Events = Events(self) from lightweight_charts.polygon import PolygonAPI self.polygon: PolygonAPI = PolygonAPI(self) self.run_script( f'{self.id} = new Lib.Handler("{self.id}", {width}, {height}, "{position}", {jbool(autosize)}, {jbool(leftScale)})') Candlestick.__init__(self, self) self.topbar: TopBar = TopBar(self) if toolbox: self.toolbox: ToolBox = ToolBox(self) def fit(self): """ Fits the maximum amount of the chart data within the viewport. """ self.run_script(f'{self.id}.chart.timeScale().fitContent()') def create_line( self, name: str = '', color: str = None, style: LINE_STYLE = 'solid', width: int = 2, price_line: bool = False, price_label: bool = False, priceScaleId: str = "right" ) -> Line: """ Creates and returns a Line object. """ if color is None: color = get_next_color() self._lines.append(Line(self, name, color, style, width, price_line, price_label, True, priceScaleId)) return self._lines[-1] def create_histogram( self, name: str = '', color: str = None, price_line: bool = False, price_label: bool = True, scale_margin_top: float = 0.0, scale_margin_bottom: float = 0.0, opacity: float = None, ) -> Histogram: """ Creates and returns a Histogram object. """ if color is None: color = get_next_color() if opacity is not None: color = apply_opacity(color, opacity) return Histogram( self, name, color, price_line, price_label, scale_margin_top, scale_margin_bottom) def lines(self) -> List[Line]: """ Returns all lines for the chart. """ return self._lines.copy() def set_visible_range(self, start_time: TIME, end_time: TIME): self.run_script(f''' {self.id}.chart.timeScale().setVisibleRange({{ from: {pd.to_datetime(start_time).timestamp()}, to: {pd.to_datetime(end_time).timestamp()} }}) ''') def resize(self, width: Optional[float] = None, height: Optional[float] = None): """ Resizes the chart within the window. Dimensions should be given as a float between 0 and 1. """ self._width = width if width is not None else self._width self._height = height if height is not None else self._height self.run_script(f''' {self.id}.scale.width = {self._width} {self.id}.scale.height = {self._height} {self.id}.reSize() ''') def time_scale(self, right_offset: int = 0, min_bar_spacing: float = 0.5, visible: bool = True, time_visible: bool = True, seconds_visible: bool = False, border_visible: bool = True, border_color: Optional[str] = None): """ Options for the timescale of the chart. """ self.run_script(f'''{self.id}.chart.applyOptions({{timeScale: {js_json(locals())}}})''') def layout(self, background_color: str = '#000000', text_color: Optional[str] = None, font_size: Optional[int] = None, font_family: Optional[str] = None): """ Global layout options for the chart. """ self.run_script(f""" document.getElementById('container').style.backgroundColor = '{background_color}' {self.id}.chart.applyOptions({{ layout: {{ background: {{color: "{background_color}"}}, {f'textColor: "{text_color}",' if text_color else ''} {f'fontSize: {font_size},' if font_size else ''} {f'fontFamily: "{font_family}",' if font_family else ''} }}}})""") def grid(self, vert_enabled: bool = True, horz_enabled: bool = True, color: str = 'rgba(29, 30, 38, 5)', style: LINE_STYLE = 'solid'): """ Grid styling for the chart. """ self.run_script(f""" {self.id}.chart.applyOptions({{ grid: {{ vertLines: {{ visible: {jbool(vert_enabled)}, color: "{color}", style: {as_enum(style, LINE_STYLE)}, }}, horzLines: {{ visible: {jbool(horz_enabled)}, color: "{color}", style: {as_enum(style, LINE_STYLE)}, }}, }} }})""") def crosshair( self, mode: CROSSHAIR_MODE = 'normal', vert_visible: bool = True, vert_width: int = 1, vert_color: Optional[str] = None, vert_style: LINE_STYLE = 'large_dashed', vert_label_background_color: str = 'rgb(46, 46, 46)', horz_visible: bool = True, horz_width: int = 1, horz_color: Optional[str] = None, horz_style: LINE_STYLE = 'large_dashed', horz_label_background_color: str = 'rgb(55, 55, 55)' ): """ Crosshair formatting for its vertical and horizontal axes. """ self.run_script(f''' {self.id}.chart.applyOptions({{ crosshair: {{ mode: {as_enum(mode, CROSSHAIR_MODE)}, vertLine: {{ visible: {jbool(vert_visible)}, width: {vert_width}, {f'color: "{vert_color}",' if vert_color else ''} style: {as_enum(vert_style, LINE_STYLE)}, labelBackgroundColor: "{vert_label_background_color}" }}, horzLine: {{ visible: {jbool(horz_visible)}, width: {horz_width}, {f'color: "{horz_color}",' if horz_color else ''} style: {as_enum(horz_style, LINE_STYLE)}, labelBackgroundColor: "{horz_label_background_color}" }} }} }})''') def watermark(self, text: str, font_size: int = 44, color: str = 'rgba(180, 180, 200, 0.5)'): """ Adds a watermark to the chart. """ self.run_script(f''' {self.id}.chart.applyOptions({{ watermark: {{ visible: true, horzAlign: 'center', vertAlign: 'center', ...{js_json(locals())} }} }})''') def legend(self, visible: bool = False, ohlc: bool = True, percent: bool = True, lines: bool = True, color: str = 'rgb(191, 195, 203)', font_size: int = 11, font_family: str = 'Monaco', text: str = '', color_based_on_candle: bool = False): """ Configures the legend of the chart. """ l_id = f'{self.id}.legend' if not visible: self.run_script(f''' {l_id}.div.style.display = "none" {l_id}.ohlcEnabled = false {l_id}.percentEnabled = false {l_id}.linesEnabled = false ''') return self.run_script(f''' {l_id}.div.style.display = 'flex' {l_id}.ohlcEnabled = {jbool(ohlc)} {l_id}.percentEnabled = {jbool(percent)} {l_id}.linesEnabled = {jbool(lines)} {l_id}.colorBasedOnCandle = {jbool(color_based_on_candle)} {l_id}.div.style.color = '{color}' {l_id}.color = '{color}' {l_id}.div.style.fontSize = '{font_size}px' {l_id}.div.style.fontFamily = '{font_family}' {l_id}.text.innerText = '{text}' ''') def spinner(self, visible): self.run_script(f"{self.id}.spinner.style.display = '{'block' if visible else 'none'}'") def hotkey(self, modifier_key: Literal['ctrl', 'alt', 'shift', 'meta', None], keys: Union[str, tuple, int], func: Callable): if not isinstance(keys, tuple): keys = (keys,) for key in keys: key = str(key) if key.isalnum() and len(key) == 1: key_code = f'Digit{key}' if key.isdigit() else f'Key{key.upper()}' key_condition = f'event.code === "{key_code}"' else: key_condition = f'event.key === "{key}"' if modifier_key is not None: key_condition += f'&& event.{modifier_key}Key' self.run_script(f''' {self.id}.commandFunctions.unshift((event) => {{ if ({key_condition}) {{ event.preventDefault() window.callbackFunction(`{modifier_key, keys}_~_{key}`) return true }} else return false }})''') self.win.handlers[f'{modifier_key, keys}'] = func def create_table( self, width: NUM, height: NUM, headings: tuple, widths: Optional[tuple] = None, alignments: Optional[tuple] = None, position: FLOAT = 'left', draggable: bool = False, background_color: str = '#121417', border_color: str = 'rgb(70, 70, 70)', border_width: int = 1, heading_text_colors: Optional[tuple] = None, heading_background_colors: Optional[tuple] = None, return_clicked_cells: bool = False, func: Optional[Callable] = None ) -> Table: args = locals() del args['self'] return self.win.create_table(*args.values()) def screenshot(self) -> bytes: """ Takes a screenshot. This method can only be used after the chart window is visible. :return: a bytes object containing a screenshot of the chart. """ serial_data = self.win.run_script_and_get(f'{self.id}.chart.takeScreenshot().toDataURL()') return b64decode(serial_data.split(',')[1]) def create_subchart(self, position: FLOAT = 'left', width: float = 0.5, height: float = 0.5, sync: Optional[Union[str, bool]] = None, scale_candles_only: bool = False, sync_crosshairs_only: bool = False, toolbox: bool = False, leftScale: bool = False) -> 'AbstractChart': if sync is True: sync = self.id args = locals() del args['self'] return self.win.create_subchart(*args.values())