Skip to content

Commit 62f0cdd

Browse files
committed
feat(storage): add variant shredding for parquet
fix(storage): stabilize variant shredding read path
1 parent 3d6337e commit 62f0cdd

File tree

64 files changed

+4345
-453
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+4345
-453
lines changed

Cargo.lock

Lines changed: 87 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ aws-smithy-runtime = "1.7.8"
307307
aws-smithy-runtime-api = "1.7.3"
308308
aws-smithy-types = "1.2.13"
309309

310-
indexmap = "2.0.0"
310+
indexmap = "2.10.0"
311311
indicatif = "0.17.5"
312312
itertools = "0.13.0"
313313
jaq-core = "2.2.1"
@@ -377,7 +377,8 @@ orc-rust = "0.6.0"
377377
ordered-float = { version = "5.1.0", default-features = false }
378378
p256 = "0.13"
379379
parking_lot = "0.12.1"
380-
parquet = { version = "56", features = ["async"] }
380+
parquet = { version = "56", features = ["async", "variant_experimental"] }
381+
parquet-variant = "0.1.0"
381382
passwords = { version = "3.1.16" }
382383
paste = "1.0.15"
383384
percent-encoding = "2.3.1"

src/common/metrics/src/metrics/storage.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ static MERGE_INTO_MATCHED_OPERATION_MILLISECONDS: LazyLock<Histogram> = LazyLock
117117
register_histogram_in_milliseconds("merge_into_matched_operation_milliseconds")
118118
});
119119

