Skip to content

Commit fec6a17

Browse files
authored
Two-phase chain listener (#4436)
## Motivation Currently we don't synchronize the chains when starting up the client. Now that we also don't pessimistically synchronize chains before performing operations, we sometimes run into the situation where we want to do something with a chain whose blocks haven't been seen yet (e.g. because it came from the faucet), which leads to an error. ## Proposal Ideally when we find a block from the future we should synchronize the offending chain. But for now it should be sufficient to make sure to synchronize the initial chains at startup. Split `ChainListener::run` into two futures: the latter is the full chain listener loop, as before, but first there's an initialization step that synchronizes all the initial chains, and the caller can wait on this in serial to ensure the chains are synchronized before performing operations on the client. ## Test Plan CI should check for regressions; it would be nice to add a test for this case, perhaps after testnet release. ## Release Plan - These changes should be backported to the latest `devnet` branch - These changes should be backported to the latest `testnet` branch, then - be released in a new SDK, ## Links - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 552e63d commit fec6a17

File tree

6 files changed

+71
-54
lines changed

6 files changed

+71
-54
lines changed

linera-client/src/benchmark.rs

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,14 +130,10 @@ impl<Env: Environment> Benchmark<Env> {
130130
let notifier = Arc::new(Notify::new());
131131
let barrier = Arc::new(Barrier::new(num_chains + 1));
132132

133-
let chain_listener_handle = tokio::spawn(
134-
async move {
135-
if let Err(e) = chain_listener.run().await {
136-
warn!("Chain listener error: {}", e);
137-
}
138-
}
139-
.instrument(tracing::info_span!("chain_listener")),
140-
);
133+
let chain_listener_result = chain_listener.run().await;
134+
135+
let chain_listener_handle =
136+
tokio::spawn(async move { chain_listener_result?.await }.in_current_span());
141137

142138
let bps_control_task = Self::bps_control_task(
143139
&barrier,
@@ -216,7 +212,10 @@ impl<Env: Environment> Benchmark<Env> {
216212
if let Some(runtime_control_task) = runtime_control_task {
217213
runtime_control_task.await?;
218214
}
219-
chain_listener_handle.await?;
215+
216+
if let Err(e) = chain_listener_handle.await? {
217+
tracing::error!("chain listener error: {e}");
218+
}
220219

221220
Ok(())
222221
}

linera-client/src/chain_listener.rs

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
use futures::{
1111
future::{join_all, select_all},
1212
lock::Mutex,
13-
FutureExt as _, StreamExt,
13+
Future, FutureExt as _, StreamExt,
1414
};
1515
use linera_base::{
1616
crypto::{CryptoHash, Signer},
@@ -202,25 +202,37 @@ impl<C: ClientContext> ChainListener<C> {
202202

203203
/// Runs the chain listener.
204204
#[instrument(skip(self))]
205-
pub async fn run(mut self) -> Result<(), Error> {
205+
pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
206206
let chain_ids = {
207207
let guard = self.context.lock().await;
208-
let mut chain_ids = BTreeSet::from_iter(guard.wallet().chain_ids());
209-
chain_ids.insert(guard.wallet().genesis_admin_chain());
210-
chain_ids
208+
let admin_chain_id = guard.wallet().genesis_admin_chain();
209+
guard
210+
.make_chain_client(admin_chain_id)
211+
.synchronize_from_validators()
212+
.await?;
213+
BTreeSet::from_iter(
214+
guard
215+
.wallet()
216+
.chain_ids()
217+
.into_iter()
218+
.chain([admin_chain_id]),
219+
)
211220
};
212-
self.listen_recursively(chain_ids).await?;
213-
loop {
214-
match self.next_action().await? {
215-
Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
216-
Action::Notification(notification) => {
217-
self.process_notification(notification).await?
221+
222+
Ok(async {
223+
self.listen_recursively(chain_ids).await?;
224+
loop {
225+
match self.next_action().await? {
226+
Action::ProcessInbox(chain_id) => self.maybe_process_inbox(chain_id).await?,
227+
Action::Notification(notification) => {
228+
self.process_notification(notification).await?
229+
}
230+
Action::Stop => break,
218231
}
219-
Action::Stop => break,
220232
}
221-
}
222-
join_all(self.listening.into_values().map(|client| client.stop())).await;
223-
Ok(())
233+
join_all(self.listening.into_values().map(|client| client.stop())).await;
234+
Ok(())
235+
})
224236
}
225237

226238
/// Processes a notification, updating local chains and validators as needed.
@@ -317,6 +329,7 @@ impl<C: ClientContext> ChainListener<C> {
317329
while let Some(chain_id) = chain_ids.pop_first() {
318330
chain_ids.extend(self.listen(chain_id).await?);
319331
}
332+
320333
Ok(())
321334
}
322335

linera-client/src/unit_tests/chain_listener.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -148,12 +148,12 @@ async fn test_chain_listener() -> anyhow::Result<()> {
148148
let context = Arc::new(Mutex::new(context));
149149
let cancellation_token = CancellationToken::new();
150150
let child_token = cancellation_token.child_token();
151-
let handle = linera_base::task::spawn(async move {
152-
ChainListener::new(config, context, storage, child_token)
153-
.run()
154-
.await
155-
.unwrap()
156-
});
151+
let chain_listener = ChainListener::new(config, context, storage, child_token)
152+
.run()
153+
.await
154+
.unwrap();
155+
156+
let handle = linera_base::task::spawn(async move { chain_listener.await.unwrap() });
157157
// Transfer one token to chain 0. The listener should eventually become leader and receive
158158
// the message.
159159
let recipient0 = Account::chain(chain_id0);
@@ -209,15 +209,12 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
209209
let context = Arc::new(Mutex::new(context));
210210
let cancellation_token = CancellationToken::new();
211211
let child_token = cancellation_token.child_token();
212-
let handle = linera_base::task::spawn({
213-
let storage = storage.clone();
214-
async move {
215-
ChainListener::new(config, context, storage, child_token)
216-
.run()
217-
.await
218-
.unwrap()
219-
}
220-
});
212+
let chain_listener = ChainListener::new(config, context, storage.clone(), child_token)
213+
.run()
214+
.await
215+
.unwrap();
216+
217+
let handle = linera_base::task::spawn(async move { chain_listener.await.unwrap() });
221218
// Burn one token.
222219
let certificate = client0
223220
.burn(AccountOwner::CHAIN, Amount::ONE)

linera-faucet/server/src/lib.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -682,7 +682,8 @@ where
682682
self.storage,
683683
cancellation_token.clone(),
684684
)
685-
.run();
685+
.run()
686+
.await?;
686687
let batch_processor_task = batch_processor.run(cancellation_token.clone());
687688
let tcp_listener =
688689
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;

linera-service/src/node_service.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -895,7 +895,9 @@ where
895895
let storage = self.context.lock().await.storage().clone();
896896

897897
let chain_listener =
898-
ChainListener::new(self.config, self.context, storage, cancellation_token).run();
898+
ChainListener::new(self.config, self.context, storage, cancellation_token)
899+
.run()
900+
.await?;
899901
let mut chain_listener = Box::pin(chain_listener).fuse();
900902
let tcp_listener =
901903
tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await?;

linera-web/src/lib.rs

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ This module defines the client API for the Web extension.
88
// We sometimes need functions in this module to be async in order to
99
// ensure the generated code will return a `Promise`.
1010
#![allow(clippy::unused_async)]
11+
#![recursion_limit = "256"]
1112

1213
pub mod signer;
1314

@@ -211,20 +212,24 @@ impl Client {
211212
signer,
212213
)));
213214
let client_context_clone = client_context.clone();
214-
wasm_bindgen_futures::spawn_local(async move {
215-
if let Err(error) = ChainListener::new(
216-
ChainListenerConfig::default(),
217-
client_context_clone,
218-
storage,
219-
tokio_util::sync::CancellationToken::new(),
220-
)
221-
.run()
222-
.boxed_local()
223-
.await
224-
{
225-
tracing::error!("ChainListener error: {error:?}");
215+
let chain_listener = ChainListener::new(
216+
ChainListenerConfig::default(),
217+
client_context_clone,
218+
storage,
219+
tokio_util::sync::CancellationToken::new(),
220+
)
221+
.run()
222+
.boxed_local()
223+
.await?
224+
.boxed_local();
225+
wasm_bindgen_futures::spawn_local(
226+
async move {
227+
if let Err(error) = chain_listener.await {
228+
tracing::error!("ChainListener error: {error:?}");
229+
}
226230
}
227-
});
231+
.boxed_local(),
232+
);
228233
log::info!("Linera Web client successfully initialized");
229234
Ok(Self { client_context })
230235
}

0 commit comments

Comments
 (0)