Skip to content

Commit 89310df

Browse files
authored
Replace kill process for propper shutdown (#868)
## πŸ“ Summary The bidding service client used to trigger an ugly kill process, now it just triggers the proper shutdown. ## βœ… I have completed the following steps: * [X] Run `make lint` * [X] Run `make test` * [ ] Added tests (if applicable)
1 parent 924b662 commit 89310df

File tree

3 files changed

+30
-58
lines changed

3 files changed

+30
-58
lines changed

β€Žcrates/rbuilder-operator/src/bidding_service_wrapper/client/bidding_service_client_adapter.rsβ€Ž

Lines changed: 29 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
use futures_util::FutureExt;
22
use hyper_util::rt::TokioIo;
33
use parking_lot::Mutex;
4-
use rbuilder::live_builder::{
5-
block_output::bidding_service_interface::{
6-
BiddingService, BlockSealInterfaceForSlotBidder, LandedBlockInfo as RealLandedBlockInfo,
7-
RelaySet, ScrapedRelayBlockBidWithStats, SlotBidder, SlotBlockId,
8-
},
9-
process_killer::ProcessKiller,
4+
use rbuilder::live_builder::block_output::bidding_service_interface::{
5+
BiddingService, BlockSealInterfaceForSlotBidder, LandedBlockInfo as RealLandedBlockInfo,
6+
RelaySet, ScrapedRelayBlockBidWithStats, SlotBidder, SlotBlockId,
107
};
118
use rbuilder_utils::build_info::Version;
129
use std::{
@@ -110,15 +107,14 @@ impl BiddingServiceClientAdapter {
110107
landed_blocks_history: &[RealLandedBlockInfo],
111108
all_relay_ids: RelaySet,
112109
cancellation_token: CancellationToken,
113-
process_killer: ProcessKiller,
114110
) -> Result<Self> {
115111
let session_id_to_slot_bidder = Arc::new(Mutex::new(HashMap::new()));
116112
let (commands_sender, relay_sets) = Self::init_sender_task(
117113
uds_path,
118114
landed_blocks_history,
119115
all_relay_ids,
120116
session_id_to_slot_bidder.clone(),
121-
process_killer,
117+
cancellation_token.clone(),
122118
)
123119
.await?;
124120
spawn_slot_bidder_seal_bid_command_subscriber(
@@ -141,15 +137,16 @@ impl BiddingServiceClientAdapter {
141137
self.last_session_id.fetch_add(1, Ordering::Relaxed)
142138
}
143139

144-
// returns the commands_sender to send commands to the bidding service and the relay_sets that it got on the initialize call.
140+
/// returns the commands_sender to send commands to the bidding service and the relay_sets that it got on the initialize call.
141+
/// cancellation_token is signaled on protocol error so rbuilder restarts.
145142
async fn init_sender_task(
146143
uds_path: &str,
147144
landed_blocks_history: &[RealLandedBlockInfo],
148145
all_relay_ids: RelaySet,
149146
session_id_to_slot_bidder: Arc<
150147
Mutex<HashMap<u64, Arc<dyn BlockSealInterfaceForSlotBidder + Send + Sync>>>,
151148
>,
152-
process_killer: ProcessKiller,
149+
cancellation_token: CancellationToken,
153150
) -> Result<(
154151
mpsc::UnboundedSender<BiddingServiceClientCommand>,
155152
Vec<RelaySet>,
@@ -189,7 +186,7 @@ impl BiddingServiceClientAdapter {
189186
build_time_utc: bidding_service_version.build_time_utc,
190187
});
191188
let (commands_sender, rx) = mpsc::unbounded_channel::<BiddingServiceClientCommand>();
192-
Self::spawn_sender_loop_task(rx, client, session_id_to_slot_bidder, process_killer);
189+
Self::spawn_sender_loop_task(rx, client, session_id_to_slot_bidder, cancellation_token);
193190
Ok((commands_sender, relay_sets))
194191
}
195192

@@ -201,50 +198,44 @@ impl BiddingServiceClientAdapter {
201198
session_id_to_slot_bidder: Arc<
202199
Mutex<HashMap<u64, Arc<dyn BlockSealInterfaceForSlotBidder + Send + Sync>>>,
203200
>,
204-
process_killer: ProcessKiller,
201+
cancellation_token: CancellationToken,
205202
) {
206203
// Spawn a task to execute received futures
207204
tokio::spawn(async move {
208205
while let Some(command) = rx.recv().await {
209-
match command {
206+
let rpc_result = match command {
210207
BiddingServiceClientCommand::CreateSlotBidder(create_slot_data) => {
211208
Self::create_slot_bidder(
212209
&mut client,
213210
create_slot_data,
214211
session_id_to_slot_bidder.clone(),
215-
&process_killer,
216212
)
217-
.await;
213+
.await
218214
}
219215
BiddingServiceClientCommand::MustWinBlock(must_win_block_params) => {
220-
Self::handle_error(
221-
client.must_win_block(must_win_block_params).await,
222-
&process_killer,
223-
);
216+
client.must_win_block(must_win_block_params).await
224217
}
225218
BiddingServiceClientCommand::UpdateNewLandedBlocksDetected(params) => {
226-
Self::handle_error(
227-
client.update_new_landed_blocks_detected(params).await,
228-
&process_killer,
229-
);
219+
client.update_new_landed_blocks_detected(params).await
230220
}
231221
BiddingServiceClientCommand::UpdateFailedReadingNewLandedBlocks => {
232-
Self::handle_error(
233-
client
234-
.update_failed_reading_new_landed_blocks(Empty {})
235-
.await,
236-
&process_killer,
237-
);
222+
client
223+
.update_failed_reading_new_landed_blocks(Empty {})
224+
.await
238225
}
239226
BiddingServiceClientCommand::DestroySlotBidder(destroy_slot_bidder_params) => {
240-
Self::handle_error(
241-
client.destroy_slot_bidder(destroy_slot_bidder_params).await,
242-
&process_killer,
243-
);
227+
let rpc_result =
228+
client.destroy_slot_bidder(destroy_slot_bidder_params).await;
244229
session_id_to_slot_bidder
245230
.lock()
246231
.remove(&destroy_slot_bidder_params.session_id);
232+
rpc_result
247233
}
234+
};
235+
if let Err(error) = &rpc_result {
236+
error!(error=?error,"RPC call error, cancelling so process shutdowns and reconnects");
237+
cancellation_token.cancel();
238+
return;
248239
}
249240
}
250241
});
@@ -257,34 +248,18 @@ impl BiddingServiceClientAdapter {
257248
session_id_to_slot_bidder: Arc<
258249
Mutex<HashMap<u64, Arc<dyn BlockSealInterfaceForSlotBidder + Send + Sync>>>,
259250
>,
260-
process_killer: &ProcessKiller,
261-
) {
251+
) -> tonic::Result<tonic::Response<Empty>> {
262252
let session_id = create_slot_bidder_data.params.session_id;
263253
session_id_to_slot_bidder
264254
.lock()
265255
.insert(session_id, create_slot_bidder_data.block_seal_handle.into());
266-
if let Err(err) = client
256+
let rpc_result = client
267257
.create_slot_bidder(create_slot_bidder_data.params)
268-
.await
269-
{
258+
.await;
259+
if rpc_result.is_err() {
270260
session_id_to_slot_bidder.lock().remove(&session_id);
271-
Self::handle_error(Err(err), process_killer);
272-
};
273-
}
274-
275-
/// If error logs it.
276-
/// return result is error
277-
fn handle_error(
278-
result: tonic::Result<tonic::Response<Empty>>,
279-
process_killer: &ProcessKiller,
280-
) -> bool {
281-
if let Err(error) = &result {
282-
error!(error=?error,"RPC call error, killing process so it reconnects");
283-
process_killer.kill("RPC call error");
284-
true
285-
} else {
286-
false
287261
}
262+
rpc_result
288263
}
289264

290265
pub async fn must_win_block(&self, block: u64) {

β€Žcrates/rbuilder-operator/src/bidding_service_wrapper/fast_streams/helpers.rsβ€Ž

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,7 @@ fn init_slot_bidder_seal_bid_command_subscriber() -> Result<
425425

426426
/// Spawns a thread that subscribes to the SlotBidderSealBidCommandRPC and forwards them to registered BlockSealInterfaceForSlotBidder in session_id_to_slot_bidder.
427427
/// Result tells if the init stage was successful and the thread was able to start polling.
428+
/// cancellation_token is used to detect shutdown.
428429
pub fn spawn_slot_bidder_seal_bid_command_subscriber(
429430
session_id_to_slot_bidder: Arc<
430431
Mutex<HashMap<u64, Arc<dyn BlockSealInterfaceForSlotBidder + Send + Sync>>>,

β€Žcrates/rbuilder-operator/src/flashbots_config.rsβ€Ž

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ use rbuilder::{
2626
SpecificBuilderConfig,
2727
},
2828
payload_events::MevBoostSlotData,
29-
process_killer::ProcessKiller,
3029
LiveBuilder,
3130
},
3231
provider::StateProviderFactory,
@@ -169,7 +168,6 @@ impl LiveBuilderConfig for FlashbotsConfig {
169168
&landed_blocks,
170169
self.l1_config.relays_ids(),
171170
cancellation_token.clone(),
172-
ProcessKiller::new(cancellation_token.clone()),
173171
)
174172
.await?;
175173

@@ -293,14 +291,12 @@ impl FlashbotsConfig {
293291
landed_blocks_history: &[LandedBlockInfo],
294292
all_relay_ids: RelaySet,
295293
cancellation_token: CancellationToken,
296-
process_killer: ProcessKiller,
297294
) -> eyre::Result<Arc<BiddingServiceClientAdapter>> {
298295
let bidding_service_client = BiddingServiceClientAdapter::new(
299296
&self.bidding_service_ipc_path,
300297
landed_blocks_history,
301298
all_relay_ids,
302299
cancellation_token,
303-
process_killer,
304300
)
305301
.await
306302
.map_err(|e| eyre::Report::new(e).wrap_err("Unable to connect to remote bidder"))?;

0 commit comments

Comments
Β (0)