Skip to content

Commit 27923c7

Browse files
authored
Upload blobs for confirmed certificates separately. (#3108)
## Motivation Including blobs with the gRPC message that contains a block proposal or certificate severely limits the total size of the blobs. (See #3048.) ## Proposal As a first step, remove the blobs from the `handle_confirmed_certificate` functions and messages. Instead, when a validator sees a fully signed confirmed block it creates the blob states in its local storage even if it doesn't have the blobs yet. The client can then upload the blobs one by one, and the validator will accept them. Finally, the client can retry sending the certificate. We don't do this for block proposals or validated blocks yet: These will need to be handled differently, because in these cases the blob has not necessarily been successfully published yet, so we should _not_ create a blob state. Instead, we will put these blobs into a temporary cache. ## Test Plan The existing tests are now using the new flow for confirmed block certificates. ## Release Plan - Nothing to do / These changes follow the usual release cycle. ## Links - Part of #3048 - [reviewer checklist](https://github.com/linera-io/linera-protocol/blob/main/CONTRIBUTING.md#reviewer-checklist)
1 parent cde2c97 commit 27923c7

File tree

30 files changed

+269
-119
lines changed

30 files changed

+269
-119
lines changed

linera-core/src/chain_worker/actor.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ where
114114
/// Process a confirmed block (a commit).
115115
ProcessConfirmedBlock {
116116
certificate: ConfirmedBlockCertificate,
117-
blobs: Vec<Blob>,
118117
#[debug(with = "elide_option")]
119118
notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
120119
#[debug(skip)]
@@ -299,15 +298,13 @@ where
299298
.is_ok(),
300299
ChainWorkerRequest::ProcessConfirmedBlock {
301300
certificate,
302-
blobs,
303301
notify_when_messages_are_delivered,
304302
callback,
305303
} => callback
306304
.send(
307305
self.worker
308306
.process_confirmed_block(
309307
certificate,
310-
&blobs,
311308
notify_when_messages_are_delivered,
312309
)
313310
.await,

linera-core/src/chain_worker/state/attempted_changes.rs

Lines changed: 16 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -239,7 +239,6 @@ where
239239
pub(super) async fn process_confirmed_block(
240240
&mut self,
241241
certificate: ConfirmedBlockCertificate,
242-
blobs: &[Blob],
243242
notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
244243
) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
245244
let executed_block = certificate.executed_block();
@@ -298,33 +297,33 @@ where
298297
);
299298

300299
let required_blob_ids = executed_block.required_blob_ids();
301-
// Verify that no unrelated blobs were provided.
302-
self.state
303-
.check_for_unneeded_blobs(&required_blob_ids, blobs)?;
304-
let blobs = self.state.get_required_blobs(executed_block, blobs).await?;
305-
let blobs = blobs.into_values().collect::<Vec<_>>();
306-
307-
let certificate_hash = certificate.hash();
300+
let blobs_result = self
301+
.state
302+
.get_required_blobs(executed_block, &[])
303+
.await
304+
.map(|blobs| blobs.into_values().collect::<Vec<_>>());
308305

309-
self.state
310-
.storage
311-
.write_blobs_and_certificate(&blobs, &certificate)
312-
.await?;
306+
if let Ok(blobs) = &blobs_result {
307+
self.state
308+
.storage
309+
.write_blobs_and_certificate(blobs, &certificate)
310+
.await?;
311+
}
313312

314313
// Update the blob state with last used certificate hash.
315314
let blob_state = BlobState {
316-
last_used_by: certificate_hash,
315+
last_used_by: certificate.hash(),
317316
chain_id: block.chain_id,
318317
block_height,
319318
epoch: block.epoch,
320319
};
320+
let overwrite = blobs_result.is_ok(); // Overwrite only if we wrote the certificate.
321+
let blob_ids = required_blob_ids.into_iter().collect::<Vec<_>>();
321322
self.state
322323
.storage
323-
.maybe_write_blob_states(
324-
&required_blob_ids.into_iter().collect::<Vec<_>>(),
325-
blob_state,
326-
)
324+
.maybe_write_blob_states(&blob_ids, blob_state, overwrite)
327325
.await?;
326+
blobs_result?;
328327

329328
// Execute the block and update inboxes.
330329
self.state.chain.remove_bundles_from_inboxes(block).await?;

linera-core/src/chain_worker/state/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -237,12 +237,11 @@ where
237237
pub(super) async fn process_confirmed_block(
238238
&mut self,
239239
certificate: ConfirmedBlockCertificate,
240-
blobs: &[Blob],
241240
notify_when_messages_are_delivered: Option<oneshot::Sender<()>>,
242241
) -> Result<(ChainInfoResponse, NetworkActions), WorkerError> {
243242
ChainWorkerStateWithAttemptedChanges::new(self)
244243
.await
245-
.process_confirmed_block(certificate, blobs, notify_when_messages_are_delivered)
244+
.process_confirmed_block(certificate, notify_when_messages_are_delivered)
246245
.await
247246
}
248247

linera-core/src/client/mod.rs

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,9 @@ use linera_chain::{
4545
MessageAction,
4646
},
4747
types::{
48-
CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate, GenericCertificate,
49-
LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock, ValidatedBlockCertificate,
48+
CertificateKind, CertificateValue, ConfirmedBlock, ConfirmedBlockCertificate,
49+
GenericCertificate, LiteCertificate, Timeout, TimeoutCertificate, ValidatedBlock,
50+
ValidatedBlockCertificate,
5051
},
5152
ChainError, ChainExecutionContext, ChainStateView,
5253
};
@@ -395,9 +396,24 @@ where
395396
certificate: GenericCertificate<T>,
396397
blobs: Vec<Blob>,
397398
) -> Result<ChainInfoResponse, LocalNodeError> {
398-
self.local_node
399-
.handle_certificate(certificate, blobs, &self.notifier)
400-
.await
399+
if T::KIND == CertificateKind::Confirmed {
400+
let result = self
401+
.local_node
402+
.handle_certificate(certificate.clone(), vec![], &self.notifier)
403+
.await;
404+
if let Err(LocalNodeError::BlobsNotFound(_)) = &result {
405+
self.local_node.store_blobs(&blobs).await?;
406+
return self
407+
.local_node
408+
.handle_certificate(certificate, vec![], &self.notifier)
409+
.await;
410+
}
411+
result
412+
} else {
413+
self.local_node
414+
.handle_certificate(certificate, blobs, &self.notifier)
415+
.await
416+
}
401417
}
402418
}
403419

