Skip to content

Commit b4a42fd

Browse files
committed
refactor(hummock): add owned from_persisted_protobuf to reduce memory amplification
- Add From<PbHummockVersion> for HummockVersionCommon<T> (owned) to move data instead of cloning during deserialization - Add from_persisted_protobuf_owned and from_protobuf_owned methods for HummockVersion, HummockVersionDelta, and HummockVersionCheckpoint - Add TableChangeLogCommon::from_protobuf_owned for owned change log conversion - Update all callers that pass owned/temporary protobuf values to use the new owned variants, eliminating unnecessary clones of key_range, table_ids, and stale_objects across thousands of SstableInfos
1 parent 842fdcc commit b4a42fd

File tree

11 files changed

+112
-27
lines changed

11 files changed

+112
-27
lines changed

src/meta/src/backup_restore/meta_snapshot_builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ impl MetaSnapshotV2Builder {
9090
.map_err(map_db_err)?
9191
.into_iter()
9292
.map_into::<PbHummockVersionDelta>()
93-
.map(|pb_delta| HummockVersionDelta::from_persisted_protobuf(&pb_delta));
93+
.map(HummockVersionDelta::from_persisted_protobuf_owned);
9494
let hummock_version = {
9595
let mut redo_state = hummock_version;
9696
let mut max_log_id = None;

src/meta/src/hummock/manager/checkpoint.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ impl HummockVersionCheckpoint {
5757
}
5858
}
5959

60+
/// Convert an owned `PbHummockVersionCheckpoint` to `HummockVersionCheckpoint`,
61+
/// moving data instead of cloning for better performance on large checkpoints.
62+
pub fn from_protobuf_owned(checkpoint: PbHummockVersionCheckpoint) -> Self {
63+
Self {
64+
version: HummockVersion::from_persisted_protobuf_owned(checkpoint.version.unwrap()),
65+
stale_objects: checkpoint.stale_objects,
66+
}
67+
}
68+
6069
pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
6170
PbHummockVersionCheckpoint {
6271
version: Some(PbHummockVersion::from(&self.version)),
@@ -89,7 +98,7 @@ impl HummockManager {
8998
}
9099
};
91100
let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
92-
Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
101+
Ok(Some(HummockVersionCheckpoint::from_protobuf_owned(ckpt)))
93102
}
94103

