Skip to content

Commit 6f928c8

Browse files
authored
Trading via Binance (#28)
* binance orders + refactoring * refactoring * fix linting * bracket order
1 parent 4c0aeda commit 6f928c8

File tree

12 files changed

+168
-43
lines changed

12 files changed

+168
-43
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from enum import Enum
2+
3+
4+
class OrderDirection(Enum):
5+
Buy = 1
6+
Sell = 2

src/algotrader/pipeline/sources/binance_realtime.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ def __init__(self, binance_provider: BinanceProvider, symbols: List[str], time_s
1515
self.time_span = time_span
1616
self.queue = Queue()
1717

18+
self._last_received_candle: Dict[str, Candle] = {}
19+
1820
def read(self) -> Iterator[Candle]:
1921
for symbol in self.symbols:
2022
self.binance_provider.start_kline_socket(symbol, self.time_span, self._on_candle)
@@ -23,7 +25,11 @@ def read(self) -> Iterator[Candle]:
2325
yield self.queue.get()
2426

2527
def _on_candle(self, candle: Candle):
26-
self.queue.put(candle)
28+
if candle.symbol in self._last_received_candle and \
29+
candle.timestamp > self._last_received_candle[candle.symbol].timestamp:
30+
self.queue.put(self._last_received_candle[candle.symbol])
31+
32+
self._last_received_candle[candle.symbol] = candle
2733

2834
def serialize(self) -> Dict:
2935
obj = super().serialize()

src/algotrader/pipeline/strategies/history_bucket_compare.py

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
from datetime import datetime
23
from typing import List, Dict
34

@@ -14,22 +15,28 @@
1415
class HistoryBucketCompareStrategy(Strategy):
1516

1617
def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime, timeframe_end: datetime,
17-
indicators_to_compare: List[str], return_field: str, min_event_count: int,
18+
indicators_to_compare: List[str], return_fields: List[str], min_event_count: int,
1819
min_avg_return: float) -> None:
1920
self.timeframe_start = timeframe_start
2021
self.timeframe_end = timeframe_end
2122
self.indicators_to_compare = indicators_to_compare
2223
self.storage_provider = storage_provider
2324
self.indicators_to_compare = indicators_to_compare
24-
self.return_field = return_field
25+
self.return_fields = return_fields
2526
self.min_event_count = min_event_count
2627
self.min_avg_return = min_avg_return
2728

2829
groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare]
29-
return_field = f'attachments.returns.{return_field}'
30+
return_fields = [f'attachments.returns.{return_field}' for return_field in self.return_fields]
3031

31-
self.matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields,
32-
return_field, min_event_count, min_avg_return)
32+
self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start,
33+
timeframe_end,
34+
groupby_fields,
35+
return_fields,
36+
min_event_count,
37+
min_avg_return)
38+
39+
logging.info(f'Found {len(self.long_matchers)} long matchers and {len(self.short_matchers)} short matchers')
3340

3441
def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]:
3542
indicators_buckets: IndicatorsMatchedBuckets = \
@@ -43,7 +50,7 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal
4350
candle_buckets_map[f'attachments.indicators_matched_buckets.{indicator}.ident'] = indicators_buckets.get(
4451
indicator).ident
4552

46-
for matcher in self.matchers:
53+
for matcher in self.long_matchers:
4754
match = True
4855
for candle_ind, candle_val in candle_buckets_map.items():
4956
if matcher[candle_ind] != candle_val:
@@ -52,6 +59,15 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal
5259
if match:
5360
return [StrategySignal(candle.symbol, SignalDirection.Long)]
5461

62+
for matcher in self.short_matchers:
63+
match = True
64+
for candle_ind, candle_val in candle_buckets_map.items():
65+
if matcher[candle_ind] != candle_val:
66+
match = False
67+
68+
if match:
69+
return [StrategySignal(candle.symbol, SignalDirection.Short)]
70+
5571
return []
5672

5773
def serialize(self) -> Dict:
@@ -61,7 +77,7 @@ def serialize(self) -> Dict:
6177
'timeframe_start': self.timeframe_start,
6278
'timeframe_end': self.timeframe_end,
6379
'indicators_to_compare': self.indicators_to_compare,
64-
'return_field': self.return_field,
80+
'return_fields': self.return_fields,
6581
'min_event_count': self.min_event_count,
6682
'min_avg_return': self.min_avg_return,
6783
})
@@ -72,5 +88,5 @@ def deserialize(cls, data: Dict):
7288
storage_provider: StorageProvider = DeserializationService.deserialize(data.get('storage_provider'))
7389

