Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 1 addition & 10 deletions src/adapter/src/catalog/open/builtin_schema_migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -987,16 +987,7 @@ where

let entries: Vec<_> = updates
.into_iter()
.filter_map(|(data, _, _)| {
if let (Ok(key), Ok(val)) = data {
Some((key, val))
} else {
// If we can't decode the data, it has likely been written by a newer version, so
// we ignore it.
info!("skipping unreadable migration shard entry: {data:?}");
None
}
})
.map(|(data, _, _)| data)
.filter(move |(key, _)| predicate(key))
.collect();

Expand Down
3 changes: 1 addition & 2 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2552,8 +2552,7 @@ impl Coordinator {

// Retract the current contents, spilling into our builder.
while let Some(values) = snapshot_cursor.next().await {
for ((key, _val), _t, d) in values {
let key = key.expect("builtin table had errors");
for (key, _t, d) in values {
let d_invert = d.neg();
batch.add(&key, &(), &d_invert).await;
}
Expand Down
4 changes: 1 addition & 3 deletions src/adapter/src/coord/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -996,9 +996,7 @@ impl crate::coord::Coordinator {
let mut current_batch_size: usize = 0;

'outer: while let Some(rows) = row_cursor.next().await {
for ((key, _val), _ts, diff) in rows {
let source_data = key.expect("decoding error");

for ((source_data, _val), _ts, diff) in rows {
let row = source_data
.0
.expect("we are not sending errors on this code path");
Expand Down
12 changes: 2 additions & 10 deletions src/catalog/src/durable/objects/state_update.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,11 +394,7 @@ impl StateUpdateKindJson {
}

/// Version of [`StateUpdateKind`] that is stored directly in persist.
type PersistStateUpdate = (
(Result<SourceData, String>, Result<(), String>),
Timestamp,
StorageDiff,
);
type PersistStateUpdate = ((SourceData, ()), Timestamp, StorageDiff);

impl TryFrom<&StateUpdate<StateUpdateKind>> for Option<memory::objects::StateUpdate> {
type Error = DurableCatalogError;
Expand Down Expand Up @@ -1017,11 +1013,7 @@ impl RustType<proto::StateUpdateKind> for StateUpdateKind {
/// diff)` tuple/update we store in persist.
impl From<PersistStateUpdate> for StateUpdate<StateUpdateKindJson> {
fn from(kvtd: PersistStateUpdate) -> Self {
let ((key, val), ts, diff) = kvtd;
let (key, ()) = (
key.expect("persist decoding error"),
val.expect("persist decoding error"),
);
let ((key, ()), ts, diff) = kvtd;
StateUpdate {
kind: StateUpdateKindJson::from(key),
ts,
Expand Down
4 changes: 2 additions & 2 deletions src/durable-cache/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,8 +169,8 @@ impl<C: DurableCacheCodec> DurableCache<C> {
consolidate_updates(&mut updates);
updates.sort_by(|(_, _, d1), (_, _, d2)| d1.cmp(d2));
for ((k, v), t, d) in updates {
let encoded_key = k.unwrap();
let encoded_val = v.unwrap();
let encoded_key = k;
let encoded_val = v;
let (decoded_key, decoded_val) =
C::decode(&encoded_key, &encoded_val);
let val = LocalVal {
Expand Down
41 changes: 5 additions & 36 deletions src/persist-cli/src/maelstrom/txn_list_append_single.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,7 @@ pub struct Transactor {
// Keep a long-lived listen, which is incrementally read as we go. Then
// assert that it has the same data as the short-lived snapshot+listen in
// `read`. This hopefully stresses slightly different parts of the system.
long_lived_updates: Vec<(
(Result<MaelstromKey, String>, Result<MaelstromVal, String>),
u64,
i64,
)>,
long_lived_updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
long_lived_listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
}

Expand Down Expand Up @@ -241,11 +237,7 @@ impl Transactor {
&mut self,
) -> Result<
(
Vec<(
(Result<MaelstromKey, String>, Result<MaelstromVal, String>),
u64,
i64,
)>,
Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
Antichain<u64>,
),
MaelstromError,
Expand Down Expand Up @@ -382,14 +374,7 @@ impl Transactor {
async fn listen_through(
mut listen: Listen<MaelstromKey, MaelstromVal, u64, i64>,
frontier: &Antichain<u64>,
) -> Result<
Vec<(
(Result<MaelstromKey, String>, Result<MaelstromVal, String>),
u64,
i64,
)>,
ExternalError,
> {
) -> Result<Vec<((MaelstromKey, MaelstromVal), u64, i64)>, ExternalError> {
let mut ret = Vec::new();
loop {
for event in listen.fetch_next().await {
Expand All @@ -415,11 +400,7 @@ impl Transactor {
async fn read_long_lived(
&mut self,
as_of: &Antichain<u64>,
) -> Vec<(
(Result<MaelstromKey, String>, Result<MaelstromVal, String>),
u64,
i64,
)> {
) -> Vec<((MaelstromKey, MaelstromVal), u64, i64)> {
while PartialOrder::less_equal(self.long_lived_listen.frontier(), as_of) {
for event in self.long_lived_listen.fetch_next().await {
match event {
Expand Down Expand Up @@ -450,11 +431,7 @@ impl Transactor {

fn extract_state_map(
read_ts: u64,
updates: Vec<(
(Result<MaelstromKey, String>, Result<MaelstromVal, String>),
u64,
i64,
)>,
updates: Vec<((MaelstromKey, MaelstromVal), u64, i64)>,
) -> Result<BTreeMap<MaelstromKey, MaelstromVal>, MaelstromError> {
let mut ret = BTreeMap::new();
for ((k, v), _, d) in updates {
Expand All @@ -464,14 +441,6 @@ impl Transactor {
text: format!("invalid read at time {}", read_ts),
});
}
let k = k.map_err(|err| MaelstromError {
code: ErrorCode::Crash,
text: format!("invalid key {}", err),
})?;
let v = v.map_err(|err| MaelstromError {
code: ErrorCode::Crash,
text: format!("invalid val {}", err),
})?;
if ret.contains_key(&k) {
return Err(MaelstromError {
code: ErrorCode::Crash,
Expand Down
2 changes: 1 addition & 1 deletion src/persist-client/src/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1738,7 +1738,7 @@ mod tests {
.await;

let (actual, _) = read.expect_listen(0).await.read_until(&3).await;
let expected = vec![(((Ok("foo".to_owned())), Ok(())), 2, 1)];
let expected = vec![((("foo".to_owned()), ()), 2, 1)];
assert_eq!(actual, expected);
}

Expand Down
22 changes: 9 additions & 13 deletions src/persist-client/src/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -805,7 +805,7 @@ pub struct FetchedPart<K: Codec, V: Codec, T, D> {
diffs: Int64Array,
migration: PartMigration<K, V>,
filter_pushdown_audit: Option<LazyPartStats>,
peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
peek_stash: Option<((K, V), T, D)>,
part_cursor: usize,
key_storage: Option<K::Storage>,
val_storage: Option<V::Storage>,
Expand Down Expand Up @@ -996,7 +996,7 @@ where
&mut self,
key: &mut Option<K>,
val: &mut Option<V>,
) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
) -> Option<((K, V), T, D)> {
let mut consolidated = self.peek_stash.take();
loop {
// Fetch and decode the next tuple in the sequence. (Or break if there is none.)
Expand Down Expand Up @@ -1041,18 +1041,13 @@ where
Some((kv, t, d))
}

fn decode_kv(
&mut self,
index: usize,
key: &mut Option<K>,
val: &mut Option<V>,
) -> (Result<K, String>, Result<V, String>) {
fn decode_kv(&mut self, index: usize, key: &mut Option<K>, val: &mut Option<V>) -> (K, V) {
let decoded = self
.part
.as_ref()
.map_left(|codec| {
let ((ck, cv), _, _) = codec.get(index).expect("valid index");
Self::decode_codec(
let (k, v) = Self::decode_codec(
&*self.metrics,
self.migration.codec_read(),
ck,
Expand All @@ -1061,7 +1056,8 @@ where
val,
&mut self.key_storage,
&mut self.val_storage,
)
);
(k.expect("valid legacy key"), v.expect("valid legacy value"))
})
.map_right(|(structured_key, structured_val)| {
self.decode_structured(index, structured_key, structured_val, key, val)
Expand Down Expand Up @@ -1135,14 +1131,14 @@ where
vals: &<V::Schema as Schema<V>>::Decoder,
key: &mut Option<K>,
val: &mut Option<V>,
) -> (Result<K, String>, Result<V, String>) {
) -> (K, V) {
let mut key = key.take().unwrap_or_default();
keys.decode(idx, &mut key);

let mut val = val.take().unwrap_or_default();
vals.decode(idx, &mut val);

(Ok(key), Ok(val))
(key, val)
}
}

Expand All @@ -1153,7 +1149,7 @@ where
T: Timestamp + Lattice + Codec64,
D: Monoid + Codec64 + Send + Sync,
{
type Item = ((Result<K, String>, Result<V, String>), T, D);
type Item = ((K, V), T, D);

fn next(&mut self) -> Option<Self::Item> {
self.next_with_storage(&mut None, &mut None)
Expand Down
2 changes: 1 addition & 1 deletion src/persist-client/src/internal/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2298,7 +2298,7 @@ pub mod datadriven {
match event {
ListenEvent::Updates(x) => {
for ((k, _v), t, d) in x.iter() {
write!(s, "{} {} {}\n", k.as_ref().unwrap(), t, d);
write!(s, "{} {} {}\n", k, t, d);
}
}
ListenEvent::Progress(x) => {
Expand Down
39 changes: 12 additions & 27 deletions src/persist-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -970,10 +970,7 @@ mod tests {
.expect("client construction failed")
}

pub fn all_ok<'a, K, V, T, D, I>(
iter: I,
as_of: T,
) -> Vec<((Result<K, String>, Result<V, String>), T, D)>
pub fn all_ok<'a, K, V, T, D, I>(iter: I, as_of: T) -> Vec<((K, V), T, D)>
where
K: Ord + Clone + 'a,
V: Ord + Clone + 'a,
Expand All @@ -987,7 +984,7 @@ mod tests {
.map(|((k, v), t, d)| {
let mut t = t.clone();
t.advance_by(as_of.borrow());
((Ok(k.clone()), Ok(v.clone())), t, d.clone())
((k.clone(), v.clone()), t, d.clone())
})
.collect();
consolidate_updates(&mut ret);
Expand All @@ -999,13 +996,10 @@ mod tests {
key: &BlobKey,
metrics: &Metrics,
read_schemas: &Schemas<K, V>,
) -> (
BlobTraceBatchPart<T>,
Vec<((Result<K, String>, Result<V, String>), T, D)>,
)
) -> (BlobTraceBatchPart<T>, Vec<((K, V), T, D)>)
where
K: Codec,
V: Codec,
K: Codec + Clone,
V: Codec + Clone,
T: Timestamp + Codec64,
D: Codec64,
{
Expand All @@ -1016,22 +1010,13 @@ mod tests {
.expect("missing part");
let mut part =
BlobTraceBatchPart::decode(&value, &metrics.columnar).expect("failed to decode part");
// Ensure codec data is present even if it was not generated at write time.
let _ = part
let structured = part
.updates
.get_or_make_codec::<K, V>(&read_schemas.key, &read_schemas.val);
let mut updates = Vec::new();
// TODO(bkirwi): switch to structured data in tests
for ((k, v), t, d) in part.updates.records().expect("codec data").iter() {
updates.push((
(
K::decode(k, &read_schemas.key),
V::decode(v, &read_schemas.val),
),
T::decode(t),
D::decode(d),
));
}
.into_part::<K, V>(&*read_schemas.key, &*read_schemas.val);
let updates = structured
.decode_iter::<K, V, T, D>(&*read_schemas.key, &*read_schemas.val)
.expect("structured data")
.collect();
(part, updates)
}

Expand Down Expand Up @@ -1973,7 +1958,7 @@ mod tests {
assert_eq!(
listen_next.await,
vec![
ListenEvent::Updates(vec![((Ok("2".to_owned()), Ok("two".to_owned())), 2, 1)]),
ListenEvent::Updates(vec![(("2".to_owned(), "two".to_owned()), 2, 1)]),
ListenEvent::Progress(Antichain::from_elem(3)),
]
);
Expand Down
Loading