Skip to content

Commit 2a1b2bd

Browse files
committed
cleans up run streams and removes unused code
1 parent 70dab61 commit 2a1b2bd

File tree

6 files changed

+58
-66
lines changed

6 files changed

+58
-66
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bin/orderbook-quoter-server/src/main.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,6 @@ fn main() {
6060
.takes_value(true),
6161
)
6262
.get_matches();
63-
6463
let config_path = PathBuf::from(matches.value_of("config").unwrap_or("/etc/config.yaml"));
6564
info!("config path given: {:?}", config_path);
6665
let file = PathBuf::from(config_path);

depth-driver/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@ config.workspace = true
1616
exchange.workspace = true
1717
market-objects.workspace = true
1818
quoter-errors.workspace = true
19+
anyhow = "1.0.99"

depth-driver/src/lib.rs

Lines changed: 48 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,11 @@ use std::cell::RefCell;
22
use std::rc::Rc;
33

44
use crossbeam_channel::Sender;
5+
use tracing::warn;
56

6-
use tokio::sync::watch::{
7-
channel as watchChannel, Receiver as watchReceiver, Sender as watchSender,
8-
};
7+
use tokio::sync::watch::{channel as watchChannel, Receiver as watchReceiver};
8+
use tokio::task;
99
use tokio_context::context::Context;
10-
use tracing::info;
11-
12-
use futures::pin_mut;
13-
use futures::stream::FuturesOrdered;
14-
use futures::StreamExt;
1510

1611
use config::ExchangeConfig;
1712
use exchange::exchange::Exchange;
@@ -20,30 +15,31 @@ use quoter_errors::{ErrorHotPath, ErrorInitialState};
2015

2116
pub struct DepthDriver {
2217
exchanges: Vec<Rc<RefCell<Exchange>>>,
23-
orderbook_snapshot_trigger: watchReceiver<()>,
24-
exchange_snapshot_trigger: watchSender<()>,
18+
// todo: implement syncs
19+
// orderbook_snapshot_sync: watchReceiver<()>,
20+
// exchange_snapshot_sync: watchSender<()>,
2521
}
2622

