Skip to content

Commit af4fc61

Browse files
committed
Implement support for connector specific builtin functions
1 parent 2b59d23 commit af4fc61

File tree

19 files changed

+483
-30
lines changed

19 files changed

+483
-30
lines changed

presto-docs/src/main/sphinx/develop/functions.rst

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22
Functions
33
=========
44

5+
Functions in Presto can be implemented at Plugin and the Connector level.
6+
The following two sections describe how to implement them.
7+
58
Plugin Implementation
69
---------------------
710

811
The function framework is used to implement SQL functions. Presto includes a
912
number of built-in functions. In order to implement new functions, you can
10-
write a plugin that returns one more functions from ``getFunctions()``:
13+
write a plugin that returns one or more functions from ``getFunctions()``:
1114

1215
.. code-block:: java
1316
@@ -31,10 +34,44 @@ Note that the ``ImmutableSet`` class is a utility class from Guava.
3134
The ``getFunctions()`` method contains all of the classes for the functions
3235
that we will implement below in this tutorial.
3336

37+
Functions registered using this method are available in the default
38+
namespace ``presto.default``.
39+
3440
For a full example in the codebase, see either the ``presto-ml`` module for machine
3541
learning functions or the ``presto-teradata-functions`` module for Teradata-compatible
3642
functions, both in the root of the Presto source.
3743

44+
Connector Functions Implementation
45+
----------------------------------
46+
47+
To implement new functions at the connector level, in your
48+
connector implementation, override the ``getSystemFunctions()`` method that returns one
49+
or more functions:
50+
51+
.. code-block:: java
52+
53+
public class ExampleFunctionsConnector
54+
implements Connector
55+
{
56+
@Override
57+
public Set<Class<?>> getSystemFunctions()
58+
{
59+
return ImmutableSet.<Class<?>>builder()
60+
.add(ExampleNullFunction.class)
61+
.add(IsNullFunction.class)
62+
.add(IsEqualOrNullFunction.class)
63+
.add(ExampleStringFunction.class)
64+
.add(ExampleAverageFunction.class)
65+
.build();
66+
}
67+
}
68+
69+
Functions registered using this interface are available in the namespace
70+
``<catalog-name>.system`` where ``<catalog-name>`` is the catalog name used
71+
in the Presto deployment for this connector type.
72+
73+
At present, connector level functions do not support Window functions and Scalar operators.
74+
3875
Scalar Function Implementation
3976
------------------------------
4077

presto-main-base/src/main/java/com/facebook/presto/connector/ConnectorManager.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import com.facebook.airlift.log.Logger;
1717
import com.facebook.airlift.node.NodeInfo;
18+
import com.facebook.presto.common.CatalogSchemaName;
1819
import com.facebook.presto.common.block.BlockEncodingSerde;
1920
import com.facebook.presto.common.type.TypeManager;
2021
import com.facebook.presto.connector.informationSchema.InformationSchemaConnector;
@@ -82,6 +83,7 @@
8283
import java.util.concurrent.ConcurrentMap;
8384
import java.util.concurrent.atomic.AtomicBoolean;
8485

86+
import static com.facebook.presto.metadata.FunctionExtractor.extractFunctions;
8587
import static com.facebook.presto.spi.ConnectorId.createInformationSchemaConnectorId;
8688
import static com.facebook.presto.spi.ConnectorId.createSystemTablesConnectorId;
8789
import static com.google.common.base.Preconditions.checkArgument;
@@ -308,6 +310,10 @@ private synchronized void addConnectorInternal(MaterializedConnector connector)
308310
}
309311
connector.getConnectorCodecProvider().ifPresent(connectorCodecProvider -> connectorCodecManager.addConnectorCodecProvider(connectorId, connectorCodecProvider));
310312
metadataManager.getProcedureRegistry().addProcedures(connectorId, connector.getProcedures());
313+
Set<Class<?>> systemFunctions = connector.getSystemFunctions();
314+
if (!systemFunctions.isEmpty()) {
315+
metadataManager.registerConnectorFunctions(connectorId.getCatalogName(), extractFunctions(systemFunctions, new CatalogSchemaName(connectorId.getCatalogName(), "system")));
316+
}
311317

