Skip to content

Commit c299089

Browse files
committed
add noise for hilbert recluster
1 parent 4e1ed5d commit c299089

File tree

3 files changed

+104
-4
lines changed

3 files changed

+104
-4
lines changed

src/query/functions/src/scalars/hilbert.rs

Lines changed: 93 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,23 +21,32 @@ use databend_common_expression::types::BinaryType;
2121
use databend_common_expression::types::DataType;
2222
use databend_common_expression::types::GenericType;
2323
use databend_common_expression::types::NullableType;
24+
use databend_common_expression::types::NumberDataType;
2425
use databend_common_expression::types::NumberType;
2526
use databend_common_expression::types::ReturnType;
27+
use databend_common_expression::types::StringType;
2628
use databend_common_expression::types::ValueType;
29+
use databend_common_expression::types::ALL_NUMERICS_TYPES;
30+
use databend_common_expression::vectorize_with_builder_1_arg;
2731
use databend_common_expression::vectorize_with_builder_2_arg;
32+
use databend_common_expression::with_number_mapped_type;
2833
use databend_common_expression::Column;
2934
use databend_common_expression::FixedLengthEncoding;
3035
use databend_common_expression::Function;
3136
use databend_common_expression::FunctionDomain;
3237
use databend_common_expression::FunctionEval;
38+
use databend_common_expression::FunctionProperty;
3339
use databend_common_expression::FunctionRegistry;
3440
use databend_common_expression::FunctionSignature;
3541
use databend_common_expression::ScalarRef;
3642
use databend_common_expression::Value;
43+
use rand::rngs::SmallRng;
44+
use rand::Rng;
45+
use rand::SeedableRng;
3746

3847
/// Registers Hilbert curve related functions with the function registry.
3948
pub fn register(registry: &mut FunctionRegistry) {
40-
// Register the hilbert_range_index function that calculates Hilbert indices for multi-dimensional data
49+
// Register the hilbert_range_index function that calculates Hilbert indices for multidimensional data
4150
registry.register_function_factory("hilbert_range_index", |_, args_type| {
4251
let args_num = args_type.len();
4352
// The function supports 2, 3, 4, or 5 dimensions (each dimension requires 2 arguments)
@@ -96,7 +105,7 @@ pub fn register(registry: &mut FunctionRegistry) {
96105
points.push(key);
97106
}
98107

99-
// Convert the multi-dimensional point to a Hilbert index
108+
// Convert the multidimensional point to a Hilbert index
100109
// This maps the n-dimensional point to a 1-dimensional value
101110
let points = points
102111
.iter()
@@ -151,6 +160,88 @@ pub fn register(registry: &mut FunctionRegistry) {
151160
builder.push(id);
152161
}),
153162
);
163+
164+
// We use true randomness by appending a random u8 value at the end of the binary key.
165+
// This introduces noise to break tie cases in clustering keys that are not uniformly distributed.
166+
// Although this may slightly affect the accuracy of range_bound estimation,
167+
// it ensures that Hilbert index + scatter will no longer suffer from data skew.
168+
// Moreover, since the noise is added at the tail, the original order of the keys is preserved.
169+
registry.properties.insert(
170+
"add_noise".to_string(),
171+
FunctionProperty::default().non_deterministic(),
172+
);
173+
174+
registry.register_passthrough_nullable_1_arg::<StringType, BinaryType, _, _>(
175+
"add_noise",
176+
|_, _| FunctionDomain::Full,
177+
vectorize_with_builder_1_arg::<StringType, BinaryType>(|val, builder, _| {
178+
let mut bytes = val.as_bytes().to_vec();
179+
let mut rng = SmallRng::from_entropy();
180+
bytes.push(rng.gen::<u8>());
181+
builder.put_slice(&bytes);
182+
builder.commit_row();
183+
}),
184+
);
185+
186+
for ty in ALL_NUMERICS_TYPES {
187+
with_number_mapped_type!(|NUM_TYPE| match ty {
188+
NumberDataType::NUM_TYPE => {
189+
registry
190+
.register_passthrough_nullable_1_arg::<NumberType<NUM_TYPE>, BinaryType, _, _>(
191+
"add_noise",
192+
|_, _| FunctionDomain::Full,
193+
vectorize_with_builder_1_arg::<NumberType<NUM_TYPE>, BinaryType>(
194+
|val, builder, _| {
195+
let mut encoded = val.encode().to_vec();
196+
let mut rng = SmallRng::from_entropy();
197+
encoded.push(rng.gen::<u8>());
198+
builder.put_slice(&encoded);
199+
builder.commit_row();
200+
},
201+
),
202+
);
203+
}
204+
})
205+
}
206+
207+
registry.register_passthrough_nullable_2_arg::<StringType, NumberType<u64>, BinaryType, _, _>(
208+
"add_noise",
209+
|_, _, _| FunctionDomain::Full,
210+
vectorize_with_builder_2_arg::<StringType, NumberType<u64>, BinaryType>(
211+
|val, level, builder, _| {
212+
let mut bytes = val.as_bytes().to_vec();
213+
let mut rng = SmallRng::from_entropy();
214+
for _ in 0..level {
215+
bytes.push(rng.gen::<u8>());
216+
}
217+
builder.put_slice(&bytes);
218+
builder.commit_row();
219+
},
220+
),
221+
);
222+
223+
for ty in ALL_NUMERICS_TYPES {
224+
with_number_mapped_type!(|NUM_TYPE| match ty {
225+
NumberDataType::NUM_TYPE => {
226+
registry
227+
.register_passthrough_nullable_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType, _, _>(
228+
"add_noise",
229+
|_, _, _| FunctionDomain::Full,
230+
vectorize_with_builder_2_arg::<NumberType<NUM_TYPE>, NumberType<u64>, BinaryType>(
231+
|val, level, builder, _| {
232+
let mut encoded = val.encode().to_vec();
233+
let mut rng = SmallRng::from_entropy();
234+
for _ in 0..level {
235+
encoded.push(rng.gen::<u8>());
236+
}
237+
builder.put_slice(&encoded);
238+
builder.commit_row();
239+
},
240+
),
241+
);
242+
}
243+
})
244+
}
154245
}
155246

