Skip to content

Commit 498a84a

Browse files
authored
Merge pull request ClickHouse#79262 from ClickHouse/divanik/addBucketPartitionTransform
Support for Iceberg partition pruning bucket transform
2 parents ecca30e + 33b24dd commit 498a84a

File tree

7 files changed

+719
-168
lines changed

7 files changed

+719
-168
lines changed

docs/en/engines/table-engines/integrations/iceberg.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ To read a table where the schema has changed after its creation with dynamic sch
8585

8686
## Partition Pruning {#partition-pruning}
8787

88-
ClickHouse supports partition pruning during SELECT queries for Iceberg tables, which helps optimize query performance by skipping irrelevant data files. Now it works with only identity transforms and time-based transforms (hour, day, month, year). To enable partition pruning, set `use_iceberg_partition_pruning = 1`.
88+
ClickHouse supports partition pruning during SELECT queries for Iceberg tables, which helps optimize query performance by skipping irrelevant data files. To enable partition pruning, set `use_iceberg_partition_pruning = 1`. For more information about iceberg partition pruning address https://iceberg.apache.org/spec/#partitioning
8989

9090

9191
## Time Travel {#time-travel}

docs/en/sql-reference/table-functions/iceberg.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ Currently, it is not possible to change nested structures or the types of elemen
7878

7979
## Partition Pruning {#partition-pruning}
8080

81-
ClickHouse supports partition pruning during SELECT queries for Iceberg tables, which helps optimize query performance by skipping irrelevant data files. Now it works with only identity transforms and time-based transforms (hour, day, month, year). To enable partition pruning, set `use_iceberg_partition_pruning = 1`.
81+
ClickHouse supports partition pruning during SELECT queries for Iceberg tables, which helps optimize query performance by skipping irrelevant data files. To enable partition pruning, set `use_iceberg_partition_pruning = 1`. For more information about iceberg partition pruning address https://iceberg.apache.org/spec/#partitioning
8282

8383