312318
connector.getAccessControl()
313319
.ifPresent(accessControl -> accessControlManager.addCatalogAccessControl(connectorId, accessControl));
@@ -390,6 +396,8 @@ private static class MaterializedConnector
390396
private final ConnectorSplitManager splitManager;
391397
private final Set<SystemTable> systemTables;
392398
private final Set<Procedure> procedures;
399+
400+
private final Set<Class<?>> functions;
393401
private final ConnectorPageSourceProvider pageSourceProvider;
394402
private final Optional<ConnectorPageSinkProvider> pageSinkProvider;
395403
private final Optional<ConnectorIndexProvider> indexProvider;
@@ -512,6 +520,10 @@ public MaterializedConnector(ConnectorId connectorId, Connector connector)
512520
List<PropertyMetadata<?>> analyzeProperties = connector.getAnalyzeProperties();
513521
requireNonNull(analyzeProperties, "Connector %s returned a null analyze properties set");
514522
this.analyzeProperties = ImmutableList.copyOf(analyzeProperties);
523+
524+
Set<Class<?>> systemFunctions = connector.getSystemFunctions();
525+
requireNonNull(systemFunctions, "Connector %s returned a null system function set");
526+
this.functions = ImmutableSet.copyOf(systemFunctions);
515527
}
516528

517529
public ConnectorId getConnectorId()
@@ -539,6 +551,11 @@ public Set<Procedure> getProcedures()
539551
return procedures;
540552
}
541553