156247
/// Calculates the partition ID for a value based on range boundaries.

src/query/service/src/interpreters/interpreter_table_recluster.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -652,7 +652,7 @@ impl ReclusterTableInterpreter {
652652
"range_bound(1000, {sample_size})({cluster_key_str})"
653653
));
654654

655-
hilbert_keys.push(format!("{table}.{cluster_key_str}, []"));
655+
hilbert_keys.push(format!("{cluster_key_str}, []"));
656656
}
657657
let hilbert_keys_str = hilbert_keys.join(", ");
658658

src/query/sql/src/planner/binder/ddl/table.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ use crate::plans::VacuumTemporaryFilesPlan;
148148
use crate::BindContext;
149149
use crate::DefaultExprBinder;
150150
use crate::Planner;
151+
use crate::ScalarExpr;
151152
use crate::SelectBuilder;
152153

153154
pub(in crate::planner::binder) struct AnalyzeCreateTableResult {
@@ -1799,14 +1800,22 @@ impl Binder {
17991800

18001801
let mut cluster_keys = Vec::with_capacity(expr_len);
18011802
for cluster_expr in cluster_exprs.iter() {
1802-
let (cluster_key, _) = scalar_binder.bind(cluster_expr)?;
1803+
let (mut cluster_key, _) = scalar_binder.bind(cluster_expr)?;
18031804
if cluster_key.used_columns().len() != 1 || !cluster_key.evaluable() {
18041805
return Err(ErrorCode::InvalidClusterKeys(format!(
18051806
"Cluster by expression `{:#}` is invalid",
18061807
cluster_expr
18071808
)));
18081809
}
18091810

1811+
if let ScalarExpr::FunctionCall(func) = &cluster_key {
1812+
if func.func_name == "add_noise" && matches!(cluster_type, AstClusterType::Hilbert)
1813+
{
1814+
debug_assert!(func.arguments.len() == 1);
1815+
cluster_key = func.arguments[0].clone();
1816+
}
1817+
}
1818+
18101819
let expr = cluster_key.as_expr()?;
18111820
if !expr.is_deterministic(&BUILTIN_FUNCTIONS) {
18121821
return Err(ErrorCode::InvalidClusterKeys(format!(

0 commit comments

Comments
 (0)