Skip to content

Commit 51705f0

Browse files
authored
Synchronize chain state after notification stream is established. (#4361)
## Motivation `test_wasm_end_to_end_social_event_streams::remote_net_grpc` is failing on `devnet-2025-08-15`. The problem seems to be that the relevant block on the publisher chain arrives _after_ we synchronize that chain explicitly but _before_ the notification streams from the validators are established, so we miss that block. It's difficult to reproduce this issue in another way; locally, the tests all pass. ## Proposal Synchronize after establishing the notification stream, for each validator. ## Test Plan For me, this fixes the test. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 4253666 commit 51705f0

File tree

3 files changed

+28
-19
lines changed

3 files changed

+28
-19
lines changed

linera-core/src/client/mod.rs

Lines changed: 25 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1854,10 +1854,16 @@ impl<Env: Environment> ChainClient<Env> {
18541854

18551855
/// Obtains the committee for the current epoch of the local chain.
18561856
#[instrument(level = "trace")]
1857-
pub async fn local_committee(&self) -> Result<Committee, LocalNodeError> {
1858-
self.chain_info_with_committees()
1859-
.await?
1860-
.into_current_committee()
1857+
pub async fn local_committee(&self) -> Result<Committee, ChainClientError> {
1858+
let info = match self.chain_info_with_committees().await {
1859+
Ok(info) => info,
1860+
Err(LocalNodeError::BlobsNotFound(_)) => {
1861+
self.synchronize_chain_state(self.chain_id).await?;
1862+
self.chain_info_with_committees().await?
1863+
}
1864+
Err(err) => return Err(err.into()),
1865+
};
1866+
Ok(info.into_current_committee()?)
18611867
}
18621868

18631869
/// Obtains the committee for the latest epoch on the admin chain.
@@ -3707,14 +3713,6 @@ impl<Env: Environment> ChainClient<Env> {
37073713
let mut senders = HashMap::new(); // Senders to cancel notification streams.
37083714
let notifications = self.subscribe()?;
37093715
let (abortable_notifications, abort) = stream::abortable(self.subscribe()?);
3710-
let sync_result = if self.is_tracked() {
3711-
self.synchronize_from_validators().await
3712-
} else {
3713-
self.synchronize_chain_state(self.chain_id).await
3714-
};
3715-
if let Err(error) = sync_result {
3716-
error!("Failed to synchronize from validators: {}", error);
3717-
}
37183716

37193717
// Beware: if this future ceases to make progress, notification processing will
37203718
// deadlock, because of the issue described in
@@ -3738,10 +3736,10 @@ impl<Env: Environment> ChainClient<Env> {
37383736
.await
37393737
{
37403738
if let Reason::NewBlock { .. } = notification.reason {
3741-
match await_while_polling(
3739+
match Box::pin(await_while_polling(
37423740
this.update_notification_streams(&mut senders).fuse(),
37433741
&mut process_notifications,
3744-
)
3742+
))
37453743
.await
37463744
{
37473745
Ok(handler) => process_notifications.push(handler),
@@ -3766,14 +3764,14 @@ impl<Env: Environment> ChainClient<Env> {
37663764
&self,
37673765
senders: &mut HashMap<ValidatorPublicKey, AbortHandle>,
37683766
) -> Result<impl Future<Output = ()>, ChainClientError> {
3769-
let (chain_id, nodes, local_node) = {
3767+
let (nodes, local_node) = {
37703768
let committee = self.local_committee().await?;
37713769
let nodes: HashMap<_, _> = self
37723770
.client
37733771
.validator_node_provider()
37743772
.make_nodes(&committee)?
37753773
.collect();
3776-
(self.chain_id, nodes, self.client.local_node.clone())
3774+
(nodes, self.client.local_node.clone())
37773775
};
37783776
// Drop removed validators.
37793777
senders.retain(|validator, abort| {
@@ -3788,9 +3786,19 @@ impl<Env: Environment> ChainClient<Env> {
37883786
let hash_map::Entry::Vacant(entry) = senders.entry(public_key) else {
37893787
continue;
37903788
};
3789+
let this = self.clone();
37913790
let stream = stream::once({
37923791
let node = node.clone();
3793-
async move { node.subscribe(vec![chain_id]).await }
3792+
async move {
3793+
let stream = node.subscribe(vec![this.chain_id]).await?;
3794+
// Only now the notification stream is established. We may have missed
3795+
// notifications since the last time we synchronized.
3796+
let remote_node = RemoteNode { public_key, node };
3797+
this.client
3798+
.synchronize_chain_state_from(&remote_node, this.chain_id)
3799+
.await?;
3800+
Ok::<_, ChainClientError>(stream)
3801+
}
37943802
})
37953803
.filter_map(move |result| async move {
37963804
if let Err(error) = &result {

linera-service/tests/linera_net_tests.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1873,7 +1873,6 @@ async fn test_wasm_end_to_end_counter_publish_create(config: impl LineraNetConfi
18731873
#[cfg_attr(feature = "kubernetes", test_case(SharedLocalKubernetesNetTestingConfig::new(Network::Grpc, BuildArg::Build) ; "kubernetes_grpc"))]
18741874
#[cfg_attr(feature = "remote-net", test_case(RemoteNetTestingConfig::new(None) ; "remote_net_grpc"))]
18751875
#[test_log::test(tokio::test)]
1876-
#[ignore] // TODO(#4363)
18771876
async fn test_wasm_end_to_end_social_event_streams(config: impl LineraNetConfig) -> Result<()> {
18781877
use linera_base::time::Instant;
18791878
use social::SocialAbi;

linera-service/tests/local_net_tests.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,9 @@ async fn test_end_to_end_reconfiguration(config: LocalNetConfig) -> Result<()> {
145145
client.query_validators(Some(chain_1)).await?;
146146

147147
if matches!(network, Network::Grpc) {
148-
assert_eq!(faucet.current_validators().await?.len(), 5);
148+
assert!(
149+
eventually(|| async { faucet.current_validators().await.unwrap().len() == 5 }).await
150+
);
149151
}
150152

151153
// Add 6th validator

0 commit comments

Comments
 (0)