Skip to content

Commit 0de16c6

Browse files
authored
fix: deserialization error of AdditionalStatsMeta.location (#18618)
* Fix deserialization error of AdditionalStatsMeta.location * fix * fix * fix test
1 parent 3e86955 commit 0de16c6

File tree

11 files changed

+37
-45
lines changed

11 files changed

+37
-45
lines changed

src/query/ee/src/storages/fuse/operations/table_index.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,8 @@ pub async fn do_refresh_table_index(
145145
put_cache: false,
146146
})
147147
.await?;
148-
let stats = match &segment_info.summary.additional_stats_meta {
149-
Some(meta) if meta.location.is_some() => {
150-
Some(read_segment_stats(operator.clone(), meta.location.clone().unwrap()).await?)
151-
}
148+
let stats = match segment_info.summary.additional_stats_loc() {
149+
Some(loc) => Some(read_segment_stats(operator.clone(), loc).await?),
152150
_ => None,
153151
};
154152

src/query/ee/src/storages/fuse/operations/virtual_columns.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -136,10 +136,8 @@ pub async fn do_refresh_virtual_column(
136136
put_cache: false,
137137
})
138138
.await?;
139-
let stats = match &segment_info.summary.additional_stats_meta {
140-
Some(meta) if meta.location.is_some() => {
141-
Some(read_segment_stats(operator.clone(), meta.location.clone().unwrap()).await?)
142-
}
139+
let stats = match segment_info.summary.additional_stats_loc() {
140+
Some(loc) => Some(read_segment_stats(operator.clone(), loc).await?),
143141
_ => None,
144142
};
145143

src/query/service/src/test_kits/fuse.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ pub async fn generate_segments_v2(
130130
TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location(&location);
131131
let additional_stats_meta = AdditionalStatsMeta {
132132
size: stats.len() as u64,
133-
location: Some((stats_location.clone(), SegmentStatistics::VERSION)),
133+
location: (stats_location.clone(), SegmentStatistics::VERSION),
134134
..Default::default()
135135
};
136136
dal.write(&stats_location, stats).await?;
@@ -173,7 +173,7 @@ pub async fn generate_segments(
173173
TableMetaLocationGenerator::gen_segment_stats_location_from_segment_location(&location);
174174
let additional_stats_meta = AdditionalStatsMeta {
175175
size: stats.len() as u64,
176-
location: Some((stats_location.clone(), SegmentStatistics::VERSION)),
176+
location: (stats_location.clone(), SegmentStatistics::VERSION),
177177
..Default::default()
178178
};
179179
dal.write(&stats_location, stats).await?;

src/query/storages/common/table_meta/src/meta/v2/statistics.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,14 +76,19 @@ pub struct AdditionalStatsMeta {
7676
/// The size of the stats data in bytes.
7777
pub size: u64,
7878
/// The file location of the stats data.
79-
pub location: Option<Location>,
79+
#[serde(default = "default_location")]
80+
pub location: Location,
8081
/// An optional HyperLogLog data structure.
8182
pub hll: Option<RawBlockHLL>,
8283
/// The count of the stats rows.
8384
#[serde(default)]
8485
pub row_count: u64,
8586
}
8687

88+
fn default_location() -> Location {
89+
("".to_string(), 0)
90+
}
91+
8792
#[derive(serde::Serialize, serde::Deserialize, Debug, Clone, PartialEq, Eq, Default)]
8893
pub struct Statistics {
8994
pub row_count: u64,
@@ -273,6 +278,13 @@ impl Statistics {
273278
additional_stats_meta: None,
274279
}
275280
}
281+
282+
pub fn additional_stats_loc(&self) -> Option<Location> {
283+
match &self.additional_stats_meta {
284+
Some(meta) if !meta.location.0.is_empty() => Some(meta.location.clone()),
285+
_ => None,
286+
}
287+
}
276288
}
277289

278290
/// Serializes a `Scalar` value by first converting it to `IndexScalar`.

src/query/storages/fuse/src/operations/analyze.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -667,13 +667,12 @@ impl Processor for AnalyzeCollectNDVSource {
667667
let compact_segment_info = self.segment_reader.read(&load_param).await?;
668668

669669
let block_count = compact_segment_info.summary.block_count as usize;
670-
let block_hlls = match &compact_segment_info.summary.additional_stats_meta {
671-
Some(meta) if meta.location.is_some() => {
672-
let (path, ver) = meta.location.as_ref().unwrap();
670+
let block_hlls = match compact_segment_info.summary.additional_stats_loc() {
671+
Some((path, ver)) => {
673672
let load_param = LoadParams {
674-
location: path.clone(),
673+
location: path,
675674
len_hint: None,
676-
ver: *ver,
675+
ver,
677676
put_cache: true,
678677
};
679678
let stats = self.stats_reader.read(&load_param).await?;
@@ -744,7 +743,7 @@ impl Processor for AnalyzeCollectNDVSource {
744743
);
745744
let additional_stats_meta = AdditionalStatsMeta {
746745
size: data.len() as u64,
747-
location: Some((segment_stats_location.clone(), SegmentStatistics::VERSION)),
746+
location: (segment_stats_location.clone(), SegmentStatistics::VERSION),
748747
..Default::default()
749748
};
750749
self.dal.write(&segment_stats_location, data).await?;

src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -503,10 +503,8 @@ impl TableMutationAggregator {
503503
SegmentsIO::read_compact_segment(op.clone(), loc, schema, false).await?;
504504
let mut segment_info = SegmentInfo::try_from(compact_segment_info)?;
505505

506-
let stats = match &segment_info.summary.additional_stats_meta {
507-
Some(meta) if meta.location.is_some() => Some(
508-
read_segment_stats(op.clone(), meta.location.clone().unwrap()).await?,
509-
),
506+
let stats = match segment_info.summary.additional_stats_loc() {
507+
Some(loc) => Some(read_segment_stats(op.clone(), loc).await?),
510508
_ => None,
511509
};
512510

@@ -865,7 +863,7 @@ async fn write_segment(
865863
);
866864
let additional_stats_meta = AdditionalStatsMeta {
867865
size: stats.len() as u64,
868-
location: Some((segment_stats_location.clone(), SegmentStatistics::VERSION)),
866+
location: (segment_stats_location.clone(), SegmentStatistics::VERSION),
869867
..Default::default()
870868
};
871869
dal.write(&segment_stats_location, stats).await?;

src/query/storages/fuse/src/operations/common/processors/transform_serialize_segment.rs

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,7 @@ impl<B: SegmentBuilder> Processor for TransformSerializeSegment<B> {
282282
let stats_summary = self.hll_accumulator.take_summary();
283283
additional_stats_meta = Some(AdditionalStatsMeta {
284284
size: stats_data.len() as u64,
285-
location: Some((
286-
segment_stats_location.clone(),
287-
SegmentStatistics::VERSION,
288-
)),
285+
location: (segment_stats_location.clone(), SegmentStatistics::VERSION),
289286
..Default::default()
290287
});
291288
stats = Some((segment_stats_location, stats_data, stats_summary));

src/query/storages/fuse/src/operations/gc.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -847,10 +847,8 @@ impl TryFrom<Arc<CompactSegmentInfo>> for LocationTuple {
847847
bloom_location.insert(bloom_loc.0.clone());
848848
}
849849
}
850-
if let Some(meta) = &value.as_ref().summary.additional_stats_meta {
851-
if let Some(loc) = &meta.location {
852-
hll_location.insert(loc.0.clone());
853-
}
850+
if let Some(loc) = value.as_ref().summary.additional_stats_loc() {
851+
hll_location.insert(loc.0);
854852
}
855853
Ok(Self {
856854
block_location,
@@ -884,10 +882,8 @@ impl TryFrom<Arc<ColumnOrientedSegment>> for LocationTuple {
884882
}
885883
}
886884

887-
if let Some(meta) = &value.as_ref().summary.additional_stats_meta {
888-
if let Some(loc) = &meta.location {
889-
hll_location.insert(loc.0.clone());
890-
}
885+
if let Some(loc) = value.as_ref().summary.additional_stats_loc() {
886+
hll_location.insert(loc.0);
891887
}
892888
Ok(Self {
893889
block_location,

src/query/storages/fuse/src/operations/mutation/mutator/block_compact_mutator.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -570,10 +570,8 @@ impl CompactTaskBuilder {
570570
let permit = acquire_task_permit(semaphore.clone()).await?;
571571
let op = self.dal.clone();
572572
let handler = runtime.spawn(async move {
573-
let stats = match &segment.summary.additional_stats_meta {
574-
Some(meta) if meta.location.is_some() => {
575-
Some(read_segment_stats(op.clone(), meta.location.clone().unwrap()).await?)
576-
}
573+
let stats = match segment.summary.additional_stats_loc() {
574+
Some(loc) => Some(read_segment_stats(op.clone(), loc).await?),
577575
_ => None,
578576
};
579577
let blocks = segment.block_metas()?;

src/query/storages/fuse/src/operations/mutation/mutator/recluster_mutator.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ impl ReclusterMutator {
188188
info.summary
189189
.additional_stats_meta
190190
.as_ref()
191-
.and_then(|v| v.location.clone()),
191+
.map(|v| v.location.clone()),
192192
));
193193
(loc.segment_idx, info)
194194
})

0 commit comments

Comments
 (0)