Skip to content

Commit 6b94ea6

Browse files
committed
feat(storage): add variant shredding for parquet
fix(storage): stabilize variant shredding read path
1 parent 3d6337e commit 6b94ea6

File tree

65 files changed

+4367
-464
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+4367
-464
lines changed

Cargo.lock

Lines changed: 87 additions & 38 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -307,7 +307,7 @@ aws-smithy-runtime = "1.7.8"
307307
aws-smithy-runtime-api = "1.7.3"
308308
aws-smithy-types = "1.2.13"
309309

310-
indexmap = "2.0.0"
310+
indexmap = "2.10.0"
311311
indicatif = "0.17.5"
312312
itertools = "0.13.0"
313313
jaq-core = "2.2.1"
@@ -377,7 +377,8 @@ orc-rust = "0.6.0"
377377
ordered-float = { version = "5.1.0", default-features = false }
378378
p256 = "0.13"
379379
parking_lot = "0.12.1"
380-
parquet = { version = "56", features = ["async"] }
380+
parquet = { version = "56", features = ["async", "variant_experimental"] }
381+
parquet-variant = "0.1.0"
381382
passwords = { version = "3.1.16" }
382383
paste = "1.0.15"
383384
percent-encoding = "2.3.1"

src/common/metrics/src/metrics/storage.rs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,18 @@ static MERGE_INTO_MATCHED_OPERATION_MILLISECONDS: LazyLock<Histogram> = LazyLock
117117
register_histogram_in_milliseconds("merge_into_matched_operation_milliseconds")
118118
});
119119

