Skip to content

Commit 9997d9b

Browse files
authored
Don't do heavy syncing of sender chains unless requested. (#4706) (#4720)
Backport of #4706. Merge conflicts: * `linera-client/src/benchmark.rs` * `linera-service/src/cli_wrappers/wallet.rs` ## Motivation `find_received_certificates_from_validator` and `find_received_certificates` can take a lot of time (and memory) if there are a lot of incoming messages. ## Proposal Only do them as part of `synchronize_from_validators`. On notification, `prepare_chain` and when we're about to propose a block, only download what's necessary. Download and process sender certificates in batches, to avoid the high memory usage. Download them in a background task in the node service, with some sleep time in between, to limit interference with the other node service tasks. (Mostly written with Claude Sonnet 4.5.) ## Test Plan CI A new test was added for the updated `prepare_chain`. `test_sparse_sender_chain` already exists, and tests notification handling with a sparse chain. ## Release Plan - These changes should be released in a new SDK. ## Links - PR to main: #4706 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 5b3242c commit 9997d9b

File tree

19 files changed

+746
-211
lines changed

19 files changed

+746
-211
lines changed

CLI.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -729,6 +729,9 @@ Run a GraphQL service to explore and extend the chains of the wallet
729729

730730
Default value: `0`
731731
* `--port <PORT>` — The port on which to run the server
732+
* `--sync-sleep-ms <SYNC_SLEEP_MS>` — Milliseconds to sleep between batches during background certificate synchronization
733+
734+
Default value: `500`
732735

733736

734737

linera-chain/src/chain.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -539,6 +539,33 @@ where
539539
Ok(())
540540
}
541541

