Skip to content

Commit 70dab61

Browse files
committed
clean up exchange streams part I
1 parent e2bda9d commit 70dab61

File tree

7 files changed

+277
-172
lines changed

7 files changed

+277
-172
lines changed

Cargo.lock

Lines changed: 138 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ members = [
2222
"objs/internal-objects",
2323
"objs/market-objects",
2424
"objs/errors",
25-
"objs/config",
25+
"objs/config", "orderbook-sync",
2626
]
2727
exclude = [
2828
"market-maker",

exchange/src/exchange.rs

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,21 @@
1-
use std::pin::Pin;
2-
use std::time::Duration;
3-
41
use futures::stream::SplitSink;
5-
use futures_util::{try_join, SinkExt};
2+
use futures_util::SinkExt;
63

7-
use futures::future::join_all;
84
use tokio::net::TcpStream;
9-
use tokio::sync::watch::{
10-
channel as watchChannel, Receiver as watchReceiver, Sender as watchSender,
11-
};
5+
use tokio::sync::watch::Receiver as watchReceiver;
126
use tokio_tungstenite::{tungstenite::protocol::Message, MaybeTlsStream, WebSocketStream};
13-
use tracing::info;
147

158
use crossbeam_channel::Sender;
169

10+
use tracing::{error, info};
11+
1712
use crate::stream::ExchangeStream;
1813
use config::ExchangeConfig;
1914
use market_objects::DepthUpdate;
2015
use quoter_errors::{ErrorHotPath, ErrorInitialState};
2116

17+
const SUBSCRIBE: &'static str = "SUBSCRIBE";
18+
2219
pub struct Exchange {
2320
pub inner: ExchangeStream,
2421
pub ws_sink: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, Message>>,
@@ -31,20 +28,19 @@ impl Exchange {
3128
exchange_config: &ExchangeConfig,
3229
depth_producer: Sender<DepthUpdate>,
3330
watch_trigger: watchReceiver<()>,
34-
) -> Self {
31+
) -> Result<Exchange, ErrorInitialState> {
3532
let inner = ExchangeStream::new(
3633
exchange_config,
3734
depth_producer.clone(),
3835
watch_trigger,
3936
exchange_config.http_client,
40-
)
41-
.unwrap();
42-
Exchange {
43-
inner: inner,
37+
)?;
38+
Ok(Exchange {
39+
inner,
4440
ws_sink: None,
4541
websocket_uri: exchange_config.ws_uri.clone(),
4642
watched_pair: exchange_config.watched_pair.clone(),
47-
}
43+
})
4844
}
4945
pub async fn start(&mut self) -> Result<(), ErrorInitialState> {
5046
let ws_sink = self.inner.start().await?;
@@ -58,7 +54,7 @@ impl Exchange {
5854
self.websocket_uri
5955
);
6056
let json_obj_binance = serde_json::json!({
61-
"method": "SUBSCRIBE",
57+
"method": SUBSCRIBE,
6258
"params": [
6359
"btcusdt@depth5",
6460
],
@@ -67,16 +63,16 @@ impl Exchange {
6763
let exchange_response = self
6864
.ws_sink
6965
.as_mut()
70-
.unwrap()
66+
.ok_or(ErrorInitialState::ExchangeController)?
7167
.send(Message::Text(json_obj_binance.to_string()))
7268
.await;
7369
// TODO: handle this differently;
7470
match exchange_response {
7571
Ok(response) => {
76-
print!("subscription success: {:?}", response);
72+
info!("subscription success: {:?}", response);
7773
}
7874
Err(error) => {
79-
print!("error {}", error)
75+
error!("error {}", error)
8076
}
8177
}
8278
Ok(())
@@ -103,7 +99,11 @@ impl Exchange {
10399
Ok(())
104100
}
105101

106-
pub async fn close(&mut self) {
107-
let _ = self.ws_sink.as_mut().unwrap().send(Message::Close(None));
102+
pub async fn close(&mut self) -> Result<(), ErrorInitialState> {
103+
self.ws_sink
104+
.as_mut()
105+
.ok_or(ErrorInitialState::ExchangeController)?
106+
.send(Message::Close(None));
107+
Ok(())
108108
}
109109
}

0 commit comments

Comments
 (0)