120+
// Variant shredding metrics.
121+
static VARIANT_SHREDDING_INLINE_COLUMNS: LazyLock<Counter> =
122+
LazyLock::new(|| register_counter("variant_shredding_inline_columns"));
123+
static VARIANT_SHREDDING_INLINE_VALUE_ALL_NULL_COLUMNS: LazyLock<Counter> =
124+
LazyLock::new(|| register_counter("variant_shredding_inline_value_all_null_columns"));
125+
static VARIANT_SHREDDING_READ_TYPED_VALUE_HITS: LazyLock<Counter> =
126+
LazyLock::new(|| register_counter("variant_shredding_read_typed_value_hits"));
127+
static VARIANT_SHREDDING_READ_TYPED_VALUE_MISSES: LazyLock<Counter> =
128+
LazyLock::new(|| register_counter("variant_shredding_read_typed_value_misses"));
129+
static VARIANT_SHREDDING_UNSHRED_MILLISECONDS: LazyLock<Histogram> =
130+
LazyLock::new(|| register_histogram_in_milliseconds("variant_shredding_unshred_milliseconds"));
131+
120132
// Fuse engine metrics.
121133
static COMMIT_MUTATION_UNRESOLVABLE_CONFLICT: LazyLock<Counter> =
122134
LazyLock::new(|| register_counter("fuse_commit_mutation_unresolvable_conflict"));
@@ -937,3 +949,23 @@ pub fn metrics_inc_block_virtual_column_write_bytes(c: u64) {
937949
pub fn metrics_inc_block_virtual_column_write_milliseconds(c: u64) {
938950
BLOCK_VIRTUAL_COLUMN_WRITE_MILLISECONDS.observe(c as f64);
939951
}
952+
953+
pub fn metrics_inc_variant_shredding_inline_columns(c: u64) {
954+
VARIANT_SHREDDING_INLINE_COLUMNS.inc_by(c);
955+
}
956+
957+
pub fn metrics_inc_variant_shredding_inline_value_all_null_columns(c: u64) {
958+
VARIANT_SHREDDING_INLINE_VALUE_ALL_NULL_COLUMNS.inc_by(c);
959+
}
960+
961+
pub fn metrics_inc_variant_shredding_read_typed_value_hits(c: u64) {
962+
VARIANT_SHREDDING_READ_TYPED_VALUE_HITS.inc_by(c);
963+
}
964+
965+
pub fn metrics_inc_variant_shredding_read_typed_value_misses(c: u64) {
966+
VARIANT_SHREDDING_READ_TYPED_VALUE_MISSES.inc_by(c);
967+
}
968+
969+
pub fn metrics_inc_variant_shredding_unshred_milliseconds(c: u64) {
970+
VARIANT_SHREDDING_UNSHRED_MILLISECONDS.observe(c as f64);
971+
}

src/query/ee/tests/it/storages/fuse/operations/virtual_column_pruner_reader.rs

Lines changed: 26 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -132,49 +132,44 @@ async fn test_virtual_column_pruner_reader() -> anyhow::Result<()> {
132132
.await?
133133
.expect("virtual block meta index");
134134

135-
let mut plan_kinds = HashSet::new();
136-
for plans in virtual_block_meta_index.virtual_column_read_plan.values() {
137-
for plan in plans {
138-
collect_plan_kinds(plan, &mut plan_kinds);
139-
}
140-
}
141-
assert!(plan_kinds.contains("Direct"));
142-
assert!(plan_kinds.contains("FromParent"));
143-
assert!(plan_kinds.contains("Shared"));
144-
assert!(plan_kinds.contains("Object"));
135+
let id_column_id = column_ids[0];
136+
let text_column_id = column_ids[1];
137+
let id_plans = virtual_block_meta_index
138+
.virtual_column_read_plan
139+
.get(&id_column_id)
140+
.expect("id plans");
141+
assert!(
142+
id_plans
143+
.iter()
144+
.any(|plan| matches!(plan, VirtualColumnReadPlan::Direct { .. }))
145+
);
146+
let text_plans = virtual_block_meta_index
147+
.virtual_column_read_plan
148+
.get(&text_column_id)
149+
.expect("text plans");
150+
assert!(
151+
text_plans
152+
.iter()
153+
.any(|plan| matches!(plan, VirtualColumnReadPlan::Direct { .. }))
154+
);
145155

146156
let info_column_id = column_ids[2];
147157
let tags0_column_id = column_ids[3];
148158
let extra_column_id = column_ids[4];
149-
150159
let info_plans = virtual_block_meta_index
151160
.virtual_column_read_plan
152-
.get(&info_column_id)
153-
.expect("user.info plans");
154-
assert!(
155-
info_plans
156-
.iter()
157-
.any(|plan| matches!(plan, VirtualColumnReadPlan::Object { .. }))
158-
);
161+
.get(&info_column_id);
162+
assert!(info_plans.is_none());
159163

160164
let tags0_plans = virtual_block_meta_index
161165
.virtual_column_read_plan
162-
.get(&tags0_column_id)
163-
.expect("tags[0] plans");
164-
assert!(tags0_plans.iter().any(|plan| matches!(
165-
plan,
166-
VirtualColumnReadPlan::FromParent { suffix_path, .. } if suffix_path == "{0}"
167-
)));
166+
.get(&tags0_column_id);
167+
assert!(tags0_plans.is_none());
168168

169169
let extra_plans = virtual_block_meta_index
170170
.virtual_column_read_plan
171-
.get(&extra_column_id)
172-
.expect("user.extra plans");
173-
assert!(
174-
extra_plans
175-
.iter()
176-
.any(|plan| matches!(plan, VirtualColumnReadPlan::Shared { .. }))
177-
);
171+
.get(&extra_column_id);
172+
assert!(extra_plans.is_none());
178173

179174
let plan = table
180175
.read_plan(ctx.clone(), Some(push_down), None, false, true)
@@ -273,27 +268,6 @@ fn format_virtual_column_name(source: &str, key_paths: &OwnedKeyPaths) -> String
273268
name
274269
}
275270

276-
fn collect_plan_kinds(plan: &VirtualColumnReadPlan, kinds: &mut HashSet<&'static str>) {
277-
match plan {
278-
VirtualColumnReadPlan::Direct { .. } => {
279-
kinds.insert("Direct");
280-
}
281-
VirtualColumnReadPlan::FromParent { parent, .. } => {
282-
kinds.insert("FromParent");
283-
collect_plan_kinds(parent, kinds);
284-
}
285-
VirtualColumnReadPlan::Shared { .. } => {
286-
kinds.insert("Shared");
287-
}
288-
VirtualColumnReadPlan::Object { entries } => {
289-
kinds.insert("Object");
290-
for (_, child) in entries {
291-
collect_plan_kinds(child, kinds);
292-
}
293-
}
294-
}
295-
}
296-
297271
fn assert_variant_column(column: &Column, expected: &[Option<&str>]) {
298272
assert_eq!(column.len(), expected.len());
299273
for (idx, expected_json) in expected.iter().enumerate() {

src/query/ee/tests/it/storages/fuse/operations/virtual_columns_builder.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ async fn test_virtual_column_builder() -> anyhow::Result<()> {
8686
assert!(!result.data.is_empty());
8787
assert_eq!(
8888
result.draft_virtual_block_meta.virtual_column_metas.len(),
89-
4
90-
); // Expect a, b.c, b.d, e
89+
5
90+
); // Expect a, b.c, b.d, e[0].f, e[1]
9191

9292
// Check v['a']
9393
let meta_a = find_virtual_col(
@@ -122,16 +122,27 @@ async fn test_virtual_column_builder() -> anyhow::Result<()> {
122122
assert_eq!(meta_bd.name, "['b']['d']");
123123
assert_eq!(meta_bd.data_type, VariantDataType::Boolean);
124124

125-
// Check v['e']
126-
let meta_bd = find_virtual_col(
125+
// Check v['e'][0]['f']
126+
let meta_e0f = find_virtual_col(
127127
&result.draft_virtual_block_meta.virtual_column_metas,
128128
1,
129-
"['e']",
129+
"['e'][0]['f']",
130130
)
131-
.expect("Virtual column ['e'] not found");
132-
assert_eq!(meta_bd.source_column_id, 1);
133-
assert_eq!(meta_bd.name, "['e']");
134-
assert_eq!(meta_bd.data_type, VariantDataType::Jsonb);
131+
.expect("Virtual column ['e'][0]['f'] not found");
132+
assert_eq!(meta_e0f.source_column_id, 1);
133+
assert_eq!(meta_e0f.name, "['e'][0]['f']");
134+
assert_eq!(meta_e0f.data_type, VariantDataType::UInt64);
135+
136+
// Check v['e'][1]
137+
let meta_e1 = find_virtual_col(
138+
&result.draft_virtual_block_meta.virtual_column_metas,
139+
1,
140+
"['e'][1]",
141+
)
142+
.expect("Virtual column ['e'][1] not found");
143+
assert_eq!(meta_e1.source_column_id, 1);
144+
assert_eq!(meta_e1.name, "['e'][1]");
145+
assert_eq!(meta_e1.data_type, VariantDataType::UInt64);
135146

136147
let block = DataBlock::new(
137148
vec![
@@ -293,8 +304,8 @@ async fn test_virtual_column_builder() -> anyhow::Result<()> {
293304
builder.add_block(&block)?;
294305
let result = builder.finalize(&write_settings, &location)?;
295306

296-
// all columns should be add to shared_column due to > 70% nulls
297-
assert!(!result.data.is_empty());
307+
// Paths with > 70% nulls are discarded.
308+
assert!(result.data.is_empty());
298309
assert!(
299310
result
300311
.draft_virtual_block_meta

src/query/expression/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ async-backtrace = { workspace = true }
1919
base64 = { workspace = true }
2020
borsh = { workspace = true }
2121
bumpalo = { workspace = true }
22+
chrono = { workspace = true }
2223
comfy-table = { workspace = true }
2324
databend-common-ast = { workspace = true }
2425
databend-common-base = { workspace = true }
@@ -46,10 +47,11 @@ jsonb = { workspace = true }
4647
lexical-core = { workspace = true }
4748
log = { workspace = true }
4849
match-template = { workspace = true }
49-
memchr = { workspace = true, default-features = false }
50+
memchr = { default-features = false, workspace = true }
5051
micromarshal = { workspace = true }
5152
num-bigint = { workspace = true }
5253
num-traits = { workspace = true }
54+
parquet-variant = { workspace = true }
5355
rand = { workspace = true }
5456
rand_distr = { workspace = true }
5557
recursive = { workspace = true }

0 commit comments

Comments
 (0)