Skip to content

Commit 8ed5ddb

Browse files
authored
Upload blobs for validated block certificates in separate messages. (#3153)
## Motivation Ultimately we want to transfer all blobs separately, rather than in a single message. (#3048) This PR is another step towards that goal: The blobs required by a validated block certificate are now uploaded separately, rather than in the same message as the certificate itself. ## Proposal Add a map of missing blobs for the highest-round validated block to the chain state, and a `HandlePendingBlob` endpoint to populate that map. ## Test Plan There are already tests covering different scenarios with validated blocks' blobs. Where necessary, these were updated. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - Closes #3152. - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent 1468fa5 commit 8ed5ddb

File tree

30 files changed

+553
-250
lines changed

30 files changed

+553
-250
lines changed

linera-chain/src/chain.rs

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,13 @@ use futures::stream::{self, StreamExt, TryStreamExt};
1313
use linera_base::{
1414
crypto::CryptoHash,
1515
data_types::{
16-
Amount, ArithmeticError, BlockHeight, OracleResponse, Timestamp, UserApplicationDescription,
16+
Amount, ArithmeticError, Blob, BlockHeight, OracleResponse, Timestamp,
17+
UserApplicationDescription,
1718
},
1819
ensure,
1920
identifiers::{
20-
ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner, StreamId,
21-
UserApplicationId,
21+
BlobId, ChainId, ChannelName, Destination, GenericApplicationId, MessageId, Owner,
22+
StreamId, UserApplicationId,
2223
},
2324
};
2425
use linera_execution::{
@@ -30,6 +31,7 @@ use linera_execution::{
3031
use linera_views::{
3132
context::Context,
3233
log_view::LogView,
34+
map_view::MapView,
3335
queue_view::QueueView,
3436
reentrant_collection_view::ReentrantCollectionView,
3537
register_view::RegisterView,
@@ -46,6 +48,7 @@ use crate::{
4648
inbox::{Cursor, InboxError, InboxStateView},
4749
manager::ChainManager,
4850
outbox::OutboxStateView,
51+
types::ValidatedBlockCertificate,
4952
ChainError, ChainExecutionContext, ExecutionResultExt,
5053
};
5154

@@ -196,6 +199,11 @@ where
196199

197200
/// Consensus state.
198201
pub manager: ChainManager<C>,
202+
/// Pending validated block that is still missing blobs.
203+
#[graphql(skip)]
204+
pub pending_validated_block: RegisterView<C, Option<ValidatedBlockCertificate>>,
205+
/// The incomplete set of blobs for the pending validated block.
206+
pub pending_validated_blobs: MapView<C, BlobId, Option<Blob>>,
199207

200208
/// Hashes of all certified blocks for this sender.
201209
/// This ends with `block_hash` and has length `usize::from(next_block_height)`.
@@ -873,6 +881,9 @@ where
873881
self.execution_state_hash.set(Some(state_hash));
874882
// Last, reset the consensus state based on the current ownership.
875883
let maybe_committee = self.execution_state.system.current_committee().into_iter();
884+
885+
self.pending_validated_blobs.clear();
886+
self.pending_validated_block.set(None);
876887
self.manager.reset(
877888
self.execution_state.system.ownership.get().clone(),
878889
block.height.try_add_one()?,

linera-client/src/client_context.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -930,9 +930,7 @@ where
930930
// Replay the certificates locally.
931931
for certificate in certificates {
932932
// No required certificates from other chains: This is only used with benchmark.
933-
node.handle_certificate(certificate, vec![], &())
934-
.await
935-
.unwrap();
933+
node.handle_certificate(certificate, &()).await.unwrap();
936934
}
937935
// Last update the wallet.
938936
for chain in self.wallet.as_mut().chains_mut() {

linera-core/src/chain_worker/actor.rs

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@ where
106106
/// Process a validated block issued for this multi-owner chain.
107107
ProcessValidatedBlock {
108108
certificate: ValidatedBlockCertificate,
109-
blobs: Vec<Blob>,
110109
#[debug(skip)]
111110
callback: oneshot::Sender<Result<(ChainInfoResponse, NetworkActions, bool), WorkerError>>,
112111
},
@@ -149,6 +148,13 @@ where
149148
callback: oneshot::Sender<Result<Blob, WorkerError>>,
150149
},
151150

151+
/// Handle a blob that belongs to a pending proposal or validated block certificate.
152+
HandlePendingBlob {
153+
blob: Blob,
154+
#[debug(skip)]
155+
callback: oneshot::Sender<Result<ChainInfoResponse, WorkerError>>,
156+
},
157+
152158
/// Update the received certificate trackers to at least the given values.
153159
UpdateReceivedCertificateTrackers {
154160
new_trackers: BTreeMap<ValidatorName, u64>,
@@ -294,14 +300,9 @@ where
294300
.is_ok(),
295301
ChainWorkerRequest::ProcessValidatedBlock {
296302
certificate,
297-
blobs,
298303
callback,
299304
} => callback
300-
.send(
301-
self.worker
302-
.process_validated_block(certificate, &blobs)
303-
.await,
304-
)
305+
.send(self.worker.process_validated_block(certificate).await)
305306
.is_ok(),
306307
ChainWorkerRequest::ProcessConfirmedBlock {
307308
certificate,
@@ -340,6 +341,9 @@ where
340341
ChainWorkerRequest::DownloadPendingBlob { blob_id, callback } => callback
341342
.send(self.worker.download_pending_blob(blob_id).await)
342343
.is_ok(),
344+
ChainWorkerRequest::HandlePendingBlob { blob, callback } => callback
345+
.send(self.worker.handle_pending_blob(blob).await)
346+
.is_ok(),
343347
ChainWorkerRequest::UpdateReceivedCertificateTrackers {
344348
new_trackers,
345349
callback,

linera-core/src/chain_worker/state/attempted_changes.rs

Lines changed: 42 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,6 @@ where
164164
pub(super) async fn process_validated_block(
165165
&mut self,
166166
certificate: ValidatedBlockCertificate,
167-
blobs: &[Blob],
168167
) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
169168
let executed_block = certificate.executed_block();
170169

@@ -209,11 +208,30 @@ where
209208
.executed_block_values
210209
.insert(Cow::Borrowed(certificate.inner().inner()))
211210
.await;
212-
let required_blob_ids = executed_block.required_blob_ids();
213-
// Verify that no unrelated blobs were provided.
214-
self.state
215-
.check_for_unneeded_blobs(&required_blob_ids, blobs)?;
216-
let blobs = self.state.get_required_blobs(executed_block, blobs).await?;
211+
let maybe_blobs = self
212+
.state
213+
.maybe_get_required_blobs(executed_block, &[])
214+
.await?;
215+
let missing_blob_ids = super::missing_blob_ids(&maybe_blobs);
216+
if !missing_blob_ids.is_empty() {
217+
let chain = &mut self.state.chain;
218+
let pending_validated_block = chain.pending_validated_block.get_mut();
219+
if !pending_validated_block
220+
.as_ref()
221+
.is_some_and(|existing_cert| existing_cert.round > certificate.round)
222+
{
223+
for (blob_id, maybe_blob) in maybe_blobs {
224+
chain.pending_validated_blobs.insert(&blob_id, maybe_blob)?;
225+
}
226+
*pending_validated_block = Some(certificate);
227+
self.save().await?;
228+
}
229+
return Err(WorkerError::BlobsNotFound(missing_blob_ids));
230+
}
231+
let blobs = maybe_blobs
232+
.into_iter()
233+
.filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
234+
.collect();
217235
let old_round = self.state.chain.manager.current_round();
218236
self.state.chain.manager.create_final_vote(
219237
certificate,
@@ -560,6 +578,24 @@ where
560578
Ok(())
561579
}
562580

581+
pub(super) async fn handle_pending_blob(
582+
&mut self,
583+
blob: Blob,
584+
) -> Result<ChainInfoResponse, WorkerError> {
585+
let chain = &mut self.state.chain;
586+
let blob_id = blob.id();
587+
if let Some(maybe_blob) = chain.pending_validated_blobs.get_mut(&blob_id).await? {
588+
if maybe_blob.is_none() {
589+
*maybe_blob = Some(blob);
590+
}
591+
}
592+
self.save().await?;
593+
Ok(ChainInfoResponse::new(
594+
&self.state.chain,
595+
self.state.config.key_pair(),
596+
))
597+
}
598+
563599
/// Stores the chain state in persistent storage.
564600
///
565601
/// Waits until the [`ChainStateView`] is no longer shared before persisting the changes.

linera-core/src/chain_worker/state/mod.rs

Lines changed: 57 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,10 @@ where
225225
pub(super) async fn process_validated_block(
226226
&mut self,
227227
certificate: ValidatedBlockCertificate,
228-
blobs: &[Blob],
229228
) -> Result<(ChainInfoResponse, NetworkActions, bool), WorkerError> {
230229
ChainWorkerStateWithAttemptedChanges::new(self)
231230
.await
232-
.process_validated_block(certificate, blobs)
231+
.process_validated_block(certificate)
233232
.await
234233
}
235234

@@ -300,6 +299,17 @@ where
300299
maybe_blob.ok_or_else(|| WorkerError::BlobsNotFound(vec![blob_id]))
301300
}
302301

302+
/// Adds the blob to pending blocks or validated block certificates that are missing it.
303+
pub(super) async fn handle_pending_blob(
304+
&mut self,
305+
blob: Blob,
306+
) -> Result<ChainInfoResponse, WorkerError> {
307+
ChainWorkerStateWithAttemptedChanges::new(&mut *self)
308+
.await
309+
.handle_pending_blob(blob)
310+
.await
311+
}
312+
303313
/// Ensures that the current chain is active, returning an error otherwise.
304314
fn ensure_is_active(&mut self) -> Result<(), WorkerError> {
305315
if !self.knows_chain_is_active {
@@ -309,61 +319,57 @@ where
309319
Ok(())
310320
}
311321

312-
/// Returns an error if unrelated blobs were provided.
313-
fn check_for_unneeded_blobs(
314-
&self,
315-
required_blob_ids: &HashSet<BlobId>,
316-
blobs: &[Blob],
317-
) -> Result<(), WorkerError> {
318-
// Find all certificates containing blobs used when executing this block.
319-
for blob in blobs {
320-
let blob_id = blob.id();
321-
ensure!(
322-
required_blob_ids.contains(&blob_id),
323-
WorkerError::UnneededBlob { blob_id }
324-
);
325-
}
326-
327-
Ok(())
328-
}
329-
330322
/// Returns the blobs required by the given executed block. The ones that are not passed in
331323
/// are read from the chain manager or from storage.
332324
async fn get_required_blobs(
333325
&self,
334326
executed_block: &ExecutedBlock,
335327
blobs: &[Blob],
336328
) -> Result<BTreeMap<BlobId, Blob>, WorkerError> {
337-
let mut blob_ids = executed_block.required_blob_ids();
338-
let mut found_blobs = BTreeMap::new();
329+
let maybe_blobs = self.maybe_get_required_blobs(executed_block, blobs).await?;
330+
let not_found_blob_ids = missing_blob_ids(&maybe_blobs);
331+
ensure!(
332+
not_found_blob_ids.is_empty(),
333+
WorkerError::BlobsNotFound(not_found_blob_ids)
334+
);
335+
Ok(maybe_blobs
336+
.into_iter()
337+
.filter_map(|(blob_id, maybe_blob)| Some((blob_id, maybe_blob?)))
338+
.collect())
339+
}
339340

340-
for blob in blobs {
341-
if blob_ids.remove(&blob.id()) {
342-
found_blobs.insert(blob.id(), blob.clone());
341+
/// Returns the blobs required by the given executed block. The ones that are not passed in
342+
/// are read from the chain manager or from storage.
343+
async fn maybe_get_required_blobs(
344+
&self,
345+
executed_block: &ExecutedBlock,
346+
provided_blobs: &[Blob],
347+
) -> Result<BTreeMap<BlobId, Option<Blob>>, WorkerError> {
348+
let required_blob_ids = executed_block.required_blob_ids().into_iter();
349+
let mut maybe_blobs = BTreeMap::from_iter(required_blob_ids.map(|blob_id| (blob_id, None)));
350+
351+
for blob in provided_blobs {
352+
if let Some(maybe_blob) = maybe_blobs.get_mut(&blob.id()) {
353+
*maybe_blob = Some(blob.clone());
343354
}
344355
}
345-
let mut missing_blob_ids = Vec::new();
346-
for blob_id in blob_ids {
347-
if let Some(blob) = self.chain.manager.pending_blob(&blob_id).await? {
348-
found_blobs.insert(blob_id, blob);
349-
} else {
350-
missing_blob_ids.push(blob_id);
356+
for (blob_id, maybe_blob) in &mut maybe_blobs {
357+
if maybe_blob.is_some() {
358+
continue;
359+
}
360+
if let Some(blob) = self.chain.manager.pending_blob(blob_id).await? {
361+
*maybe_blob = Some(blob);
362+
} else if let Some(Some(blob)) = self.chain.pending_validated_blobs.get(blob_id).await?
363+
{
364+
*maybe_blob = Some(blob);
351365
}
352366
}
367+
let missing_blob_ids = missing_blob_ids(&maybe_blobs);
353368
let blobs_from_storage = self.storage.read_blobs(&missing_blob_ids).await?;
354-
let mut not_found_blob_ids = Vec::new();
355369
for (blob_id, maybe_blob) in missing_blob_ids.into_iter().zip(blobs_from_storage) {
356-
if let Some(blob) = maybe_blob {
357-
found_blobs.insert(blob_id, blob);
358-
} else {
359-
not_found_blob_ids.push(blob_id);
360-
}
370+
maybe_blobs.insert(blob_id, maybe_blob);
361371
}
362-
ensure!(
363-
not_found_blob_ids.is_empty(),
364-
WorkerError::BlobsNotFound(not_found_blob_ids)
365-
);
366-
Ok(found_blobs)
372+
Ok(maybe_blobs)
367373
}
368374

369375
/// Adds any newly created chains to the set of `tracked_chains`, if the parent chain is
@@ -542,6 +548,15 @@ where
542548
}
543549
}
544550

551+
/// Returns the keys whose value is `None`.
552+
fn missing_blob_ids(maybe_blobs: &BTreeMap<BlobId, Option<Blob>>) -> Vec<BlobId> {
553+
maybe_blobs
554+
.iter()
555+
.filter(|(_, maybe_blob)| maybe_blob.is_none())
556+
.map(|(chain_id, _)| *chain_id)
557+
.collect()
558+
}
559+
545560
/// Returns an error if the block is not at the expected epoch.
546561
fn check_block_epoch(chain_epoch: Epoch, block: &Block) -> Result<(), WorkerError> {
547562
ensure!(

linera-core/src/client/mod.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -397,24 +397,27 @@ where
397397
certificate: GenericCertificate<T>,
398398
blobs: Vec<Blob>,
399399
) -> Result<ChainInfoResponse, LocalNodeError> {
400-
if T::KIND == CertificateKind::Confirmed {
401-
let result = self
400+
let chain_id = certificate.inner().chain_id();
401+
let result = self
402+
.local_node
403+
.handle_certificate(certificate.clone(), &self.notifier)
404+
.await;
405+
if let Err(LocalNodeError::BlobsNotFound(_)) = &result {
406+
match T::KIND {
407+
CertificateKind::Confirmed => self.local_node.store_blobs(&blobs).await?,
408+
CertificateKind::Validated => {
409+
self.local_node
410+
.handle_pending_blobs(chain_id, blobs)
411+
.await?
412+
}
413+
CertificateKind::Timeout => return result,
414+
}
415+
return self
402416
.local_node
403-
.handle_certificate(certificate.clone(), vec![], &self.notifier)
417+
.handle_certificate(certificate, &self.notifier)
404418
.await;
405-
if let Err(LocalNodeError::BlobsNotFound(_)) = &result {
406-
self.local_node.store_blobs(&blobs).await?;
407-
return self
408-
.local_node
409-
.handle_certificate(certificate, vec![], &self.notifier)
410-
.await;
411-
}
412-
result
413-
} else {
414-
self.local_node
415-
.handle_certificate(certificate, blobs, &self.notifier)
416-
.await
417419
}
420+
result
418421
}
419422
}
420423

0 commit comments

Comments
 (0)