7490
return cls(storage_provider, data.get('timeframe_start'), data.get('timeframe_end'),
75-
data.get('indicators_to_compare'), data.get('return_field'),
91+
data.get('indicators_to_compare'), data.get('return_fields'),
7692
data.get('min_event_count'), data.get('min_avg_return'))

src/algotrader/pipeline/strategies/history_cosine_similarity.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,10 @@ def __init__(self, storage_provider: StorageProvider, timeframe_start: datetime,
2828
groupby_fields = [f'attachments.indicators_matched_buckets.{ind}.ident' for ind in self.indicators_to_compare]
2929
return_field = f'attachments.returns.{return_field}'
3030

31-
self.matchers = storage_provider.get_aggregated_history(timeframe_start, timeframe_end, groupby_fields,
32-
return_field, min_event_count, min_avg_return)
31+
self.long_matchers, self.short_matchers = storage_provider.get_aggregated_history(timeframe_start,
32+
timeframe_end, groupby_fields,
33+
return_field, min_event_count,
34+
min_avg_return)
3335

3436
def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal]:
3537
indicators_buckets: IndicatorsMatchedBuckets = \
@@ -42,7 +44,7 @@ def process(self, context: SharedContext, candle: Candle) -> List[StrategySignal
4244

4345
candle_values.append(indicators_buckets.get(indicator).ident)
4446

45-
for matcher in self.matchers:
47+
for matcher in self.long_matchers:
4648
matcher_values: list[int] = []
4749
for indicator in self.indicators_to_compare:
4850
matcher_values.append(matcher[f'attachments.indicators_matched_buckets.{indicator}.ident'])

src/algotrader/pipeline/terminators/technicals_binner.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,17 @@
99
from algotrader.entities.candle import Candle
1010
from algotrader.pipeline.processors.candle_cache import CandleCache
1111
from algotrader.pipeline.processors.technicals import IndicatorValue
12-
from algotrader.pipeline.processors.technicals_normalizer import NormalizedIndicators, NORMALIZED_INDICATORS_ATTACHMENT_KEY
12+
from algotrader.pipeline.processors.technicals_normalizer import NormalizedIndicators, \
13+
NORMALIZED_INDICATORS_ATTACHMENT_KEY
1314
from algotrader.pipeline.shared_context import SharedContext
1415
from algotrader.pipeline.terminator import Terminator
1516

1617

1718
class TechnicalsBinner(Terminator):
18-
def __init__(self, symbols: List[str], bins_count: int, output_file_path: str) -> None:
19+
def __init__(self, symbols: List[str], bins_count: int, output_file_path: str,
20+
outliers_removal_percentage: float = 0.05) -> None:
1921
super().__init__()
22+
self.outliers_removal_percentage = outliers_removal_percentage
2023
self.symbols = symbols
2124
self.output_file_path = output_file_path
2225
self.values: Dict[str, List[IndicatorValue]] = {}
@@ -38,6 +41,9 @@ def _process_candle(self, candle: Candle):
3841
normalized_indicators: NormalizedIndicators = candle.attachments.get_attachment(
3942
NORMALIZED_INDICATORS_ATTACHMENT_KEY)
4043

44+
if not normalized_indicators:
45+
return
46+
4147
for indicator, value in normalized_indicators.items():
4248
if indicator not in self.values:
4349
self.values[indicator] = []
@@ -59,7 +65,7 @@ def _calculate_bins(self):
5965
def _get_single_float_bins(self, values: List[float]) -> List[Bucket]:
6066
values.sort()
6167

62-
margins = int(len(values) * 0.05)
68+
margins = int(len(values) * self.outliers_removal_percentage)
6369
values = values[margins:len(values) - margins]
6470

6571
step_size = int(math.floor(len(values) / self.bins_count))
@@ -83,9 +89,11 @@ def serialize(self) -> Dict:
8389
'symbols': self.symbols,
8490
'bins_count': self.bins_count,
8591
'output_file_path': self.output_file_path,
92+
'outliers_removal_percentage': self.outliers_removal_percentage
8693
})
8794
return obj
8895

8996
@classmethod
9097
def deserialize(cls, data: Dict):
91-
return cls(data.get('symbols'), data.get('bins_count'), data.get('output_file_path'))
98+
return cls(data.get('symbols'), data.get('bins_count'), data.get('output_file_path'),
99+
data.get('outliers_removal_percentage'))

src/algotrader/providers/binance.py

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,25 @@
66
from binance.websocket.spot.websocket_client import SpotWebsocketClient as WebsocketClient
77

88
from algotrader.entities.candle import Candle
9+
from algotrader.entities.order_direction import OrderDirection
910
from algotrader.entities.serializable import Deserializable, Serializable
1011
from algotrader.entities.timespan import TimeSpan
1112

1213
StreamedCandleCallback = Callable[[Candle], None]
1314

