diff --git a/src/meta/src/backup_restore/meta_snapshot_builder.rs b/src/meta/src/backup_restore/meta_snapshot_builder.rs index 977fff21fa36b..5e17e23f7f37b 100644 --- a/src/meta/src/backup_restore/meta_snapshot_builder.rs +++ b/src/meta/src/backup_restore/meta_snapshot_builder.rs @@ -90,7 +90,7 @@ impl MetaSnapshotV2Builder { .map_err(map_db_err)? .into_iter() .map_into::() - .map(|pb_delta| HummockVersionDelta::from_persisted_protobuf(&pb_delta)); + .map(HummockVersionDelta::from_persisted_protobuf_owned); let hummock_version = { let mut redo_state = hummock_version; let mut max_log_id = None; diff --git a/src/meta/src/hummock/manager/checkpoint.rs b/src/meta/src/hummock/manager/checkpoint.rs index b759f009387be..bf0d54542f15f 100644 --- a/src/meta/src/hummock/manager/checkpoint.rs +++ b/src/meta/src/hummock/manager/checkpoint.rs @@ -57,6 +57,15 @@ impl HummockVersionCheckpoint { } } + /// Convert an owned `PbHummockVersionCheckpoint` to `HummockVersionCheckpoint`, + /// moving data instead of cloning for better performance on large checkpoints. + pub fn from_protobuf_owned(checkpoint: PbHummockVersionCheckpoint) -> Self { + Self { + version: HummockVersion::from_persisted_protobuf_owned(checkpoint.version.unwrap()), + stale_objects: checkpoint.stale_objects, + } + } + pub fn to_protobuf(&self) -> PbHummockVersionCheckpoint { PbHummockVersionCheckpoint { version: Some(PbHummockVersion::from(&self.version)), @@ -89,7 +98,7 @@ impl HummockManager { } }; let ckpt = PbHummockVersionCheckpoint::decode(data).map_err(|e| anyhow::anyhow!(e))?; - Ok(Some(HummockVersionCheckpoint::from_protobuf(&ckpt))) + Ok(Some(HummockVersionCheckpoint::from_protobuf_owned(ckpt))) } pub(super) async fn write_checkpoint( diff --git a/src/meta/src/hummock/manager/mod.rs b/src/meta/src/hummock/manager/mod.rs index cb03cd0d96482..fbaf04e9d8b58 100644 --- a/src/meta/src/hummock/manager/mod.rs +++ b/src/meta/src/hummock/manager/mod.rs @@ -417,7 +417,7 @@ impl HummockManager { .map(|m| { ( m.id, - HummockVersionDelta::from_persisted_protobuf(&m.into()), + HummockVersionDelta::from_persisted_protobuf_owned(m.into()), ) }) .collect(); diff --git a/src/meta/src/hummock/manager/time_travel.rs b/src/meta/src/hummock/manager/time_travel.rs index 1dc0eb1c49c17..bc232fe7a7433 100644 --- a/src/meta/src/hummock/manager/time_travel.rs +++ b/src/meta/src/hummock/manager/time_travel.rs @@ -54,7 +54,9 @@ impl HummockManager { .order_by_desc(hummock_time_travel_version::Column::VersionId) .one(&sql_store.conn) .await? - .map(|v| IncompleteHummockVersion::from_persisted_protobuf(&v.version.to_protobuf())) + .map(|v| { + IncompleteHummockVersion::from_persisted_protobuf_owned(v.version.to_protobuf()) + }) else { return Ok(()); }; @@ -123,7 +125,9 @@ impl HummockManager { .order_by_desc(hummock_time_travel_version::Column::VersionId) .one(&txn) .await? - .map(|m| IncompleteHummockVersion::from_persisted_protobuf(&m.version.to_protobuf())); + .map(|m| { + IncompleteHummockVersion::from_persisted_protobuf_owned(m.version.to_protobuf()) + }); let Some(latest_valid_version) = latest_valid_version else { txn.commit().await?; return Ok(()); @@ -197,8 +201,8 @@ impl HummockManager { delta_id_to_delete ))) })?; - let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf( - &delta_to_delete.version_delta.to_protobuf(), + let delta_to_delete = IncompleteHummockVersionDelta::from_persisted_protobuf_owned( + delta_to_delete.version_delta.to_protobuf(), ); let new_sst_ids = delta_to_delete.newly_added_sst_ids(true); // The SST ids added and then deleted by compaction between the 2 versions. @@ -226,8 +230,8 @@ impl HummockManager { prev_version_id ))) })?; - IncompleteHummockVersion::from_persisted_protobuf( - &prev_version.version.to_protobuf(), + IncompleteHummockVersion::from_persisted_protobuf_owned( + prev_version.version.to_protobuf(), ) }; let sst_ids = prev_version.get_sst_ids(true); @@ -352,7 +356,7 @@ impl HummockManager { let mut next_prev_version_id = None; while let Some(model) = version_stream.try_next().await? { let version = - HummockVersion::from_persisted_protobuf(&model.version.to_protobuf()); + HummockVersion::from_persisted_protobuf_owned(model.version.to_protobuf()); for object_id in version.get_object_ids(true) { result.remove(&object_id); } @@ -388,8 +392,8 @@ impl HummockManager { .await?; let mut next_prev_version_id = None; while let Some(model) = version_stream.try_next().await? { - let version_delta = HummockVersionDelta::from_persisted_protobuf( - &model.version_delta.to_protobuf(), + let version_delta = HummockVersionDelta::from_persisted_protobuf_owned( + model.version_delta.to_protobuf(), ); // set exclude_table_change_log to true because in time travel delta we ignore the table change log for object_id in version_delta.newly_added_object_ids(true) { @@ -692,9 +696,9 @@ fn replay_archive( ) -> HummockVersion { // The pb version ann pb version delta are actually written by InHummockVersion and InHummockVersionDelta, respectively. // Using HummockVersion make it easier for `refill_version` later. - let mut last_version = HummockVersion::from_persisted_protobuf(&version); + let mut last_version = HummockVersion::from_persisted_protobuf_owned(version); for d in deltas { - let d = HummockVersionDelta::from_persisted_protobuf(&d); + let d = HummockVersionDelta::from_persisted_protobuf_owned(d); debug_assert!( !should_mark_next_time_travel_version_snapshot(&d), "unexpected time travel delta {:?}", diff --git a/src/storage/backup/src/meta_snapshot_v1.rs b/src/storage/backup/src/meta_snapshot_v1.rs index 6e2494e19c540..fd43e50160e6b 100644 --- a/src/storage/backup/src/meta_snapshot_v1.rs +++ b/src/storage/backup/src/meta_snapshot_v1.rs @@ -170,7 +170,7 @@ impl ClusterMetadata { .zip_eq_fast(default_cf_values.into_iter()) .collect(); let hummock_version = - HummockVersion::from_persisted_protobuf(&Self::decode_prost_message(&mut buf)?); + HummockVersion::from_persisted_protobuf_owned(Self::decode_prost_message(&mut buf)?); let version_stats = Self::decode_prost_message(&mut buf)?; let compaction_groups: Vec = Self::decode_prost_message_list(&mut buf)?; let table_fragments: Vec = Self::decode_prost_message_list(&mut buf)?; diff --git a/src/storage/backup/src/meta_snapshot_v2.rs b/src/storage/backup/src/meta_snapshot_v2.rs index f2cdc30369943..cc767162ef351 100644 --- a/src/storage/backup/src/meta_snapshot_v2.rs +++ b/src/storage/backup/src/meta_snapshot_v2.rs @@ -119,7 +119,7 @@ macro_rules! define_decode_metadata { let mut _idx = 0; $( if _idx == 1 { - metadata.hummock_version = HummockVersion::from_persisted_protobuf(&get_1(&mut buf)?); + metadata.hummock_version = HummockVersion::from_persisted_protobuf_owned(get_1(&mut buf)?); } metadata.$name = get_n(&mut buf)?; _idx += 1; diff --git a/src/storage/benches/bench_table_watermarks.rs b/src/storage/benches/bench_table_watermarks.rs index f77b542e37206..1fb5b42e3c2ee 100644 --- a/src/storage/benches/bench_table_watermarks.rs +++ b/src/storage/benches/bench_table_watermarks.rs @@ -117,7 +117,7 @@ fn gen_version( vnode_part_count, )); let committed_epoch = test_epoch(new_epoch_idx as _); - let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { + let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion { id: (new_epoch_idx as u64).into(), ..Default::default() }); diff --git a/src/storage/hummock_sdk/src/change_log.rs b/src/storage/hummock_sdk/src/change_log.rs index 627288e88638a..d9fb09c51a309 100644 --- a/src/storage/hummock_sdk/src/change_log.rs +++ b/src/storage/hummock_sdk/src/change_log.rs @@ -270,6 +270,15 @@ where } } +impl TableChangeLogCommon +where + T: From, +{ + pub fn from_protobuf_owned(val: PbTableChangeLog) -> Self { + Self(val.change_logs.into_iter().map(|a| a.into()).collect()) + } +} + pub fn build_table_change_log_delta<'a>( old_value_ssts: impl Iterator, new_value_ssts: impl Iterator, diff --git a/src/storage/hummock_sdk/src/version.rs b/src/storage/hummock_sdk/src/version.rs index fb444a164bab4..665096a747c90 100644 --- a/src/storage/hummock_sdk/src/version.rs +++ b/src/storage/hummock_sdk/src/version.rs @@ -259,6 +259,18 @@ where } } +impl HummockVersionCommon +where + T: From, + PbSstableInfo: for<'a> From<&'a T>, +{ + /// Convert an owned `PbHummockVersion` deserialized from persisted state to `HummockVersion`, + /// moving data instead of cloning for better performance on large checkpoints. + pub fn from_persisted_protobuf_owned(pb_version: PbHummockVersion) -> Self { + pb_version.into() + } +} + impl HummockVersion { pub fn estimated_encode_len(&self) -> usize { self.levels.len() * size_of::() @@ -331,6 +343,49 @@ where } } +impl From for HummockVersionCommon +where + T: From, +{ + fn from(pb_version: PbHummockVersion) -> Self { + #[expect(deprecated)] + Self { + id: pb_version.id, + levels: pb_version + .levels + .into_iter() + .map(|(group_id, levels)| (group_id, LevelsCommon::from(levels))) + .collect(), + max_committed_epoch: pb_version.max_committed_epoch, + table_watermarks: pb_version + .table_watermarks + .into_iter() + .map(|(table_id, table_watermark)| { + (table_id, Arc::new(TableWatermarks::from(table_watermark))) + }) + .collect(), + table_change_log: pb_version + .table_change_logs + .into_iter() + .map(|(table_id, change_log)| { + ( + table_id, + TableChangeLogCommon::from_protobuf_owned(change_log), + ) + }) + .collect(), + state_table_info: HummockVersionStateTableInfo::from_protobuf( + &pb_version.state_table_info, + ), + vector_indexes: pb_version + .vector_indexes + .into_iter() + .map(|(table_id, index)| (table_id, index.into())) + .collect(), + } + } +} + impl From<&HummockVersionCommon> for PbHummockVersion where PbSstableInfo: for<'a> From<&'a T>, @@ -561,6 +616,18 @@ where } } +impl HummockVersionDeltaCommon +where + T: From, + PbSstableInfo: for<'a> From<&'a T>, +{ + /// Convert an owned `PbHummockVersionDelta` deserialized from persisted state to + /// `HummockVersionDelta`, moving data instead of cloning. + pub fn from_persisted_protobuf_owned(delta: PbHummockVersionDelta) -> Self { + delta.into() + } +} + pub trait SstableIdReader { fn sst_id(&self) -> HummockSstableId; } @@ -807,22 +874,18 @@ where removed_table_ids: pb_version_delta.removed_table_ids.into_iter().collect(), change_log_delta: pb_version_delta .change_log_delta - .iter() + .into_iter() .map(|(table_id, log_delta)| { ( - *table_id, + table_id, ChangeLogDeltaCommon { - new_log: log_delta.new_log.clone().unwrap().into(), + new_log: log_delta.new_log.unwrap().into(), truncate_epoch: log_delta.truncate_epoch, }, ) }) .collect(), - state_table_info_delta: pb_version_delta - .state_table_info_delta - .iter() - .map(|(table_id, delta)| (*table_id, *delta)) - .collect(), + state_table_info_delta: pb_version_delta.state_table_info_delta, vector_index_delta: pb_version_delta .vector_index_delta .into_iter() diff --git a/src/storage/src/hummock/event_handler/uploader/test_utils.rs b/src/storage/src/hummock/event_handler/uploader/test_utils.rs index afb377f6a1d86..e19fbf57ed390 100644 --- a/src/storage/src/hummock/event_handler/uploader/test_utils.rs +++ b/src/storage/src/hummock/event_handler/uploader/test_utils.rs @@ -92,7 +92,7 @@ impl HummockUploader { } pub(super) fn test_hummock_version(epoch: HummockEpoch) -> HummockVersion { - let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { + let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion { id: epoch.into(), ..Default::default() }); diff --git a/src/storage/src/hummock/store/version.rs b/src/storage/src/hummock/store/version.rs index 809ccef76c3f4..c2b7a6d685ecf 100644 --- a/src/storage/src/hummock/store/version.rs +++ b/src/storage/src/hummock/store/version.rs @@ -1387,7 +1387,7 @@ mod tests { compaction_group_version_id: 0, }; - let mut version = HummockVersion::from_persisted_protobuf(&PbHummockVersion { + let mut version = HummockVersion::from_persisted_protobuf_owned(PbHummockVersion { id: 1u64.into(), ..Default::default() });