542+
/// Collects all missing sender blocks from removed bundles across all inboxes.
543+
/// Returns a map of origin chain IDs to their respective missing block heights.
544+
#[instrument(target = "telemetry_only", skip_all, fields(
545+
chain_id = %self.chain_id()
546+
))]
547+
pub async fn collect_missing_sender_blocks(
548+
&self,
549+
) -> Result<BTreeMap<ChainId, Vec<BlockHeight>>, ChainError> {
550+
let pairs = self.inboxes.try_load_all_entries().await?;
551+
let max_stream_queries = self.context().store().max_stream_queries();
552+
let stream = stream::iter(pairs)
553+
.map(|(origin, inbox)| async move {
554+
let mut missing_heights = Vec::new();
555+
let bundles = inbox.removed_bundles.elements().await?;
556+
for bundle in bundles {
557+
missing_heights.push(bundle.height);
558+
}
559+
Ok::<(ChainId, Vec<BlockHeight>), ChainError>((origin, missing_heights))
560+
})
561+
.buffer_unordered(max_stream_queries);
562+
let results: Vec<(ChainId, Vec<BlockHeight>)> = stream.try_collect().await?;
563+
Ok(results
564+
.into_iter()
565+
.filter(|(_, heights)| !heights.is_empty())
566+
.collect())
567+
}
568+
542569
pub async fn next_block_height_to_receive(
543570
&self,
544571
origin: &ChainId,

linera-chain/src/lib.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ pub enum ChainError {
5656
height: BlockHeight,
5757
},
5858
#[error(
59-
"Message in block proposed to {chain_id:?} does not match the previously received messages from \
59+
"Message in block proposed to {chain_id} does not match the previously received messages from \
6060
origin {origin:?}: was {bundle:?} instead of {previous_bundle:?}"
6161
)]
6262
UnexpectedMessage {
@@ -66,7 +66,7 @@ pub enum ChainError {
6666
previous_bundle: Box<MessageBundle>,
6767
},
6868
#[error(
69-
"Message in block proposed to {chain_id:?} is out of order compared to previous messages \
69+
"Message in block proposed to {chain_id} is out of order compared to previous messages \
7070
from origin {origin:?}: {bundle:?}. Block and height should be at least: \
7171
{next_height}, {next_index}"
7272
)]
@@ -78,7 +78,7 @@ pub enum ChainError {
7878
next_index: u32,
7979
},
8080
#[error(
81-
"Block proposed to {chain_id:?} is attempting to reject protected message \
81+
"Block proposed to {chain_id} is attempting to reject protected message \
8282
{posted_message:?}"
8383
)]
8484
CannotRejectMessage {
@@ -87,7 +87,7 @@ pub enum ChainError {
8787
posted_message: Box<PostedMessage>,
8888
},
8989
#[error(
90-
"Block proposed to {chain_id:?} is attempting to skip a message bundle \
90+
"Block proposed to {chain_id} is attempting to skip a message bundle \
9191
that cannot be skipped: {bundle:?}"
9292
)]
9393
CannotSkipMessage {
@@ -96,7 +96,7 @@ pub enum ChainError {
9696
bundle: Box<MessageBundle>,
9797
},
9898
#[error(
99-
"Incoming message bundle in block proposed to {chain_id:?} has timestamp \
99+
"Incoming message bundle in block proposed to {chain_id} has timestamp \
100100
{bundle_timestamp:}, which is later than the block timestamp {block_timestamp:}."
101101
)]
102102
IncorrectBundleTimestamp {

linera-client/src/benchmark.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,9 @@ impl<Env: Environment> Benchmark<Env> {
126126
let notifier = Arc::new(Notify::new());
127127
let barrier = Arc::new(Barrier::new(num_chains + 1));
128128

129-
let chain_listener_result = chain_listener.run().await;
129+
let chain_listener_result = chain_listener
130+
.run(Some(500)) // Default sync_sleep_ms for benchmarks
131+
.await;
130132

131133
let chain_listener_handle =
132134
tokio::spawn(async move { chain_listener_result?.await }.in_current_span());

linera-client/src/chain_listener.rs

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ pub struct ChainListener<C: ClientContext> {
182182
cancellation_token: CancellationToken,
183183
}
184184

185-
impl<C: ClientContext> ChainListener<C> {
185+
impl<C: ClientContext + 'static> ChainListener<C> {
186186
/// Creates a new chain listener given client chains.
187187
pub fn new(
188188
config: ChainListenerConfig,
@@ -202,13 +202,16 @@ impl<C: ClientContext> ChainListener<C> {
202202

203203
/// Runs the chain listener.
204204
#[instrument(skip(self))]
205-
pub async fn run(mut self) -> Result<impl Future<Output = Result<(), Error>>, Error> {
205+
pub async fn run(
206+
mut self,
207+
sync_sleep_ms: Option<u64>,
208+
) -> Result<impl Future<Output = Result<(), Error>>, Error> {
206209
let chain_ids = {
207210
let guard = self.context.lock().await;
208211
let admin_chain_id = guard.wallet().genesis_admin_chain();
209212
guard
210213
.make_chain_client(admin_chain_id)
211-
.synchronize_from_validators()
214+
.synchronize_chain_state(admin_chain_id)
212215
.await?;
213216
BTreeSet::from_iter(
214217
guard
@@ -219,6 +222,30 @@ impl<C: ClientContext> ChainListener<C> {
219222
)
220223
};
221224

225+
// Start background tasks to sync received certificates for each chain,
226+
// if enabled (sync_sleep_ms is Some).
227+
if let Some(sync_sleep_ms) = sync_sleep_ms {
228+
let context = Arc::clone(&self.context);
229+
let cancellation_token = self.cancellation_token.clone();
230+
for chain_id in &chain_ids {
231+
let context = Arc::clone(&context);
232+
let cancellation_token = cancellation_token.clone();
233+
let chain_id = *chain_id;
234+
drop(linera_base::task::spawn(async move {
235+
if let Err(e) = Self::background_sync_received_certificates(
236+
context,
237+
sync_sleep_ms,
238+
chain_id,
239+
cancellation_token,
240+
)
241+
.await
242+
{
243+
warn!("Background sync failed for chain {chain_id}: {e}");
244+
}
245+
}));
246+
}
247+
}
248+
222249
Ok(async {
223250
self.listen_recursively(chain_ids).await?;
224251
loop {
@@ -333,6 +360,46 @@ impl<C: ClientContext> ChainListener<C> {
333360
Ok(())
334361
}
335362

363+
/// Background task that syncs received certificates in small batches.
364+
/// This discovers unacknowledged sender blocks gradually without overwhelming the system.
365+
#[instrument(skip(context, cancellation_token))]
366+
async fn background_sync_received_certificates(
367+
context: Arc<Mutex<C>>,
368+
sync_sleep_ms: u64,
369+
chain_id: ChainId,
370+
cancellation_token: CancellationToken,
371+
) -> Result<(), Error> {
372+
info!("Starting background certificate sync for chain {chain_id}");
373+
let client = context.lock().await.make_chain_client(chain_id);
374+
375+
loop {
376+
// Check if we should stop.
377+
if cancellation_token.is_cancelled() {
378+
info!("Background sync cancelled for chain {chain_id}");
379+
break;
380+
}
381+
382+
// Sleep to avoid overwhelming the system.
383+
Self::sleep(sync_sleep_ms).await;
384+
385+
// Sync one batch.
386+
match client.sync_received_certificates_batch().await {
387+
Ok(has_more) => {
388+
if !has_more {
389+
info!("Background sync completed for chain {chain_id}");
390+
break;
391+
}
392+
}
393+
Err(e) => {
394+
warn!("Error syncing batch for chain {chain_id}: {e}. Will retry.");
395+
// Continue trying despite errors - validators might be temporarily unavailable.
396+
}
397+
}
398+
}
399+
400+
Ok(())
401+
}
402+
336403
/// Starts listening for notifications about the given chain.
337404
///
338405
/// Returns all publishing chains, that we also need to listen to.

linera-client/src/unit_tests/chain_listener.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async fn test_chain_listener() -> anyhow::Result<()> {
148148
let cancellation_token = CancellationToken::new();
149149
let child_token = cancellation_token.child_token();
150150
let chain_listener = ChainListener::new(config, context, storage, child_token)
151-
.run()
151+
.run(None) // Unit test doesn't need background sync
152152
.await
153153
.unwrap();
154154

@@ -210,7 +210,7 @@ async fn test_chain_listener_admin_chain() -> anyhow::Result<()> {
210210
let cancellation_token = CancellationToken::new();
211211
let child_token = cancellation_token.child_token();
212212
let chain_listener = ChainListener::new(config, context, storage.clone(), child_token)
213-
.run()
213+
.run(None) // Unit test doesn't need background sync
214214
.await
215215
.unwrap();
216216

0 commit comments

Comments
 (0)