Skip to content

Commit 9b1946e

Browse files
authored
Merge pull request #33299 from petrosagg/refactor-upsert-value
upsert: refactor `UpsertValue` struct and box error to reduce memory usage
2 parents 2e9c047 + c3ef955 commit 9b1946e

File tree

5 files changed

+150
-186
lines changed

5 files changed

+150
-186
lines changed

src/storage/src/render/sources.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::decode::{render_decode_cdcv2, render_decode_delimited};
4343
use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
4444
use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
4545
use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
46-
use crate::upsert::UpsertKey;
46+
use crate::upsert::{UpsertKey, UpsertValue};
4747

4848
/// _Renders_ complete _differential_ [`Collection`]s
4949
/// that represent the final source and its errors
@@ -479,7 +479,7 @@ fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
479479
fn upsert_commands<G: Scope, FromTime: Timestamp>(
480480
input: Collection<G, DecodeResult<FromTime>, Diff>,
481481
upsert_envelope: UpsertEnvelope,
482-
) -> Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff> {
482+
) -> Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
483483
let mut row_buf = Row::default();
484484
input.map(move |result| {
485485
let from_time = result.from_time;
@@ -493,9 +493,15 @@ fn upsert_commands<G: Scope, FromTime: Timestamp>(
493493
// If we have a well-formed key we can continue, otherwise we're upserting an error
494494
let key = match key {
495495
Ok(key) => key,
496-
err @ Err(_) => match result.value {
497-
Some(_) => return (UpsertKey::from_key(err.as_ref()), Some(err), from_time),
498-
None => return (UpsertKey::from_key(err.as_ref()), None, from_time),
496+
Err(err) => match result.value {
497+
Some(_) => {
498+
return (
499+
UpsertKey::from_key(Err(&err)),
500+
Some(Err(Box::new(err))),
501+
from_time,
502+
);
503+
}
504+
None => return (UpsertKey::from_key(Err(&err)), None, from_time),
499505
},
500506
};
501507

@@ -584,10 +590,10 @@ fn upsert_commands<G: Scope, FromTime: Timestamp>(
584590
packer.extend_by_row(&metadata);
585591
Some(Ok(row_buf.clone()))
586592
}
587-
_ => Some(Err(UpsertError::Value(UpsertValueError {
593+
_ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
588594
for_key: key_row,
589595
inner,
590-
}))),
596+
})))),
591597
}
592598
}
593599
None => None,

src/storage/src/upsert.rs

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ use crate::metrics::upsert::UpsertMetrics;
4848
use crate::storage_state::StorageInstanceContext;
4949
use crate::upsert_continual_feedback;
5050
use types::{
51-
BincodeOpts, StateValue, UpsertState, UpsertStateBackend, Value, consolidating_merge_function,
51+
BincodeOpts, StateValue, UpsertState, UpsertStateBackend, consolidating_merge_function,
5252
upsert_bincode_opts,
5353
};
5454

@@ -58,7 +58,7 @@ pub(crate) mod rocksdb;
5858
// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
5959
pub(crate) mod types;
6060

61-
pub type UpsertValue = Result<Row, UpsertError>;
61+
pub type UpsertValue = Result<Row, Box<UpsertError>>;
6262

6363
#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
6464
pub struct UpsertKey([u8; 32]);
@@ -508,7 +508,7 @@ enum DrainStyle<'a, T> {
508508
async fn drain_staged_input<S, G, T, FromTime, E>(
509509
stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
510510
commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
511-
output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
511+
output_updates: &mut Vec<(UpsertValue, T, Diff)>,
512512
multi_get_scratch: &mut Vec<UpsertKey>,
513513
drain_style: DrainStyle<'_, T>,
514514
error_emitter: &mut E,
@@ -607,15 +607,15 @@ async fn drain_staged_input<S, G, T, FromTime, E>(
607607
if let Some(old_value) =
608608
existing_value.replace(StateValue::finalized_value(value.clone()))
609609
{
610-
if let Value::FinalizedValue(old_value) = old_value.into_decoded() {
610+
if let Some(old_value) = old_value.into_decoded().finalized {
611611
output_updates.push((old_value, ts.clone(), Diff::MINUS_ONE));
612612
}
613613
}
614614
output_updates.push((value, ts, Diff::ONE));
615615
}
616616
None => {
617617
if let Some(old_value) = existing_value.take() {
618-
if let Value::FinalizedValue(old_value) = old_value.into_decoded() {
618+
if let Some(old_value) = old_value.into_decoded().finalized {
619619
output_updates.push((old_value, ts, Diff::MINUS_ONE));
620620
}
621621
}
@@ -691,12 +691,16 @@ where
691691
let value = match result {
692692
Ok(ok) => Ok(ok),
693693
Err(DataflowError::EnvelopeError(err)) => match *err {
694-
EnvelopeError::Upsert(err) => Err(err),
694+
EnvelopeError::Upsert(err) => Err(Box::new(err)),
695695
_ => return None,
696696
},
697697
Err(_) => return None,
698698
};
699-
Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
699+
let value_ref = match value {
700+
Ok(ref row) => Ok(row),
701+
Err(ref err) => Err(&**err),
702+
};
703+
Some((UpsertKey::from_value(value_ref, &key_indices), value))
700704
});
701705
let (output_handle, output) = builder.new_output();
702706

@@ -931,7 +935,7 @@ where
931935
(
932936
output.as_collection().map(|result| match result {
933937
Ok(ok) => Ok(ok),
934-
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
938+
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
935939
}),
936940
health_stream,
937941
snapshot_stream,

src/storage/src/upsert/memory.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use std::collections::HashMap;
1818

1919
use itertools::Itertools;
20+
use mz_ore::cast::CastFrom;
2021

2122
use super::UpsertKey;
2223
use super::types::{
@@ -93,7 +94,7 @@ where
9394
stats.processed_gets += 1;
9495
let value = self.state.get(&key).cloned();
9596
let metadata = value.as_ref().map(|v| ValueMetadata {
96-
size: v.memory_size(),
97+
size: u64::cast_from(v.memory_size()),
9798
is_tombstone: v.is_tombstone(),
9899
});
99100
stats.processed_gets_size += metadata.map_or(0, |m| m.size);

0 commit comments

Comments
 (0)