Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/meta/src/backup_restore/meta_snapshot_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl MetaSnapshotV2Builder {
.map_err(map_db_err)?
.into_iter()
.map_into::<PbHummockVersionDelta>()
.map(|pb_delta| HummockVersionDelta::from_persisted_protobuf(&pb_delta));
.map(HummockVersionDelta::from_persisted_protobuf_owned);
let hummock_version = {
let mut redo_state = hummock_version;
let mut max_log_id = None;
Expand Down
11 changes: 10 additions & 1 deletion src/meta/src/hummock/manager/checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ impl HummockVersionCheckpoint {
}
}

/// Convert an owned `PbHummockVersionCheckpoint` to `HummockVersionCheckpoint`,
/// moving data instead of cloning for better performance on large checkpoints.
pub fn from_protobuf_owned(checkpoint: PbHummockVersionCheckpoint) -> Self {
Self {
version: HummockVersion::from_persisted_protobuf_owned(checkpoint.version.unwrap()),
stale_objects: checkpoint.stale_objects,
}
}

pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint {
PbHummockVersionCheckpoint {
version: Some(PbHummockVersion::from(&self.version)),
Expand Down Expand Up @@ -89,7 +98,7 @@ impl HummockManager {
}
};
let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?;
Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt)))
Ok(Some(HummockVersionCheckpoint::from_protobuf_owned(ckpt)))
}

pub(super) async fn write_checkpoint(
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,7 @@ impl HummockManager {
.map(|m| {
(
m.id,
HummockVersionDelta::from_persisted_protobuf(&m.into()),
HummockVersionDelta::from_persisted_protobuf_owned(m.into()),
)
})
.collect();
Expand Down
26 changes: 15 additions & 11 deletions src/meta/src/hummock/manager/time_travel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ impl HummockManager {
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&sql_store.conn)
.await?
.map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf()))
.map(|v| {
IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf())
})
else {
return Ok(());
};
Expand Down Expand Up @@ -123,7 +125,9 @@ impl HummockManager {
.order_by_desc(hummock_time_travel_version::Column::VersionId)
.one(&txn)
.await?
.map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf()));
.map(|m| {
IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf())
});
let Some(latest_valid_version) = latest_valid_version else {
txn.commit().await?;
return Ok(());
Expand Down Expand Up @@ -197,8 +201,8 @@ impl HummockManager {
delta_id_to_delete
)))
})?;
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf(
&delta_to_delete.version_delta.to_protobuf(),
let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned(
delta_to_delete.version_delta.to_protobuf(),
);
let new_sst_ids = delta_to_delete.newly_added_sst_ids(true);
// The SST ids added and then deleted by compaction between the 2 versions.
Expand Down Expand Up @@ -226,8 +230,8 @@ impl HummockManager {
prev_version_id
)))
})?;
IncompleteHummockVersion::from_persisted_protobuf(
&prev_version.version.to_protobuf(),
IncompleteHummockVersion::from_persisted_protobuf_owned(
prev_version.version.to_protobuf(),
)
};
let sst_ids = prev_version.get_sst_ids(true);
Expand Down Expand Up @@ -352,7 +356,7 @@ impl HummockManager {
let mut next_prev_version_id = None;
while let Some(model) = version_stream.try_next().await? {
let version =
HummockVersion::from_persisted_protobuf(&model.version.to_protobuf());
HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf());
for object_id in version.get_object_ids(true) {
result.remove(&object_id);
}
Expand Down Expand Up @@ -388,8 +392,8 @@ impl HummockManager {
.await?;
let mut next_prev_version_id = None;
while let Some(model) = version_stream.try_next().await? {
let version_delta = HummockVersionDelta::from_persisted_protobuf(
&model.version_delta.to_protobuf(),
let version_delta = HummockVersionDelta::from_persisted_protobuf_owned(
model.version_delta.to_protobuf(),
);
// set exclude_table_change_log to true because in time travel delta we ignore the table change log
for object_id in version_delta.newly_added_object_ids(true) {
Expand Down Expand Up @@ -692,9 +696,9 @@ fn replay_archive(
) -> HummockVersion {
// The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively.
// Using HummockVersion make it easier for `refill_version` later.
let mut last_version = HummockVersion::from_persisted_protobuf(&version);
let mut last_version = HummockVersion::from_persisted_protobuf_owned(version);
for d in deltas {
let d = HummockVersionDelta::from_persisted_protobuf(&d);
let d = HummockVersionDelta::from_persisted_protobuf_owned(d);
debug_assert!(
!should_mark_next_time_travel_version_snapshot(&d),
"unexpected time travel delta {:?}",
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/src/meta_snapshot_v1.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl ClusterMetadata {
.zip_eq_fast(default_cf_values.into_iter())
.collect();
let hummock_version =
HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?);
HummockVersion::from_persisted_protobuf_owned(Self::decode_prost_message(&mut buf)?);
let version_stats = Self::decode_prost_message(&mut buf)?;
let compaction_groups: Vec<CompactionGroup> = Self::decode_prost_message_list(&mut buf)?;
let table_fragments: Vec<TableFragments> = Self::decode_prost_message_list(&mut buf)?;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/backup/src/meta_snapshot_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ macro_rules! define_decode_metadata {
let mut _idx = 0;
$(
if _idx == 1 {
metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?);
metadata.hummock_version = HummockVersion::from_persisted_protobuf_owned(get_1(&mut buf)?);
}
metadata.$name = get_n(&mut buf)?;
_idx += 1;
Expand Down
2 changes: 1 addition & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ fn gen_version(
vnode_part_count,
));
let committed_epoch = test_epoch(new_epoch_idx as _);
let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
id: (new_epoch_idx as u64).into(),
..Default::default()
});
Expand Down
9 changes: 9 additions & 0 deletions src/storage/hummock_sdk/src/change_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,15 @@ where
}
}

impl<T> TableChangeLogCommon<T>
where
T: From<PbSstableInfo>,
{
pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self {
Self(val.change_logs.into_iter().map(|a| a.into()).collect())
}
}

pub fn build_table_change_log_delta<'a>(
old_value_ssts: impl Iterator<Item = SstableInfo>,
new_value_ssts: impl Iterator<Item = &'a SstableInfo>,
Expand Down
79 changes: 71 additions & 8 deletions src/storage/hummock_sdk/src/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,18 @@ where
}
}

impl<T> HummockVersionCommon<T>
where
T: From<PbSstableInfo>,
PbSstableInfo: for<'a> From<&'a T>,
{
/// Convert an owned `PbHummockVersion` deserialized from persisted state to `HummockVersion`,
/// moving data instead of cloning for better performance on large checkpoints.
pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self {
pb_version.into()
}
}

impl HummockVersion {
pub fn estimated_encode_len(&self) -> usize {
self.levels.len() * size_of::<CompactionGroupId>()
Expand Down Expand Up @@ -331,6 +343,49 @@ where
}
}

impl<T> From<PbHummockVersion> for HummockVersionCommon<T>
where
T: From<PbSstableInfo>,
{
fn from(pb_version: PbHummockVersion) -> Self {
#[expect(deprecated)]
Self {
id: pb_version.id,
levels: pb_version
.levels
.into_iter()
.map(|(group_id, levels)| (group_id, LevelsCommon::from(levels)))
.collect(),
max_committed_epoch: pb_version.max_committed_epoch,
table_watermarks: pb_version
.table_watermarks
.into_iter()
.map(|(table_id, table_watermark)| {
(table_id, Arc::new(TableWatermarks::from(table_watermark)))
})
.collect(),
table_change_log: pb_version
.table_change_logs
.into_iter()
.map(|(table_id, change_log)| {
(
table_id,
TableChangeLogCommon::from_protobuf_owned(change_log),
)
})
.collect(),
state_table_info: HummockVersionStateTableInfo::from_protobuf(
&pb_version.state_table_info,
),
vector_indexes: pb_version
.vector_indexes
.into_iter()
.map(|(table_id, index)| (table_id, index.into()))
.collect(),
}
}
}

impl<T> From<&HummockVersionCommon<T>> for PbHummockVersion
where
PbSstableInfo: for<'a> From<&'a T>,
Expand Down Expand Up @@ -561,6 +616,18 @@ where
}
}

