|
| 1 | +# Drakkar-Software OctoBot-Tentacles |
| 2 | +# Copyright (c) Drakkar-Software, All rights reserved. |
| 3 | +# |
| 4 | +# This library is free software; you can redistribute it and/or |
| 5 | +# modify it under the terms of the GNU Lesser General Public |
| 6 | +# License as published by the Free Software Foundation; either |
| 7 | +# version 3.0 of the License, or (at your option) any later version. |
| 8 | +# |
| 9 | +# This library is distributed in the hope that it will be useful, |
| 10 | +# but WITHOUT ANY WARRANTY; without even the implied warranty of |
| 11 | +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| 12 | +# Lesser General Public License for more details. |
| 13 | +# |
| 14 | +# You should have received a copy of the GNU Lesser General Public |
| 15 | +# License along with this library. |
| 16 | +import asyncio |
| 17 | +import time |
| 18 | +import typing |
| 19 | +import copy |
| 20 | + |
| 21 | +import octobot_services.channel as services_channel |
| 22 | +import octobot_services.constants as services_constants |
| 23 | +import octobot_services.service_feeds as service_feeds |
| 24 | + |
| 25 | +import octobot_trading.api as trading_api |
| 26 | +import octobot_trading.personal_data as personal_data |
| 27 | + |
| 28 | +import tentacles.Services.Services_bases as Services_bases |
| 29 | + |
| 30 | + |
| 31 | +class ExchangeServiceFeedChannel(services_channel.AbstractServiceFeedChannel): |
| 32 | + pass |
| 33 | + |
| 34 | + |
| 35 | +class ExchangeProfile: |
| 36 | + def __init__(self, exchange_name: str, profile_id: str, refresh_time: float): |
| 37 | + self.exchange_name: str = exchange_name |
| 38 | + self.profile_id: str = profile_id |
| 39 | + self.refresh_time: float = refresh_time |
| 40 | + self.next_refresh: float = time.time() |
| 41 | + |
| 42 | + self.positions: typing.List[personal_data.Position] = [] |
| 43 | + self.closed_positions: typing.List[personal_data.Position] = [] |
| 44 | + self.trades: typing.List[personal_data.Trade] = [] |
| 45 | + self.portfolio: typing.Optional[personal_data.Portfolio] = None |
| 46 | + self.orders: typing.List[personal_data.Order] = [] |
| 47 | + |
| 48 | + def __str__(self): |
| 49 | + return f"{self.exchange_name} {self.profile_id}" |
| 50 | + |
| 51 | + |
| 52 | +class ExchangeServiceFeed(service_feeds.AbstractServiceFeed): |
| 53 | + FEED_CHANNEL = ExchangeServiceFeedChannel |
| 54 | + REQUIRED_SERVICES = [Services_bases.ExchangeService] |
| 55 | + |
| 56 | + def __init__(self, config, main_async_loop, bot_id: str): |
| 57 | + super().__init__(config, main_async_loop, bot_id) |
| 58 | + self.exchange_profiles: typing.List[ExchangeProfile] = [] |
| 59 | + |
| 60 | + # merge new config into existing config |
| 61 | + def update_feed_config(self, config): |
| 62 | + self.exchange_profiles.extend(profile |
| 63 | + for profile in config[services_constants.CONFIG_EXCHANGE_PROFILE_IDS] |
| 64 | + if profile not in self.exchange_profiles) |
| 65 | + |
| 66 | + def _get_profile_exchange_manager(self, profile: ExchangeProfile): |
| 67 | + return trading_api.get_exchange_manager_from_exchange_name(profile.exchange_name) |
| 68 | + |
| 69 | + def _something_to_watch(self): |
| 70 | + # TODO later: check if exchange supports profiles fetching before considering profiles as to be watched |
| 71 | + return bool(self.exchange_profiles) |
| 72 | + |
| 73 | + def _get_sleep_time_before_next_wakeup(self): |
| 74 | + closest_wakeup = min(profile.next_refresh for profile in self.exchange_profiles) |
| 75 | + return max(0, closest_wakeup - time.time()) |
| 76 | + |
| 77 | + async def _get_exchange_profile(self, profile: ExchangeProfile) -> bool: |
| 78 | + updated = False |
| 79 | + self.logger.debug(f"Fetching exchange profile on {profile.exchange_name} {profile.profile_id}") |
| 80 | + exchange_manager = self._get_profile_exchange_manager(profile) |
| 81 | + profile.next_refresh = time.time() + profile.refresh_time |
| 82 | + exchange_has_positions = True |
| 83 | + if exchange_has_positions: |
| 84 | + previous_positions = copy.deepcopy(profile.positions) |
| 85 | + profile.positions = await exchange_manager.exchange.get_user_positions(profile.profile_id) |
| 86 | + if previous_positions != profile.positions: |
| 87 | + updated = True |
| 88 | + else: |
| 89 | + # Update portfolio |
| 90 | + pass |
| 91 | + |
| 92 | + # TODO: fetch orders ? |
| 93 | + # TODO: fetch trades ? |
| 94 | + return updated |
| 95 | + |
| 96 | + async def _push_update_and_wait_exchange_profiles(self): |
| 97 | + for profile in self.exchange_profiles: |
| 98 | + if time.time() >= profile.next_refresh: |
| 99 | + profile_updated = await self._get_exchange_profile(profile) |
| 100 | + if profile_updated: |
| 101 | + await self._async_notify_consumers( |
| 102 | + { |
| 103 | + services_constants.FEED_METADATA: profile |
| 104 | + } |
| 105 | + ) |
| 106 | + |
| 107 | + async def _push_update_and_wait(self): |
| 108 | + if self.exchange_profiles: |
| 109 | + await self._push_update_and_wait_exchange_profiles() |
| 110 | + await asyncio.sleep(self._get_sleep_time_before_next_wakeup()) |
| 111 | + |
| 112 | + async def _update_loop(self): |
| 113 | + while not self.should_stop: |
| 114 | + try: |
| 115 | + await self._push_update_and_wait() |
| 116 | + except Exception as e: |
| 117 | + self.logger.exception(e, True, f"Error when receiving exchange feed: ({e})") |
| 118 | + self.should_stop = True |
| 119 | + return False |
| 120 | + |
| 121 | + async def _start_service_feed(self): |
| 122 | + try: |
| 123 | + asyncio.create_task(self._update_loop()) |
| 124 | + except Exception as e: |
| 125 | + self.logger.exception(e, True, f"Error when initializing exchange feed: {e}") |
| 126 | + return False |
| 127 | + return True |
0 commit comments