Skip to content

Commit f0958d7

Browse files
authored
Simplify storage of BlobStates (#4053)
## Motivation * Improve code readability * Fix a particular failure path for clients ## Proposal * Remove the mysterious `overwrite` argument to `write_blob_states` * Do not record last_used_by in the blob state if the certificate is not yet in storage * Removed unused APIs ## Test Plan CI ## Release Plan - Nothing to do / These changes follow the usual release cycle.
1 parent cf90768 commit f0958d7

File tree

8 files changed

+43
-82
lines changed

8 files changed

+43
-82
lines changed

linera-chain/src/block.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -139,12 +139,12 @@ impl ConfirmedBlock {
139139
}
140140

141141
/// Returns a blob state that applies to all blobs used by this block.
142-
pub fn to_blob_state(&self) -> BlobState {
142+
pub fn to_blob_state(&self, is_stored_block: bool) -> BlobState {
143143
BlobState {
144-
last_used_by: self.0.hash(),
144+
last_used_by: is_stored_block.then_some(self.0.hash()),
145145
chain_id: self.chain_id(),
146146
block_height: self.height(),
147-
epoch: self.epoch(),
147+
epoch: is_stored_block.then_some(self.epoch()),
148148
}
149149
}
150150
}

linera-core/src/chain_worker/state/attempted_changes.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,12 +340,11 @@ where
340340
}
341341

342342
// Update the blob state with last used certificate hash.
343-
let blob_state = certificate.value().to_blob_state();
344-
let overwrite = blobs_result.is_ok(); // Overwrite only if we wrote the certificate.
343+
let blob_state = certificate.value().to_blob_state(blobs_result.is_ok());
345344
let blob_ids = required_blob_ids.into_iter().collect::<Vec<_>>();
346345
self.state
347346
.storage
348-
.maybe_write_blob_states(&blob_ids, blob_state, overwrite)
347+
.maybe_write_blob_states(&blob_ids, blob_state)
349348
.await?;
350349
let mut blobs = blobs_result?
351350
.into_iter()

linera-core/src/unit_tests/test_utils.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,9 @@ where
596596
Err(err) => Err(err),
597597
Ok(blob_state) => match blob_state {
598598
None => Err(NodeError::BlobsNotFound(vec![blob_id])),
599-
Some(blob_state) => Ok(blob_state.last_used_by),
599+
Some(blob_state) => blob_state
600+
.last_used_by
601+
.ok_or_else(|| NodeError::BlobsNotFound(vec![blob_id])),
600602
},
601603
};
602604

linera-execution/src/lib.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1283,14 +1283,16 @@ impl From<Vec<u8>> for QueryResponse {
12831283
/// The state of a blob of binary data.
12841284
#[derive(Eq, PartialEq, Debug, Hash, Clone, Serialize, Deserialize)]
12851285
pub struct BlobState {
1286-
/// Hash of the last `Certificate` that published or used this blob.
1287-
pub last_used_by: CryptoHash,
1286+
/// Hash of the last `Certificate` that published or used this blob. If empty, the
1287+
/// blob is known to be published by a confirmed certificate but we may not have fully
1288+
/// processed this certificate just yet.
1289+
pub last_used_by: Option<CryptoHash>,
12881290
/// The `ChainId` of the chain that published the change
12891291
pub chain_id: ChainId,
12901292
/// The `BlockHeight` of the chain that published the change
12911293
pub block_height: BlockHeight,
1292-
/// Epoch of the `last_used_by` certificate.
1293-
pub epoch: Epoch,
1294+
/// Epoch of the `last_used_by` certificate (if any).
1295+
pub epoch: Option<Epoch>,
12941296
}
12951297

12961298
/// The runtime to use for running the application.

linera-service/src/proxy/grpc.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -632,7 +632,10 @@ where
632632
.map_err(Self::error_to_status)?;
633633
let blob_state =
634634
blob_state.ok_or_else(|| Status::not_found(format!("Blob not found {}", blob_id)))?;
635-
Ok(Response::new(blob_state.last_used_by.into()))
635+
let last_used_by = blob_state
636+
.last_used_by
637+
.ok_or_else(|| Status::not_found(format!("Blob not found {}", blob_id)))?;
638+
Ok(Response::new(last_used_by.into()))
636639
}
637640

638641
#[instrument(skip_all, err(level = Level::WARN))]