554+
public Set<Class<?>> getSystemFunctions()
555+
{
556+
return functions;
557+
}
558+
542559
public ConnectorPageSourceProvider getPageSourceProvider()
543560
{
544561
return pageSourceProvider;

presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInTypeAndFunctionNamespaceManager.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -552,6 +552,15 @@ public BuiltInTypeAndFunctionNamespaceManager(
552552
FunctionsConfig functionsConfig,
553553
Set<Type> types,
554554
FunctionAndTypeManager functionAndTypeManager)
555+
{
556+
this(blockEncodingSerde, functionsConfig, types, functionAndTypeManager, true);
557+
}
558+
public BuiltInTypeAndFunctionNamespaceManager(
559+
BlockEncodingSerde blockEncodingSerde,
560+
FunctionsConfig functionsConfig,
561+
Set<Type> types,
562+
FunctionAndTypeManager functionAndTypeManager,
563+
boolean registerFunctions)
555564
{
556565
this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null");
557566
this.magicLiteralFunction = new MagicLiteralFunction(blockEncodingSerde);
@@ -605,7 +614,9 @@ public BuiltInTypeAndFunctionNamespaceManager(
605614
.expireAfterWrite(1, HOURS)
606615
.build(CacheLoader.from(this::instantiateParametricType));
607616

608-
registerBuiltInFunctions(getBuiltInFunctions(functionsConfig));
617+
if (registerFunctions) {
618+
registerBuiltInFunctions(getBuiltInFunctions(functionsConfig));
619+
}
609620
registerBuiltInTypes(functionsConfig);
610621

611622
for (Type type : requireNonNull(types, "types is null")) {

presto-main-base/src/main/java/com/facebook/presto/metadata/DelegatingMetadataManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,12 @@ public void registerBuiltInFunctions(List<? extends SqlFunction> functionInfos)
8686
delegate.registerBuiltInFunctions(functionInfos);
8787
}
8888

89+
@Override
90+
public void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functionInfos)
91+
{
92+
delegate.registerConnectorFunctions(catalogName, functionInfos);
93+
}
94+
8995
@Override
9096
public List<String> listSchemaNames(Session session, String catalogName)
9197
{

presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ public class FunctionAndTypeManager
150150
private final AtomicReference<TypeManager> servingTypeManager;
151151
private final AtomicReference<Supplier<Map<String, ParametricType>>> servingTypeManagerParametricTypesSupplier;
152152
private final BuiltInPluginFunctionNamespaceManager builtInPluginFunctionNamespaceManager;
153+
private final FunctionsConfig functionsConfig;
154+
private final Set<Type> types;
153155

154156
@Inject
155157
public FunctionAndTypeManager(
@@ -162,6 +164,8 @@ public FunctionAndTypeManager(
162164
{
163165
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
164166
this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null");
167+
this.functionsConfig = requireNonNull(functionsConfig, "functionsConfig is null");
168+
this.types = requireNonNull(types, "types is null");
165169
this.builtInTypeAndFunctionNamespaceManager = new BuiltInTypeAndFunctionNamespaceManager(blockEncodingSerde, functionsConfig, types, this);
166170
this.functionNamespaceManagers.put(JAVA_BUILTIN_NAMESPACE.getCatalogName(), builtInTypeAndFunctionNamespaceManager);
167171
this.functionInvokerProvider = new FunctionInvokerProvider(this);
@@ -449,6 +453,16 @@ public void registerPluginFunctions(List<? extends SqlFunction> functions)
449453
builtInPluginFunctionNamespaceManager.registerPluginFunctions(functions);
450454
}
451455

456+
public void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functions)
457+
{
458+
FunctionNamespaceManager builtInPluginFunctionNamespaceManager = functionNamespaceManagers.get(catalogName);
459+
if (builtInPluginFunctionNamespaceManager == null) {
460+
builtInPluginFunctionNamespaceManager = new BuiltInTypeAndFunctionNamespaceManager(blockEncodingSerde, functionsConfig, types, this, false);
461+
addFunctionNamespace(catalogName, builtInPluginFunctionNamespaceManager);
462+
}
463+
((BuiltInTypeAndFunctionNamespaceManager) builtInPluginFunctionNamespaceManager).registerBuiltInFunctions(functions);
464+
}
465+
452466
/**
453467
* likePattern / escape is an opportunistic optimization push down to function namespace managers.
454468
* Not all function namespace managers can handle it, thus the returned function list could

presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionExtractor.java

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,21 @@
3232
import static com.facebook.presto.metadata.BuiltInTypeAndFunctionNamespaceManager.JAVA_BUILTIN_NAMESPACE;
3333
import static com.google.common.base.Preconditions.checkArgument;
3434
import static com.google.common.collect.ImmutableList.toImmutableList;
35+
import static java.lang.String.format;
3536

3637
public final class FunctionExtractor
3738
{
3839
private FunctionExtractor() {}
3940

4041
public static List<SqlFunction> extractFunctions(Collection<Class<?>> classes)
42+
{
43+
return extractFunctions(classes, JAVA_BUILTIN_NAMESPACE);
44+
}
45+
46+
public static List<SqlFunction> extractFunctions(Collection<Class<?>> classes, CatalogSchemaName functionNamespace)
4147
{
4248
return classes.stream()
43-
.map(FunctionExtractor::extractFunctions)
49+
.map(c -> extractFunctions(c, functionNamespace))
4450
.flatMap(Collection::stream)
4551
.collect(toImmutableList());
4652
}
@@ -53,17 +59,21 @@ public static List<? extends SqlFunction> extractFunctions(Class<?> clazz)
5359
public static List<? extends SqlFunction> extractFunctions(Class<?> clazz, CatalogSchemaName defaultNamespace)
5460
{
5561
if (WindowFunction.class.isAssignableFrom(clazz)) {
62+
checkArgument(defaultNamespace.equals(JAVA_BUILTIN_NAMESPACE), format("Connector specific Window functions are not supported: Class [%s], Namespace [%s]", clazz.getName(), defaultNamespace));
5663
@SuppressWarnings("unchecked")
5764
Class<? extends WindowFunction> windowClazz = (Class<? extends WindowFunction>) clazz;
5865
return WindowAnnotationsParser.parseFunctionDefinition(windowClazz);
5966
}
6067

6168
if (clazz.isAnnotationPresent(AggregationFunction.class)) {
62-
return SqlAggregationFunction.createFunctionsByAnnotations(clazz);
69+
return SqlAggregationFunction.createFunctionsByAnnotations(clazz, defaultNamespace);
6370
}
6471

65-
if (clazz.isAnnotationPresent(ScalarFunction.class) ||
66-
clazz.isAnnotationPresent(ScalarOperator.class)) {
72+
if (clazz.isAnnotationPresent(ScalarFunction.class)) {
73+
return ScalarFromAnnotationsParser.parseFunctionDefinition(clazz, defaultNamespace);
74+
}
75+
if (clazz.isAnnotationPresent(ScalarOperator.class)) {
76+
checkArgument(defaultNamespace.equals(JAVA_BUILTIN_NAMESPACE), format("Connector specific Scalar Operator functions are not supported: Class [%s], Namespace [%s]", clazz.getName(), defaultNamespace));
6777
return ScalarFromAnnotationsParser.parseFunctionDefinition(clazz);
6878
}
6979

@@ -72,9 +82,9 @@ public static List<? extends SqlFunction> extractFunctions(Class<?> clazz, Catal
7282
}
7383

7484
List<SqlFunction> scalarFunctions = ImmutableList.<SqlFunction>builder()
75-
.addAll(ScalarFromAnnotationsParser.parseFunctionDefinitions(clazz))
85+
.addAll(ScalarFromAnnotationsParser.parseFunctionDefinitions(clazz, defaultNamespace))
7686
.addAll(SqlInvokedScalarFromAnnotationsParser.parseFunctionDefinitions(clazz, defaultNamespace))
77-
.addAll(CodegenScalarFromAnnotationsParser.parseFunctionDefinitions(clazz))
87+
.addAll(CodegenScalarFromAnnotationsParser.parseFunctionDefinitions(clazz, defaultNamespace))
7888
.build();
7989
checkArgument(!scalarFunctions.isEmpty(), "Class [%s] does not define any scalar functions", clazz.getName());
8090
return scalarFunctions;

presto-main-base/src/main/java/com/facebook/presto/metadata/Metadata.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public interface Metadata
7272

7373
void registerBuiltInFunctions(List<? extends SqlFunction> functions);
7474

75+
void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functionInfos);
76+
7577
List<String> listSchemaNames(Session session, String catalogName);
7678

7779
/**

presto-main-base/src/main/java/com/facebook/presto/metadata/MetadataManager.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,12 @@ public void registerBuiltInFunctions(List<? extends SqlFunction> functionInfos)
302302
functionAndTypeManager.registerBuiltInFunctions(functionInfos);
303303
}
304304

305+
@Override
306+
public void registerConnectorFunctions(String catalogName, List<? extends SqlFunction> functionInfos)
307+
{
308+
functionAndTypeManager.registerConnectorFunctions(catalogName, functionInfos);
309+
}
310+
305311
@Override
306312
public List<String> listSchemaNames(Session session, String catalogName)
307313
{

presto-main-base/src/main/java/com/facebook/presto/metadata/SqlAggregationFunction.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.metadata;
1515

16+
import com.facebook.presto.common.CatalogSchemaName;
1617
import com.facebook.presto.common.QualifiedObjectName;
1718
import com.facebook.presto.common.type.TypeSignature;
1819
import com.facebook.presto.operator.aggregation.AggregationFromAnnotationsParser;
@@ -46,7 +47,12 @@ public static List<SqlAggregationFunction> createFunctionByAnnotations(Class<?>
4647

4748
public static List<SqlAggregationFunction> createFunctionsByAnnotations(Class<?> aggregationDefinition)
4849
{
49-
return AggregationFromAnnotationsParser.parseFunctionDefinitions(aggregationDefinition)
50+
return createFunctionsByAnnotations(aggregationDefinition, JAVA_BUILTIN_NAMESPACE);
51+
}
52+
53+
public static List<SqlAggregationFunction> createFunctionsByAnnotations(Class<?> aggregationDefinition, CatalogSchemaName functionNamespace)
54+
{
55+
return AggregationFromAnnotationsParser.parseFunctionDefinitions(aggregationDefinition, functionNamespace)
5056
.stream()
5157
.map(x -> (SqlAggregationFunction) x)
5258
.collect(toImmutableList());

presto-main-base/src/main/java/com/facebook/presto/operator/aggregation/AggregationFromAnnotationsParser.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
*/
1414
package com.facebook.presto.operator.aggregation;
1515

16+
import com.facebook.presto.common.CatalogSchemaName;
1617
import com.facebook.presto.common.type.TypeSignature;
1718
import com.facebook.presto.operator.ParametricImplementationsGroup;
1819
import com.facebook.presto.operator.annotations.FunctionsParserHelper;
@@ -34,6 +35,7 @@
3435
import java.util.Optional;
3536
import java.util.Set;
3637

38+
import static com.facebook.presto.metadata.BuiltInTypeAndFunctionNamespaceManager.JAVA_BUILTIN_NAMESPACE;
3739
import static com.facebook.presto.operator.aggregation.AggregationImplementation.Parser.parseImplementation;
3840
import static com.facebook.presto.operator.annotations.FunctionsParserHelper.parseDescription;
3941
import static com.google.common.base.Preconditions.checkArgument;
@@ -54,7 +56,7 @@ public static ParametricAggregation parseFunctionDefinitionWithTypesConstraint(C
5456
{
5557
requireNonNull(returnType, "returnType is null");
5658
requireNonNull(argumentTypes, "argumentTypes is null");
57-
for (ParametricAggregation aggregation : parseFunctionDefinitions(clazz)) {
59+
for (ParametricAggregation aggregation : parseFunctionDefinitions(clazz, JAVA_BUILTIN_NAMESPACE)) {
5860
if (aggregation.getSignature().getReturnType().equals(returnType) &&
5961
aggregation.getSignature().getArgumentTypes().equals(argumentTypes)) {
6062
return aggregation;
@@ -64,6 +66,11 @@ public static ParametricAggregation parseFunctionDefinitionWithTypesConstraint(C
6466
}
6567

6668
public static List<ParametricAggregation> parseFunctionDefinitions(Class<?> aggregationDefinition)
69+
{
70+
return parseFunctionDefinitions(aggregationDefinition, JAVA_BUILTIN_NAMESPACE);
71+
}
72+
73+
public static List<ParametricAggregation> parseFunctionDefinitions(Class<?> aggregationDefinition, CatalogSchemaName functionNamespace)
6774
{
6875
AggregationFunction aggregationAnnotation = aggregationDefinition.getAnnotation(AggregationFunction.class);
6976
requireNonNull(aggregationAnnotation, "aggregationAnnotation is null");
@@ -76,7 +83,7 @@ public static List<ParametricAggregation> parseFunctionDefinitions(Class<?> aggr
7683
for (Method outputFunction : getOutputFunctions(aggregationDefinition, stateClass)) {
7784
for (Method inputFunction : getInputFunctions(aggregationDefinition, stateClass)) {
7885
for (AggregationHeader header : parseHeaders(aggregationDefinition, outputFunction)) {
79-
AggregationImplementation onlyImplementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory);
86+
AggregationImplementation onlyImplementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory, functionNamespace);
8087
ParametricImplementationsGroup<AggregationImplementation> implementations = ParametricImplementationsGroup.of(onlyImplementation);
8188
builder.add(new ParametricAggregation(implementations.getSignature(), header, implementations));
8289
}
@@ -97,7 +104,7 @@ public static ParametricAggregation parseFunctionDefinition(Class<?> aggregation
97104
Optional<Method> aggregationStateSerializerFactory = getAggregationStateSerializerFactory(aggregationDefinition, stateClass);
98105
Method outputFunction = getOnlyElement(getOutputFunctions(aggregationDefinition, stateClass));
99106
for (Method inputFunction : getInputFunctions(aggregationDefinition, stateClass)) {
100-
AggregationImplementation implementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory);
107+
AggregationImplementation implementation = parseImplementation(aggregationDefinition, header, stateClass, inputFunction, outputFunction, combineFunction, aggregationStateSerializerFactory, JAVA_BUILTIN_NAMESPACE);
101108
implementationsBuilder.addImplementation(implementation);
102109
}
103110
}

0 commit comments

Comments
 (0)