@@ -1751,7 +1767,7 @@ where
17511767
if let Some(cert) = info.manager.requested_locked {
17521768
let hash = cert.hash();
17531769
let blobs = info.manager.locked_blobs.clone();
1754-
if let Err(err) = self.client.handle_certificate(*cert.clone(), blobs).await {
1770+
if let Err(err) = self.process_certificate(*cert.clone(), blobs).await {
17551771
warn!(
17561772
"Skipping certificate {hash} from validator {}: {err}",
17571773
remote_node.name

linera-core/src/local_node.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,13 @@ where
210210
.collect())
211211
}
212212

213+
/// Writes the given blobs to storage if there is an appropriate blob state.
214+
pub async fn store_blobs(&self, blobs: &[Blob]) -> Result<(), LocalNodeError> {
215+
let storage = self.storage_client();
216+
storage.maybe_write_blobs(blobs).await?;
217+
Ok(())
218+
}
219+
213220
/// Returns a read-only view of the [`ChainStateView`] of a chain referenced by its
214221
/// [`ChainId`].
215222
///

linera-core/src/node.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ pub trait ValidatorNode {
7171
async fn handle_confirmed_certificate(
7272
&self,
7373
certificate: GenericCertificate<ConfirmedBlock>,
74-
blobs: Vec<Blob>,
7574
delivery: CrossChainMessageDelivery,
7675
) -> Result<ChainInfoResponse, NodeError>;
7776

@@ -103,6 +102,10 @@ pub trait ValidatorNode {
103102
/// Subscribes to receiving notifications for a collection of chains.
104103
async fn subscribe(&self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
105104

105+
// Uploads a blob content. Returns an error if the validator has not seen a
106+
// certificate using this blob.
107+
async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError>;
108+
106109
async fn download_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, NodeError>;
107110

108111
async fn download_certificate(

linera-core/src/remote_node.rs

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use std::collections::HashSet;
55

66
use custom_debug_derive::Debug;
7-
use futures::{stream::FuturesUnordered, StreamExt};
7+
use futures::{future::try_join_all, stream::FuturesUnordered, StreamExt};
88
use linera_base::{
99
crypto::CryptoHash,
1010
data_types::{Blob, BlockHeight},
@@ -68,13 +68,12 @@ impl<N: ValidatorNode> RemoteNode<N> {
6868
pub(crate) async fn handle_confirmed_certificate(
6969
&self,
7070
certificate: ConfirmedBlockCertificate,
71-
blobs: Vec<Blob>,
7271
delivery: CrossChainMessageDelivery,
7372
) -> Result<Box<ChainInfo>, NodeError> {
7473
let chain_id = certificate.inner().chain_id();
7574
let response = self
7675
.node
77-
.handle_confirmed_certificate(certificate, blobs, delivery)
76+
.handle_confirmed_certificate(certificate, delivery)
7877
.await?;
7978
self.check_and_return_info(response, chain_id)
8079
}
@@ -148,7 +147,7 @@ impl<N: ValidatorNode> RemoteNode<N> {
148147
_ => return result,
149148
}
150149
}
151-
self.handle_confirmed_certificate(certificate.clone(), vec![], delivery)
150+
self.handle_confirmed_certificate(certificate.clone(), delivery)
152151
.await
153152
}
154153

@@ -217,6 +216,16 @@ impl<N: ValidatorNode> RemoteNode<N> {
217216
Ok(certificate)
218217
}
219218

219+
/// Uploads the blobs to the validator.
220+
#[instrument(level = "trace")]
221+
pub(crate) async fn upload_blobs(&self, blobs: Vec<Blob>) -> Result<(), NodeError> {
222+
let tasks = blobs
223+
.into_iter()
224+
.map(|blob| self.node.upload_blob(blob.into()));
225+
try_join_all(tasks).await?;
226+
Ok(())
227+
}
228+
220229
/// Tries to download the given blobs from this node. Returns `None` if not all could be found.
221230
#[instrument(level = "trace")]
222231
pub(crate) async fn try_download_blobs(&self, blob_ids: &[BlobId]) -> Option<Vec<Blob>> {

linera-core/src/unit_tests/test_utils.rs

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -148,11 +148,10 @@ where
148148
async fn handle_confirmed_certificate(
149149
&self,
150150
certificate: GenericCertificate<ConfirmedBlock>,
151-
blobs: Vec<Blob>,
152151
_delivery: CrossChainMessageDelivery,
153152
) -> Result<ChainInfoResponse, NodeError> {
154153
self.spawn_and_receive(move |validator, sender| {
155-
validator.do_handle_certificate(certificate, blobs, sender)
154+
validator.do_handle_certificate(certificate, vec![], sender)
156155
})
157156
.await
158157
}
@@ -180,6 +179,11 @@ where
180179
Ok(CryptoHash::test_hash("genesis config"))
181180
}
182181

182+
async fn upload_blob(&self, content: BlobContent) -> Result<BlobId, NodeError> {
183+
self.spawn_and_receive(move |validator, sender| validator.do_upload_blob(content, sender))
184+
.await
185+
}
186+
183187
async fn download_blob_content(&self, blob_id: BlobId) -> Result<BlobContent, NodeError> {
184188
self.spawn_and_receive(move |validator, sender| {
185189
validator.do_download_blob_content(blob_id, sender)
@@ -456,6 +460,23 @@ where
456460
sender.send(Ok(stream))
457461
}
458462

463+
async fn do_upload_blob(
464+
self,
465+
content: BlobContent,
466+
sender: oneshot::Sender<Result<BlobId, NodeError>>,
467+
) -> Result<(), Result<BlobId, NodeError>> {
468+
let validator = self.client.lock().await;
469+
let blob = Blob::new(content);
470+
let id = blob.id();
471+
let storage = validator.state.storage_client();
472+
let result = match storage.maybe_write_blobs(&[blob]).await {
473+
Ok(has_state) if has_state.first() == Some(&true) => Ok(id),
474+
Ok(_) => Err(NodeError::BlobsNotFound(vec![id])),
475+
Err(error) => Err(error.into()),
476+
};
477+
sender.send(result)
478+
}
479+
459480
async fn do_download_blob_content(
460481
self,
461482
blob_id: BlobId,

linera-core/src/unit_tests/wasm_worker_tests.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
use std::collections::BTreeSet;
1414

15+
use assert_matches::assert_matches;
1516
use linera_base::{
1617
crypto::KeyPair,
1718
data_types::{
@@ -46,6 +47,7 @@ use linera_views::{memory::MemoryStore, views::CryptoHashView};
4647
use test_case::test_case;
4748

4849
use super::{init_worker_with_chains, make_certificate};
50+
use crate::worker::WorkerError;
4951

5052
#[cfg_attr(feature = "wasmer", test_case(WasmRuntime::Wasmer ; "wasmer"))]
5153
#[cfg_attr(feature = "wasmtime", test_case(WasmRuntime::Wasmtime ; "wasmtime"))]
@@ -103,7 +105,7 @@ where
103105
let creator_key_pair = KeyPair::generate();
104106
let creator_chain = ChainDescription::Root(2);
105107
let (committee, worker) = init_worker_with_chains(
106-
storage,
108+
storage.clone(),
107109
vec![
108110
(publisher_chain, publisher_key_pair.public(), Amount::ZERO),
109111
(creator_chain, creator_key_pair.public(), Amount::ZERO),
@@ -153,12 +155,17 @@ where
153155
));
154156
let publish_certificate = make_certificate(&committee, &worker, publish_block_proposal);
155157

158+
assert_matches!(
159+
worker
160+
.fully_handle_certificate_with_notifications(publish_certificate.clone(), vec![], &())
161+
.await,
162+
Err(WorkerError::BlobsNotFound(_))
163+
);
164+
storage
165+
.write_blobs(&[contract_blob.clone(), service_blob.clone()])
166+
.await?;
156167
let info = worker
157-
.fully_handle_certificate_with_notifications(
158-
publish_certificate.clone(),
159-
vec![contract_blob.clone(), service_blob.clone()],
160-
&(),
161-
)
168+
.fully_handle_certificate_with_notifications(publish_certificate.clone(), vec![], &())
162169
.await
163170
.unwrap()
164171
.info;

linera-core/src/unit_tests/worker_tests.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -728,7 +728,7 @@ where
728728
drop(chain);
729729

730730
worker
731-
.handle_confirmed_certificate(certificate0, vec![], None)
731+
.handle_confirmed_certificate(certificate0, None)
732732
.await?;
733733
let chain = worker.chain_state_view(ChainId::root(1)).await?;
734734
drop(chain);
@@ -858,7 +858,7 @@ where
858858
// Missing earlier blocks
859859
assert_matches!(
860860
worker
861-
.handle_confirmed_certificate(certificate1.clone(), vec![], None)
861+
.handle_confirmed_certificate(certificate1.clone(), None)
862862
.await,
863863
Err(WorkerError::MissingEarlierBlocks { .. })
864864
);
@@ -1088,7 +1088,7 @@ where
10881088
)),
10891089
);
10901090
worker
1091-
.handle_confirmed_certificate(certificate.clone(), vec![], None)
1091+
.handle_confirmed_certificate(certificate.clone(), None)
10921092
.await?;
10931093

10941094
// Then receive the next two messages.
@@ -3922,7 +3922,7 @@ where
39223922
));
39233923
let certificate = make_certificate(&committee, &worker, value);
39243924
worker
3925-
.handle_confirmed_certificate(certificate, vec![], None)
3925+
.handle_confirmed_certificate(certificate, None)
39263926
.await?;
39273927

39283928
for query_context in query_contexts_after_new_block.clone() {

0 commit comments

Comments
 (0)