8484
## Time Travel {#time-travel}
Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
#include <cstddef>
2+
#include <memory>
3+
#include <string>
4+
#include <Columns/ColumnString.h>
5+
#include <Columns/IColumn.h>
6+
#include <DataTypes/DataTypeString.h>
7+
#include <DataTypes/DataTypesDecimal.h>
8+
#include <DataTypes/DataTypesNumber.h>
9+
#include <Functions/FunctionFactory.h>
10+
#include <Functions/FunctionsHashing.h>
11+
#include <Functions/IFunction.h>
12+
#include <Interpreters/Context.h>
13+
#include <Poco/Logger.h>
14+
#include "Columns/ColumnsDateTime.h"
15+
#include "Core/ColumnWithTypeAndName.h"
16+
#include "Core/Field.h"
17+
#include "Core/Types.h"
18+
#include "DataTypes/DataTypeDateTime64.h"
19+
#include "base/Decimal.h"
20+
#include "base/types.h"
21+
22+
namespace DB
23+
{
24+
25+
namespace ErrorCodes
26+
{
27+
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
28+
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
29+
extern const int BAD_ARGUMENTS;
30+
}
31+
32+
/// This function specification https://iceberg.apache.org/spec/#truncate-transform-details
33+
class FunctionIcebergHash : public IFunction
34+
{
35+
36+
public:
37+
static inline const char * name = "icebergHash";
38+
39+
explicit FunctionIcebergHash(ContextPtr)
40+
{
41+
}
42+
43+
static FunctionPtr create(ContextPtr context_)
44+
{
45+
return std::make_shared<FunctionIcebergHash>(context_);
46+
}
47+
48+
String getName() const override
49+
{
50+
return name;
51+
}
52+
53+
size_t getNumberOfArguments() const override
54+
{
55+
return 1;
56+
}
57+
58+
DataTypePtr getReturnTypeImpl(const DataTypes &) const override
59+
{
60+
return std::make_shared<DataTypeInt32>();
61+
}
62+
63+
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /* result_type */, size_t input_rows_count) const override
64+
{
65+
if (arguments.size() != 1)
66+
throw Exception(
67+
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
68+
"Incorrect number of arguments for function icebergHash: expected 1 argument");
69+
auto context = Context::getGlobalContextInstance();
70+
71+
const auto & column = arguments[0].column;
72+
const auto & type = arguments[0].type;
73+
74+
auto result_column = ColumnInt32::create(input_rows_count);
75+
auto & result_data = result_column->getData();
76+
77+
WhichDataType which(type);
78+
79+
if (isBool(type) || which.isInteger() || which.isDate())
80+
{
81+
for (size_t i = 0; i < input_rows_count; ++i)
82+
{
83+
auto value = column->getInt(i);
84+
result_data[i] = hashLong(value);
85+
}
86+
}
87+
else if (which.isFloat())
88+
{
89+
for (size_t i = 0; i < input_rows_count; ++i)
90+
{
91+
auto value = column->getFloat64(i);
92+
result_data[i] = hashLong(doubleToLongBits(value));
93+
}
94+
}
95+
else if (which.isStringOrFixedString())
96+
{
97+
auto murmur_result = FunctionFactory::instance()
98+
.get("murmurHash3_32", context)
99+
->build(arguments)
100+
->execute(arguments, std::make_shared<DataTypeUInt32>(), input_rows_count, false);
101+
for (size_t i = 0; i < input_rows_count; ++i)
102+
{
103+
result_data[i] = murmur_result->getUInt(i);
104+
}
105+
}
106+
else if (which.isUUID())
107+
{
108+
// Function toUnderType: UUID => toUInt128 doesn't work for some reason so we need to use toUInt128 clickhouse implementation
109+
ColumnPtr intermediate_representation = FunctionFactory::instance()
110+
.get("toUInt128", context)
111+
->build(arguments)
112+
->execute(arguments, std::make_shared<DataTypeUInt128>(), input_rows_count, false);
113+
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(intermediate_representation.get());
114+
const IColumn & wrapper_column = const_column ? const_column->getDataColumn() : *intermediate_representation.get();
115+
const ColumnVector<UInt128> & uuid_column = checkAndGetColumn<const ColumnVector<UInt128> &>(wrapper_column);
116+
for (size_t i = 0; i < input_rows_count; ++i)
117+
{
118+
UInt128 value = uuid_column.getData()[i];
119+
result_data[i] = hashUnderlyingIntBigEndian(value, /*reduce_two_complement*/ false);
120+
}
121+
}
122+
else if (which.isDateTime64())
123+
{
124+
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(arguments[0].column.get());
125+
const IColumn & wrapper_column = const_column ? const_column->getDataColumn() : *arguments[0].column.get();
126+
const auto & source_col = checkAndGetColumn<DataTypeDateTime64::ColumnType>(wrapper_column);
127+
const ColumnDateTime64 * decimal_column = &source_col;
128+
assert(decimal_column != nullptr);
129+
UInt32 scale = decimal_column->getScale();
130+
if ((scale != 6) && (scale != 9))
131+
{
132+
throw Exception(
133+
ErrorCodes::BAD_ARGUMENTS,
134+
"Unsupported scale for DateTime64 in IcebergHash function. Supports only microseconds and nanoseconds.");
135+
}
136+
for (size_t i = 0; i < input_rows_count; ++i)
137+
{
138+
DateTime64 value = decimal_column->getElement(i);
139+
Int64 value_int = value.convertTo<Int64>();
140+
if (scale == 9)
141+
{
142+
value_int = value_int / 1000;
143+
}
144+
result_data[i] = hashLong(value_int);
145+
}
146+
}
147+
else if (which.isDecimal())
148+
{
149+
const ColumnConst * const_column = checkAndGetColumn<ColumnConst>(arguments[0].column.get());
150+
const IColumn & wrapper_column = const_column ? const_column->getDataColumn() : *arguments[0].column.get();
151+
for (size_t i = 0; i < input_rows_count; ++i)
152+
{
153+
UInt128 value;
154+
if (which.isDecimal32())
155+
{
156+
const ColumnDecimal<Decimal32> * decimal_column = typeid_cast<const ColumnDecimal<Decimal32> *>(&wrapper_column);
157+
value = decimal_column->getElement(i).value;
158+
}
159+
else if (which.isDecimal64())
160+
{
161+
const ColumnDecimal<Decimal64> * decimal_column = typeid_cast<const ColumnDecimal<Decimal64> *>(&wrapper_column);
162+
value = decimal_column->getElement(i).value;
163+
}
164+
else if (which.isDecimal128())
165+
{
166+
const ColumnDecimal<Decimal128> * decimal_column = typeid_cast<const ColumnDecimal<Decimal128> *>(&wrapper_column);
167+
value = decimal_column->getElement(i).value;
168+
}
169+
else if (which.isDecimal256())
170+
{
171+
const ColumnDecimal<Decimal256> * decimal_column = typeid_cast<const ColumnDecimal<Decimal256> *>(&wrapper_column);
172+
value = decimal_column->getElement(i).value;
173+
}
174+
else
175+
{
176+
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported data type `{}` for icebergHash", type->getName());
177+
}
178+
result_data[i] = hashUnderlyingIntBigEndian(value, /*reduce_two_complement*/ true);
179+
}
180+
}
181+
else
182+
{
183+
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Unsupported data type `{}` for icebergHash", type->getName());
184+
}
185+
return result_column;
186+
}
187+
188+
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
189+
190+
private:
191+
static Int32 hashLong(Int64 value)
192+
{
193+
std::array<char, 8> little_endian_representation;
194+
for (char & i : little_endian_representation)
195+
{
196+
i = static_cast<unsigned char>(value & 0xFF);
197+
value >>= 8;
198+
}
199+
return MurmurHash3Impl32::apply(little_endian_representation.data(), 8);
200+
}
201+
202+
static Int32 hashUnderlyingIntBigEndian(UInt128 value, bool reduce_two_complement)
203+
{
204+
std::array<char, 16> big_endian_representation;
205+
size_t taken = 1;
206+
signed char prev = 0;
207+
for (size_t i = 0; i < 16; ++i)
208+
{
209+
signed char c = static_cast<signed char>(value & 0xFF);
210+
big_endian_representation[i] = c;
211+
value >>= 8;
212+
// Take minimum number of bytes to represent the value and sign bit
213+
if ((i == 0) || ((c != 0) && (c != -1)) || ((c & 0x80) != (prev & 0x80)))
214+
{
215+
taken = i + 1;
216+
}
217+
prev = c;
218+
}
219+
// Take all bytes
220+
if (!reduce_two_complement)
221+
{
222+
taken = 16;
223+
}
224+
std::reverse(big_endian_representation.begin(), big_endian_representation.begin() + taken);
225+
return MurmurHash3Impl32::apply(big_endian_representation.data(), taken);
226+
}
227+
228+
static UInt64 doubleToLongBits(Float64 value)
229+
{
230+
if (std::isnan(value))
231+
{
232+
// Return a canonical NaN representation
233+
return 0x7ff8000000000000ULL;
234+
}
235+
236+
// For other values, use a union to perform the bit-level conversion
237+
union
238+
{
239+
Float64 d;
240+
UInt64 bits;
241+
} converter;
242+
243+
converter.d = value;
244+
if (converter.bits == 0x8000000000000000ULL)
245+
{
246+
// Handle -0.0 case
247+
return 0x0000000000000000ULL;
248+
}
249+
return converter.bits;
250+
}
251+
};
252+
253+
REGISTER_FUNCTION(IcebergHash)
254+
{
255+
FunctionDocumentation::Description description = R"(Implements logic of iceberg hashing transform: https://iceberg.apache.org/spec/#appendix-b-32-bit-hash-requirements.)";
256+
FunctionDocumentation::Syntax syntax = "icebergHash(N, value)";
257+
FunctionDocumentation::Arguments arguments
258+
= {{"value", "Integer, bool, decimal, float, string, fixed_string, uuid, date, time, datetime."}};
259+
FunctionDocumentation::ReturnedValue returned_value = "Int32";
260+
FunctionDocumentation::Examples examples = {{"Example", "SELECT icebergHash(1.0 :: Float32)", "-142385009"}};
261+
FunctionDocumentation::Category category = FunctionDocumentation::Category::Other;
262+
FunctionDocumentation::IntroducedIn introduced_in = {25, 5};
263+
264+
factory.registerFunction<FunctionIcebergHash>({description, syntax, arguments, returned_value, examples, introduced_in, category});
265+
}
266+
267+
class FunctionIcebergBucket : public IFunction
268+
{
269+
270+
public:
271+
static inline const char * name = "icebergBucket";
272+
273+
explicit FunctionIcebergBucket(ContextPtr)
274+
{
275+
}
276+
277+
static FunctionPtr create(ContextPtr context_)
278+
{
279+
return std::make_shared<FunctionIcebergBucket>(context_);
280+
}
281+
282+
String getName() const override
283+
{
284+
return name;
285+
}
286+
287+
size_t getNumberOfArguments() const override
288+
{
289+
return 2;
290+
}
291+
292+
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {0}; }
293+
294+
DataTypePtr getReturnTypeImpl(const DataTypes &) const override
295+
{
296+
return std::make_shared<DataTypeUInt32>();
297+
}
298+
299+
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & /* result_type */, size_t input_rows_count) const override
300+
{
301+
if (arguments.size() != 2)
302+
throw Exception(
303+
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
304+
"Incorrect number of arguments for function icebergBucket: expected 2 arguments");
305+
auto value = (*arguments[0].column)[0].safeGet<Int64>();
306+
if (value <= 0 || value > std::numeric_limits<Int32>::max())
307+
throw Exception(
308+
ErrorCodes::BAD_ARGUMENTS, "Function IcebergBucket accepts only positive bucket size which is suitable to Int32");
309+
310+
auto context = Context::getGlobalContextInstance();
311+
312+
auto iceberg_hash_arguments = {arguments[1]};
313+
auto iceberg_hash_func = FunctionFactory::instance().get("icebergHash", context)->build(iceberg_hash_arguments);
314+
auto iceberg_hash_result_type = iceberg_hash_func->getResultType();
315+
auto iceberg_hash_result = iceberg_hash_func->execute(iceberg_hash_arguments, iceberg_hash_result_type, input_rows_count, false);
316+
317+
auto iceberg_hash_result_with_type = ColumnWithTypeAndName(iceberg_hash_result, std::make_shared<DataTypeInt32>(), "");
318+
auto max_int_with_type = ColumnWithTypeAndName(
319+
std::make_shared<DataTypeInt32>()->createColumnConst(input_rows_count, std::numeric_limits<Int32>::max()),
320+
std::make_shared<DataTypeInt32>(),
321+
"");
322+
auto bitand_result_type = std::make_shared<DataTypeInt32>();
323+
auto bitand_result = FunctionFactory::instance().get("bitAnd", context)->build({iceberg_hash_result_with_type, max_int_with_type})->execute({iceberg_hash_result_with_type, max_int_with_type}, bitand_result_type, input_rows_count, false);
324+
325+
ColumnWithTypeAndName bitand_result_with_type(bitand_result, bitand_result_type, "");
326+
auto modulo_column = ColumnWithTypeAndName(
327+
std::make_shared<DataTypeUInt32>()->createColumnConst(input_rows_count, static_cast<UInt32>(value)),
328+
std::make_shared<DataTypeUInt32>(),
329+
"");
330+
ColumnsWithTypeAndName modulo_arguments = {bitand_result_with_type, modulo_column};
331+
auto modulo_func = FunctionFactory::instance().get("positiveModulo", context)->build(modulo_arguments);
332+
return modulo_func->execute(modulo_arguments, std::make_shared<DataTypeUInt32>(), input_rows_count, false);
333+
}
334+
335+
bool useDefaultImplementationForConstants() const override { return true; }
336+
337+
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
338+
};
339+
340+
REGISTER_FUNCTION(IcebergBucket)
341+
{
342+
FunctionDocumentation::Description description
343+
= R"(Implements logic of iceberg bucket transform: https://iceberg.apache.org/spec/#bucket-transform-details.)";
344+
FunctionDocumentation::Syntax syntax = "icebergBucket(N, value)";
345+
FunctionDocumentation::Arguments arguments
346+
= {{"N", "modulo, positive integer, always constant."},
347+
{"value", "Integer, bool, decimal, float, string, fixed_string, uuid, date, time or datetime value."}};
348+
FunctionDocumentation::ReturnedValue returned_value = "Int32";
349+
FunctionDocumentation::Examples examples = {{"Example", "SELECT icebergBucket(5, 1.0 :: Float32)", "4"}};
350+
FunctionDocumentation::IntroducedIn introduced_in = {25, 5};
351+
FunctionDocumentation::Category category = FunctionDocumentation::Category::Other;
352+
353+
354+
factory.registerFunction<FunctionIcebergBucket>({description, syntax, arguments, returned_value, examples, introduced_in, category});
355+
}
356+
357+
}

0 commit comments

Comments
 (0)