Skip to content

Commit 9807568

Browse files
committed
extra log
1 parent a0ab755 commit 9807568

File tree

4 files changed

+100
-0
lines changed

4 files changed

+100
-0
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/storages/common/blocks/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ databend-common-expression = { workspace = true }
1212
databend-storages-common-table-meta = { workspace = true }
1313
parking_lot = { workspace = true }
1414
parquet = { workspace = true }
15+
log = "0.4.27"
1516

1617
[build-dependencies]
1718

src/query/storages/common/blocks/src/parquet_rs.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use parquet::file::properties::WriterProperties;
3535
use parquet::file::properties::WriterVersion;
3636
use parquet::format::FileMetaData;
3737
use parquet::schema::types::ColumnPath;
38+
use log::info;
3839

3940
/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold.
4041
const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1;
@@ -182,6 +183,7 @@ pub fn build_parquet_writer_properties(
182183
};
183184
if should_apply_int32_delta(stats, ndv, num_rows) {
184185
if let Some(path) = column_paths.get(&column_id) {
186+
info!("applying DBP to column {}", path.display());
185187
builder = builder
186188
.set_column_dictionary_enabled(path.clone(), false)
187189
.set_column_encoding(path.clone(), Encoding::DELTA_BINARY_PACKED);

src/query/storages/fuse/src/io/write/stream/block_builder.rs

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,14 @@ use databend_common_expression::ColumnId;
3131
use databend_common_expression::ComputedExpr;
3232
use databend_common_expression::DataBlock;
3333
use databend_common_expression::FieldIndex;
34+
use databend_common_expression::Scalar;
35+
use databend_common_expression::TableDataType;
3436
use databend_common_expression::TableField;
3537
use databend_common_expression::TableSchema;
3638
use databend_common_expression::TableSchemaRef;
3739
use databend_common_expression::types::DataType;
40+
use databend_common_expression::types::number::NumberScalar;
41+
use databend_common_expression::types::NumberDataType;
3842
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
3943
use databend_common_meta_app::schema::TableIndex;
4044
use databend_common_native::write::NativeWriter;
@@ -407,6 +411,12 @@ impl StreamBlockBuilder {
407411
let mut cols_ndv = self.column_stats_state.peek_cols_ndv();
408412
cols_ndv.extend(self.block_stats_builder.peek_cols_ndv());
409413
let cols_stats = self.column_stats_state.peek_column_stats()?;
414+
log_first_chunk_delta_inputs(
415+
&self.properties,
416+
block.num_rows(),
417+
&cols_ndv,
418+
&cols_stats,
419+
);
410420
self.block_writer
411421
.start(ColumnsNdvInfo::new(block.num_rows(), cols_ndv, cols_stats))?;
412422
}
@@ -641,3 +651,89 @@ impl StreamBlockProperties {
641651
}))
642652
}
643653
}
654+
655+
fn log_first_chunk_delta_inputs(
656+
properties: &StreamBlockProperties,
657+
num_rows: usize,
658+
cols_ndv: &HashMap<ColumnId, usize>,
659+
cols_stats: &StatisticsOfColumns,
660+
) {
661+
if !properties
662+
.write_settings
663+
.enable_parquet_int32_delta_encoding
664+
|| num_rows == 0
665+
|| !log::log_enabled!(log::Level::Info)
666+
{
667+
return;
668+
}
669+
670+
let mut entries = Vec::new();
671+
for field in properties.source_schema.leaf_fields() {
672+
if !matches!(
673+
field.data_type().remove_nullable(),
674+
TableDataType::Number(NumberDataType::Int32)
675+
) {
676+
continue;
677+
}
678+
679+
let column_id = field.column_id();
680+
let Some(ndv) = cols_ndv.get(&column_id).copied() else {
681+
continue;
682+
};
683+
let Some(stats) = cols_stats.get(&column_id) else {
684+
continue;
685+
};
686+
687+
let ndv_ratio = (ndv as f64) / (num_rows as f64);
688+
let mut entry = format!(
689+
"{}: rows={} first_chunk_ndv={} ndv_ratio={:.4}",
690+
field.name(),
691+
num_rows,
692+
ndv,
693+
ndv_ratio
694+
);
695+
696+
if let (Some(min), Some(max)) = (
697+
scalar_to_i64(&stats.min),
698+
scalar_to_i64(&stats.max),
699+
) {
700+
if max >= min && ndv > 0 {
701+
let span = max - min + 1;
702+
let contiguous_ratio = span as f64 / (ndv as f64);
703+
entry.push_str(&format!(
704+
" min={} max={} span={} contiguous_ratio={:.4}",
705+
min, max, span, contiguous_ratio
706+
));
707+
}
708+
} else {
709+
entry.push_str(&format!(" min={:?} max={:?}", stats.min, stats.max));
710+
}
711+
712+
entries.push(entry);
713+
}
714+
715+
if !entries.is_empty() {
716+
log::info!(
717+
"parquet delta heuristics (first chunk) rows={} columns=[{}]",
718+
num_rows,
719+
entries.join("; ")
720+
);
721+
}
722+
}
723+
724+
fn scalar_to_i64(val: &Scalar) -> Option<i64> {
725+
match val {
726+
Scalar::Number(num) => match num {
727+
NumberScalar::Int8(v) => Some(*v as i64),
728+
NumberScalar::Int16(v) => Some(*v as i64),
729+
NumberScalar::Int32(v) => Some(*v as i64),
730+
NumberScalar::Int64(v) => Some(*v),
731+
NumberScalar::UInt8(v) => Some(*v as i64),
732+
NumberScalar::UInt16(v) => Some(*v as i64),
733+
NumberScalar::UInt32(v) => Some(*v as i64),
734+
NumberScalar::UInt64(v) => i64::try_from(*v).ok(),
735+
_ => None,
736+
},
737+
_ => None,
738+
}
739+
}

0 commit comments

Comments
 (0)