15+
PRODUCTION = 'https://api.binance.com'
16+
TESTNET = 'https://testnet.binance.vision'
17+
1418

1519
class BinanceProvider(Serializable, Deserializable):
1620
logger = logging.getLogger('BinanceProvider')
1721

18-
def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '', enable_websocket: bool = False):
22+
def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '',
23+
enable_websocket: bool = False, testnet: bool = False):
1924
self.api_key = api_key
2025
self.api_secret = api_secret
2126
self.enable_websocket = enable_websocket
22-
self.client = Spot(api_key, api_secret)
27+
self.client = Spot(api_key, api_secret, base_url=TESTNET if testnet else PRODUCTION)
2328

2429
self.wsManager = WebsocketClient()
2530
if enable_websocket:
@@ -70,6 +75,35 @@ def _deserialize_candle(self, symbol: str, interval: TimeSpan, data: Dict) -> Ca
7075

7176
return Candle(symbol, interval, timestamp, open, close, high, low, volume)
7277

78+
def send_bracket_order(self, symbol: str, direction: OrderDirection, quantity: float,
79+
triggering_price: float, position_entry_grace: float, spread: float,
80+
time_in_force: str = 'GTC'):
81+
82+
grace_price = triggering_price * (1 + position_entry_grace) if direction == OrderDirection.BUY else \
83+
triggering_price * (1 - position_entry_grace)
84+
85+
take_profit_price = triggering_price * (1 + spread) if direction == OrderDirection.BUY else \
86+
triggering_price * (1 - spread)
87+
88+
stop_loss_price = triggering_price * (1 - spread) if direction == OrderDirection.BUY else \
89+
triggering_price * (1 + spread)
90+
91+
side = self._direction_to_side(direction)
92+
logging.info(f'Sending order for {symbol} {side} {quantity} at {grace_price}...')
93+
order_response = self.client.new_order(symbol=symbol, side=side, type='LIMIT',
94+
quantity=quantity, price=grace_price,
95+
timeInForce=time_in_force)
96+
97+
logging.info(f'Order response: {order_response}')
98+
if order_response['status'] == 'FILLED':
99+
logging.info(f'Order filled, sending take profit and stop loss... '
100+
f'take profit: {take_profit_price}, stop loss: {stop_loss_price}')
101+
102+
opposite_side = self._direction_to_opposite_side(direction)
103+
self.client.new_oco_order(symbol=symbol, side=opposite_side, quantity=quantity, price=take_profit_price,
104+
stopPrice=stop_loss_price, time_in_force='GTC')
105+
return order_response
106+
73107
def get_symbol_history(self, symbol: str, interval: TimeSpan, start_time: datetime,
74108
end_time: datetime = datetime.now()) -> List[Candle]:
75109
self.logger.info(f'Getting {symbol} history from {start_time} to {end_time}...')
@@ -111,6 +145,14 @@ def deserialize(cls, data: Dict):
111145
def _timestamp_to_datetime(timestamp: int) -> datetime:
112146
return datetime.fromtimestamp(timestamp / 1000)
113147

148+
@staticmethod
149+
def _direction_to_side(direction: OrderDirection) -> str:
150+
return 'BUY' if direction == OrderDirection.Buy else 'SELL'
151+
152+
@staticmethod
153+
def _direction_to_opposite_side(direction: OrderDirection) -> str:
154+
return 'SELL' if direction == OrderDirection.Buy else 'BUY'
155+
114156
@staticmethod
115157
def _timespan_to_interval(timespan: TimeSpan) -> str:
116158
if timespan == TimeSpan.Second:

src/algotrader/storage/inmemory_storage.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,19 @@ def __init__(self) -> None:
1313
self.candles: Dict[str, List[Candle]] = {}
1414

1515
def get_symbol_candles(self, symbol: str, time_span: TimeSpan, from_timestamp: datetime,
16-
to_timestamp: datetime) -> List[Candle]:
16+
to_timestamp: datetime, limit: int = 0) -> List[Candle]:
1717

1818
if symbol not in self.candles:
1919
return []
2020

21-
return list(filter(lambda candle:
22-
candle.time_span == time_span and
23-
from_timestamp <= candle.timestamp <= to_timestamp, self.candles[symbol]))
21+
results = list(filter(lambda candle:
22+
candle.time_span == time_span and
23+
from_timestamp <= candle.timestamp <= to_timestamp, self.candles[symbol]))
24+
25+
if limit > 0:
26+
return results[:limit]
27+
28+
return results
2429

2530
def get_candles(self, time_span: TimeSpan, from_timestamp: datetime, to_timestamp: datetime) -> List[Candle]:
2631

0 commit comments

Comments
 (0)