@@ -93,14 +93,17 @@ fn orderbook_quoter_server(config: Config) -> Result<(), Box<dyn Error>> {
9393 let ( mut orderbook, depth_producer) =
9494 OrderBook :: new ( quotes_producer. clone ( ) , & config. orderbook ) ;
9595 orderbook. send_deals = true ; // TODO: this will be removed in the future, a hack.
96+
97+ // todo: update orderbook so bids is in its own thread and asks is in its own thread
98+ // exchange stream(s) should possibly just be in each ask and bid thread as well
9699 let orderbook = Box :: new ( orderbook) ;
97100
98101 let orderbook_depth_processor_core = core_ids[ 0 ] ;
99102 let mut orderbook_clone = orderbook. clone ( ) ;
100103 let t1 = thread:: spawn ( move || {
101104 info ! ( "starting orderbook, depth processor, writer thread" ) ;
102- _ = orderbook_clone. process_all_depths ( & process_depths_ctx) ;
103- let _ = core_affinity:: set_for_current ( orderbook_depth_processor_core) ;
105+ orderbook_clone. process_all_depths ( & process_depths_ctx) ;
106+ core_affinity:: set_for_current ( orderbook_depth_processor_core) ;
104107 } ) ;
105108
106109 let mut orderbook_clone = orderbook. clone ( ) ;
@@ -132,15 +135,14 @@ fn orderbook_quoter_server(config: Config) -> Result<(), Box<dyn Error>> {
132135 . worker_threads ( 1 )
133136 . build ( )
134137 . unwrap ( ) ;
135- let _ = core_affinity:: set_for_current ( io_grpc_core) ;
138+ core_affinity:: set_for_current ( io_grpc_core) ;
136139 let grpc_io_handler = async_grpc_io_rt. handle ( ) ;
137140 let grpc_server = grpc_io_handler. spawn ( async move {
138141 run_server ( & config_clone, quotes_producer) . await ;
139142 info ! ( "shutting down" ) ;
140143 } ) ;
141144 async_grpc_io_rt. block_on ( grpc_server) ;
142145 } ) ;
143-
144146 let io_ws_core = core_ids[ 2 ] ;
145147 let depth_producer = depth_producer. clone ( ) ;
146148 let snapshot_depth_consumer = snapshot_depth_consumer. clone ( ) ;
@@ -155,7 +157,7 @@ fn orderbook_quoter_server(config: Config) -> Result<(), Box<dyn Error>> {
155157 . build ( )
156158 . unwrap ( ) ;
157159 let local = LocalSet :: new ( ) ;
158- let _ = core_affinity:: set_for_current ( io_ws_core) ;
160+ core_affinity:: set_for_current ( io_ws_core) ;
159161 local. spawn_local ( async move {
160162 let mut depth_driver = DepthDriver :: new (
161163 & config_clone. exchanges ,
@@ -170,9 +172,9 @@ fn orderbook_quoter_server(config: Config) -> Result<(), Box<dyn Error>> {
170172 panic ! ( "failed to stream exchanges: {:?}" , stream_error) ;
171173 }
172174 }
173- let _ = depth_driver. subscribe_depths ( ) . await ;
174- let _ = depth_driver. build_orderbook ( ) . await ;
175- let stream_result = depth_driver. run_streams ( & mut async_ctx) . await ;
175+ depth_driver. subscribe_depths ( ) . await ;
176+ depth_driver. build_orderbook ( ) . await ;
177+ if let match stream_result = depth_driver. run_streams ( & mut async_ctx) . await ;
176178 match stream_result {
177179 Ok ( _) => depth_driver. close_exchanges ( ) . await ,
178180 Err ( stream_error) => {
0 commit comments