From a55caae279eac70c0437ee1140f7c69e7f7a9a25 Mon Sep 17 00:00:00 2001 From: Ben Kirwin Date: Tue, 2 Dec 2025 17:47:58 -0500 Subject: [PATCH] Remove the Result wrapping from the legacy codec The old, non-columnar Persist encoding returned row-by-row errors, but the expectation is that the schema checking for columnar data happens all at once and up front. Not only does this error path only matter for the increasingly small fraction of legacy data, but all callers were effectively unwrapping errors anyways. This commit switches to passing around data direcly instead of wrapping each key and value in a Result. --- .../catalog/open/builtin_schema_migration.rs | 11 +---- src/adapter/src/coord.rs | 3 +- src/adapter/src/coord/peek.rs | 4 +- .../src/durable/objects/state_update.rs | 12 +---- src/durable-cache/src/lib.rs | 4 +- .../src/maelstrom/txn_list_append_single.rs | 41 +++-------------- src/persist-client/src/batch.rs | 2 +- src/persist-client/src/fetch.rs | 22 ++++----- src/persist-client/src/internal/machine.rs | 2 +- src/persist-client/src/lib.rs | 39 +++++----------- src/persist-client/src/read.rs | 45 +++++-------------- src/persist-client/src/schema.rs | 6 +-- src/persist-client/src/write.rs | 7 +-- src/storage-client/src/storage_collections.rs | 27 +++-------- src/storage-controller/src/collection_mgmt.rs | 11 ++--- src/storage-controller/src/rtr.rs | 4 +- src/storage-operators/src/persist_source.rs | 8 +--- src/storage-operators/src/stats.rs | 12 ++--- src/storage/src/source/reclock/compat.rs | 17 ++----- .../src/storage_state/async_storage_worker.rs | 4 +- src/txn-wal/src/lib.rs | 7 +-- src/txn-wal/src/operator.rs | 5 +-- src/txn-wal/src/txn_cache.rs | 10 +---- src/txn-wal/src/txn_read.rs | 7 +-- 24 files changed, 79 insertions(+), 231 deletions(-) diff --git a/src/adapter/src/catalog/open/builtin_schema_migration.rs b/src/adapter/src/catalog/open/builtin_schema_migration.rs index ba01f1a9269f7..6b4cdfe5360bd 100644 --- a/src/adapter/src/catalog/open/builtin_schema_migration.rs +++ b/src/adapter/src/catalog/open/builtin_schema_migration.rs @@ -987,16 +987,7 @@ where let entries: Vec<_> = updates .into_iter() - .filter_map(|(data, _, _)| { - if let (Ok(key), Ok(val)) = data { - Some((key, val)) - } else { - // If we can't decode the data, it has likely been written by a newer version, so - // we ignore it. - info!("skipping unreadable migration shard entry: {data:?}"); - None - } - }) + .map(|(data, _, _)| data) .filter(move |(key, _)| predicate(key)) .collect(); diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index d3636dce3d9f3..284727c7bfe65 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2552,8 +2552,7 @@ impl Coordinator { // Retract the current contents, spilling into our builder. while let Some(values) = snapshot_cursor.next().await { - for ((key, _val), _t, d) in values { - let key = key.expect("builtin table had errors"); + for (key, _t, d) in values { let d_invert = d.neg(); batch.add(&key, &(), &d_invert).await; } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index ae26ae6db18b4..c9e40b97b367c 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -996,9 +996,7 @@ impl crate::coord::Coordinator { let mut current_batch_size: usize = 0; 'outer: while let Some(rows) = row_cursor.next().await { - for ((key, _val), _ts, diff) in rows { - let source_data = key.expect("decoding error"); - + for ((source_data, _val), _ts, diff) in rows { let row = source_data .0 .expect("we are not sending errors on this code path"); diff --git a/src/catalog/src/durable/objects/state_update.rs b/src/catalog/src/durable/objects/state_update.rs index 5a8e12538d079..823a1f989515b 100644 --- a/src/catalog/src/durable/objects/state_update.rs +++ b/src/catalog/src/durable/objects/state_update.rs @@ -394,11 +394,7 @@ impl StateUpdateKindJson { } /// Version of [`StateUpdateKind`] that is stored directly in persist. -type PersistStateUpdate = ( - (Result, Result<(), String>), - Timestamp, - StorageDiff, -); +type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff); impl TryFrom<&StateUpdate> for Option { type Error = DurableCatalogError; @@ -1017,11 +1013,7 @@ impl RustType for StateUpdateKind { /// diff)` tuple/update we store in persist. impl From for StateUpdate { fn from(kvtd: PersistStateUpdate) -> Self { - let ((key, val), ts, diff) = kvtd; - let (key, ()) = ( - key.expect("persist decoding error"), - val.expect("persist decoding error"), - ); + let ((key, ()), ts, diff) = kvtd; StateUpdate { kind: StateUpdateKindJson::from(key), ts, diff --git a/src/durable-cache/src/lib.rs b/src/durable-cache/src/lib.rs index 333e5f588ef47..c2ecc99b119e6 100644 --- a/src/durable-cache/src/lib.rs +++ b/src/durable-cache/src/lib.rs @@ -169,8 +169,8 @@ impl DurableCache { consolidate_updates(&mut updates); updates.sort_by(|(_, _, d1), (_, _, d2)| d1.cmp(d2)); for ((k, v), t, d) in updates { - let encoded_key = k.unwrap(); - let encoded_val = v.unwrap(); + let encoded_key = k; + let encoded_val = v; let (decoded_key, decoded_val) = C::decode(&encoded_key, &encoded_val); let val = LocalVal { diff --git a/src/persist-cli/src/maelstrom/txn_list_append_single.rs b/src/persist-cli/src/maelstrom/txn_list_append_single.rs index cad0eb33a6da9..1a9980008f282 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_single.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_single.rs @@ -112,11 +112,7 @@ pub struct Transactor { // Keep a long-lived listen, which is incrementally read as we go. Then // assert that it has the same data as the short-lived snapshot+listen in // `read`. This hopefully stresses slightly different parts of the system. - long_lived_updates: Vec<( - (Result, Result), - u64, - i64, - )>, + long_lived_updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>, long_lived_listen: Listen, } @@ -241,11 +237,7 @@ impl Transactor { &mut self, ) -> Result< ( - Vec<( - (Result, Result), - u64, - i64, - )>, + Vec<((MaelstromKey, MaelstromVal), u64, i64)>, Antichain, ), MaelstromError, @@ -382,14 +374,7 @@ impl Transactor { async fn listen_through( mut listen: Listen, frontier: &Antichain, - ) -> Result< - Vec<( - (Result, Result), - u64, - i64, - )>, - ExternalError, - > { + ) -> Result, ExternalError> { let mut ret = Vec::new(); loop { for event in listen.fetch_next().await { @@ -415,11 +400,7 @@ impl Transactor { async fn read_long_lived( &mut self, as_of: &Antichain, - ) -> Vec<( - (Result, Result), - u64, - i64, - )> { + ) -> Vec<((MaelstromKey, MaelstromVal), u64, i64)> { while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) { for event in self.long_lived_listen.fetch_next().await { match event { @@ -450,11 +431,7 @@ impl Transactor { fn extract_state_map( read_ts: u64, - updates: Vec<( - (Result, Result), - u64, - i64, - )>, + updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>, ) -> Result, MaelstromError> { let mut ret = BTreeMap::new(); for ((k, v), _, d) in updates { @@ -464,14 +441,6 @@ impl Transactor { text: format!("invalid read at time {}", read_ts), }); } - let k = k.map_err(|err| MaelstromError { - code: ErrorCode::Crash, - text: format!("invalid key {}", err), - })?; - let v = v.map_err(|err| MaelstromError { - code: ErrorCode::Crash, - text: format!("invalid val {}", err), - })?; if ret.contains_key(&k) { return Err(MaelstromError { code: ErrorCode::Crash, diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index b16df1451b29f..94803f4993e2b 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -1738,7 +1738,7 @@ mod tests { .await; let (actual, _) = read.expect_listen(0).await.read_until(&3).await; - let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)]; + let expected = vec![((("foo".to_owned()), ()), 2, 1)]; assert_eq!(actual, expected); } diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 069a9ae2390de..4316e11cc055b 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -805,7 +805,7 @@ pub struct FetchedPart { diffs: Int64Array, migration: PartMigration, filter_pushdown_audit: Option, - peek_stash: Option<((Result, Result), T, D)>, + peek_stash: Option<((K, V), T, D)>, part_cursor: usize, key_storage: Option, val_storage: Option, @@ -996,7 +996,7 @@ where &mut self, key: &mut Option, val: &mut Option, - ) -> Option<((Result, Result), T, D)> { + ) -> Option<((K, V), T, D)> { let mut consolidated = self.peek_stash.take(); loop { // Fetch and decode the next tuple in the sequence. (Or break if there is none.) @@ -1041,18 +1041,13 @@ where Some((kv, t, d)) } - fn decode_kv( - &mut self, - index: usize, - key: &mut Option, - val: &mut Option, - ) -> (Result, Result) { + fn decode_kv(&mut self, index: usize, key: &mut Option, val: &mut Option) -> (K, V) { let decoded = self .part .as_ref() .map_left(|codec| { let ((ck, cv), _, _) = codec.get(index).expect("valid index"); - Self::decode_codec( + let (k, v) = Self::decode_codec( &*self.metrics, self.migration.codec_read(), ck, @@ -1061,7 +1056,8 @@ where val, &mut self.key_storage, &mut self.val_storage, - ) + ); + (k.expect("valid legacy key"), v.expect("valid legacy value")) }) .map_right(|(structured_key, structured_val)| { self.decode_structured(index, structured_key, structured_val, key, val) @@ -1135,14 +1131,14 @@ where vals: &>::Decoder, key: &mut Option, val: &mut Option, - ) -> (Result, Result) { + ) -> (K, V) { let mut key = key.take().unwrap_or_default(); keys.decode(idx, &mut key); let mut val = val.take().unwrap_or_default(); vals.decode(idx, &mut val); - (Ok(key), Ok(val)) + (key, val) } } @@ -1153,7 +1149,7 @@ where T: Timestamp + Lattice + Codec64, D: Monoid + Codec64 + Send + Sync, { - type Item = ((Result, Result), T, D); + type Item = ((K, V), T, D); fn next(&mut self) -> Option { self.next_with_storage(&mut None, &mut None) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 0fb41b5e131ca..a940ac4a0dca4 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -2298,7 +2298,7 @@ pub mod datadriven { match event { ListenEvent::Updates(x) => { for ((k, _v), t, d) in x.iter() { - write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d); + write!(s, "{} {} {}\n", k, t, d); } } ListenEvent::Progress(x) => { diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 9335679e29387..16d880fa2bc55 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -970,10 +970,7 @@ mod tests { .expect("client construction failed") } - pub fn all_ok<'a, K, V, T, D, I>( - iter: I, - as_of: T, - ) -> Vec<((Result, Result), T, D)> + pub fn all_ok<'a, K, V, T, D, I>(iter: I, as_of: T) -> Vec<((K, V), T, D)> where K: Ord + Clone + 'a, V: Ord + Clone + 'a, @@ -987,7 +984,7 @@ mod tests { .map(|((k, v), t, d)| { let mut t = t.clone(); t.advance_by(as_of.borrow()); - ((Ok(k.clone()), Ok(v.clone())), t, d.clone()) + ((k.clone(), v.clone()), t, d.clone()) }) .collect(); consolidate_updates(&mut ret); @@ -999,13 +996,10 @@ mod tests { key: &BlobKey, metrics: &Metrics, read_schemas: &Schemas, - ) -> ( - BlobTraceBatchPart, - Vec<((Result, Result), T, D)>, - ) + ) -> (BlobTraceBatchPart, Vec<((K, V), T, D)>) where - K: Codec, - V: Codec, + K: Codec + Clone, + V: Codec + Clone, T: Timestamp + Codec64, D: Codec64, { @@ -1016,22 +1010,13 @@ mod tests { .expect("missing part"); let mut part = BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part"); - // Ensure codec data is present even if it was not generated at write time. - let _ = part + let structured = part .updates - .get_or_make_codec::(&read_schemas.key, &read_schemas.val); - let mut updates = Vec::new(); - // TODO(bkirwi): switch to structured data in tests - for ((k, v), t, d) in part.updates.records().expect("codec data").iter() { - updates.push(( - ( - K::decode(k, &read_schemas.key), - V::decode(v, &read_schemas.val), - ), - T::decode(t), - D::decode(d), - )); - } + .into_part::(&*read_schemas.key, &*read_schemas.val); + let updates = structured + .decode_iter::(&*read_schemas.key, &*read_schemas.val) + .expect("structured data") + .collect(); (part, updates) } @@ -1973,7 +1958,7 @@ mod tests { assert_eq!( listen_next.await, vec![ - ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]), + ListenEvent::Updates(vec![(("2".to_owned(), "two".to_owned()), 2, 1)]), ListenEvent::Progress(Antichain::from_elem(3)), ] ); diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index b486c64b3c9b2..301b5a687f436 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -159,9 +159,7 @@ where /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`], /// fetches and returns the data from within it. #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))] - pub async fn fetch_next( - &mut self, - ) -> Vec, Result), T, D)>> { + pub async fn fetch_next(&mut self) -> Vec> { let events = self.next(None).await; let new_len = events .iter() @@ -411,9 +409,7 @@ where /// If you have a use for consolidated listen output, given that snapshots can't be /// consolidated, come talk to us! #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))] - pub async fn fetch_next( - &mut self, - ) -> Vec, Result), T, D)>> { + pub async fn fetch_next(&mut self) -> Vec> { let (parts, progress) = self.next(None).await; let mut ret = Vec::with_capacity(parts.len() + 1); for part in parts { @@ -428,9 +424,7 @@ where } /// Convert listener into futures::Stream - pub fn into_stream( - mut self, - ) -> impl Stream, Result), T, D)>> { + pub fn into_stream(mut self) -> impl Stream> { async_stream::stream!({ loop { for msg in self.fetch_next().await { @@ -445,13 +439,7 @@ where /// return the final progress info. #[cfg(test)] #[track_caller] - pub async fn read_until( - &mut self, - ts: &T, - ) -> ( - Vec<((Result, Result), T, D)>, - Antichain, - ) { + pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain) { let mut updates = Vec::new(); let mut frontier = Antichain::from_elem(T::minimum()); while self.frontier.less_than(ts) { @@ -920,9 +908,7 @@ where D: Monoid + Ord + Codec64 + Send + Sync, { /// Grab the next batch of consolidated data. - pub async fn next( - &mut self, - ) -> Option, Result), T, D)> + '_> { + pub async fn next(&mut self) -> Option + '_> { let Self { consolidator, max_len, @@ -952,7 +938,7 @@ where val_decoder.decode(i, &mut v); let t = T::decode(part.time.value(i).to_le_bytes()); let d = D::decode(part.diff.value(i).to_le_bytes()); - ((Ok(k), Ok(v)), t, d) + ((k, v), t, d) }); Some(iter) @@ -982,7 +968,7 @@ where pub async fn snapshot_and_fetch( &mut self, as_of: Antichain, - ) -> Result, Result), T, D)>, Since> { + ) -> Result, Since> { let mut cursor = self.snapshot_cursor(as_of, |_| true).await?; let mut contents = Vec::new(); while let Some(iter) = cursor.next().await { @@ -1170,10 +1156,7 @@ where pub async fn snapshot_and_stream( &mut self, as_of: Antichain, - ) -> Result< - impl Stream, Result), T, D)> + use, - Since, - > { + ) -> Result + use, Since> { let snap = self.snapshot(as_of).await?; let blob = Arc::clone(&self.blob); @@ -1220,10 +1203,7 @@ where /// succeed, process its batches, and then return its data sorted. #[cfg(test)] #[track_caller] - pub async fn expect_snapshot_and_fetch( - &mut self, - as_of: T, - ) -> Vec<((Result, Result), T, D)> { + pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> { let mut ret = self .snapshot_and_fetch(Antichain::from_elem(as_of)) .await @@ -1365,10 +1345,7 @@ mod tests { } } } - assert_eq!( - updates, - &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)], - ) + assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],) } #[mz_persist_proc::test(tokio::test)] @@ -1399,7 +1376,7 @@ mod tests { let mut snapshot_rows = vec![]; while let Some(((k, v), t, d)) = snapshot.next().await { - snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d)); + snapshot_rows.push(((k, v), t, d)); } for ((_k, _v), t, _d) in data.as_mut_slice() { diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index 9b23a291301b2..71a4792afd39c 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -679,9 +679,9 @@ mod tests { assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1)); } - fn strings(xs: &[((Result, Result<(), String>), u64, i64)]) -> Vec> { + fn strings(xs: &[((Strings, ()), u64, i64)]) -> Vec> { xs.iter() - .map(|((k, _), _, _)| k.as_ref().unwrap().0.iter().map(|x| x.as_str()).collect()) + .map(|((k, _), _, _)| k.0.iter().map(|x| x.as_str()).collect()) .collect() } @@ -691,7 +691,7 @@ mod tests { async fn snap_streaming( as_of: u64, read: &mut ReadHandle, - ) -> Vec<((Result, Result<(), String>), u64, i64)> { + ) -> Vec<((Strings, ()), u64, i64)> { // NB: We test with both snapshot_and_fetch and snapshot_and_stream // because one uses the consolidating iter and one doesn't. let mut ret = read diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 8a4e1561c9a77..e78ac55f18326 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -664,12 +664,7 @@ where fetched_part.next_with_storage(&mut key_storage, &mut val_storage) { builder - .add( - &k.expect("decoded just-encoded key data"), - &v.expect("decoded just-encoded val data"), - &t, - &d, - ) + .add(&k, &v, &t, &d) .await .expect("re-encoding just-decoded data"); } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 76ee8970723af..e232f4825d884 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -363,17 +363,9 @@ pub struct SnapshotCursor { impl SnapshotCursor { pub async fn next( &mut self, - ) -> Option< - impl Iterator< - Item = ( - (Result, Result<(), String>), - T, - StorageDiff, - ), - > + Sized - + '_, - > { - self.cursor.next().await + ) -> Option + Sized + '_> { + let iter = self.cursor.next().await?; + Some(iter.map(|((k, ()), t, d)| (k, t, d))) } } @@ -1132,8 +1124,8 @@ where let mut snapshot = Vec::with_capacity(contents.len()); for ((data, _), _, diff) in contents { // TODO(petrosagg): We should accumulate the errors too and let the user - // interprret the result - let row = data.expect("invalid protobuf data").0?; + // interpret the result + let row = data.0?; snapshot.push((row, diff)); } Ok(snapshot) @@ -1188,14 +1180,7 @@ where }; // Map our stream, unwrapping Persist internal errors. - let stream = stream - .map(|((k, _v), t, d)| { - // TODO(parkmycar): We should accumulate the errors and pass them on to the - // caller. - let data = k.expect("error while streaming from Persist"); - (data, t, d) - }) - .boxed(); + let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed(); Ok(stream) } .boxed() diff --git a/src/storage-controller/src/collection_mgmt.rs b/src/storage-controller/src/collection_mgmt.rs index 24d76ef03363b..336b91c847c21 100644 --- a/src/storage-controller/src/collection_mgmt.rs +++ b/src/storage-controller/src/collection_mgmt.rs @@ -1024,7 +1024,7 @@ where Ok(contents) => { let mut snapshot = Vec::with_capacity(contents.len()); for ((data, _), _, diff) in contents { - let row = data.expect("invalid protobuf data").0.unwrap(); + let row = data.0.unwrap(); snapshot.push((row, -Diff::from(diff))); } snapshot @@ -1546,8 +1546,7 @@ where // Produce retractions by inverting diffs of rows we want to delete. let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts.clone())); while let Some(chunk) = rows.next().await { - for ((key, _v), _t, diff) in chunk { - let data = key.map_err(|e| anyhow!("decoding error in metrics snapshot: {e}"))?; + for (data, _t, diff) in chunk { let Ok(row) = &data.0 else { continue }; let datums = row.unpack(); let occurred_at = datums[occurred_at_col].unwrap_timestamptz(); @@ -1664,8 +1663,7 @@ where > = BTreeMap::new(); while let Some(chunk) = rows.next().await { - for ((key, _v), _t, diff) in chunk { - let data = key.expect("successful decode"); + for (data, _t, diff) in chunk { let Ok(row) = &data.0 else { continue }; let (key, timestamp) = handle_row(row, diff); @@ -1705,8 +1703,7 @@ where // Mark any row outside the retention window for deletion while let Some(chunk) = rows.next().await { - for ((key, _v), _t, diff) in chunk { - let data = key.expect("successful decode"); + for (data, _t, diff) in chunk { let Ok(row) = &data.0 else { continue }; let (_, timestamp) = handle_row(row, diff); diff --git a/src/storage-controller/src/rtr.rs b/src/storage-controller/src/rtr.rs index 531b34e3828a6..9a7d4f405531d 100644 --- a/src/storage-controller/src/rtr.rs +++ b/src/storage-controller/src/rtr.rs @@ -152,8 +152,8 @@ async fn decode_remap_data_until_geq_external_frontier< match event { ListenEvent::Updates(updates) => { for ((k, v), into_ts, diff) in updates { - let row: Row = k.expect("invalid binding").0.expect("invalid binding"); - let _v: () = v.expect("invalid binding"); + let row: Row = k.0.expect("invalid binding"); + let _v: () = v; let from_ts: FromTime = SourceTimestamp::decode_row(&row); pending_remap.push(Reverse((into_ts, from_ts, diff))); diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index 5c52913a9451e..08200fd7a3689 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -577,7 +577,7 @@ impl PendingWork { continue; } match (key, val) { - (Ok(SourceData(Ok(row))), Ok(())) => { + (SourceData(Ok(row)), ()) => { if let Some(mfp) = map_filter_project { // We originally accounted work as the number of outputs, to give downstream // operators a chance to reduce down anything we've emitted. This mfp call @@ -660,16 +660,12 @@ impl PendingWork { *work += 1; } } - (Ok(SourceData(Err(err))), Ok(())) => { + (SourceData(Err(err)), ()) => { let mut emit_time = *self.capability.time(); emit_time.0 = time; session.give((Err(err), emit_time, diff.into())); *work += 1; } - // TODO(petrosagg): error handling - (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => { - panic!("decoding failed") - } } if yield_fn(start_time, *work) { return false; diff --git a/src/storage-operators/src/stats.rs b/src/storage-operators/src/stats.rs index f9d95889badbc..6d45a75341963 100644 --- a/src/storage-operators/src/stats.rs +++ b/src/storage-operators/src/stats.rs @@ -93,19 +93,13 @@ impl StatsCursor { ) -> Option, Timestamp, StorageDiff)> + '_> { fn expect_decode( - raw: impl Iterator< - Item = ( - (Result, Result<(), String>), - Timestamp, - StorageDiff, - ), - >, + raw: impl Iterator, is_err: bool, ) -> impl Iterator, Timestamp, StorageDiff)> { raw.map(|((k, v), t, d)| { // NB: this matches the decode behaviour in sources - let SourceData(row) = k.expect("decode error"); - let () = v.expect("decode error"); + let SourceData(row) = k; + let () = v; (row, t, d) }) .filter(move |(r, _, _)| if is_err { r.is_err() } else { r.is_ok() }) diff --git a/src/storage/src/source/reclock/compat.rs b/src/storage/src/source/reclock/compat.rs index 4f03cd77d15f4..200ce3fca03f2 100644 --- a/src/storage/src/source/reclock/compat.rs +++ b/src/storage/src/source/reclock/compat.rs @@ -40,17 +40,8 @@ use tokio::sync::watch; /// A handle to a persist shard that stores remap bindings pub struct PersistHandle { - events: LocalBoxStream< - 'static, - ListenEvent< - IntoTime, - ( - (Result, Result<(), String>), - IntoTime, - StorageDiff, - ), - >, - >, + events: + LocalBoxStream<'static, ListenEvent>, write_handle: WriteHandle, /// Whether or not this handle is in read-only mode. read_only_rx: watch::Receiver, @@ -223,9 +214,7 @@ where } ListenEvent::Updates(msgs) => { for ((update, _), into_ts, diff) in msgs { - let from_ts = FromTime::decode_row( - &update.expect("invalid row").0.expect("invalid row"), - ); + let from_ts = FromTime::decode_row(&update.0.expect("invalid row")); self.pending_batch.push((from_ts, into_ts, diff.into())); } } diff --git a/src/storage/src/storage_state/async_storage_worker.rs b/src/storage/src/storage_state/async_storage_worker.rs index 66151d92b1f14..f50c130834f4d 100644 --- a/src/storage/src/storage_state/async_storage_worker.rs +++ b/src/storage/src/storage_state/async_storage_worker.rs @@ -156,8 +156,8 @@ where match event { ListenEvent::Updates(updates) => { for ((k, v), t, d) in updates { - let row: Row = k.expect("invalid binding").0.expect("invalid binding"); - let _v: () = v.expect("invalid binding"); + let row: Row = k.0.expect("invalid binding"); + let _v: () = v; let from_ts = C::Time::decode_row(&row); remap_updates.push((from_ts, t, d)); } diff --git a/src/txn-wal/src/lib.rs b/src/txn-wal/src/lib.rs index aca9c18dd216b..07b864db53fe3 100644 --- a/src/txn-wal/src/lib.rs +++ b/src/txn-wal/src/lib.rs @@ -744,7 +744,7 @@ mod tests { TxnsCache::open(&self.client, self.txns_id, Some(data_id)).await; let _ = cache.update_gt(&as_of).await; let snapshot = cache.data_snapshot(data_id, as_of); - let mut data_read = self + let mut data_read: ReadHandle = self .client .open_leased_reader( data_id, @@ -762,10 +762,7 @@ mod tests { data_read.expire().await; let snapshot: Vec<_> = snapshot .into_iter() - .map(|((k, v), t, d)| { - let (k, ()) = (k.unwrap(), v.unwrap()); - (k, t, d) - }) + .map(|((k, ()), t, d)| (k, t, d)) .collect(); // Check that a subscribe would produce the same result. diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 3705f30ebeee9..b32b2fc7b35be 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -548,10 +548,7 @@ impl DataSubscribe { let (data, txns) = (ProbeHandle::new(), ProbeHandle::new()); let data_stream = data_stream.flat_map(|part| { let part = part.parse(); - part.part.map(|((k, v), t, d)| { - let (k, ()) = (k.unwrap(), v.unwrap()); - (k, t, d) - }) + part.part.map(|((k, ()), t, d)| (k, t, d)) }); let data_stream = data_stream.probe_with(&data); let (data_stream, mut txns_progress_token) = diff --git a/src/txn-wal/src/txn_cache.rs b/src/txn-wal/src/txn_cache.rs index a9da3a2b2936f..12d844bd5538b 100644 --- a/src/txn-wal/src/txn_cache.rs +++ b/src/txn-wal/src/txn_cache.rs @@ -951,10 +951,7 @@ where continue; } let part_updates = txns_subscribe.fetch_batch_part(part).await; - let part_updates = part_updates.map(|((k, v), t, d)| { - let (k, v) = (k.expect("valid key"), v.expect("valid val")); - (C::decode(k, v), t, d) - }); + let part_updates = part_updates.map(|((k, v), t, d)| (C::decode(k, v), t, d)); if let Some(only_data_id) = only_data_id.as_ref() { updates.extend(part_updates.filter(|(x, _, _)| x.data_id() == only_data_id)); } else { @@ -1163,10 +1160,7 @@ mod tests { snapshot.sort(); snapshot .into_iter() - .flat_map(|((k, v), _t, d)| { - let (k, ()) = (k.unwrap(), v.unwrap()); - std::iter::repeat(k).take(usize::try_from(d).unwrap()) - }) + .flat_map(|((k, ()), _t, d)| std::iter::repeat(k).take(usize::try_from(d).unwrap())) .collect() } diff --git a/src/txn-wal/src/txn_read.rs b/src/txn-wal/src/txn_read.rs index e3cc7cac9ea98..63e468a11f91b 100644 --- a/src/txn-wal/src/txn_read.rs +++ b/src/txn-wal/src/txn_read.rs @@ -141,7 +141,7 @@ impl DataSnapshot { pub async fn snapshot_and_fetch( &self, data_read: &mut ReadHandle, - ) -> Result, Result), T, D)>, Since> + ) -> Result, Since> where K: Debug + Codec + Ord, V: Debug + Codec + Ord, @@ -176,10 +176,7 @@ impl DataSnapshot { pub async fn snapshot_and_stream( &self, data_read: &mut ReadHandle, - ) -> Result< - impl Stream, Result), T, D)> + use, - Since, - > + ) -> Result + use, Since> where K: Debug + Codec + Ord, V: Debug + Codec + Ord,