|
| 1 | +# Drakkar-Software OctoBot-Tentacles |
| 2 | +# Copyright (c) Drakkar-Software, All rights reserved. |
| 3 | +# |
| 4 | +# This library is free software; you can redistribute it and/or |
| 5 | +# modify it under the terms of the GNU Lesser General Public |
| 6 | +# License as published by the Free Software Foundation; either |
| 7 | +# version 3.0 of the License, or (at your option) any later version. |
| 8 | +# |
| 9 | +# This library is distributed in the hope that it will be useful, |
| 10 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 12 | +# Lesser General Public License for more details. |
| 13 | +# |
| 14 | +# You should have received a copy of the GNU Lesser General Public |
| 15 | +# License along with this library. |
| 16 | +import asyncio |
| 17 | +import time |
| 18 | +import typing |
| 19 | +import copy |
| 20 | + |
| 21 | +import octobot_services.channel as services_channel |
| 22 | +import octobot_services.constants as services_constants |
| 23 | +import octobot_services.service_feeds as service_feeds |
| 24 | + |
| 25 | +import octobot_trading.api as trading_api |
| 26 | +import octobot_trading.personal_data as personal_data |
| 27 | +import octobot_trading.exchanges as exchanges |
| 28 | + |
| 29 | +class ExchangeServiceFeedChannel(services_channel.AbstractServiceFeedChannel): |
| 30 | + pass |
| 31 | + |
| 32 | + |
| 33 | +class ExchangeProfile: |
| 34 | + DEFAULT_EXCHANGE_PROFILE_REFRESH_TIME = 60 |
| 35 | + |
| 36 | + def __init__(self, profile_id: str, exchange_name: str, refresh_time: float = DEFAULT_EXCHANGE_PROFILE_REFRESH_TIME, next_refresh: typing.Optional[float] = None): |
| 37 | + self.profile_id: str = profile_id |
| 38 | + self.exchange_name: str = exchange_name |
| 39 | + self.refresh_time: float = refresh_time |
| 40 | + self.next_refresh: float = next_refresh if next_refresh is not None else time.time() + refresh_time |
| 41 | + |
| 42 | + self.positions: typing.List[personal_data.Position] = [] |
| 43 | + self.closed_positions: typing.List[personal_data.Position] = [] |
| 44 | + self.trades: typing.List[personal_data.Trade] = [] |
| 45 | + self.portfolio: typing.Optional[personal_data.Portfolio] = None |
| 46 | + self.orders: typing.List[personal_data.Order] = [] |
| 47 | + |
| 48 | + def update_next_refresh(self): |
| 49 | + self.next_refresh = time.time() + self.refresh_time |
| 50 | + |
| 51 | + def __str__(self): |
| 52 | + return f"{self.exchange_name} {self.profile_id}" |
| 53 | + |
| 54 | + |
| 55 | +class ExchangeServiceFeed(service_feeds.AbstractServiceFeed): |
| 56 | + FEED_CHANNEL = ExchangeServiceFeedChannel |
| 57 | + REQUIRED_SERVICES = False |
| 58 | + |
| 59 | + DEFAULT_REFRESH_TIME = 10 |
| 60 | + |
| 61 | + def __init__(self, config, main_async_loop, bot_id: str): |
| 62 | + super().__init__(config, main_async_loop, bot_id) |
| 63 | + self.exchange_profiles: typing.Dict[str, ExchangeProfile] = {} |
| 64 | + |
| 65 | + def _initialize(self): |
| 66 | + pass |
| 67 | + |
| 68 | + # merge new config into existing config |
| 69 | + def update_feed_config(self, config): |
| 70 | + for profile in config[services_constants.CONFIG_EXCHANGE_PROFILES]: |
| 71 | + if profile[services_constants.CONFIG_EXCHANGE_PROFILE_ID] not in self.exchange_profiles.keys(): |
| 72 | + self.exchange_profiles[profile[services_constants.CONFIG_EXCHANGE_PROFILE_ID]] = ExchangeProfile( |
| 73 | + profile_id=profile[services_constants.CONFIG_EXCHANGE_PROFILE_ID], |
| 74 | + exchange_name=profile[services_constants.CONFIG_EXCHANGE_EXCHANGE_NAME] |
| 75 | + ) |
| 76 | + else: |
| 77 | + self.exchange_profiles[profile[services_constants.CONFIG_EXCHANGE_PROFILE_ID]].update(profile) |
| 78 | + |
| 79 | + def _get_profile_exchange_manager(self, exchange_name: str) -> typing.Optional[exchanges.ExchangeManager]: |
| 80 | + exchange_configurations = list(trading_api.get_exchange_configurations_from_exchange_name(exchange_name).values()) |
| 81 | + if exchange_configurations: |
| 82 | + return exchange_configurations[0].exchange_manager # TODO: is there a better way to get the exchange manager ? |
| 83 | + return None |
| 84 | + |
| 85 | + def _something_to_watch(self): |
| 86 | + service_config = self.config[services_constants.CONFIG_CATEGORY_SERVICES][services_constants.CONFIG_EXCHANGE] |
| 87 | + # update feed config before checking if there is something to watch |
| 88 | + self.update_feed_config(service_config) |
| 89 | + # TODO later: check if exchange supports profiles fetching before considering profiles as to be watched |
| 90 | + return bool(self.exchange_profiles.keys()) |
| 91 | + |
| 92 | + def _get_sleep_time_before_next_wakeup(self): |
| 93 | + if not self.exchange_profiles.keys(): |
| 94 | + return self.DEFAULT_REFRESH_TIME |
| 95 | + closest_wakeup = min(profile.next_refresh for profile in self.exchange_profiles.values()) |
| 96 | + return max(0, closest_wakeup - time.time()) |
| 97 | + |
| 98 | + async def _fetch_exchange_profile(self, profile_id: str) -> bool: |
| 99 | + updated = False |
| 100 | + current_profile = self.exchange_profiles.get(profile_id, None) |
| 101 | + if current_profile is None: |
| 102 | + self.logger.error(f"Exchange profile {profile_id} not found") |
| 103 | + return False |
| 104 | + |
| 105 | + self.logger.debug(f"Fetching exchange profile on {current_profile.exchange_name} {current_profile.profile_id}") |
| 106 | + exchange_manager = self._get_profile_exchange_manager(current_profile.exchange_name) |
| 107 | + current_profile.update_next_refresh() |
| 108 | + exchange_has_positions = True |
| 109 | + if exchange_has_positions: |
| 110 | + previous_positions = copy.deepcopy(current_profile.positions) if current_profile is not None else [] |
| 111 | + current_profile.positions = await exchange_manager.exchange.get_user_positions(current_profile.profile_id) |
| 112 | + if previous_positions != current_profile.positions: |
| 113 | + updated = True |
| 114 | + else: |
| 115 | + # Update portfolio |
| 116 | + pass |
| 117 | + |
| 118 | + # TODO: fetch orders ? |
| 119 | + # TODO: fetch trades ? |
| 120 | + return updated |
| 121 | + |
| 122 | + async def _push_update_and_wait_exchange_profiles(self): |
| 123 | + for profile in self.exchange_profiles.values(): |
| 124 | + if time.time() >= profile.next_refresh: |
| 125 | + profile_updated = await self._fetch_exchange_profile(profile.profile_id) |
| 126 | + if profile_updated: |
| 127 | + await self._async_notify_consumers( |
| 128 | + { |
| 129 | + services_constants.FEED_METADATA: profile |
| 130 | + } |
| 131 | + ) |
| 132 | + |
| 133 | + async def _push_update_and_wait(self): |
| 134 | + if self.exchange_profiles.keys(): |
| 135 | + await self._push_update_and_wait_exchange_profiles() |
| 136 | + await asyncio.sleep(self._get_sleep_time_before_next_wakeup()) |
| 137 | + |
| 138 | + async def _update_loop(self): |
| 139 | + while not self.should_stop: |
| 140 | + try: |
| 141 | + await self._push_update_and_wait() |
| 142 | + except Exception as e: |
| 143 | + self.logger.exception(e, True, f"Error when receiving exchange feed: ({e})") |
| 144 | + self.should_stop = True |
| 145 | + return False |
| 146 | + |
| 147 | + async def _start_service_feed(self): |
| 148 | + try: |
| 149 | + asyncio.create_task(self._update_loop()) |
| 150 | + except Exception as e: |
| 151 | + self.logger.exception(e, True, f"Error when initializing exchange feed: {e}") |
| 152 | + return False |
| 153 | + return True |
0 commit comments