Skip to content

Commit 689420a

Browse files
authored
add real time candle processing from Binance (#23)
* add real time candle processing from binance * lint
1 parent 774b884 commit 689420a

File tree

8 files changed

+118
-6
lines changed

8 files changed

+118
-6
lines changed

README.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,14 @@ Running this example pipeline will load historical data from Binance to MongoDB:
155155
```shell
156156
python main.py pipeline run examples/pipeline-templates/build_daily_binance_loader.json
157157
```
158+
159+
* Realtime Crypto pipeline from Binance can be found in [examples/pipeline-templates/build_realtime_binance.json](src/examples/pipeline-templates/build_realtime_binance.json).
160+
Running this example pipeline will process realtime, second candles from Binance:
161+
162+
```shell
163+
python main.py pipeline run examples/pipeline-templates/build_realtime_binance.json
164+
```
165+
158166

159167

160168
## Virtual environment
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
{
2+
"__class__": "pipeline.pipeline:Pipeline",
3+
"source": {
4+
"__class__": "pipeline.sources.binance_realtime:BinanceRealtimeSource",
5+
"binanceProvider": {
6+
"apiKey": "",
7+
"apiSecret": "",
8+
"enableWebsocket": true
9+
},
10+
"symbols": [
11+
"BTCUSDT"
12+
],
13+
"timeSpan": 1
14+
},
15+
"processor": {
16+
"__class__": "pipeline.processors.technicals:TechnicalsProcessor",
17+
"next_processor": {
18+
"__class__": "pipeline.processors.candle_cache:CandleCache",
19+
"next_processor": {
20+
"__class__": "pipeline.processors.storage_provider_sink:StorageSinkProcessor",
21+
"storage_provider": {
22+
"__class__": "storage.mongodb_storage:MongoDBStorage",
23+
"host": "localhost",
24+
"port": 27017,
25+
"database": "algo-trader",
26+
"username": "root",
27+
"password": "root"
28+
}
29+
}
30+
}
31+
},
32+
"terminator": null
33+
}

src/logger/__init__.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
import logging
2+
import os
23
import pathlib
34
import time
5+
from rich.logging import RichHandler
46

57

68
def setup_logger():
79
log_dir = pathlib.Path(__file__).parent.parent.joinpath('logs').resolve()
10+
level = logging.DEBUG if os.environ.get('DEBUG') else logging.INFO
811

912
if not pathlib.Path.exists(log_dir):
1013
pathlib.Path.mkdir(log_dir)
@@ -16,11 +19,11 @@ def setup_logger():
1619
logging.basicConfig(
1720
filename=f'{log_dir}/{time.strftime("algo-trader.%y%m%d_%H%M%S.log")}',
1821
filemode="w",
19-
level=logging.INFO,
22+
level=level,
2023
format=recfmt, datefmt=timefmt)
2124
logger = logging.getLogger()
22-
console = logging.StreamHandler()
23-
console.setLevel(logging.INFO)
25+
console = RichHandler()
26+
console.setLevel(level)
2427
logger.addHandler(console)
2528

2629
logging.getLogger('ibapi.wrapper').setLevel(logging.WARNING)

src/main.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ def save_pipeline_spec(filename: str, pipeline: Pipeline):
2525

2626

2727
def generate_example_templates():
28+
save_pipeline_spec('build_realtime_binance.json', LoadersPipelines.build_realtime_binance())
2829
save_pipeline_spec('build_daily_binance_loader.json', LoadersPipelines.build_daily_binance_loader())
2930
save_pipeline_spec('build_daily_yahoo_loader.json', LoadersPipelines.build_daily_yahoo_loader())
3031
save_pipeline_spec('backtest_mongo_source_rsi_strategy.json', BacktestPipelines.build_mongodb_backtester())

src/pipeline/builders/loaders.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from pipeline.reverse_source import ReverseSource
1717
from pipeline.source import Source
1818
from pipeline.sources.binance_history import BinanceHistorySource
19+
from pipeline.sources.binance_realtime import BinanceRealtimeSource
1920
from pipeline.sources.ib_history import IBHistorySource
2021
from pipeline.sources.mongodb_source import MongoDBSource
2122
from pipeline.sources.yahoo_finance_history import YahooFinanceHistorySource
@@ -75,6 +76,21 @@ def build_daily_binance_loader(days_back: int = DEFAULT_DAYS_BACK) -> Pipeline:
7576

7677
return Pipeline(source, processor)
7778

79+
@staticmethod
80+
def build_realtime_binance() -> Pipeline:
81+
mongodb_storage = MongoDBStorage()
82+
83+
symbols = AssetsProvider.get_crypto_symbols()
84+
85+
provider = BinanceProvider(enable_websocket=True)
86+
source = BinanceRealtimeSource(provider, symbols, TimeSpan.Second)
87+
88+
sink = StorageSinkProcessor(mongodb_storage)
89+
cache_processor = CandleCache(sink)
90+
processor = TechnicalsProcessor(cache_processor)
91+
92+
return Pipeline(source, processor)
93+
7894
@staticmethod
7995
def build_returns_calculator(days_back: int = DEFAULT_DAYS_BACK) -> Pipeline:
8096
mongodb_storage = MongoDBStorage()

src/pipeline/pipeline.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import logging
22
from typing import Optional, Dict
33

4+
from rich.progress import Progress, TextColumn, BarColumn
5+
46
from entities.serializable import Serializable, Deserializable
57
from pipeline.processor import Processor
68
from pipeline.shared_context import SharedContext
@@ -34,8 +36,14 @@ def deserialize(cls, data: Dict):
3436

3537
def run(self, context: SharedContext) -> None:
3638
self.logger.info('Starting pipeline...')
37-
for candle in self.source.read():
38-
self.processor.process(context, candle)
39+
40+
with Progress(TextColumn('{task.completed} Candle(s) processed'), BarColumn()) as progress:
41+
processing_task = progress.add_task("Processing", total=None)
42+
43+
for candle in self.source.read():
44+
self.logger.debug('Processing candle: %s\r', candle.serialize())
45+
self.processor.process(context, candle)
46+
progress.update(processing_task, advance=1)
3947

4048
if self.terminator:
4149
self.terminator.terminate(context)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
from queue import Queue
2+
from typing import List, Dict, Iterator
3+
4+
from entities.candle import Candle
5+
from entities.timespan import TimeSpan
6+
from pipeline.source import Source
7+
from providers.binance import BinanceProvider
8+
9+
10+
class BinanceRealtimeSource(Source):
11+
12+
def __init__(self, binance_provider: BinanceProvider, symbols: List[str], time_span: TimeSpan):
13+
self.binance_provider = binance_provider
14+
self.symbols = symbols
15+
self.time_span = time_span
16+
self.queue = Queue()
17+
18+
def read(self) -> Iterator[Candle]:
19+
for symbol in self.symbols:
20+
self.binance_provider.start_kline_socket(symbol, self.time_span, self._on_candle)
21+
22+
while self.binance_provider.is_socket_alive():
23+
yield self.queue.get()
24+
25+
def _on_candle(self, candle: Candle):
26+
self.queue.put(candle)
27+
28+
def serialize(self) -> Dict:
29+
obj = super().serialize()
30+
obj.update({
31+
'binanceProvider': self.binance_provider.serialize(),
32+
'symbols': self.symbols,
33+
'timeSpan': self.time_span.value,
34+
})
35+
return obj
36+
37+
@classmethod
38+
def deserialize(cls, data: Dict):
39+
provider = BinanceProvider.deserialize(data.get('binanceProvider'))
40+
return cls(provider, data.get('symbols'), TimeSpan(data.get('timeSpan')))

src/providers/binance.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ def __init__(self, api_key: Optional[str] = '', api_secret: Optional[str] = '',
2727
self.wsManager.start()
2828

2929
def stop(self):
30-
if self.wsManager.isAlive():
30+
if self.is_socket_alive():
3131
self.logger.info('Stopping websocket manager...')
3232
self.wsManager.stop()
3333

34+
def is_socket_alive(self) -> bool:
35+
return self.wsManager.is_alive()
36+
3437
def start_kline_socket(self, symbol: str, interval: TimeSpan, handler: StreamedCandleCallback):
3538
self.logger.info(f'Starting kline socket for {symbol}...')
3639
self.wsManager.kline(symbol=symbol, id=1,

0 commit comments

Comments
 (0)