2723
impl DepthDriver {
2824
pub fn new(
2925
exchange_configs: &Vec<ExchangeConfig>,
3026
depths_producer: Sender<DepthUpdate>,
31-
orderbook_snapshot_trigger: watchReceiver<()>,
27+
_: watchReceiver<()>,
3228
) -> Result<DepthDriver, ErrorInitialState> {
3329
let mut exchanges: Vec<Rc<RefCell<Exchange>>> = Vec::new();
34-
let (snapshot_trigger, inner_snapshot_consumer) = watchChannel(());
30+
let (_, inner_snapshot_consumer) = watchChannel(());
3531
for exchange_config in exchange_configs {
3632
let exchange = Exchange::new(
3733
exchange_config,
3834
depths_producer.clone(),
3935
inner_snapshot_consumer.clone(),
4036
);
41-
exchanges.push(Rc::new(RefCell::new(exchange)));
37+
exchanges.push(Rc::new(RefCell::new(exchange?)));
4238
}
4339
Ok(DepthDriver {
4440
exchanges,
45-
orderbook_snapshot_trigger,
46-
exchange_snapshot_trigger: snapshot_trigger,
41+
// orderbook_snapshot_trigger,
42+
// exchange_snapshot_trigger: snapshot_trigger,
4743
})
4844
}
4945

@@ -65,6 +61,7 @@ impl DepthDriver {
6561
Ok(())
6662
}
6763

64+
// todo: this can be done on multiplie threads?
6865
pub async fn build_orderbook(&mut self) -> Result<(), ErrorInitialState> {
6966
for exchange in self.exchanges.iter_mut() {
7067
exchange.as_ref().borrow_mut().run_snapshot().await?;
@@ -79,50 +76,47 @@ impl DepthDriver {
7976
Ok(())
8077
}
8178

82-
// run_streams is the main driver the entire program that continously polls websocket depths from
83-
// N exchanges and also handles orderbook rebuilds
84-
pub async fn run_streams(&mut self, ctx: &mut Context) -> Result<(), ErrorHotPath> {
85-
info!("running streams");
86-
let inner_trigger = &mut self.orderbook_snapshot_trigger;
87-
let mut exchanges: Vec<*mut Exchange> = vec![];
88-
// let mut streams = FuturesOrdered::new();
89-
for exchange in &mut self.exchanges {
90-
exchanges.push(exchange.as_ref().as_ptr())
91-
}
92-
93-
/*
94-
unsafe {
95-
for stream in exchanges {
96-
streams.push_back(stream.as_mut().unwrap().stream_depths())
97-
}
98-
}
99-
*/
100-
101-
// pin_mut!(streams);
102-
103-
loop {
104-
info!("waiting for future");
105-
tokio::select! {
106-
// TODO: This may not rebuild orderbook correctly but the trigger currently is not
107-
// implemented within the orderbook. This future handles orderbook rebuilds.
108-
// _ = inner_trigger.changed()=> {
109-
// send snapshot trigger to our N exchange streams
110-
// this sets a streams run function to grab a http depth load from multiplie exchanges, and buffers
111-
// the websocket depths.
112-
// let _ = self.exchange_snapshot_trigger.send(());
113-
_ = unsafe {exchanges[0].as_mut().unwrap().stream_depths() } => {}
114-
_ = unsafe {exchanges[1].as_mut().unwrap().stream_depths() } => {}
115-
_ = ctx.done() => {
116-
return Ok(());
79+
// todo: think about implementing context for graceful shutdowns
80+
pub async fn run_streams(&mut self, _: &mut Context) -> Result<(), ErrorHotPath> {
81+
let local = task::LocalSet::new();
82+
local
83+
.run_until(async move {
84+
for (idx, exchange) in self.exchanges.iter_mut().enumerate() {
85+
let exchange = exchange.clone();
86+
tokio::task::spawn_local(async move {
87+
loop {
88+
match exchange.as_ref().borrow_mut().stream_depths().await {
89+
Ok(_) => continue,
90+
Err(err) => {
91+
warn!(
92+
"received error {} for exchange when streaming depths{}",
93+
err, idx
94+
);
95+
// todo: don't return but reconnect - we don't want to end the
96+
// entire system if one websocket fails
97+
return Err(ErrorHotPath::OrderBookDealSendFail);
98+
}
99+
}
117100
}
118-
}
119-
}
101+
// even though this code is unreachable we need to infer
102+
// the return type
103+
Ok::<(), ErrorHotPath>(())
104+
});
105+
}
106+
})
107+
.await;
108+
Ok(())
120109
}
121110

122-
pub async fn close_exchanges(&mut self) {
111+
// todo: update the error here
112+
pub async fn close_exchanges(&mut self) -> Result<(), ErrorHotPath> {
123113
for exchange in &mut self.exchanges {
124114
let mut exchange = exchange.borrow_mut();
125-
exchange.close().await
115+
exchange
116+
.close()
117+
.await
118+
.map_err(|_| ErrorHotPath::OrderBookDealSendFail)?;
126119
}
120+
Ok(())
127121
}
128122
}

exchange/src/exchange.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,7 @@ impl Exchange {
8989

9090
pub async fn stream_depths(&mut self) -> Result<(), ErrorHotPath> {
9191
info!("streaming depths");
92-
self.inner.run().await;
92+
self.inner.run().await?;
9393
Ok(())
9494
}
9595

exchange/src/stream.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use std::error::Error;
2-
use std::fmt;
32
use std::{pin::Pin, task::Poll};
43

54
use serde_json::from_str;
@@ -135,9 +134,9 @@ impl ExchangeStream {
135134
Ok(mut depths) => {
136135
while let Some(depth) = depths.next() {
137136
self.buffer.push(depth);
138-
// we must keep processing snapshot depths and depths from the websocket
137+
// we must keep processing snapshot depths and depths from the websocket
139138
// but this time the websocket depths are stored in their own buffer
140-
// to be sequenced
139+
// to be sequenced aftr snapshot depths are processed
141140
self.next().await;
142141
}
143142
success = true;
@@ -174,9 +173,6 @@ impl ExchangeStream {
174173
Ok(mut depths) => {
175174
while let Some(depth) = depths.next() {
176175
self.buffer.push(depth);
177-
// we must keep processing snapshot depths and depths from the websocket
178-
// but this time the websocket depths are stored in their own buffer
179-
// to be sequenced
180176
self.next().await;
181177
}
182178
break
@@ -249,7 +245,7 @@ impl ExchangeStream {
249245
impl Iterator<Item = DepthUpdate>,
250246
impl Iterator<Item = DepthUpdate>,
251247
),
252-
Box<dyn Error + Sync + Send + 'static>,
248+
orderbook_snapshot_error,
253249
> {
254250
let req_builder = self
255251
.http_client
@@ -328,6 +324,8 @@ impl ExchangeStream {
328324

329325
#[derive(Debug)]
330326
pub enum WSStreamState {
327+
// todo: collapse these errors into 1 Faulty state or do something differently
328+
// -- this is our async state not a error
331329
WSError(tokio_tungstenite::tungstenite::Error),
332330
SenderError,
333331
FailedStream,
@@ -459,7 +457,6 @@ impl Stream for ExchangeStream {
459457
}
460458
}
461459
}
462-
info!("streaming -- here 1");
463460
return Poll::Ready(Some(WSStreamState::WaitingForDepth));
464461
}
465462
}

0 commit comments

Comments
 (0)