Skip to content

Commit c3ef955

Browse files
committed
upsert: store errors on the heap
`DataflowError` instances are heap allocated since these values are rarer and we don't want to be paying the cost of storing them for each value. The upsert operator was previosuly unpacking the general `DataflowError` into a local `UpsertError` variant that was not boxed, leading to large structs. This commit re-boxes the error variant bringing the size of `UpsertValue` down significantly. Fixes MaterializeInc/database-issues#9567 Signed-off-by: Petros Angelatos <[email protected]>
1 parent 35406f3 commit c3ef955

File tree

4 files changed

+41
-25
lines changed

4 files changed

+41
-25
lines changed

src/storage/src/render/sources.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ use crate::decode::{render_decode_cdcv2, render_decode_delimited};
4343
use crate::healthcheck::{HealthStatusMessage, StatusNamespace};
4444
use crate::source::types::{DecodeResult, SourceOutput, SourceRender};
4545
use crate::source::{self, RawSourceCreationConfig, SourceExportCreationConfig};
46-
use crate::upsert::UpsertKey;
46+
use crate::upsert::{UpsertKey, UpsertValue};
4747

4848
/// _Renders_ complete _differential_ [`Collection`]s
4949
/// that represent the final source and its errors
@@ -479,7 +479,7 @@ fn append_metadata_to_value<G: Scope, FromTime: Timestamp>(
479479
fn upsert_commands<G: Scope, FromTime: Timestamp>(
480480
input: Collection<G, DecodeResult<FromTime>, Diff>,
481481
upsert_envelope: UpsertEnvelope,
482-
) -> Collection<G, (UpsertKey, Option<Result<Row, UpsertError>>, FromTime), Diff> {
482+
) -> Collection<G, (UpsertKey, Option<UpsertValue>, FromTime), Diff> {
483483
let mut row_buf = Row::default();
484484
input.map(move |result| {
485485
let from_time = result.from_time;
@@ -493,9 +493,15 @@ fn upsert_commands<G: Scope, FromTime: Timestamp>(
493493
// If we have a well-formed key we can continue, otherwise we're upserting an error
494494
let key = match key {
495495
Ok(key) => key,
496-
err @ Err(_) => match result.value {
497-
Some(_) => return (UpsertKey::from_key(err.as_ref()), Some(err), from_time),
498-
None => return (UpsertKey::from_key(err.as_ref()), None, from_time),
496+
Err(err) => match result.value {
497+
Some(_) => {
498+
return (
499+
UpsertKey::from_key(Err(&err)),
500+
Some(Err(Box::new(err))),
501+
from_time,
502+
);
503+
}
504+
None => return (UpsertKey::from_key(Err(&err)), None, from_time),
499505
},
500506
};
501507

@@ -584,10 +590,10 @@ fn upsert_commands<G: Scope, FromTime: Timestamp>(
584590
packer.extend_by_row(&metadata);
585591
Some(Ok(row_buf.clone()))
586592
}
587-
_ => Some(Err(UpsertError::Value(UpsertValueError {
593+
_ => Some(Err(Box::new(UpsertError::Value(UpsertValueError {
588594
for_key: key_row,
589595
inner,
590-
}))),
596+
})))),
591597
}
592598
}
593599
None => None,

src/storage/src/upsert.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ pub(crate) mod rocksdb;
5858
// TODO(aljoscha): Move next to upsert module, rename to upsert_types.
5959
pub(crate) mod types;
6060

61-
pub type UpsertValue = Result<Row, UpsertError>;
61+
pub type UpsertValue = Result<Row, Box<UpsertError>>;
6262

6363
#[derive(Copy, Clone, Hash, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
6464
pub struct UpsertKey([u8; 32]);
@@ -508,7 +508,7 @@ enum DrainStyle<'a, T> {
508508
async fn drain_staged_input<S, G, T, FromTime, E>(
509509
stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
510510
commands_state: &mut indexmap::IndexMap<UpsertKey, types::UpsertValueAndSize<T, FromTime>>,
511-
output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
511+
output_updates: &mut Vec<(UpsertValue, T, Diff)>,
512512
multi_get_scratch: &mut Vec<UpsertKey>,
513513
drain_style: DrainStyle<'_, T>,
514514
error_emitter: &mut E,
@@ -691,12 +691,16 @@ where
691691
let value = match result {
692692
Ok(ok) => Ok(ok),
693693
Err(DataflowError::EnvelopeError(err)) => match *err {
694-
EnvelopeError::Upsert(err) => Err(err),
694+
EnvelopeError::Upsert(err) => Err(Box::new(err)),
695695
_ => return None,
696696
},
697697
Err(_) => return None,
698698
};
699-
Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
699+
let value_ref = match value {
700+
Ok(ref row) => Ok(row),
701+
Err(ref err) => Err(&**err),
702+
};
703+
Some((UpsertKey::from_value(value_ref, &key_indices), value))
700704
});
701705
let (output_handle, output) = builder.new_output();
702706

@@ -931,7 +935,7 @@ where
931935
(
932936
output.as_collection().map(|result| match result {
933937
Ok(ok) => Ok(ok),
934-
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
938+
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
935939
}),
936940
health_stream,
937941
snapshot_stream,

src/storage/src/upsert/types.rs

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -304,15 +304,17 @@ impl<T, O> StateValue<T, O> {
304304
/// Other implementations may use more accurate accounting.
305305
#[cfg(test)]
306306
pub fn memory_size(&self) -> usize {
307-
use std::mem::size_of_val;
307+
use mz_repr::Row;
308+
use std::mem::size_of;
309+
308310
let heap_size = match self {
309311
Self::Consolidating(Consolidating { value_xor, .. }) => value_xor.len(),
310312
Self::Value(value) => {
311313
let finalized_heap_size = match value.finalized {
312314
Some(Ok(ref row)) => {
313315
// `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
314316
// subtract it.
315-
row.byte_len() - size_of_val(&row)
317+
row.byte_len() - size_of::<Row>()
316318
}
317319
// Assume errors are rare enough to not move the needle.
318320
_ => 0,
@@ -322,7 +324,7 @@ impl<T, O> StateValue<T, O> {
322324
Some(Ok(ref row)) => {
323325
// `Row::byte_len` includes the size of `Row`, which is also in `Self`, so we
324326
// subtract it.
325-
row.byte_len() - size_of_val(&row)
327+
row.byte_len() - size_of::<Row>()
326328
}
327329
// Assume errors are rare enough to not move the needle.
328330
_ => 0,
@@ -332,7 +334,7 @@ impl<T, O> StateValue<T, O> {
332334
finalized_heap_size + provisional_heap_size
333335
}
334336
};
335-
heap_size + size_of_val(self)
337+
heap_size + size_of::<Self>()
336338
}
337339
}
338340

@@ -1366,23 +1368,23 @@ mod tests {
13661368
fn test_memory_size() {
13671369
let finalized_value: StateValue<(), ()> = StateValue::finalized_value(Ok(Row::default()));
13681370
assert!(
1369-
finalized_value.memory_size() <= 144,
1371+
finalized_value.memory_size() <= 64,
13701372
"memory size is {}",
13711373
finalized_value.memory_size(),
13721374
);
13731375

13741376
let provisional_value_with_finalized_value: StateValue<(), ()> =
13751377
finalized_value.into_provisional_value(Ok(Row::default()), (), ());
13761378
assert!(
1377-
provisional_value_with_finalized_value.memory_size() <= 168,
1379+
provisional_value_with_finalized_value.memory_size() <= 64,
13781380
"memory size is {}",
13791381
provisional_value_with_finalized_value.memory_size(),
13801382
);
13811383

13821384
let provisional_value_without_finalized_value: StateValue<(), ()> =
13831385
StateValue::new_provisional_value(Ok(Row::default()), (), ());
13841386
assert!(
1385-
provisional_value_without_finalized_value.memory_size() <= 144,
1387+
provisional_value_without_finalized_value.memory_size() <= 64,
13861388
"memory size is {}",
13871389
provisional_value_without_finalized_value.memory_size(),
13881390
);
@@ -1395,7 +1397,7 @@ mod tests {
13951397
&mut Vec::new(),
13961398
);
13971399
assert!(
1398-
consolidating_value.memory_size() <= 146,
1400+
consolidating_value.memory_size() <= 66,
13991401
"memory size is {}",
14001402
consolidating_value.memory_size(),
14011403
);

src/storage/src/upsert_continual_feedback.rs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use indexmap::map::Entry;
2020
use itertools::Itertools;
2121
use mz_ore::vec::VecExt;
2222
use mz_repr::{Diff, GlobalId, Row};
23-
use mz_storage_types::errors::{DataflowError, EnvelopeError, UpsertError};
23+
use mz_storage_types::errors::{DataflowError, EnvelopeError};
2424
use mz_timely_util::builder_async::{
2525
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
2626
};
@@ -132,12 +132,16 @@ where
132132
let value = match result {
133133
Ok(ok) => Ok(ok),
134134
Err(DataflowError::EnvelopeError(err)) => match *err {
135-
EnvelopeError::Upsert(err) => Err(err),
135+
EnvelopeError::Upsert(err) => Err(Box::new(err)),
136136
_ => return None,
137137
},
138138
Err(_) => return None,
139139
};
140-
Some((UpsertKey::from_value(value.as_ref(), &key_indices), value))
140+
let value_ref = match value {
141+
Ok(ref row) => Ok(row),
142+
Err(ref err) => Err(&**err),
143+
};
144+
Some((UpsertKey::from_value(value_ref, &key_indices), value))
141145
});
142146
let (output_handle, output) = builder.new_output();
143147

@@ -536,7 +540,7 @@ where
536540
(
537541
output.as_collection().map(|result| match result {
538542
Ok(ok) => Ok(ok),
539-
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(err))),
543+
Err(err) => Err(DataflowError::from(EnvelopeError::Upsert(*err))),
540544
}),
541545
health_stream,
542546
snapshot_stream,
@@ -599,7 +603,7 @@ enum DrainStyle<'a, T> {
599603
async fn drain_staged_input<S, G, T, FromTime, E>(
600604
stash: &mut Vec<(T, UpsertKey, Reverse<FromTime>, Option<UpsertValue>)>,
601605
commands_state: &mut indexmap::IndexMap<UpsertKey, UpsertValueAndSize<T, FromTime>>,
602-
output_updates: &mut Vec<(Result<Row, UpsertError>, T, Diff)>,
606+
output_updates: &mut Vec<(UpsertValue, T, Diff)>,
603607
multi_get_scratch: &mut Vec<UpsertKey>,
604608
drain_style: DrainStyle<'_, T>,
605609
error_emitter: &mut E,

0 commit comments

Comments
 (0)