linera-service/src/proxy/main.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,8 +370,11 @@ where
370370
BlobLastUsedBy(blob_id) => {
371371
let blob_state = self.storage.read_blob_state(*blob_id).await?;
372372
let blob_state = blob_state.ok_or_else(|| anyhow!("Blob not found {}", blob_id))?;
373+
let last_used_by = blob_state
374+
.last_used_by
375+
.ok_or_else(|| anyhow!("Blob not found {}", blob_id))?;
373376
Ok(Some(RpcMessage::BlobLastUsedByResponse(Box::new(
374-
blob_state.last_used_by,
377+
last_used_by,
375378
))))
376379
}
377380
MissingBlobIds(blob_ids) => Ok(Some(RpcMessage::MissingBlobIdsResponse(

linera-storage/src/db_storage.rs

Lines changed: 18 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use dashmap::DashMap;
99
use linera_base::prometheus_util::MeasureLatency as _;
1010
use linera_base::{
1111
crypto::CryptoHash,
12-
data_types::{Blob, Epoch, NetworkDescription, TimeDelta, Timestamp},
12+
data_types::{Blob, NetworkDescription, TimeDelta, Timestamp},
1313
identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
1414
};
1515
use linera_chain::{
@@ -696,35 +696,13 @@ where
696696
Ok(())
697697
}
698698

699-
async fn maybe_write_blob_state(
700-
&self,
701-
blob_id: BlobId,
702-
blob_state: BlobState,
703-
) -> Result<Epoch, ViewError> {
704-
let current_blob_state = self.read_blob_state(blob_id).await?;
705-
let (should_write, latest_epoch) = match current_blob_state {
706-
Some(current_blob_state) => (
707-
current_blob_state.epoch < blob_state.epoch,
708-
current_blob_state.epoch.max(blob_state.epoch),
709-
),
710-
None => (true, blob_state.epoch),
711-
};
712-
713-
if should_write {
714-
self.write_blob_state(blob_id, &blob_state).await?;
715-
}
716-
717-
Ok(latest_epoch)
718-
}
719-
720699
async fn maybe_write_blob_states(
721700
&self,
722701
blob_ids: &[BlobId],
723702
blob_state: BlobState,
724-
overwrite: bool,
725-
) -> Result<Vec<Epoch>, ViewError> {
703+
) -> Result<(), ViewError> {
726704
if blob_ids.is_empty() {
727-
return Ok(Vec::new());
705+
return Ok(());
728706
}
729707
let blob_state_keys = blob_ids
730708
.iter()
@@ -734,36 +712,22 @@ where
734712
.store
735713
.read_multi_values::<BlobState>(blob_state_keys)
736714
.await?;
737-
let mut latest_epochs = Vec::new();
738715
let mut batch = Batch::new();
739-
let mut need_write = false;
740716
for (maybe_blob_state, blob_id) in maybe_blob_states.iter().zip(blob_ids) {
741-
let (should_write, latest_epoch) = match maybe_blob_state {
742-
None => (true, blob_state.epoch),
743-
Some(current_blob_state) => (
744-
overwrite && current_blob_state.epoch < blob_state.epoch,
745-
current_blob_state.epoch.max(blob_state.epoch),
746-
),
747-
};
748-
if should_write {
749-
batch.add_blob_state(*blob_id, &blob_state)?;
750-
need_write = true;
717+
match maybe_blob_state {
718+
None => {
719+
batch.add_blob_state(*blob_id, &blob_state)?;
720+
}
721+
Some(state) => {
722+
if state.epoch < blob_state.epoch {
723+
batch.add_blob_state(*blob_id, &blob_state)?;
724+
}
725+
}
751726
}
752-
latest_epochs.push(latest_epoch);
753-
}
754-
if need_write {
755-
self.write_batch(batch).await?;
756727
}
757-
Ok(latest_epochs)
758-
}
759-
760-
async fn write_blob_state(
761-
&self,
762-
blob_id: BlobId,
763-
blob_state: &BlobState,
764-
) -> Result<(), ViewError> {
765-
let mut batch = Batch::new();
766-
batch.add_blob_state(blob_id, blob_state)?;
728+
// We tolerate race conditions because two active chains are likely to
729+
// be both from the latest epoch, and otherwise failing to pick the
730+
// more recent blob state has limited impact.
767731
self.write_batch(batch).await?;
768732
Ok(())
769733
}
@@ -997,6 +961,9 @@ where
997961
}
998962

999963
async fn write_batch(&self, batch: Batch) -> Result<(), ViewError> {
964+
if batch.key_value_bytes.is_empty() {
965+
return Ok(());
966+
}
1000967
let mut futures = Vec::new();
1001968
for (key, bytes) in batch.key_value_bytes.into_iter() {
1002969
let store = self.store.clone();

linera-storage/src/lib.rs

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ use dashmap::{mapref::entry::Entry, DashMap};
1414
use linera_base::{
1515
crypto::CryptoHash,
1616
data_types::{
17-
ApplicationDescription, Blob, ChainDescription, CompressedBytecode, Epoch,
18-
NetworkDescription, TimeDelta, Timestamp,
17+
ApplicationDescription, Blob, ChainDescription, CompressedBytecode, NetworkDescription,
18+
TimeDelta, Timestamp,
1919
},
2020
identifiers::{ApplicationId, BlobId, ChainId, EventId, IndexAndEvent, StreamId},
2121
vm::VmRuntime,
@@ -115,31 +115,16 @@ pub trait Storage: Sized {
115115
certificate: &ConfirmedBlockCertificate,
116116
) -> Result<(), ViewError>;
117117

118-
/// Writes the given blob state.
119-
async fn write_blob_state(
120-
&self,
121-
blob_id: BlobId,
122-
blob_state: &BlobState,
123-
) -> Result<(), ViewError>;
124-
125118
/// Writes the given blobs, but only if they already have a blob state. Returns `true` for the
126119
/// blobs that were written.
127120
async fn maybe_write_blobs(&self, blobs: &[Blob]) -> Result<Vec<bool>, ViewError>;
128121

129-
/// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
130-
async fn maybe_write_blob_state(
131-
&self,
132-
blob_id: BlobId,
133-
blob_state: BlobState,
134-
) -> Result<Epoch, ViewError>;
135-
136122
/// Attempts to write the given blob state. Returns the latest `Epoch` to have used this blob.
137123
async fn maybe_write_blob_states(
138124
&self,
139125
blob_ids: &[BlobId],
140126
blob_state: BlobState,
141-
overwrite: bool,
142-
) -> Result<Vec<Epoch>, ViewError>;
127+
) -> Result<(), ViewError>;
143128

144129
/// Writes several blobs.
145130
async fn write_blobs(&self, blobs: &[Blob]) -> Result<(), ViewError>;

0 commit comments

Comments
 (0)