120+
// Variant shredding metrics.
121+
static VARIANT_SHREDDING_INLINE_COLUMNS: LazyLock<Counter> =
122+
LazyLock::new(|| register_counter("variant_shredding_inline_columns"));
123+
static VARIANT_SHREDDING_INLINE_VALUE_ALL_NULL_COLUMNS: LazyLock<Counter> =
124+
LazyLock::new(|| register_counter("variant_shredding_inline_value_all_null_columns"));
125+
static VARIANT_SHREDDING_READ_TYPED_VALUE_HITS: LazyLock<Counter> =
126+
LazyLock::new(|| register_counter("variant_shredding_read_typed_value_hits"));
127+
static VARIANT_SHREDDING_READ_TYPED_VALUE_MISSES: LazyLock<Counter> =
128+
LazyLock::new(|| register_counter("variant_shredding_read_typed_value_misses"));
129+
static VARIANT_SHREDDING_UNSHRED_MILLISECONDS: LazyLock<Histogram> =
130+
LazyLock::new(|| register_histogram_in_milliseconds("variant_shredding_unshred_milliseconds"));
131+
120132
// Fuse engine metrics.
121133
static COMMIT_MUTATION_UNRESOLVABLE_CONFLICT: LazyLock<Counter> =
122134
LazyLock::new(|| register_counter("fuse_commit_mutation_unresolvable_conflict"));
@@ -937,3 +949,23 @@ pub fn metrics_inc_block_virtual_column_write_bytes(c: u64) {
937949
pub fn metrics_inc_block_virtual_column_write_milliseconds(c: u64) {
938950
BLOCK_VIRTUAL_COLUMN_WRITE_MILLISECONDS.observe(c as f64);
939951
}
952+
953+
pub fn metrics_inc_variant_shredding_inline_columns(c: u64) {
954+
VARIANT_SHREDDING_INLINE_COLUMNS.inc_by(c);
955+
}
956+
957+
pub fn metrics_inc_variant_shredding_inline_value_all_null_columns(c: u64) {
958+
VARIANT_SHREDDING_INLINE_VALUE_ALL_NULL_COLUMNS.inc_by(c);
959+
}
960+
961+
pub fn metrics_inc_variant_shredding_read_typed_value_hits(c: u64) {
962+
VARIANT_SHREDDING_READ_TYPED_VALUE_HITS.inc_by(c);
963+
}
964+
965+
pub fn metrics_inc_variant_shredding_read_typed_value_misses(c: u64) {
966+
VARIANT_SHREDDING_READ_TYPED_VALUE_MISSES.inc_by(c);
967+
}
968+
969+
pub fn metrics_inc_variant_shredding_unshred_milliseconds(c: u64) {
970+
VARIANT_SHREDDING_UNSHRED_MILLISECONDS.observe(c as f64);
971+
}

src/query/ee/tests/it/storages/fuse/operations/virtual_column_pruner_reader.rs

Lines changed: 26 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -132,49 +132,44 @@ async fn test_virtual_column_pruner_reader() -> anyhow::Result<()> {
132132
.await?
133133
.expect("virtual block meta index");
134134

135-
let mut plan_kinds = HashSet::new();
136-
for plans in virtual_block_meta_index.virtual_column_read_plan.values() {
137-
for plan in plans {
138-
collect_plan_kinds(plan, &mut plan_kinds);
139-
}
140-
}
141-
assert!(plan_kinds.contains("Direct"));
142-
assert!(plan_kinds.contains("FromParent"));
143-
assert!(plan_kinds.contains("Shared"));
144-
assert!(plan_kinds.contains("Object"));
135+
let id_column_id = column_ids[0];
136+
let text_column_id = column_ids[1];
137+
let id_plans = virtual_block_meta_index
138+
.virtual_column_read_plan
139+
.get(&id_column_id)
140+
.expect("id plans");
141+
assert!(
142+
id_plans
143+
.iter()
144+
.any(|plan| matches!(plan, VirtualColumnReadPlan::Direct { .. }))
145+
);
146+
let text_plans = virtual_block_meta_index
147+
.virtual_column_read_plan
148+
.get(&text_column_id)
149+
.expect("text plans");
150+
assert!(
151+
text_plans
152+
.iter()
153+
.any(|plan| matches!(plan, VirtualColumnReadPlan::Direct { .. }))
154+
);
145155

146156
let info_column_id = column_ids[2];
147157
let tags0_column_id = column_ids[3];
148158
let extra_column_id = column_ids[4];
149-
150159
let info_plans = virtual_block_meta_index
151160
.virtual_column_read_plan
152-
.get(&info_column_id)
153-
.expect("user.info plans");
154-
assert!(
155-
info_plans
156-
.iter()
157-
.any(|plan| matches!(plan, VirtualColumnReadPlan::Object { .. }))
158-
);
161+
.get(&info_column_id);
162+
assert!(info_plans.is_none());
159163

160164
let tags0_plans = virtual_block_meta_index
161165
.virtual_column_read_plan
162-
.get(&tags0_column_id)
163-
.expect("tags[0] plans");
164-
assert!(tags0_plans.iter().any(|plan| matches!(
165-
plan,
166-
VirtualColumnReadPlan::FromParent { suffix_path, .. } if suffix_path == "{0}"
167-
)));
166+
.get(&tags0_column_id);
167+
assert!(tags0_plans.is_none());
168168

169169
let extra_plans = virtual_block_meta_index
170170
.virtual_column_read_plan
171-
.get(&extra_column_id)
172-
.expect("user.extra plans");
173-
assert!(
174-
extra_plans
175-
.iter()
176-
.any(|plan| matches!(plan, VirtualColumnReadPlan::Shared { .. }))
177-
);
171+
.get(&extra_column_id);
172+
assert!(extra_plans.is_none());
178173

179174
let plan = table
180175
.read_plan(ctx.clone(), Some(push_down), None, false, true)
@@ -273,27 +268,6 @@ fn format_virtual_column_name(source: &str, key_paths: &OwnedKeyPaths) -> String
273268
name
274269
}
275270

276-
fn collect_plan_kinds(plan: &VirtualColumnReadPlan, kinds: &mut HashSet<&'static str>) {
277-
match plan {
278-
VirtualColumnReadPlan::Direct { .. } => {
279-
kinds.insert("Direct");
280-
}
281-
VirtualColumnReadPlan::FromParent { parent, .. } => {
282-
kinds.insert("FromParent");
283-
collect_plan_kinds(parent, kinds);
284-
}
285-
VirtualColumnReadPlan::Shared { .. } => {
286-
kinds.insert("Shared");
287-
}
288-
VirtualColumnReadPlan::Object { entries } => {
289-
kinds.insert("Object");
290-
for (_, child) in entries {
291-
collect_plan_kinds(child, kinds);
292-
}
293-
}
294-
}
295-
}
296-
297271
fn assert_variant_column(column: &Column, expected: &[Option<&str>]) {
298272
assert_eq!(column.len(), expected.len());
299273
for (idx, expected_json) in expected.iter().enumerate() {

src/query/expression/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ async-backtrace = { workspace = true }
1919
base64 = { workspace = true }
2020
borsh = { workspace = true }
2121
bumpalo = { workspace = true }
22+
chrono = { workspace = true }
2223
comfy-table = { workspace = true }
2324
databend-common-ast = { workspace = true }
2425
databend-common-base = { workspace = true }
@@ -46,10 +47,11 @@ jsonb = { workspace = true }
4647
lexical-core = { workspace = true }
4748
log = { workspace = true }
4849
match-template = { workspace = true }
49-
memchr = { workspace = true, default-features = false }
50+
memchr = { default-features = false, workspace = true }
5051
micromarshal = { workspace = true }
5152
num-bigint = { workspace = true }
5253
num-traits = { workspace = true }
54+
parquet-variant = { workspace = true }
5355
rand = { workspace = true }
5456
rand_distr = { workspace = true }
5557
recursive = { workspace = true }

0 commit comments

Comments
 (0)