Skip to content

Commit 551500e

Browse files
nmahadevuniebyhrposulliv
authored andcommitted
Implement Iceberg system.bucket scalar function
Co-Authored-By: Yuya Ebihara <[email protected]> Co-Authored-By: Pádraig O'Sullivan <[email protected]>
1 parent 3257215 commit 551500e

File tree

4 files changed

+200
-0
lines changed

4 files changed

+200
-0
lines changed

presto-iceberg/src/main/java/com/facebook/presto/iceberg/IcebergConnector.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.bootstrap.LifeCycleManager;
1717
import com.facebook.presto.hive.HiveTransactionHandle;
18+
import com.facebook.presto.iceberg.function.IcebergBucketFunction;
1819
import com.facebook.presto.iceberg.function.changelog.ApplyChangelogFunction;
1920
import com.facebook.presto.spi.SystemTable;
2021
import com.facebook.presto.spi.classloader.ThreadContextClassLoader;
@@ -225,6 +226,8 @@ public Set<Class<?>> getSystemFunctions()
225226
{
226227
return ImmutableSet.<Class<?>>builder()
227228
.add(ApplyChangelogFunction.class)
229+
.add(IcebergBucketFunction.class)
230+
.add(IcebergBucketFunction.Bucket.class)
228231
.build();
229232
}
230233
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg.function;
15+
16+
import com.facebook.presto.common.type.StandardTypes;
17+
import com.facebook.presto.spi.function.LiteralParameter;
18+
import com.facebook.presto.spi.function.LiteralParameters;
19+
import com.facebook.presto.spi.function.ScalarFunction;
20+
import com.facebook.presto.spi.function.SqlType;
21+
import io.airlift.slice.Slice;
22+
import org.apache.iceberg.transforms.Transforms;
23+
import org.apache.iceberg.types.Types;
24+
25+
import java.math.BigDecimal;
26+
import java.math.MathContext;
27+
28+
import static com.facebook.presto.common.type.DateTimeEncoding.unpackMillisUtc;
29+
import static com.facebook.presto.common.type.Decimals.decodeUnscaledValue;
30+
import static com.facebook.presto.common.type.SqlTimestamp.MICROSECONDS_PER_MILLISECOND;
31+
32+
public final class IcebergBucketFunction
33+
{
34+
private IcebergBucketFunction() {}
35+
36+
@ScalarFunction("bucket")
37+
@SqlType(StandardTypes.BIGINT)
38+
public static long bucketInteger(@SqlType(StandardTypes.BIGINT) long value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
39+
{
40+
return Transforms.bucket((int) numberOfBuckets)
41+
.bind(Types.LongType.get())
42+
.apply(value);
43+
}
44+
45+
@ScalarFunction("bucket")
46+
@SqlType(StandardTypes.BIGINT)
47+
public static long bucketVarchar(@SqlType(StandardTypes.VARCHAR) Slice value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
48+
{
49+
return (long) Transforms.bucket((int) numberOfBuckets)
50+
.bind(Types.StringType.get())
51+
.apply(value.toStringUtf8());
52+
}
53+
54+
@ScalarFunction("bucket")
55+
@SqlType(StandardTypes.BIGINT)
56+
public static long bucketVarbinary(@SqlType(StandardTypes.VARBINARY) Slice value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
57+
{
58+
return (long) Transforms.bucket((int) numberOfBuckets)
59+
.bind(Types.BinaryType.get())
60+
.apply(value.toByteBuffer());
61+
}
62+
63+
@ScalarFunction("bucket")
64+
@SqlType(StandardTypes.BIGINT)
65+
public static long bucketDate(@SqlType(StandardTypes.DATE) long value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
66+
{
67+
return Transforms.bucket((int) numberOfBuckets)
68+
.bind(Types.DateType.get())
69+
.apply((int) value);
70+
}
71+
72+
@ScalarFunction("bucket")
73+
@SqlType(StandardTypes.BIGINT)
74+
public static long bucketTimestamp(@SqlType(StandardTypes.TIMESTAMP) long value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
75+
{
76+
return Transforms.bucket((int) numberOfBuckets)
77+
.bind(Types.TimestampType.withoutZone())
78+
.apply(value);
79+
}
80+
81+
@ScalarFunction("bucket")
82+
@SqlType(StandardTypes.BIGINT)
83+
public static long bucketTimestampWithTimeZone(@SqlType(StandardTypes.TIMESTAMP_WITH_TIME_ZONE) long value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
84+
{
85+
return Transforms.bucket((int) numberOfBuckets)
86+
.bind(Types.TimestampType.withZone())
87+
.apply(unpackMillisUtc(value) * MICROSECONDS_PER_MILLISECOND);
88+
}
89+
90+
@ScalarFunction("bucket")
91+
public static final class Bucket
92+
{
93+
@LiteralParameters({"p", "s"})
94+
@SqlType(StandardTypes.BIGINT)
95+
public static long bucketShortDecimal(@LiteralParameter("p") long numPrecision, @LiteralParameter("s") long numScale, @SqlType("decimal(p, s)") long value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
96+
{
97+
return Transforms.bucket((int) numberOfBuckets)
98+
.bind(Types.DecimalType.of((int) numPrecision, (int) numScale))
99+
.apply(BigDecimal.valueOf(value));
100+
}
101+
102+
@LiteralParameters({"p", "s"})
103+
@SqlType(StandardTypes.BIGINT)
104+
public static long bucketLongDecimal(@LiteralParameter("p") long numPrecision, @LiteralParameter("s") long numScale, @SqlType("decimal(p, s)") Slice value, @SqlType(StandardTypes.INTEGER) long numberOfBuckets)
105+
{
106+
return Transforms.bucket((int) numberOfBuckets)
107+
.bind(Types.DecimalType.of((int) numPrecision, (int) numScale))
108+
.apply(new BigDecimal(decodeUnscaledValue(value), (int) numScale, new MathContext((int) numPrecision)));
109+
}
110+
}
111+
}
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*/
14+
package com.facebook.presto.iceberg;
15+
16+
import com.facebook.presto.common.CatalogSchemaName;
17+
import com.facebook.presto.iceberg.function.IcebergBucketFunction;
18+
import com.facebook.presto.metadata.FunctionExtractor;
19+
import com.facebook.presto.operator.scalar.AbstractTestFunctions;
20+
import com.facebook.presto.sql.analyzer.FeaturesConfig;
21+
import com.facebook.presto.sql.analyzer.FunctionsConfig;
22+
import com.facebook.presto.type.DateOperators;
23+
import com.facebook.presto.type.TimestampOperators;
24+
import com.facebook.presto.type.TimestampWithTimeZoneOperators;
25+
import org.testcontainers.shaded.com.google.common.collect.ImmutableList;
26+
import org.testng.annotations.BeforeClass;
27+
import org.testng.annotations.Test;
28+
29+
import java.math.BigDecimal;
30+
31+
import static com.facebook.presto.SessionTestUtils.TEST_SESSION;
32+
import static com.facebook.presto.common.type.BigintType.BIGINT;
33+
import static com.facebook.presto.common.type.Decimals.encodeScaledValue;
34+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.Bucket.bucketLongDecimal;
35+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.Bucket.bucketShortDecimal;
36+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.bucketDate;
37+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.bucketInteger;
38+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.bucketTimestamp;
39+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.bucketTimestampWithTimeZone;
40+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.bucketVarbinary;
41+
import static com.facebook.presto.iceberg.function.IcebergBucketFunction.bucketVarchar;
42+
import static io.airlift.slice.Slices.utf8Slice;
43+
44+
public class TestIcebergScalarFunctions
45+
extends AbstractTestFunctions
46+
{
47+
public TestIcebergScalarFunctions()
48+
{
49+
super(TEST_SESSION, new FeaturesConfig(), new FunctionsConfig(), false);
50+
}
51+
52+
@BeforeClass
53+
public void registerFunction()
54+
{
55+
ImmutableList.Builder<Class<?>> functions = ImmutableList.builder();
56+
functions.add(IcebergBucketFunction.class)
57+
.add(IcebergBucketFunction.Bucket.class);
58+
functionAssertions.addConnectorFunctions(FunctionExtractor.extractFunctions(functions.build(),
59+
new CatalogSchemaName("iceberg", "system")), "iceberg");
60+
}
61+
62+
@Test
63+
public void testBucketFunction()
64+
{
65+
String catalogSchema = "iceberg.system";
66+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast(10 as tinyint), 3)", BIGINT, bucketInteger(10, 3));
67+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast(1950 as smallint), 4)", BIGINT, bucketInteger(1950, 4));
68+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast(2375645 as int), 5)", BIGINT, bucketInteger(2375645, 5));
69+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast(2779099983928392323 as bigint), 6)", BIGINT, bucketInteger(2779099983928392323L, 6));
70+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast(456.43 as DECIMAL(5,2)), 12)", BIGINT, bucketShortDecimal(5, 2, 45643, 12));
71+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast('12345678901234567890.1234567890' as DECIMAL(30,10)), 12)", BIGINT, bucketLongDecimal(30, 10, encodeScaledValue(new BigDecimal("12345678901234567890.1234567890")), 12));
72+
73+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast('nasdbsdnsdms' as varchar), 7)", BIGINT, bucketVarchar(utf8Slice("nasdbsdnsdms"), 7));
74+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast('nasdbsdnsdms' as varbinary), 8)", BIGINT, bucketVarbinary(utf8Slice("nasdbsdnsdms"), 8));
75+
76+
functionAssertions.assertFunction(catalogSchema + ".bucket(cast('2018-04-06' as date), 9)", BIGINT, bucketDate(DateOperators.castFromSlice(utf8Slice("2018-04-06")), 9));
77+
functionAssertions.assertFunction(catalogSchema + ".bucket(CAST('2018-04-06 04:35:00.000' AS TIMESTAMP),10)", BIGINT, bucketTimestamp(TimestampOperators.castFromSlice(TEST_SESSION.getSqlFunctionProperties(), utf8Slice("2018-04-06 04:35:00.000")), 10));
78+
functionAssertions.assertFunction(catalogSchema + ".bucket(CAST('2018-04-06 04:35:00.000 GMT' AS TIMESTAMP WITH TIME ZONE), 11)", BIGINT, bucketTimestampWithTimeZone(TimestampWithTimeZoneOperators.castFromSlice(TEST_SESSION.getSqlFunctionProperties(), utf8Slice("2018-04-06 04:35:00.000 GMT")), 11));
79+
}
80+
}

presto-main-base/src/test/java/com/facebook/presto/operator/scalar/FunctionAssertions.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ public FunctionAssertions addScalarFunctions(Class<?> clazz)
278278
return this;
279279
}
280280

281+
public FunctionAssertions addConnectorFunctions(List<? extends SqlFunction> functionInfos, String namespace)
282+
{
283+
metadata.registerConnectorFunctions(namespace, functionInfos);
284+
return this;
285+
}
286+
281287
public void assertFunction(String projection, Type expectedType, Object expected)
282288
{
283289
if (expected instanceof Slice) {

0 commit comments

Comments
 (0)