95104
pub(super) async fn write_checkpoint(

src/meta/src/hummock/manager/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,7 @@ impl HummockManager {
417417
.map(|m| {
418418
(
419419
m.id,
420-
HummockVersionDelta::from_persisted_protobuf(&m.into()),
420+
HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
421421
)
422422
})
423423
.collect();

src/meta/src/hummock/manager/time_travel.rs

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ impl HummockManager {
5454
.order_by_desc(hummock_time_travel_version::Column::VersionId)
5555
.one(&sql_store.conn)
5656
.await?
57-
.map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
57+
.map(|v| {
58+
IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
59+
})
5860
else {
5961
return Ok(());
6062
};
@@ -123,7 +125,9 @@ impl HummockManager {
123125
.order_by_desc(hummock_time_travel_version::Column::VersionId)
124126
.one(&txn)
125127
.await?
126-
.map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
128+
.map(|m| {
129+
IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
130+
});
127131
let Some(latest_valid_version) = latest_valid_version else {
128132
txn.commit().await?;
129133
return Ok(());
@@ -197,8 +201,8 @@ impl HummockManager {
197201
delta_id_to_delete
198202
)))
199203
})?;
200-
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
201-
&delta_to_delete.version_delta.to_protobuf(),
204+
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
205+
delta_to_delete.version_delta.to_protobuf(),
202206
);
203207
let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
204208
// The SST ids added and then deleted by compaction between the 2 versions.
@@ -226,8 +230,8 @@ impl HummockManager {
226230
prev_version_id
227231
)))
228232
})?;
229-
IncompleteHummockVersion::from_persisted_protobuf(
230-
&prev_version.version.to_protobuf(),
233+
IncompleteHummockVersion::from_persisted_protobuf_owned(
234+
prev_version.version.to_protobuf(),
231235
)
232236
};
233237
let sst_ids = prev_version.get_sst_ids(true);
@@ -352,7 +356,7 @@ impl HummockManager {
352356
let mut next_prev_version_id = None;
353357
while let Some(model) = version_stream.try_next().await? {
354358
let version =
355-
HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
359+
HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
356360
for object_id in version.get_object_ids(true) {
357361
result.remove(&object_id);
358362
}
@@ -388,8 +392,8 @@ impl HummockManager {
388392
.await?;
389393
let mut next_prev_version_id = None;
390394
while let Some(model) = version_stream.try_next().await? {
391-
let version_delta = HummockVersionDelta::from_persisted_protobuf(
392-
&model.version_delta.to_protobuf(),
395+
let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
396+
model.version_delta.to_protobuf(),
393397
);
394398
// set exclude_table_change_log to true because in time travel delta we ignore the table change log
395399
for object_id in version_delta.newly_added_object_ids(true) {
@@ -692,9 +696,9 @@ fn replay_archive(
692696
) -> HummockVersion {
693697
// The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
694698
// Using HummockVersion make it easier for `refill_version` later.
695-
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
699+
let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
696700
for d in deltas {
697-
let d = HummockVersionDelta::from_persisted_protobuf(&d);
701+
let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
698702
debug_assert!(
699703
!should_mark_next_time_travel_version_snapshot(&d),
700704
"unexpected time travel delta {:?}",

src/storage/backup/src/meta_snapshot_v1.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ impl ClusterMetadata {
170170
.zip_eq_fast(default_cf_values.into_iter())
171171
.collect();
172172
let hummock_version =
173-
HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?);
173+
HummockVersion::from_persisted_protobuf_owned(Self::decode_prost_message(&mut buf)?);
174174
let version_stats = Self::decode_prost_message(&mut buf)?;
175175
let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
176176
let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;

src/storage/backup/src/meta_snapshot_v2.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ macro_rules! define_decode_metadata {
119119
let mut _idx = 0;
120120
$(
121121
if _idx == 1 {
122-
metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?);
122+
metadata.hummock_version = HummockVersion::from_persisted_protobuf_owned(get_1(&mut buf)?);
123123
}
124124
metadata.$name = get_n(&mut buf)?;
125125
_idx += 1;

src/storage/benches/bench_table_watermarks.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ fn gen_version(
117117
vnode_part_count,
118118
));
119119
let committed_epoch = test_epoch(new_epoch_idx as _);
120-
let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
120+
let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
121121
id: (new_epoch_idx as u64).into(),
122122
..Default::default()
123123
});

src/storage/hummock_sdk/src/change_log.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,15 @@ where
270270
}
271271
}
272272

273+
impl<T> TableChangeLogCommon<T>
274+
where
275+
T: From<PbSstableInfo>,
276+
{
277+
pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self {
278+
Self(val.change_logs.into_iter().map(|a| a.into()).collect())
279+
}
280+
}
281+
273282
pub fn build_table_change_log_delta<'a>(
274283
old_value_ssts: impl Iterator<Item = SstableInfo>,
275284
new_value_ssts: impl Iterator<Item = &'a SstableInfo>,

src/storage/hummock_sdk/src/version.rs

Lines changed: 71 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,18 @@ where
259259
}
260260
}
261261

262+
impl<T> HummockVersionCommon<T>
263+
where
264+
T: From<PbSstableInfo>,
265+
PbSstableInfo: for<'a> From<&'a T>,
266+
{
267+
/// Convert an owned `PbHummockVersion` deserialized from persisted state to `HummockVersion`,
268+
/// moving data instead of cloning for better performance on large checkpoints.
269+
pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self {
270+
pb_version.into()
271+
}
272+
}
273+
262274
impl HummockVersion {
263275
pub fn estimated_encode_len(&self) -> usize {
264276
self.levels.len() * size_of::<CompactionGroupId>()
@@ -331,6 +343,49 @@ where
331343
}
332344
}
333345

346+
impl<T> From<PbHummockVersion> for HummockVersionCommon<T>
347+
where
348+
T: From<PbSstableInfo>,
349+
{
350+
fn from(pb_version: PbHummockVersion) -> Self {
351+
#[expect(deprecated)]
352+
Self {
353+
id: pb_version.id,
354+
levels: pb_version
355+
.levels
356+
.into_iter()
357+
.map(|(group_id, levels)| (group_id, LevelsCommon::from(levels)))
358+
.collect(),
359+
max_committed_epoch: pb_version.max_committed_epoch,
360+
table_watermarks: pb_version
361+
.table_watermarks
362+
.into_iter()
363+
.map(|(table_id, table_watermark)| {
364+
(table_id, Arc::new(TableWatermarks::from(table_watermark)))
365+
})
366+
.collect(),
367+
table_change_log: pb_version
368+
.table_change_logs
369+
.into_iter()
370+
.map(|(table_id, change_log)| {
371+
(
372+
table_id,
373+
TableChangeLogCommon::from_protobuf_owned(change_log),
374+
)
375+
})
376+
.collect(),
377+
state_table_info: HummockVersionStateTableInfo::from_protobuf(
378+
&pb_version.state_table_info,
379+
),
380+
vector_indexes: pb_version
381+
.vector_indexes
382+
.into_iter()
383+
.map(|(table_id, index)| (table_id, index.into()))
384+
.collect(),
385+
}
386+
}
387+
}
388+
334389
impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
335390
where
336391
PbSstableInfo: for<'a> From<&'a T>,
@@ -561,6 +616,18 @@ where
561616
}
562617
}
563618

619+
impl<T> HummockVersionDeltaCommon<T>
620+
where
621+
T: From<PbSstableInfo>,
622+
PbSstableInfo: for<'a> From<&'a T>,
623+
{
624+
/// Convert an owned `PbHummockVersionDelta` deserialized from persisted state to
625+
/// `HummockVersionDelta`, moving data instead of cloning.
626+
pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self {
627+
delta.into()
628+
}
629+
}
630+
564631
pub trait SstableIdReader {
565632
fn sst_id(&self) -> HummockSstableId;
566633
}
@@ -807,22 +874,18 @@ where
807874
removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
808875
change_log_delta: pb_version_delta
809876
.change_log_delta
810-
.iter()
877+
.into_iter()
811878
.map(|(table_id, log_delta)| {
812879
(
813-
*table_id,
880+
table_id,
814881
ChangeLogDeltaCommon {
815-
new_log: log_delta.new_log.clone().unwrap().into(),
882+
new_log: log_delta.new_log.unwrap().into(),
816883
truncate_epoch: log_delta.truncate_epoch,
817884
},
818885
)
819886
})
820887
.collect(),
821-
state_table_info_delta: pb_version_delta
822-
.state_table_info_delta
823-
.iter()
824-
.map(|(table_id, delta)| (*table_id, *delta))
825-
.collect(),
888+
state_table_info_delta: pb_version_delta.state_table_info_delta,
826889
vector_index_delta: pb_version_delta
827890
.vector_index_delta
828891
.into_iter()

src/storage/src/hummock/event_handler/uploader/test_utils.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ impl HummockUploader {
9292
}
9393

9494
pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion {
95-
let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
95+
let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
9696
id: epoch.into(),
9797
..Default::default()
9898
});

0 commit comments

Comments
 (0)