Skip to content

Commit 37289c8

Browse files
authored
Stop client gracefully in validator (solana-labs#6341)
Stop tpu-client-next clients in validator gracefully.
1 parent dedf09a commit 37289c8

File tree

5 files changed

+27
-6
lines changed

5 files changed

+27
-6
lines changed

core/src/forwarding_stage.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ mod packet_container;
5858
/// * [`TpuClientNextClient`]: Relies on the `tpu-client-next` crate.
5959
pub enum ForwardingClientOption<'a> {
6060
ConnectionCache(Arc<ConnectionCache>),
61-
TpuClientNext((&'a Keypair, UdpSocket, RuntimeHandle)),
61+
TpuClientNext((&'a Keypair, UdpSocket, RuntimeHandle, CancellationToken)),
6262
}
6363

6464
/// Value chosen because it was used historically, at some point
@@ -153,12 +153,14 @@ pub(crate) fn spawn_forwarding_stage(
153153
stake_identity,
154154
tpu_client_socket,
155155
runtime_handle,
156+
cancel,
156157
)) => {
157158
let non_vote_client = TpuClientNextClient::new(
158159
runtime_handle,
159160
forward_address_getter,
160161
Some(stake_identity),
161162
tpu_client_socket,
163+
cancel,
162164
);
163165
let forwarding_stage = ForwardingStage::new(
164166
receiver,
@@ -550,10 +552,10 @@ impl TpuClientNextClient {
550552
forward_address_getter: ForwardAddressGetter,
551553
stake_identity: Option<&Keypair>,
552554
bind_socket: UdpSocket,
555+
cancel: CancellationToken,
553556
) -> Self {
554557
// For now use large channel, the more suitable size to be found later.
555558
let (sender, receiver) = mpsc::channel(128);
556-
let cancel = CancellationToken::new();
557559
let leader_updater = forward_address_getter.clone();
558560

559561
let config = Self::create_config(bind_socket, stake_identity);

core/src/validator.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ use {
146146
strum_macros::{Display, EnumCount, EnumIter, EnumString, EnumVariantNames, IntoStaticStr},
147147
thiserror::Error,
148148
tokio::runtime::Runtime as TokioRuntime,
149+
tokio_util::sync::CancellationToken,
149150
};
150151

151152
const MAX_COMPLETED_DATA_SETS_IN_CHANNEL: usize = 100_000;
@@ -731,13 +732,21 @@ impl Validator {
731732
}
732733
}
733734

735+
// token used to cancel tpu-client-next.
736+
let cancel_tpu_client_next = CancellationToken::new();
734737
{
735738
let exit = exit.clone();
736739
config
737740
.validator_exit
738741
.write()
739742
.unwrap()
740743
.register_exit(Box::new(move || exit.store(true, Ordering::Relaxed)));
744+
let cancel_tpu_client_next = cancel_tpu_client_next.clone();
745+
config
746+
.validator_exit
747+
.write()
748+
.unwrap()
749+
.register_exit(Box::new(move || cancel_tpu_client_next.cancel()));
741750
}
742751

743752
let accounts_update_notifier = geyser_plugin_service
@@ -1177,6 +1186,7 @@ impl Validator {
11771186
Arc::as_ref(&identity_keypair),
11781187
node.sockets.rpc_sts_client,
11791188
runtime_handle.clone(),
1189+
cancel_tpu_client_next.clone(),
11801190
)
11811191
} else {
11821192
let Some(connection_cache) = &connection_cache else {
@@ -1595,6 +1605,7 @@ impl Validator {
15951605
.take()
15961606
.expect("Socket should exist."),
15971607
runtime_handle.clone(),
1608+
cancel_tpu_client_next,
15981609
))
15991610
};
16001611
let tpu = Tpu::new_with_client(

rpc/src/rpc_service.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ use {
6161
tokio_util::{
6262
bytes::Bytes,
6363
codec::{BytesCodec, FramedRead},
64+
sync::CancellationToken,
6465
},
6566
};
6667

@@ -496,7 +497,7 @@ pub struct JsonRpcServiceConfig<'a> {
496497
/// requires a reference to a [`Keypair`].
497498
pub enum ClientOption<'a> {
498499
ConnectionCache(Arc<ConnectionCache>),
499-
TpuClientNext(&'a Keypair, UdpSocket, RuntimeHandle),
500+
TpuClientNext(&'a Keypair, UdpSocket, RuntimeHandle, CancellationToken),
500501
}
501502

502503
impl JsonRpcService {
@@ -553,7 +554,12 @@ impl JsonRpcService {
553554
)?;
554555
Ok(json_rpc_service)
555556
}
556-
ClientOption::TpuClientNext(identity_keypair, tpu_client_socket, client_runtime) => {
557+
ClientOption::TpuClientNext(
558+
identity_keypair,
559+
tpu_client_socket,
560+
client_runtime,
561+
cancel,
562+
) => {
557563
let my_tpu_address = config
558564
.cluster_info
559565
.my_contact_info()
@@ -570,6 +576,7 @@ impl JsonRpcService {
570576
config.send_transaction_service_config.leader_forward_count,
571577
Some(identity_keypair),
572578
tpu_client_socket,
579+
cancel,
573580
);
574581

575582
let json_rpc_service = Self::new_with_client(

send-transaction-service/src/test_utils.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use {
1111
solana_client::connection_cache::ConnectionCache,
1212
std::{net::SocketAddr, sync::Arc},
1313
tokio::runtime::Handle,
14+
tokio_util::sync::CancellationToken,
1415
};
1516

1617
// `maybe_runtime` argument is introduced to be able to use runtime from test
@@ -62,6 +63,7 @@ impl CreateClient for TpuClientNextClient {
6263
leader_forward_count,
6364
None,
6465
bind_socket,
66+
CancellationToken::new(),
6567
)
6668
}
6769
}

send-transaction-service/src/transaction_client.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -246,6 +246,7 @@ impl TpuClientNextClient {
246246
leader_forward_count: u64,
247247
identity: Option<&Keypair>,
248248
bind_socket: UdpSocket,
249+
cancel: CancellationToken,
249250
) -> Self
250251
where
251252
T: TpuInfoWithSendStatic + Clone,
@@ -256,8 +257,6 @@ impl TpuClientNextClient {
256257

257258
let (update_certificate_sender, update_certificate_receiver) = watch::channel(None);
258259

259-
let cancel = CancellationToken::new();
260-
261260
let leader_info_provider = CurrentLeaderInfo::new(leader_info);
262261
let leader_updater: SendTransactionServiceLeaderUpdater<T> =
263262
SendTransactionServiceLeaderUpdater {

0 commit comments

Comments
 (0)