Skip to content

Commit a3adba8

Browse files
committed
Address more review comments
1 parent 115c8a0 commit a3adba8

File tree

6 files changed

+50
-92
lines changed

6 files changed

+50
-92
lines changed

crates/walrus-sdk/src/client.rs

Lines changed: 39 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,13 @@ struct SendBlobOptions<'a> {
206206
initial_completed_weight: Option<&'a HashMap<BlobId, usize>>,
207207
}
208208

209+
/// Pending upload state carried through a single reserve+store execution.
210+
#[derive(Debug, Default, Clone)]
211+
struct PendingUploadContext {
212+
initial_upload_weight: Option<HashMap<BlobId, usize>>,
213+
initial_failed_nodes: Option<HashMap<BlobId, Vec<NodeIndex>>>,
214+
}
215+
209216
struct PendingUploadHandle<'a> {
210217
cancel: CancellationToken,
211218
future: PendingUploadFuture<'a>,
@@ -1019,7 +1026,7 @@ impl WalrusNodeClient<SuiContractClient> {
10191026
return Ok(vec![]);
10201027
}
10211028

1022-
let mut store_args = self.pending_upload_store_args(store_args);
1029+
let store_args = self.pending_upload_store_args(store_args);
10231030

10241031
tracing::info!(
10251032
"writing {blobs_count} blob{} to Walrus",
@@ -1069,12 +1076,8 @@ impl WalrusNodeClient<SuiContractClient> {
10691076
tracing::debug!(?registered_blobs);
10701077
store_args.maybe_observe_store_operation(store_op_duration);
10711078

1072-
store_args = self.apply_pending_upload_outcome(
1073-
store_args,
1074-
pending_upload_result,
1075-
&pending_blobs,
1076-
&committees,
1077-
);
1079+
let pending_context =
1080+
self.apply_pending_upload_outcome(pending_upload_result, &pending_blobs, &committees);
10781081

10791082
let (mut final_result, blobs_awaiting_upload, mut blobs_pending_certify_and_extend) =
10801083
Self::partition_registered_blobs(registered_blobs, blobs_count);
@@ -1097,9 +1100,11 @@ impl WalrusNodeClient<SuiContractClient> {
10971100
// has changed in the meantime.
10981101
// This operation can be safely interrupted as it does not require a wallet.
10991102
blobs_with_certificates = self
1100-
.await_while_checking_notification(
1101-
self.get_all_blob_certificates(blobs_awaiting_upload, &store_args),
1102-
)
1103+
.await_while_checking_notification(self.get_all_blob_certificates(
1104+
blobs_awaiting_upload,
1105+
&store_args,
1106+
&pending_context,
1107+
))
11031108
.await?;
11041109

11051110
debug_assert_eq!(blobs_with_certificates.len(), num_to_be_certified);
@@ -1141,7 +1146,7 @@ impl WalrusNodeClient<SuiContractClient> {
11411146
encoded_blobs_with_status: &[WalrusStoreBlobMaybeFinished<BlobWithStatus>],
11421147
store_args: &StoreArgs,
11431148
) -> Vec<(VerifiedBlobMetadataWithId, Arc<Vec<SliverPair>>)> {
1144-
let optimistic_max_blob_bytes = self.optimistic_upload_max_blob_bytes();
1149+
let pending_upload_max_blob_bytes = self.pending_upload_max_blob_bytes();
11451150
let pending_uploads_enabled = self.config.communication_config.pending_uploads_enabled;
11461151

11471152
let pending: Vec<_> = encoded_blobs_with_status
@@ -1150,16 +1155,16 @@ impl WalrusNodeClient<SuiContractClient> {
11501155
blob.pending_upload_payload(
11511156
pending_uploads_enabled,
11521157
store_args,
1153-
optimistic_max_blob_bytes,
1158+
pending_upload_max_blob_bytes,
11541159
)
11551160
})
11561161
.collect();
11571162

11581163
tracing::debug!(
11591164
pending_candidates = pending.len(),
11601165
pending_enabled = pending_uploads_enabled,
1161-
optimistic_uploads = store_args.store_optimizations.optimistic_uploads_enabled(),
1162-
max_blob_bytes = optimistic_max_blob_bytes,
1166+
pending_uploads = store_args.store_optimizations.pending_uploads_enabled(),
1167+
max_blob_bytes = pending_upload_max_blob_bytes,
11631168
"computed pending upload candidates",
11641169
);
11651170

@@ -1250,11 +1255,11 @@ impl WalrusNodeClient<SuiContractClient> {
12501255

12511256
fn apply_pending_upload_outcome(
12521257
&self,
1253-
mut store_args: StoreArgs,
12541258
mut pending_upload_result: Option<RunOutput<Vec<BlobId>, StoreError>>,
12551259
pending_blobs: &[(VerifiedBlobMetadataWithId, Arc<Vec<SliverPair>>)],
12561260
committees: &ActiveCommittees,
1257-
) -> StoreArgs {
1261+
) -> PendingUploadContext {
1262+
let mut context = PendingUploadContext::default();
12581263
// Cancel any in-flight pending tail. Tail handle is returned during the upload with
12591264
// immediate intent later.
12601265
if let Some(ref mut pending_output) = pending_upload_result
@@ -1264,9 +1269,9 @@ impl WalrusNodeClient<SuiContractClient> {
12641269
}
12651270

12661271
if let Some(pending_output) = pending_upload_result {
1267-
let weight_map = Self::success_weight_by_blob(&pending_output.results, &HashSet::new());
1272+
let weight_map = Self::success_weight_by_blob(&pending_output.results);
12681273
if !weight_map.is_empty() {
1269-
store_args = store_args.with_initial_upload_weight(weight_map);
1274+
context.initial_upload_weight = Some(weight_map);
12701275
}
12711276
let mut failed_nodes = failed_node_indices(&pending_output.results);
12721277
// Add nodes that were not seen in the pending upload results to the failed nodes.
@@ -1286,11 +1291,11 @@ impl WalrusNodeClient<SuiContractClient> {
12861291
.iter()
12871292
.map(|(meta, _)| (*meta.blob_id(), failed_nodes.clone())),
12881293
);
1289-
store_args = store_args.with_initial_failed_nodes(failed_map);
1294+
context.initial_failed_nodes = Some(failed_map);
12901295
}
12911296
}
12921297

1293-
store_args
1298+
context
12941299
}
12951300

12961301
fn partition_registered_blobs(
@@ -1365,6 +1370,7 @@ impl WalrusNodeClient<SuiContractClient> {
13651370
&self,
13661371
blobs_to_be_certified: Vec<WalrusStoreBlobUnfinished<BlobAwaitingUpload>>,
13671372
store_args: &StoreArgs,
1373+
pending_context: &PendingUploadContext,
13681374
) -> ClientResult<Vec<WalrusStoreBlobMaybeFinished<BlobPendingCertifyAndExtend>>> {
13691375
if blobs_to_be_certified.is_empty() {
13701376
return Ok(vec![]);
@@ -1377,8 +1383,13 @@ impl WalrusNodeClient<SuiContractClient> {
13771383
|blob_to_be_certified| {
13781384
let multi_pb = Arc::clone(&multi_pb);
13791385
async move {
1380-
self.get_certificate(blob_to_be_certified, multi_pb.as_ref(), store_args)
1381-
.await
1386+
self.get_certificate(
1387+
blob_to_be_certified,
1388+
multi_pb.as_ref(),
1389+
store_args,
1390+
pending_context,
1391+
)
1392+
.await
13821393
}
13831394
},
13841395
))
@@ -1401,6 +1412,7 @@ impl WalrusNodeClient<SuiContractClient> {
14011412
blob_to_be_certified: WalrusStoreBlobUnfinished<BlobAwaitingUpload>,
14021413
multi_pb: &MultiProgress,
14031414
store_args: &StoreArgs,
1415+
pending_context: &PendingUploadContext,
14041416
) -> ClientResult<WalrusStoreBlobMaybeFinished<BlobPendingCertifyAndExtend>> {
14051417
let committees = self.get_committees().await?;
14061418

@@ -1447,6 +1459,7 @@ impl WalrusNodeClient<SuiContractClient> {
14471459
&blob_object.blob_persistence_type(),
14481460
Some(multi_pb),
14491461
store_args,
1462+
pending_context,
14501463
)
14511464
.await
14521465
}
@@ -1672,7 +1685,7 @@ impl<T> WalrusNodeClient<T> {
16721685
}
16731686
}
16741687

1675-
fn optimistic_upload_max_blob_bytes(&self) -> u64 {
1688+
fn pending_upload_max_blob_bytes(&self) -> u64 {
16761689
self.config
16771690
.communication_config
16781691
.optimistic_upload_max_blob_bytes
@@ -2028,9 +2041,10 @@ impl<T> WalrusNodeClient<T> {
20282041
blob_persistence_type: &BlobPersistenceType,
20292042
multi_pb: Option<&MultiProgress>,
20302043
store_args: &StoreArgs,
2044+
pending_context: &PendingUploadContext,
20312045
) -> ClientResult<ConfirmationCertificate> {
20322046
let blobs = vec![(metadata.clone(), pairs.clone())];
2033-
let target_nodes = if let Some(failed) = store_args
2047+
let target_nodes = if let Some(failed) = pending_context
20342048
.initial_failed_nodes
20352049
.as_ref()
20362050
.and_then(|m| m.get(metadata.blob_id()))
@@ -2048,7 +2062,7 @@ impl<T> WalrusNodeClient<T> {
20482062
quorum_forwarder: store_args.quorum_event_tx.clone(),
20492063
tail_handle_collector: store_args.tail_handle_collector.clone(),
20502064
target_nodes,
2051-
initial_completed_weight: store_args.initial_upload_weight.as_ref(),
2065+
initial_completed_weight: pending_context.initial_upload_weight.as_ref(),
20522066
};
20532067

20542068
let (confirmation_results, upload_result) = self
@@ -2340,15 +2354,11 @@ impl<T> WalrusNodeClient<T> {
23402354

23412355
fn success_weight_by_blob(
23422356
results: &[NodeResult<Vec<BlobId>, StoreError>],
2343-
excluded_nodes: &HashSet<NodeIndex>,
23442357
) -> HashMap<BlobId, usize> {
23452358
let mut totals: HashMap<BlobId, usize> = HashMap::new();
23462359
let mut seen: HashMap<BlobId, HashSet<NodeIndex>> = HashMap::new();
23472360

23482361
for res in results {
2349-
if excluded_nodes.contains(&res.node) {
2350-
continue;
2351-
}
23522362
if let Ok(blob_ids) = &res.result {
23532363
for blob_id in blob_ids {
23542364
let counted_nodes = seen.entry(*blob_id).or_default();

crates/walrus-sdk/src/client/client_types.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -698,19 +698,19 @@ impl WalrusStoreBlobMaybeFinished<BlobWithStatus> {
698698
Ok(self)
699699
}
700700

701-
/// Returns the metadata and sliver pairs needed for an optimistic upload if eligible.
701+
/// Returns the metadata and sliver pairs needed for a pending upload if eligible.
702702
pub fn pending_upload_payload(
703703
&self,
704704
pending_uploads_enabled: bool,
705705
store_args: &StoreArgs,
706-
optimistic_upload_max_blob_bytes: u64,
706+
pending_upload_max_blob_bytes: u64,
707707
) -> Option<(VerifiedBlobMetadataWithId, Arc<Vec<SliverPair>>)> {
708708
let WalrusStoreBlobState::Unfinished(blob_with_status) = &self.state else {
709709
return None;
710710
};
711711
if blob_with_status.status.is_registered()
712712
|| !pending_uploads_enabled
713-
|| !store_args.store_optimizations.optimistic_uploads_enabled()
713+
|| !store_args.store_optimizations.pending_uploads_enabled()
714714
{
715715
return None;
716716
}
@@ -719,7 +719,7 @@ impl WalrusStoreBlobMaybeFinished<BlobWithStatus> {
719719
return None;
720720
};
721721
let unencoded_len = encoded.metadata.metadata().unencoded_length();
722-
if unencoded_len > optimistic_upload_max_blob_bytes {
722+
if unencoded_len > pending_upload_max_blob_bytes {
723723
return None;
724724
}
725725

crates/walrus-sdk/src/client/store_args.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@ use tokio::{
1515
use walrus_core::{DEFAULT_ENCODING, EncodingType, EpochCount};
1616
use walrus_sui::client::{BlobPersistence, PostStoreAction};
1717

18-
use super::{
19-
communication::node::NodeIndex,
20-
metrics::ClientMetrics,
21-
upload_relay_client::UploadRelayClient,
22-
};
18+
use super::{metrics::ClientMetrics, upload_relay_client::UploadRelayClient};
2319
use crate::{
2420
client::upload_relay_client::UploadRelayClientError,
2521
store_optimizations::StoreOptimizations,
@@ -76,11 +72,6 @@ pub struct StoreArgs {
7672
pub tail_handle_collector: Option<Arc<Mutex<Vec<JoinHandle<()>>>>>,
7773
/// Optional channel to forward encoding progress events to.
7874
pub encoding_event_tx: Option<UnboundedSender<EncodingProgressEvent>>,
79-
/// Initial upload weight accumulated before certification (e.g., from pending uploads).
80-
pub initial_upload_weight: Option<std::collections::HashMap<walrus_core::BlobId, usize>>,
81-
/// Nodes that failed during pending uploads, keyed by blob ID, to scope immediate retries.
82-
pub initial_failed_nodes:
83-
Option<std::collections::HashMap<walrus_core::BlobId, Vec<NodeIndex>>>,
8475
}
8576

8677
impl StoreArgs {
@@ -120,8 +111,6 @@ impl StoreArgs {
120111
quorum_event_tx: None,
121112
tail_handle_collector: None,
122113
encoding_event_tx: None,
123-
initial_upload_weight: None,
124-
initial_failed_nodes: None,
125114
}
126115
}
127116

@@ -159,24 +148,6 @@ impl StoreArgs {
159148
self
160149
}
161150

162-
/// Marks blob IDs that already had optimistic uploads scheduled earlier.
163-
pub fn with_initial_upload_weight(
164-
mut self,
165-
weight: std::collections::HashMap<walrus_core::BlobId, usize>,
166-
) -> Self {
167-
self.initial_upload_weight = Some(weight);
168-
self
169-
}
170-
171-
/// Sets the initial failed nodes map.
172-
pub fn with_initial_failed_nodes(
173-
mut self,
174-
failed: std::collections::HashMap<walrus_core::BlobId, Vec<NodeIndex>>,
175-
) -> Self {
176-
self.initial_failed_nodes = Some(failed);
177-
self
178-
}
179-
180151
/// Returns a reference to the upload relay client if present.
181152
pub fn upload_relay_client_ref(&self) -> Option<&UploadRelayClient> {
182153
self.upload_relay_client

crates/walrus-sdk/src/error.rs

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -52,18 +52,6 @@ pub struct SliverStoreError {
5252
pub error: NodeError,
5353
}
5454

55-
impl SliverStoreError {
56-
/// Returns true if the underlying node rejected the write because its pending cache is full.
57-
pub fn is_cache_saturated(&self) -> bool {
58-
self.error.is_cache_saturated()
59-
}
60-
61-
/// Returns true if the underlying node reports the blob is not yet registered.
62-
pub fn is_not_registered(&self) -> bool {
63-
self.error.is_not_registered()
64-
}
65-
}
66-
6755
/// A helper type for the client to handle errors.
6856
pub type ClientResult<T> = Result<T, ClientError>;
6957

crates/walrus-sdk/src/store_optimizations.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,13 @@ impl StoreOptimizations {
7979
self.reuse_resources
8080
}
8181

82+
/// Returns true if pending (optimistic) uploads are enabled.
83+
pub fn pending_uploads_enabled(&self) -> bool {
84+
self.optimistic_uploads
85+
}
86+
8287
/// Returns true if optimistic buffering is enabled.
8388
pub fn optimistic_uploads_enabled(&self) -> bool {
84-
self.optimistic_uploads
89+
self.pending_uploads_enabled()
8590
}
8691
}

crates/walrus-storage-node-client/src/error.rs

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,6 @@ impl NodeError {
6666
.unwrap_or(false)
6767
}
6868

69-
/// Returns true if the error was caused by the pending upload cache being full.
70-
pub fn is_cache_saturated(&self) -> bool {
71-
self.has_reason("CACHE_SATURATED")
72-
}
73-
74-
/// Returns true if the error indicates the blob is not yet registered.
75-
pub fn is_not_registered(&self) -> bool {
76-
self.has_reason("NOT_REGISTERED")
77-
}
78-
7969
/// Returns true if the HTTP error status code associated with number 499
8070
pub fn is_expired(&self) -> bool {
8171
Some(StatusCode::from_u16(499).expect("status code is in a valid range"))
@@ -139,12 +129,6 @@ impl NodeError {
139129
None
140130
}
141131
}
142-
143-
fn has_reason(&self, reason: &str) -> bool {
144-
self.status()
145-
.map(|status| status.is_for_reason(reason, STORAGE_NODE_ERROR_DOMAIN))
146-
.unwrap_or(false)
147-
}
148132
}
149133

150134
/// Errors returned during the communication with a storage node.

0 commit comments

Comments
 (0)