impl<T> HummockVersionDeltaCommon<T>
where
T: From<PbSstableInfo>,
PbSstableInfo: for<'a> From<&'a T>,
{
/// Convert an owned `PbHummockVersionDelta` deserialized from persisted state to
/// `HummockVersionDelta`, moving data instead of cloning.
pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self {
delta.into()
}
Comment on lines +624 to +628
Copy link

Copilot AI Mar 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

from_persisted_protobuf_owned currently just calls delta.into(), but the underlying From<PbHummockVersionDelta> conversion still clones change_log_delta (log_delta.new_log.clone() + iterating by reference). That means the new owned API won’t fully eliminate the intended cloning/memory amplification for deltas. Consider changing the From<PbHummockVersionDelta> impl to consume pb_version_delta.change_log_delta via into_iter() and convert each PbChangeLogDelta with its owned From<PbChangeLogDelta> impl (in change_log.rs), avoiding the clone() entirely.

Copilot uses AI. Check for mistakes.
}

pub trait SstableIdReader {
fn sst_id(&self) -> HummockSstableId;
}
Expand Down Expand Up @@ -807,22 +874,18 @@ where
removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(),
change_log_delta: pb_version_delta
.change_log_delta
.iter()
.into_iter()
.map(|(table_id, log_delta)| {
(
*table_id,
table_id,
ChangeLogDeltaCommon {
new_log: log_delta.new_log.clone().unwrap().into(),
new_log: log_delta.new_log.unwrap().into(),
truncate_epoch: log_delta.truncate_epoch,
},
)
})
.collect(),
state_table_info_delta: pb_version_delta
.state_table_info_delta
.iter()
.map(|(table_id, delta)| (*table_id, *delta))
.collect(),
state_table_info_delta: pb_version_delta.state_table_info_delta,
vector_index_delta: pb_version_delta
.vector_index_delta
.into_iter()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ impl HummockUploader {
}

pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion {
let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
id: epoch.into(),
..Default::default()
});
Expand Down
2 changes: 1 addition & 1 deletion src/storage/src/hummock/store/version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1387,7 +1387,7 @@ mod tests {
compaction_group_version_id: 0,
};

let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion {
let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion {
id: 1u64.into(),
..Default::default()
});
Expand Down