|
| 1 | +# pylint: disable=missing-class-docstring,missing-function-docstring |
| 2 | +# Drakkar-Software OctoBot-Commons |
| 3 | +# Copyright (c) Drakkar-Software, All rights reserved. |
| 4 | +# |
| 5 | +# This library is free software; you can redistribute it and/or |
| 6 | +# modify it under the terms of the GNU Lesser General Public |
| 7 | +# License as published by the Free Software Foundation; either |
| 8 | +# version 3.0 of the License, or (at your option) any later version. |
| 9 | +# |
| 10 | +# This library is distributed in the hope that it will be useful, |
| 11 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 12 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 13 | +# Lesser General Public License for more details. |
| 14 | +# |
| 15 | +# You should have received a copy of the GNU Lesser General Public |
| 16 | +# License along with this library. |
| 17 | +import typing |
| 18 | +import dataclasses |
| 19 | +import numpy as np |
| 20 | + |
| 21 | +import octobot_commons.constants |
| 22 | +import octobot_commons.errors |
| 23 | +import octobot_commons.logging |
| 24 | +import octobot_commons.enums as commons_enums |
| 25 | +import octobot_commons.dsl_interpreter as dsl_interpreter |
| 26 | +import octobot_trading.exchanges |
| 27 | +import octobot_trading.exchange_data |
| 28 | +import octobot_trading.api |
| 29 | +import octobot_trading.constants |
| 30 | + |
| 31 | +import tentacles.Meta.DSL_operators.exchange_operators.exchange_operator as exchange_operator |
| 32 | + |
| 33 | + |
| 34 | +@dataclasses.dataclass |
| 35 | +class ExchangeDataDependency(dsl_interpreter.InterpreterDependency): |
| 36 | + exchange_manager_id: str |
| 37 | + symbol: typing.Optional[str] |
| 38 | + time_frame: typing.Optional[str] |
| 39 | + data_source: str = octobot_trading.constants.OHLCV_CHANNEL |
| 40 | + |
| 41 | + def __hash__(self) -> int: |
| 42 | + return hash((self.exchange_manager_id, self.symbol, self.time_frame, self.data_source)) |
| 43 | + |
| 44 | + |
| 45 | +class OHLCVOperator(exchange_operator.ExchangeOperator): |
| 46 | + def __init__(self, *parameters: dsl_interpreter.OperatorParameterType, **kwargs: typing.Any): |
| 47 | + super().__init__(*parameters, **kwargs) |
| 48 | + self.value: dsl_interpreter_operator.ComputedOperatorParameterType = exchange_operator.UNINITIALIZED_VALUE # type: ignore |
| 49 | + |
| 50 | + @staticmethod |
| 51 | + def get_library() -> str: |
| 52 | + # this is a contextual operator, so it should not be included by default in the get_all_operators function return values |
| 53 | + return octobot_commons.constants.CONTEXTUAL_OPERATORS_LIBRARY |
| 54 | + |
| 55 | + @staticmethod |
| 56 | + def get_parameters() -> list[dsl_interpreter.OperatorParameter]: |
| 57 | + return [ |
| 58 | + dsl_interpreter.OperatorParameter(name="symbol", description="the symbol to get the OHLCV data for", required=False, type=str), |
| 59 | + dsl_interpreter.OperatorParameter(name="time_frame", description="the time frame to get the OHLCV data for", required=False, type=str), |
| 60 | + ] |
| 61 | + |
| 62 | + def get_symbol_and_time_frame(self) -> typing.Tuple[typing.Optional[str], typing.Optional[str]]: |
| 63 | + if parameters := self.get_computed_parameters(): |
| 64 | + symbol = parameters[0] if len(parameters) > 0 else None |
| 65 | + time_frame = parameters[1] if len(parameters) > 1 else None |
| 66 | + return ( |
| 67 | + str(symbol) if symbol is not None else None, |
| 68 | + str(time_frame) if time_frame is not None else None |
| 69 | + ) |
| 70 | + return None, None |
| 71 | + |
| 72 | + def compute(self) -> dsl_interpreter.ComputedOperatorParameterType: |
| 73 | + if self.value is exchange_operator.UNINITIALIZED_VALUE: |
| 74 | + raise octobot_commons.errors.DSLInterpreterError("{self.__class__.__name__} has not been initialized") |
| 75 | + return self.value |
| 76 | + |
| 77 | + |
| 78 | +def create_ohlcv_operators( |
| 79 | + exchange_manager: typing.Optional[octobot_trading.exchanges.ExchangeManager], |
| 80 | + symbol: str, |
| 81 | + time_frame: str, |
| 82 | + candle_manager_by_time_frame_by_symbol: typing.Optional[ |
| 83 | + typing.Dict[str, typing.Dict[str, octobot_trading.exchange_data.CandlesManager]] |
| 84 | + ] = None |
| 85 | +) -> typing.List[type[OHLCVOperator]]: |
| 86 | + |
| 87 | + if exchange_manager is None and candle_manager_by_time_frame_by_symbol is None: |
| 88 | + raise octobot_commons.errors.InvalidParametersError("exchange_manager or candle_manager_by_time_frame_by_symbol must be provided") |
| 89 | + |
| 90 | + def _get_candles_values_with_latest_kline_if_available( |
| 91 | + input_symbol: typing.Optional[str], input_time_frame: typing.Optional[str], |
| 92 | + value_type: commons_enums.PriceIndexes, limit: int = -1 |
| 93 | + ) -> np.ndarray: |
| 94 | + _symbol = input_symbol or symbol |
| 95 | + _time_frame = input_time_frame or time_frame |
| 96 | + if exchange_manager is None: |
| 97 | + if candle_manager_by_time_frame_by_symbol is not None: |
| 98 | + candles_manager = candle_manager_by_time_frame_by_symbol[_time_frame][_symbol] |
| 99 | + symbol_data = None |
| 100 | + else: |
| 101 | + symbol_data = octobot_trading.api.get_symbol_data( |
| 102 | + exchange_manager, _symbol, allow_creation=False |
| 103 | + ) |
| 104 | + candles_manager = octobot_trading.api.get_symbol_candles_manager( |
| 105 | + symbol_data, _time_frame |
| 106 | + ) |
| 107 | + candles_values = _get_candles_values(candles_manager, value_type, limit) |
| 108 | + if symbol_data is not None and (kline := _get_kline(symbol_data, _time_frame)): |
| 109 | + kline_time = kline[commons_enums.PriceIndexes.IND_PRICE_TIME.value] |
| 110 | + last_candle_time = candles_manager.time_candles[candles_manager.time_candles_index - 1] |
| 111 | + if kline_time == last_candle_time: |
| 112 | + # kline is an update of the last candle |
| 113 | + return _adapt_last_candle_value(candles_manager, value_type, candles_values, kline) |
| 114 | + else: |
| 115 | + tf_seconds = commons_enums.TimeFramesMinutes[commons_enums.TimeFrames(_time_frame)] * octobot_commons.constants.MINUTE_TO_SECONDS |
| 116 | + if kline_time == last_candle_time + tf_seconds: |
| 117 | + # kline is a new candle |
| 118 | + kline_value = kline[value_type.value] |
| 119 | + return np.append(candles_values[1:], kline_value) |
| 120 | + else: |
| 121 | + octobot_commons.logging.get_logger(OHLCVOperator.__name__).error( |
| 122 | + f"{exchange_manager.exchange_name + '' if exchange_manager is not None else ''}{_symbol} {_time_frame} " |
| 123 | + f"kline time ({kline_time}) is not equal to last candle time not the last time + {time_frame} " |
| 124 | + f"({last_candle_time} + {tf_seconds}) seconds. Kline has been ignored." |
| 125 | + ) |
| 126 | + return candles_values |
| 127 | + |
| 128 | + def _get_dependencies() -> typing.List[ExchangeDataDependency]: |
| 129 | + return [ |
| 130 | + ExchangeDataDependency( |
| 131 | + exchange_manager_id=octobot_trading.api.get_exchange_manager_id(exchange_manager), |
| 132 | + symbol=symbol, |
| 133 | + time_frame=time_frame |
| 134 | + ) |
| 135 | + ] |
| 136 | + |
| 137 | + class _LocalOHLCVOperator(OHLCVOperator): |
| 138 | + PRICE_INDEX: commons_enums.PriceIndexes = None # type: ignore |
| 139 | + |
| 140 | + def get_dependencies(self) -> typing.List[dsl_interpreter.InterpreterDependency]: |
| 141 | + return super().get_dependencies() + _get_dependencies() |
| 142 | + |
| 143 | + async def pre_compute(self) -> None: |
| 144 | + await super().pre_compute() |
| 145 | + self.value = _get_candles_values_with_latest_kline_if_available(*self.get_symbol_and_time_frame(), self.PRICE_INDEX, -1) |
| 146 | + |
| 147 | + class _OpenPriceOperator(_LocalOHLCVOperator): |
| 148 | + PRICE_INDEX = commons_enums.PriceIndexes.IND_PRICE_OPEN |
| 149 | + |
| 150 | + @staticmethod |
| 151 | + def get_name() -> str: |
| 152 | + return "open" |
| 153 | + |
| 154 | + class _HighPriceOperator(_LocalOHLCVOperator): |
| 155 | + PRICE_INDEX = commons_enums.PriceIndexes.IND_PRICE_HIGH |
| 156 | + |
| 157 | + @staticmethod |
| 158 | + def get_name() -> str: |
| 159 | + return "high" |
| 160 | + |
| 161 | + class _LowPriceOperator(_LocalOHLCVOperator): |
| 162 | + PRICE_INDEX = commons_enums.PriceIndexes.IND_PRICE_LOW |
| 163 | + |
| 164 | + @staticmethod |
| 165 | + def get_name() -> str: |
| 166 | + return "low" |
| 167 | + |
| 168 | + class _ClosePriceOperator(_LocalOHLCVOperator): |
| 169 | + PRICE_INDEX = commons_enums.PriceIndexes.IND_PRICE_CLOSE |
| 170 | + |
| 171 | + @staticmethod |
| 172 | + def get_name() -> str: |
| 173 | + return "close" |
| 174 | + |
| 175 | + class _VolumePriceOperator(_LocalOHLCVOperator): |
| 176 | + PRICE_INDEX = commons_enums.PriceIndexes.IND_PRICE_VOL |
| 177 | + |
| 178 | + @staticmethod |
| 179 | + def get_name() -> str: |
| 180 | + return "volume" |
| 181 | + |
| 182 | + class _TimePriceOperator(_LocalOHLCVOperator): |
| 183 | + PRICE_INDEX = commons_enums.PriceIndexes.IND_PRICE_TIME |
| 184 | + |
| 185 | + @staticmethod |
| 186 | + def get_name() -> str: |
| 187 | + return "time" |
| 188 | + |
| 189 | + return [_OpenPriceOperator, _HighPriceOperator, _LowPriceOperator, _ClosePriceOperator, _VolumePriceOperator, _TimePriceOperator] |
| 190 | + |
| 191 | +def _get_kline( |
| 192 | + symbol_data: octobot_trading.exchange_data.ExchangeSymbolData, _time_frame: str |
| 193 | +) -> typing.Optional[list]: |
| 194 | + try: |
| 195 | + return octobot_trading.api.get_symbol_klines(symbol_data, _time_frame) |
| 196 | + except KeyError: |
| 197 | + return None |
| 198 | + |
| 199 | + |
| 200 | +def _get_candles_values( |
| 201 | + candles_manager: octobot_trading.exchange_data.CandlesManager, |
| 202 | + candle_value: commons_enums.PriceIndexes, limit: int = -1 |
| 203 | +) -> np.ndarray: |
| 204 | + match candle_value: |
| 205 | + case commons_enums.PriceIndexes.IND_PRICE_CLOSE: |
| 206 | + return candles_manager.get_symbol_close_candles(limit) |
| 207 | + case commons_enums.PriceIndexes.IND_PRICE_OPEN: |
| 208 | + return candles_manager.get_symbol_open_candles(limit) |
| 209 | + case commons_enums.PriceIndexes.IND_PRICE_HIGH: |
| 210 | + return candles_manager.get_symbol_high_candles(limit) |
| 211 | + case commons_enums.PriceIndexes.IND_PRICE_LOW: |
| 212 | + return candles_manager.get_symbol_low_candles(limit) |
| 213 | + case commons_enums.PriceIndexes.IND_PRICE_VOL: |
| 214 | + return candles_manager.get_symbol_volume_candles(limit) |
| 215 | + case commons_enums.PriceIndexes.IND_PRICE_TIME: |
| 216 | + return candles_manager.get_symbol_time_candles(limit) |
| 217 | + case _: |
| 218 | + raise octobot_commons.errors.InvalidParametersError(f"Invalid candle value: {candle_value}") |
| 219 | + |
| 220 | +def _adapt_last_candle_value( |
| 221 | + candles_manager: octobot_trading.exchange_data.CandlesManager, |
| 222 | + candle_value: commons_enums.PriceIndexes, |
| 223 | + candles_values: np.ndarray, |
| 224 | + kline: list |
| 225 | +) -> np.ndarray: |
| 226 | + match candle_value: |
| 227 | + case commons_enums.PriceIndexes.IND_PRICE_CLOSE: |
| 228 | + candles_values[candles_manager.close_candles_index - 1] = kline[commons_enums.PriceIndexes.IND_PRICE_CLOSE.value] |
| 229 | + case commons_enums.PriceIndexes.IND_PRICE_OPEN: |
| 230 | + candles_values[candles_manager.open_candles_index - 1] = kline[commons_enums.PriceIndexes.IND_PRICE_OPEN.value] |
| 231 | + case commons_enums.PriceIndexes.IND_PRICE_HIGH: |
| 232 | + candles_values[candles_manager.high_candles_index - 1] = kline[commons_enums.PriceIndexes.IND_PRICE_HIGH.value] |
| 233 | + case commons_enums.PriceIndexes.IND_PRICE_LOW: |
| 234 | + candles_values[candles_manager.low_candles_index - 1] = kline[commons_enums.PriceIndexes.IND_PRICE_LOW.value] |
| 235 | + case commons_enums.PriceIndexes.IND_PRICE_VOL: |
| 236 | + candles_values[candles_manager.volume_candles_index - 1] = kline[commons_enums.PriceIndexes.IND_PRICE_VOL.value] |
| 237 | + case commons_enums.PriceIndexes.IND_PRICE_TIME: |
| 238 | + # nothing to do for time (this value is constant) |
| 239 | + pass |
| 240 | + case _: |
| 241 | + raise octobot_commons.errors.InvalidParametersError(f"Invalid candle value: {candle_value}") |
| 242 | + return candles_values |
0 commit comments