diff --git a/src/adapter/src/catalog/open/builtin_schema_migration.rs b/src/adapter/src/catalog/open/builtin_schema_migration.rs index ba01f1a9269f7..6b4cdfe5360bd 100644 --- a/src/adapter/src/catalog/open/builtin_schema_migration.rs +++ b/src/adapter/src/catalog/open/builtin_schema_migration.rs @@ -987,16 +987,7 @@ where let entries: Vec<_> = updates .into_iter() - .filter_map(|(data, _, _)| { - if let (Ok(key), Ok(val)) = data { - Some((key, val)) - } else { - // If we can't decode the data, it has likely been written by a newer version, so - // we ignore it. - info!("skipping unreadable migration shard entry: {data:?}"); - None - } - }) + .map(|(data, _, _)| data) .filter(move |(key, _)| predicate(key)) .collect(); diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index d3636dce3d9f3..284727c7bfe65 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -2552,8 +2552,7 @@ impl Coordinator { // Retract the current contents, spilling into our builder. while let Some(values) = snapshot_cursor.next().await { - for ((key, _val), _t, d) in values { - let key = key.expect("builtin table had errors"); + for (key, _t, d) in values { let d_invert = d.neg(); batch.add(&key, &(), &d_invert).await; } diff --git a/src/adapter/src/coord/peek.rs b/src/adapter/src/coord/peek.rs index ae26ae6db18b4..c9e40b97b367c 100644 --- a/src/adapter/src/coord/peek.rs +++ b/src/adapter/src/coord/peek.rs @@ -996,9 +996,7 @@ impl crate::coord::Coordinator { let mut current_batch_size: usize = 0; 'outer: while let Some(rows) = row_cursor.next().await { - for ((key, _val), _ts, diff) in rows { - let source_data = key.expect("decoding error"); - + for ((source_data, _val), _ts, diff) in rows { let row = source_data .0 .expect("we are not sending errors on this code path"); diff --git a/src/catalog/src/durable/objects/state_update.rs b/src/catalog/src/durable/objects/state_update.rs index 5a8e12538d079..823a1f989515b 100644 --- a/src/catalog/src/durable/objects/state_update.rs +++ b/src/catalog/src/durable/objects/state_update.rs @@ -394,11 +394,7 @@ impl StateUpdateKindJson { } /// Version of [`StateUpdateKind`] that is stored directly in persist. -type PersistStateUpdate = ( - (Result, Result<(), String>), - Timestamp, - StorageDiff, -); +type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff); impl TryFrom<&StateUpdate> for Option { type Error = DurableCatalogError; @@ -1017,11 +1013,7 @@ impl RustType for StateUpdateKind { /// diff)` tuple/update we store in persist. impl From for StateUpdate { fn from(kvtd: PersistStateUpdate) -> Self { - let ((key, val), ts, diff) = kvtd; - let (key, ()) = ( - key.expect("persist decoding error"), - val.expect("persist decoding error"), - ); + let ((key, ()), ts, diff) = kvtd; StateUpdate { kind: StateUpdateKindJson::from(key), ts, diff --git a/src/durable-cache/src/lib.rs b/src/durable-cache/src/lib.rs index 333e5f588ef47..c2ecc99b119e6 100644 --- a/src/durable-cache/src/lib.rs +++ b/src/durable-cache/src/lib.rs @@ -169,8 +169,8 @@ impl DurableCache { consolidate_updates(&mut updates); updates.sort_by(|(_, _, d1), (_, _, d2)| d1.cmp(d2)); for ((k, v), t, d) in updates { - let encoded_key = k.unwrap(); - let encoded_val = v.unwrap(); + let encoded_key = k; + let encoded_val = v; let (decoded_key, decoded_val) = C::decode(&encoded_key, &encoded_val); let val = LocalVal { diff --git a/src/persist-cli/src/maelstrom/txn_list_append_single.rs b/src/persist-cli/src/maelstrom/txn_list_append_single.rs index cad0eb33a6da9..1a9980008f282 100644 --- a/src/persist-cli/src/maelstrom/txn_list_append_single.rs +++ b/src/persist-cli/src/maelstrom/txn_list_append_single.rs @@ -112,11 +112,7 @@ pub struct Transactor { // Keep a long-lived listen, which is incrementally read as we go. Then // assert that it has the same data as the short-lived snapshot+listen in // `read`. This hopefully stresses slightly different parts of the system. - long_lived_updates: Vec<( - (Result, Result), - u64, - i64, - )>, + long_lived_updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>, long_lived_listen: Listen, } @@ -241,11 +237,7 @@ impl Transactor { &mut self, ) -> Result< ( - Vec<( - (Result, Result), - u64, - i64, - )>, + Vec<((MaelstromKey, MaelstromVal), u64, i64)>, Antichain, ), MaelstromError, @@ -382,14 +374,7 @@ impl Transactor { async fn listen_through( mut listen: Listen, frontier: &Antichain, - ) -> Result< - Vec<( - (Result, Result), - u64, - i64, - )>, - ExternalError, - > { + ) -> Result, ExternalError> { let mut ret = Vec::new(); loop { for event in listen.fetch_next().await { @@ -415,11 +400,7 @@ impl Transactor { async fn read_long_lived( &mut self, as_of: &Antichain, - ) -> Vec<( - (Result, Result), - u64, - i64, - )> { + ) -> Vec<((MaelstromKey, MaelstromVal), u64, i64)> { while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) { for event in self.long_lived_listen.fetch_next().await { match event { @@ -450,11 +431,7 @@ impl Transactor { fn extract_state_map( read_ts: u64, - updates: Vec<( - (Result, Result), - u64, - i64, - )>, + updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>, ) -> Result, MaelstromError> { let mut ret = BTreeMap::new(); for ((k, v), _, d) in updates { @@ -464,14 +441,6 @@ impl Transactor { text: format!("invalid read at time {}", read_ts), }); } - let k = k.map_err(|err| MaelstromError { - code: ErrorCode::Crash, - text: format!("invalid key {}", err), - })?; - let v = v.map_err(|err| MaelstromError { - code: ErrorCode::Crash, - text: format!("invalid val {}", err), - })?; if ret.contains_key(&k) { return Err(MaelstromError { code: ErrorCode::Crash, diff --git a/src/persist-client/src/batch.rs b/src/persist-client/src/batch.rs index b16df1451b29f..94803f4993e2b 100644 --- a/src/persist-client/src/batch.rs +++ b/src/persist-client/src/batch.rs @@ -1738,7 +1738,7 @@ mod tests { .await; let (actual, _) = read.expect_listen(0).await.read_until(&3).await; - let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)]; + let expected = vec![((("foo".to_owned()), ()), 2, 1)]; assert_eq!(actual, expected); } diff --git a/src/persist-client/src/fetch.rs b/src/persist-client/src/fetch.rs index 069a9ae2390de..4316e11cc055b 100644 --- a/src/persist-client/src/fetch.rs +++ b/src/persist-client/src/fetch.rs @@ -805,7 +805,7 @@ pub struct FetchedPart { diffs: Int64Array, migration: PartMigration, filter_pushdown_audit: Option, - peek_stash: Option<((Result, Result), T, D)>, + peek_stash: Option<((K, V), T, D)>, part_cursor: usize, key_storage: Option, val_storage: Option, @@ -996,7 +996,7 @@ where &mut self, key: &mut Option, val: &mut Option, - ) -> Option<((Result, Result), T, D)> { + ) -> Option<((K, V), T, D)> { let mut consolidated = self.peek_stash.take(); loop { // Fetch and decode the next tuple in the sequence. (Or break if there is none.) @@ -1041,18 +1041,13 @@ where Some((kv, t, d)) } - fn decode_kv( - &mut self, - index: usize, - key: &mut Option, - val: &mut Option, - ) -> (Result, Result) { + fn decode_kv(&mut self, index: usize, key: &mut Option, val: &mut Option) -> (K, V) { let decoded = self .part .as_ref() .map_left(|codec| { let ((ck, cv), _, _) = codec.get(index).expect("valid index"); - Self::decode_codec( + let (k, v) = Self::decode_codec( &*self.metrics, self.migration.codec_read(), ck, @@ -1061,7 +1056,8 @@ where val, &mut self.key_storage, &mut self.val_storage, - ) + ); + (k.expect("valid legacy key"), v.expect("valid legacy value")) }) .map_right(|(structured_key, structured_val)| { self.decode_structured(index, structured_key, structured_val, key, val) @@ -1135,14 +1131,14 @@ where vals: &>::Decoder, key: &mut Option, val: &mut Option, - ) -> (Result, Result) { + ) -> (K, V) { let mut key = key.take().unwrap_or_default(); keys.decode(idx, &mut key); let mut val = val.take().unwrap_or_default(); vals.decode(idx, &mut val); - (Ok(key), Ok(val)) + (key, val) } } @@ -1153,7 +1149,7 @@ where T: Timestamp + Lattice + Codec64, D: Monoid + Codec64 + Send + Sync, { - type Item = ((Result, Result), T, D); + type Item = ((K, V), T, D); fn next(&mut self) -> Option { self.next_with_storage(&mut None, &mut None) diff --git a/src/persist-client/src/internal/machine.rs b/src/persist-client/src/internal/machine.rs index 0fb41b5e131ca..a940ac4a0dca4 100644 --- a/src/persist-client/src/internal/machine.rs +++ b/src/persist-client/src/internal/machine.rs @@ -2298,7 +2298,7 @@ pub mod datadriven { match event { ListenEvent::Updates(x) => { for ((k, _v), t, d) in x.iter() { - write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d); + write!(s, "{} {} {}\n", k, t, d); } } ListenEvent::Progress(x) => { diff --git a/src/persist-client/src/lib.rs b/src/persist-client/src/lib.rs index 9335679e29387..16d880fa2bc55 100644 --- a/src/persist-client/src/lib.rs +++ b/src/persist-client/src/lib.rs @@ -970,10 +970,7 @@ mod tests { .expect("client construction failed") } - pub fn all_ok<'a, K, V, T, D, I>( - iter: I, - as_of: T, - ) -> Vec<((Result, Result), T, D)> + pub fn all_ok<'a, K, V, T, D, I>(iter: I, as_of: T) -> Vec<((K, V), T, D)> where K: Ord + Clone + 'a, V: Ord + Clone + 'a, @@ -987,7 +984,7 @@ mod tests { .map(|((k, v), t, d)| { let mut t = t.clone(); t.advance_by(as_of.borrow()); - ((Ok(k.clone()), Ok(v.clone())), t, d.clone()) + ((k.clone(), v.clone()), t, d.clone()) }) .collect(); consolidate_updates(&mut ret); @@ -999,13 +996,10 @@ mod tests { key: &BlobKey, metrics: &Metrics, read_schemas: &Schemas, - ) -> ( - BlobTraceBatchPart, - Vec<((Result, Result), T, D)>, - ) + ) -> (BlobTraceBatchPart, Vec<((K, V), T, D)>) where - K: Codec, - V: Codec, + K: Codec + Clone, + V: Codec + Clone, T: Timestamp + Codec64, D: Codec64, { @@ -1016,22 +1010,13 @@ mod tests { .expect("missing part"); let mut part = BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part"); - // Ensure codec data is present even if it was not generated at write time. - let _ = part + let structured = part .updates - .get_or_make_codec::(&read_schemas.key, &read_schemas.val); - let mut updates = Vec::new(); - // TODO(bkirwi): switch to structured data in tests - for ((k, v), t, d) in part.updates.records().expect("codec data").iter() { - updates.push(( - ( - K::decode(k, &read_schemas.key), - V::decode(v, &read_schemas.val), - ), - T::decode(t), - D::decode(d), - )); - } + .into_part::(&*read_schemas.key, &*read_schemas.val); + let updates = structured + .decode_iter::(&*read_schemas.key, &*read_schemas.val) + .expect("structured data") + .collect(); (part, updates) } @@ -1973,7 +1958,7 @@ mod tests { assert_eq!( listen_next.await, vec![ - ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]), + ListenEvent::Updates(vec![(("2".to_owned(), "two".to_owned()), 2, 1)]), ListenEvent::Progress(Antichain::from_elem(3)), ] ); diff --git a/src/persist-client/src/read.rs b/src/persist-client/src/read.rs index b486c64b3c9b2..301b5a687f436 100644 --- a/src/persist-client/src/read.rs +++ b/src/persist-client/src/read.rs @@ -159,9 +159,7 @@ where /// Equivalent to `next`, but rather than returning a [`LeasedBatchPart`], /// fetches and returns the data from within it. #[instrument(level = "debug", fields(shard = %self.listen.handle.machine.shard_id()))] - pub async fn fetch_next( - &mut self, - ) -> Vec, Result), T, D)>> { + pub async fn fetch_next(&mut self) -> Vec> { let events = self.next(None).await; let new_len = events .iter() @@ -411,9 +409,7 @@ where /// If you have a use for consolidated listen output, given that snapshots can't be /// consolidated, come talk to us! #[instrument(level = "debug", name = "listen::next", fields(shard = %self.handle.machine.shard_id()))] - pub async fn fetch_next( - &mut self, - ) -> Vec, Result), T, D)>> { + pub async fn fetch_next(&mut self) -> Vec> { let (parts, progress) = self.next(None).await; let mut ret = Vec::with_capacity(parts.len() + 1); for part in parts { @@ -428,9 +424,7 @@ where } /// Convert listener into futures::Stream - pub fn into_stream( - mut self, - ) -> impl Stream, Result), T, D)>> { + pub fn into_stream(mut self) -> impl Stream> { async_stream::stream!({ loop { for msg in self.fetch_next().await { @@ -445,13 +439,7 @@ where /// return the final progress info. #[cfg(test)] #[track_caller] - pub async fn read_until( - &mut self, - ts: &T, - ) -> ( - Vec<((Result, Result), T, D)>, - Antichain, - ) { + pub async fn read_until(&mut self, ts: &T) -> (Vec<((K, V), T, D)>, Antichain) { let mut updates = Vec::new(); let mut frontier = Antichain::from_elem(T::minimum()); while self.frontier.less_than(ts) { @@ -920,9 +908,7 @@ where D: Monoid + Ord + Codec64 + Send + Sync, { /// Grab the next batch of consolidated data. - pub async fn next( - &mut self, - ) -> Option, Result), T, D)> + '_> { + pub async fn next(&mut self) -> Option + '_> { let Self { consolidator, max_len, @@ -952,7 +938,7 @@ where val_decoder.decode(i, &mut v); let t = T::decode(part.time.value(i).to_le_bytes()); let d = D::decode(part.diff.value(i).to_le_bytes()); - ((Ok(k), Ok(v)), t, d) + ((k, v), t, d) }); Some(iter) @@ -982,7 +968,7 @@ where pub async fn snapshot_and_fetch( &mut self, as_of: Antichain, - ) -> Result, Result), T, D)>, Since> { + ) -> Result, Since> { let mut cursor = self.snapshot_cursor(as_of, |_| true).await?; let mut contents = Vec::new(); while let Some(iter) = cursor.next().await { @@ -1170,10 +1156,7 @@ where pub async fn snapshot_and_stream( &mut self, as_of: Antichain, - ) -> Result< - impl Stream, Result), T, D)> + use, - Since, - > { + ) -> Result + use, Since> { let snap = self.snapshot(as_of).await?; let blob = Arc::clone(&self.blob); @@ -1220,10 +1203,7 @@ where /// succeed, process its batches, and then return its data sorted. #[cfg(test)] #[track_caller] - pub async fn expect_snapshot_and_fetch( - &mut self, - as_of: T, - ) -> Vec<((Result, Result), T, D)> { + pub async fn expect_snapshot_and_fetch(&mut self, as_of: T) -> Vec<((K, V), T, D)> { let mut ret = self .snapshot_and_fetch(Antichain::from_elem(as_of)) .await @@ -1365,10 +1345,7 @@ mod tests { } } } - assert_eq!( - updates, - &[((Ok("k".to_owned()), Ok("v".to_owned())), 4u64, 3i64)], - ) + assert_eq!(updates, &[(("k".to_owned(), "v".to_owned()), 4u64, 3i64)],) } #[mz_persist_proc::test(tokio::test)] @@ -1399,7 +1376,7 @@ mod tests { let mut snapshot_rows = vec![]; while let Some(((k, v), t, d)) = snapshot.next().await { - snapshot_rows.push(((k.expect("valid key"), v.expect("valid key")), t, d)); + snapshot_rows.push(((k, v), t, d)); } for ((_k, _v), t, _d) in data.as_mut_slice() { diff --git a/src/persist-client/src/schema.rs b/src/persist-client/src/schema.rs index 9b23a291301b2..71a4792afd39c 100644 --- a/src/persist-client/src/schema.rs +++ b/src/persist-client/src/schema.rs @@ -679,9 +679,9 @@ mod tests { assert_eq!(write1.write_schemas.id.unwrap(), SchemaId(1)); } - fn strings(xs: &[((Result, Result<(), String>), u64, i64)]) -> Vec> { + fn strings(xs: &[((Strings, ()), u64, i64)]) -> Vec> { xs.iter() - .map(|((k, _), _, _)| k.as_ref().unwrap().0.iter().map(|x| x.as_str()).collect()) + .map(|((k, _), _, _)| k.0.iter().map(|x| x.as_str()).collect()) .collect() } @@ -691,7 +691,7 @@ mod tests { async fn snap_streaming( as_of: u64, read: &mut ReadHandle, - ) -> Vec<((Result, Result<(), String>), u64, i64)> { + ) -> Vec<((Strings, ()), u64, i64)> { // NB: We test with both snapshot_and_fetch and snapshot_and_stream // because one uses the consolidating iter and one doesn't. let mut ret = read diff --git a/src/persist-client/src/write.rs b/src/persist-client/src/write.rs index 8a4e1561c9a77..e78ac55f18326 100644 --- a/src/persist-client/src/write.rs +++ b/src/persist-client/src/write.rs @@ -664,12 +664,7 @@ where fetched_part.next_with_storage(&mut key_storage, &mut val_storage) { builder - .add( - &k.expect("decoded just-encoded key data"), - &v.expect("decoded just-encoded val data"), - &t, - &d, - ) + .add(&k, &v, &t, &d) .await .expect("re-encoding just-decoded data"); } diff --git a/src/storage-client/src/storage_collections.rs b/src/storage-client/src/storage_collections.rs index 76ee8970723af..e232f4825d884 100644 --- a/src/storage-client/src/storage_collections.rs +++ b/src/storage-client/src/storage_collections.rs @@ -363,17 +363,9 @@ pub struct SnapshotCursor { impl SnapshotCursor { pub async fn next( &mut self, - ) -> Option< - impl Iterator< - Item = ( - (Result, Result<(), String>), - T, - StorageDiff, - ), - > + Sized - + '_, - > { - self.cursor.next().await + ) -> Option + Sized + '_> { + let iter = self.cursor.next().await?; + Some(iter.map(|((k, ()), t, d)| (k, t, d))) } } @@ -1132,8 +1124,8 @@ where let mut snapshot = Vec::with_capacity(contents.len()); for ((data, _), _, diff) in contents { // TODO(petrosagg): We should accumulate the errors too and let the user - // interprret the result - let row = data.expect("invalid protobuf data").0?; + // interpret the result + let row = data.0?; snapshot.push((row, diff)); } Ok(snapshot) @@ -1188,14 +1180,7 @@ where }; // Map our stream, unwrapping Persist internal errors. - let stream = stream - .map(|((k, _v), t, d)| { - // TODO(parkmycar): We should accumulate the errors and pass them on to the - // caller. - let data = k.expect("error while streaming from Persist"); - (data, t, d) - }) - .boxed(); + let stream = stream.map(|((data, _v), t, d)| (data, t, d)).boxed(); Ok(stream) } .boxed() diff --git a/src/storage-controller/src/collection_mgmt.rs b/src/storage-controller/src/collection_mgmt.rs index 24d76ef03363b..336b91c847c21 100644 --- a/src/storage-controller/src/collection_mgmt.rs +++ b/src/storage-controller/src/collection_mgmt.rs @@ -1024,7 +1024,7 @@ where Ok(contents) => { let mut snapshot = Vec::with_capacity(contents.len()); for ((data, _), _, diff) in contents { - let row = data.expect("invalid protobuf data").0.unwrap(); + let row = data.0.unwrap(); snapshot.push((row, -Diff::from(diff))); } snapshot @@ -1546,8 +1546,7 @@ where // Produce retractions by inverting diffs of rows we want to delete. let mut builder = write_handle.builder(Antichain::from_elem(old_upper_ts.clone())); while let Some(chunk) = rows.next().await { - for ((key, _v), _t, diff) in chunk { - let data = key.map_err(|e| anyhow!("decoding error in metrics snapshot: {e}"))?; + for (data, _t, diff) in chunk { let Ok(row) = &data.0 else { continue }; let datums = row.unpack(); let occurred_at = datums[occurred_at_col].unwrap_timestamptz(); @@ -1664,8 +1663,7 @@ where > = BTreeMap::new(); while let Some(chunk) = rows.next().await { - for ((key, _v), _t, diff) in chunk { - let data = key.expect("successful decode"); + for (data, _t, diff) in chunk { let Ok(row) = &data.0 else { continue }; let (key, timestamp) = handle_row(row, diff); @@ -1705,8 +1703,7 @@ where // Mark any row outside the retention window for deletion while let Some(chunk) = rows.next().await { - for ((key, _v), _t, diff) in chunk { - let data = key.expect("successful decode"); + for (data, _t, diff) in chunk { let Ok(row) = &data.0 else { continue }; let (_, timestamp) = handle_row(row, diff); diff --git a/src/storage-controller/src/rtr.rs b/src/storage-controller/src/rtr.rs index 531b34e3828a6..9a7d4f405531d 100644 --- a/src/storage-controller/src/rtr.rs +++ b/src/storage-controller/src/rtr.rs @@ -152,8 +152,8 @@ async fn decode_remap_data_until_geq_external_frontier< match event { ListenEvent::Updates(updates) => { for ((k, v), into_ts, diff) in updates { - let row: Row = k.expect("invalid binding").0.expect("invalid binding"); - let _v: () = v.expect("invalid binding"); + let row: Row = k.0.expect("invalid binding"); + let _v: () = v; let from_ts: FromTime = SourceTimestamp::decode_row(&row); pending_remap.push(Reverse((into_ts, from_ts, diff))); diff --git a/src/storage-operators/src/persist_source.rs b/src/storage-operators/src/persist_source.rs index 5c52913a9451e..08200fd7a3689 100644 --- a/src/storage-operators/src/persist_source.rs +++ b/src/storage-operators/src/persist_source.rs @@ -577,7 +577,7 @@ impl PendingWork { continue; } match (key, val) { - (Ok(SourceData(Ok(row))), Ok(())) => { + (SourceData(Ok(row)), ()) => { if let Some(mfp) = map_filter_project { // We originally accounted work as the number of outputs, to give downstream // operators a chance to reduce down anything we've emitted. This mfp call @@ -660,16 +660,12 @@ impl PendingWork { *work += 1; } } - (Ok(SourceData(Err(err))), Ok(())) => { + (SourceData(Err(err)), ()) => { let mut emit_time = *self.capability.time(); emit_time.0 = time; session.give((Err(err), emit_time, diff.into())); *work += 1; } - // TODO(petrosagg): error handling - (Err(_), Ok(_)) | (Ok(_), Err(_)) | (Err(_), Err(_)) => { - panic!("decoding failed") - } } if yield_fn(start_time, *work) { return false; diff --git a/src/storage-operators/src/stats.rs b/src/storage-operators/src/stats.rs index f9d95889badbc..6d45a75341963 100644 --- a/src/storage-operators/src/stats.rs +++ b/src/storage-operators/src/stats.rs @@ -93,19 +93,13 @@ impl StatsCursor { ) -> Option, Timestamp, StorageDiff)> + '_> { fn expect_decode( - raw: impl Iterator< - Item = ( - (Result, Result<(), String>), - Timestamp, - StorageDiff, - ), - >, + raw: impl Iterator, is_err: bool, ) -> impl Iterator, Timestamp, StorageDiff)> { raw.map(|((k, v), t, d)| { // NB: this matches the decode behaviour in sources - let SourceData(row) = k.expect("decode error"); - let () = v.expect("decode error"); + let SourceData(row) = k; + let () = v; (row, t, d) }) .filter(move |(r, _, _)| if is_err { r.is_err() } else { r.is_ok() }) diff --git a/src/storage/src/source/reclock/compat.rs b/src/storage/src/source/reclock/compat.rs index 4f03cd77d15f4..200ce3fca03f2 100644 --- a/src/storage/src/source/reclock/compat.rs +++ b/src/storage/src/source/reclock/compat.rs @@ -40,17 +40,8 @@ use tokio::sync::watch; /// A handle to a persist shard that stores remap bindings pub struct PersistHandle { - events: LocalBoxStream< - 'static, - ListenEvent< - IntoTime, - ( - (Result, Result<(), String>), - IntoTime, - StorageDiff, - ), - >, - >, + events: + LocalBoxStream<'static, ListenEvent>, write_handle: WriteHandle, /// Whether or not this handle is in read-only mode. read_only_rx: watch::Receiver, @@ -223,9 +214,7 @@ where } ListenEvent::Updates(msgs) => { for ((update, _), into_ts, diff) in msgs { - let from_ts = FromTime::decode_row( - &update.expect("invalid row").0.expect("invalid row"), - ); + let from_ts = FromTime::decode_row(&update.0.expect("invalid row")); self.pending_batch.push((from_ts, into_ts, diff.into())); } } diff --git a/src/storage/src/storage_state/async_storage_worker.rs b/src/storage/src/storage_state/async_storage_worker.rs index 66151d92b1f14..f50c130834f4d 100644 --- a/src/storage/src/storage_state/async_storage_worker.rs +++ b/src/storage/src/storage_state/async_storage_worker.rs @@ -156,8 +156,8 @@ where match event { ListenEvent::Updates(updates) => { for ((k, v), t, d) in updates { - let row: Row = k.expect("invalid binding").0.expect("invalid binding"); - let _v: () = v.expect("invalid binding"); + let row: Row = k.0.expect("invalid binding"); + let _v: () = v; let from_ts = C::Time::decode_row(&row); remap_updates.push((from_ts, t, d)); } diff --git a/src/txn-wal/src/lib.rs b/src/txn-wal/src/lib.rs index aca9c18dd216b..07b864db53fe3 100644 --- a/src/txn-wal/src/lib.rs +++ b/src/txn-wal/src/lib.rs @@ -744,7 +744,7 @@ mod tests { TxnsCache::open(&self.client, self.txns_id, Some(data_id)).await; let _ = cache.update_gt(&as_of).await; let snapshot = cache.data_snapshot(data_id, as_of); - let mut data_read = self + let mut data_read: ReadHandle = self .client .open_leased_reader( data_id, @@ -762,10 +762,7 @@ mod tests { data_read.expire().await; let snapshot: Vec<_> = snapshot .into_iter() - .map(|((k, v), t, d)| { - let (k, ()) = (k.unwrap(), v.unwrap()); - (k, t, d) - }) + .map(|((k, ()), t, d)| (k, t, d)) .collect(); // Check that a subscribe would produce the same result. diff --git a/src/txn-wal/src/operator.rs b/src/txn-wal/src/operator.rs index 3705f30ebeee9..b32b2fc7b35be 100644 --- a/src/txn-wal/src/operator.rs +++ b/src/txn-wal/src/operator.rs @@ -548,10 +548,7 @@ impl DataSubscribe { let (data, txns) = (ProbeHandle::new(), ProbeHandle::new()); let data_stream = data_stream.flat_map(|part| { let part = part.parse(); - part.part.map(|((k, v), t, d)| { - let (k, ()) = (k.unwrap(), v.unwrap()); - (k, t, d) - }) + part.part.map(|((k, ()), t, d)| (k, t, d)) }); let data_stream = data_stream.probe_with(&data); let (data_stream, mut txns_progress_token) = diff --git a/src/txn-wal/src/txn_cache.rs b/src/txn-wal/src/txn_cache.rs index a9da3a2b2936f..12d844bd5538b 100644 --- a/src/txn-wal/src/txn_cache.rs +++ b/src/txn-wal/src/txn_cache.rs @@ -951,10 +951,7 @@ where continue; } let part_updates = txns_subscribe.fetch_batch_part(part).await; - let part_updates = part_updates.map(|((k, v), t, d)| { - let (k, v) = (k.expect("valid key"), v.expect("valid val")); - (C::decode(k, v), t, d) - }); + let part_updates = part_updates.map(|((k, v), t, d)| (C::decode(k, v), t, d)); if let Some(only_data_id) = only_data_id.as_ref() { updates.extend(part_updates.filter(|(x, _, _)| x.data_id() == only_data_id)); } else { @@ -1163,10 +1160,7 @@ mod tests { snapshot.sort(); snapshot .into_iter() - .flat_map(|((k, v), _t, d)| { - let (k, ()) = (k.unwrap(), v.unwrap()); - std::iter::repeat(k).take(usize::try_from(d).unwrap()) - }) + .flat_map(|((k, ()), _t, d)| std::iter::repeat(k).take(usize::try_from(d).unwrap())) .collect() } diff --git a/src/txn-wal/src/txn_read.rs b/src/txn-wal/src/txn_read.rs index e3cc7cac9ea98..63e468a11f91b 100644 --- a/src/txn-wal/src/txn_read.rs +++ b/src/txn-wal/src/txn_read.rs @@ -141,7 +141,7 @@ impl DataSnapshot { pub async fn snapshot_and_fetch( &self, data_read: &mut ReadHandle, - ) -> Result, Result), T, D)>, Since> + ) -> Result, Since> where K: Debug + Codec + Ord, V: Debug + Codec + Ord, @@ -176,10 +176,7 @@ impl DataSnapshot { pub async fn snapshot_and_stream( &self, data_read: &mut ReadHandle, - ) -> Result< - impl Stream, Result), T, D)> + use, - Since, - > + ) -> Result + use, Since> where K: Debug + Codec + Ord, V: Debug + Codec + Ord,