diff --git a/pom.xml b/pom.xml index 52ed78bd8ad51..d3f5e1fb01c06 100644 --- a/pom.xml +++ b/pom.xml @@ -153,6 +153,7 @@ presto-parser presto-main-base presto-main + presto-built-in-worker-function-tools presto-ml presto-geospatial-toolkit presto-benchmark @@ -815,6 +816,12 @@ ${project.version} + + com.facebook.presto + presto-built-in-worker-function-tools + ${project.version} + + com.facebook.presto presto-main-base diff --git a/presto-built-in-worker-function-tools/pom.xml b/presto-built-in-worker-function-tools/pom.xml new file mode 100644 index 0000000000000..ab040be18de4b --- /dev/null +++ b/presto-built-in-worker-function-tools/pom.xml @@ -0,0 +1,57 @@ + + + + presto-root + com.facebook.presto + 0.295-SNAPSHOT + + 4.0.0 + + presto-built-in-worker-function-tools + presto-built-in-worker-function-tools + + + ${project.parent.basedir} + 17 + true + + + + + com.facebook.presto + presto-spi + + + com.facebook.presto + presto-function-namespace-managers-common + + + com.facebook.airlift + http-client + + + com.google.inject + guice + + + com.google.guava + guava + + + com.facebook.airlift + json + + + com.facebook.presto + presto-common + + + com.facebook.airlift + log + + + com.facebook.airlift + configuration + + + diff --git a/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/ForNativeFunctionRegistryInfo.java b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/ForNativeFunctionRegistryInfo.java new file mode 100644 index 0000000000000..000a6b2e6d0e1 --- /dev/null +++ b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/ForNativeFunctionRegistryInfo.java @@ -0,0 +1,26 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.builtin.tools; + +import com.google.inject.BindingAnnotation; + +import java.lang.annotation.Retention; + +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@BindingAnnotation +public @interface ForNativeFunctionRegistryInfo +{ +} diff --git a/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/NativeSidecarFunctionRegistryTool.java b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/NativeSidecarFunctionRegistryTool.java new file mode 100644 index 0000000000000..eff2db8a844a0 --- /dev/null +++ b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/NativeSidecarFunctionRegistryTool.java @@ -0,0 +1,120 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.builtin.tools; + +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.HttpUriBuilder; +import com.facebook.airlift.http.client.JsonResponseHandler; +import com.facebook.airlift.http.client.Request; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata; +import com.facebook.presto.functionNamespace.UdfFunctionSignatureMap; +import com.facebook.presto.spi.Node; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.StandardErrorCode; +import com.facebook.presto.spi.function.SqlFunction; +import com.google.common.collect.ImmutableMap; + +import java.net.URI; +import java.util.List; +import java.util.Map; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.util.Objects.requireNonNull; + +public class NativeSidecarFunctionRegistryTool + implements WorkerFunctionRegistryTool +{ + private final int maxRetries; + private final long retryDelayMs; + private static final Logger log = Logger.get(NativeSidecarFunctionRegistryTool.class); + private final JsonCodec>> nativeFunctionSignatureMapJsonCodec; + private final NodeManager nodeManager; + private final HttpClient httpClient; + private static final String FUNCTION_SIGNATURES_ENDPOINT = "/v1/functions"; + + public NativeSidecarFunctionRegistryTool( + HttpClient httpClient, + JsonCodec>> nativeFunctionSignatureMapJsonCodec, + NodeManager nodeManager, + int nativeSidecarRegistryToolNumRetries, + long nativeSidecarRegistryToolRetryDelayMs) + { + this.nativeFunctionSignatureMapJsonCodec = + requireNonNull(nativeFunctionSignatureMapJsonCodec, "nativeFunctionSignatureMapJsonCodec is null"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.httpClient = requireNonNull(httpClient, "typeManager is null"); + this.maxRetries = nativeSidecarRegistryToolNumRetries; + this.retryDelayMs = nativeSidecarRegistryToolRetryDelayMs; + } + + @Override + public List getWorkerFunctions() + { + return getNativeFunctionSignatureMap() + .getUDFSignatureMap() + .entrySet() + .stream() + .flatMap(entry -> entry.getValue().stream() + .map(metaInfo -> WorkerFunctionUtil.createSqlInvokedFunction(entry.getKey(), metaInfo, "presto"))) + .collect(toImmutableList()); + } + + private UdfFunctionSignatureMap getNativeFunctionSignatureMap() + { + try { + Request request = Request.Builder.prepareGet().setUri(getSidecarLocationOnStartup()).build(); + Map> nativeFunctionSignatureMap = httpClient.execute(request, JsonResponseHandler.createJsonResponseHandler(nativeFunctionSignatureMapJsonCodec)); + return new UdfFunctionSignatureMap(ImmutableMap.copyOf(nativeFunctionSignatureMap)); + } + catch (Exception e) { + throw new PrestoException(StandardErrorCode.INVALID_ARGUMENTS, "Failed to get functions from sidecar.", e); + } + } + + private URI getSidecarLocationOnStartup() + { + Node sidecarNode = null; + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + sidecarNode = nodeManager.getSidecarNode(); + if (sidecarNode != null) { + break; + } + } + catch (Exception e) { + log.error("Error getting sidecar node (attempt " + attempt + "): " + e.getMessage()); + if (attempt == maxRetries) { + throw new RuntimeException("Failed to get sidecar node", e); + } + else { + try { + Thread.sleep(retryDelayMs); + } + catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Retry fetching sidecar function registry interrupted", ie); + } + } + } + } + + return HttpUriBuilder + .uriBuilderFrom(sidecarNode.getHttpUri()) + .appendPath(FUNCTION_SIGNATURES_ENDPOINT) + .build(); + } +} diff --git a/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/NativeSidecarRegistryToolConfig.java b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/NativeSidecarRegistryToolConfig.java new file mode 100644 index 0000000000000..9bc9af778111a --- /dev/null +++ b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/NativeSidecarRegistryToolConfig.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.builtin.tools; + +import com.facebook.airlift.configuration.Config; +import com.facebook.airlift.configuration.ConfigDescription; + +import java.time.Duration; + +public class NativeSidecarRegistryToolConfig +{ + private int nativeSidecarRegistryToolNumRetries = 8; + private long nativeSidecarRegistryToolRetryDelayMs = Duration.ofMinutes(1).toMillis(); + + public int getNativeSidecarRegistryToolNumRetries() + { + return nativeSidecarRegistryToolNumRetries; + } + + @Config("native-sidecar-registry-tool-num-retries") + @ConfigDescription("Max times to retry fetching sidecar node") + public NativeSidecarRegistryToolConfig setNativeSidecarRegistryToolNumRetries(int nativeSidecarRegistryToolNumRetries) + { + this.nativeSidecarRegistryToolNumRetries = nativeSidecarRegistryToolNumRetries; + return this; + } + + public long getNativeSidecarRegistryToolRetryDelayMs() + { + return nativeSidecarRegistryToolRetryDelayMs; + } + + @Config("native-sidecar-registry-tool-retry-delay-ms") + @ConfigDescription("Cooldown period to retry when fetching sidecar node fails") + public NativeSidecarRegistryToolConfig setNativeSidecarRegistryToolRetryDelayMs(int nativeSidecarRegistryToolRetryDelayMs) + { + this.nativeSidecarRegistryToolRetryDelayMs = nativeSidecarRegistryToolRetryDelayMs; + return this; + } +} diff --git a/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/WorkerFunctionRegistryTool.java b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/WorkerFunctionRegistryTool.java new file mode 100644 index 0000000000000..99c7cfe13bc78 --- /dev/null +++ b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/WorkerFunctionRegistryTool.java @@ -0,0 +1,23 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.builtin.tools; + +import com.facebook.presto.spi.function.SqlFunction; + +import java.util.List; + +public interface WorkerFunctionRegistryTool +{ + List getWorkerFunctions(); +} diff --git a/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/WorkerFunctionUtil.java b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/WorkerFunctionUtil.java new file mode 100644 index 0000000000000..d880429f7489f --- /dev/null +++ b/presto-built-in-worker-function-tools/src/main/java/com/facebook/presto/builtin/tools/WorkerFunctionUtil.java @@ -0,0 +1,175 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.builtin.tools; + +import com.facebook.presto.common.CatalogSchemaName; +import com.facebook.presto.common.QualifiedObjectName; +import com.facebook.presto.common.type.NamedTypeSignature; +import com.facebook.presto.common.type.StandardTypes; +import com.facebook.presto.common.type.TypeSignature; +import com.facebook.presto.common.type.TypeSignatureParameter; +import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata; +import com.facebook.presto.spi.function.AggregationFunctionMetadata; +import com.facebook.presto.spi.function.LongVariableConstraint; +import com.facebook.presto.spi.function.Parameter; +import com.facebook.presto.spi.function.RoutineCharacteristics; +import com.facebook.presto.spi.function.SqlInvokedFunction; +import com.facebook.presto.spi.function.TypeVariableConstraint; +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.spi.function.FunctionVersion.notVersioned; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; + +public class WorkerFunctionUtil +{ + private WorkerFunctionUtil() {} + + public static synchronized SqlInvokedFunction createSqlInvokedFunction(String functionName, JsonBasedUdfFunctionMetadata jsonBasedUdfFunctionMetaData, String catalogName) + { + checkState(jsonBasedUdfFunctionMetaData.getRoutineCharacteristics().getLanguage().equals(RoutineCharacteristics.Language.CPP), "WorkerFunctionUtil only supports CPP UDF"); + QualifiedObjectName qualifiedFunctionName = QualifiedObjectName.valueOf(new CatalogSchemaName(catalogName, jsonBasedUdfFunctionMetaData.getSchema()), functionName); + List parameterNameList = jsonBasedUdfFunctionMetaData.getParamNames(); + List parameterTypeList = convertApplicableTypeToVariable(jsonBasedUdfFunctionMetaData.getParamTypes()); + List typeVariableConstraintsList = jsonBasedUdfFunctionMetaData.getTypeVariableConstraints().isPresent() ? + jsonBasedUdfFunctionMetaData.getTypeVariableConstraints().get() : ImmutableList.of(); + List longVariableConstraintList = jsonBasedUdfFunctionMetaData.getLongVariableConstraints().isPresent() ? + jsonBasedUdfFunctionMetaData.getLongVariableConstraints().get() : ImmutableList.of(); + + TypeSignature outputType = convertApplicableTypeToVariable(jsonBasedUdfFunctionMetaData.getOutputType()); + ImmutableList.Builder parameterBuilder = ImmutableList.builder(); + for (int i = 0; i < parameterNameList.size(); i++) { + parameterBuilder.add(new Parameter(parameterNameList.get(i), parameterTypeList.get(i))); + } + + Optional aggregationFunctionMetadata = + jsonBasedUdfFunctionMetaData.getAggregateMetadata() + .map(metadata -> new AggregationFunctionMetadata( + convertApplicableTypeToVariable(metadata.getIntermediateType()), + metadata.isOrderSensitive())); + + return new SqlInvokedFunction( + qualifiedFunctionName, + parameterBuilder.build(), + typeVariableConstraintsList, + longVariableConstraintList, + outputType, + jsonBasedUdfFunctionMetaData.getDocString(), + jsonBasedUdfFunctionMetaData.getRoutineCharacteristics(), + "", + jsonBasedUdfFunctionMetaData.getVariableArity(), + notVersioned(), + jsonBasedUdfFunctionMetaData.getFunctionKind(), + aggregationFunctionMetadata); + } + + // Todo: Improve the handling of parameter type differentiation in native execution. + // HACK: Currently, we lack support for correctly identifying the parameterKind, specifically between TYPE and VARIABLE, + // in native execution. The following utility functions help bridge this gap by parsing the type signature and verifying whether its base + // and parameters are of a supported type. The valid types list are non - parametric types that Presto supports. + public static List convertApplicableTypeToVariable(List typeSignatures) + { + List newTypeSignaturesList = new ArrayList<>(); + for (TypeSignature typeSignature : typeSignatures) { + if (!typeSignature.getParameters().isEmpty()) { + TypeSignature newTypeSignature = + new TypeSignature( + typeSignature.getBase(), + getTypeSignatureParameters( + typeSignature, + typeSignature.getParameters())); + newTypeSignaturesList.add(newTypeSignature); + } + else { + newTypeSignaturesList.add(typeSignature); + } + } + return newTypeSignaturesList; + } + + public static TypeSignature convertApplicableTypeToVariable(TypeSignature typeSignature) + { + List typeSignaturesList = convertApplicableTypeToVariable(ImmutableList.of(typeSignature)); + checkArgument(!typeSignaturesList.isEmpty(), "Type signature list is empty for : " + typeSignature); + return typeSignaturesList.get(0); + } + + private static List getTypeSignatureParameters( + TypeSignature typeSignature, + List typeSignatureParameterList) + { + List newParameterTypeList = new ArrayList<>(); + for (TypeSignatureParameter parameter : typeSignatureParameterList) { + if (parameter.isLongLiteral()) { + newParameterTypeList.add(parameter); + continue; + } + + boolean isNamedTypeSignature = parameter.isNamedTypeSignature(); + TypeSignature parameterTypeSignature; + // If it's a named type signatures only in the case of row signature types. + if (isNamedTypeSignature) { + parameterTypeSignature = parameter.getNamedTypeSignature().getTypeSignature(); + } + else { + parameterTypeSignature = parameter.getTypeSignature(); + } + + if (parameterTypeSignature.getParameters().isEmpty()) { + boolean changeTypeToVariable = isDecimalTypeBase(typeSignature.getBase()); + if (changeTypeToVariable) { + newParameterTypeList.add( + TypeSignatureParameter.of(parameterTypeSignature.getBase())); + } + else { + if (isNamedTypeSignature) { + newParameterTypeList.add(TypeSignatureParameter.of(parameter.getNamedTypeSignature())); + } + else { + newParameterTypeList.add(TypeSignatureParameter.of(parameterTypeSignature)); + } + } + } + else { + TypeSignature newTypeSignature = + new TypeSignature( + parameterTypeSignature.getBase(), + getTypeSignatureParameters( + parameterTypeSignature.getStandardTypeSignature(), + parameterTypeSignature.getParameters())); + if (isNamedTypeSignature) { + newParameterTypeList.add( + TypeSignatureParameter.of( + new NamedTypeSignature( + Optional.empty(), + newTypeSignature))); + } + else { + newParameterTypeList.add(TypeSignatureParameter.of(newTypeSignature)); + } + } + } + return newParameterTypeList; + } + + private static boolean isDecimalTypeBase(String typeBase) + { + return typeBase.equals(StandardTypes.DECIMAL); + } +} diff --git a/presto-main-base/pom.xml b/presto-main-base/pom.xml index f724e286320a5..c51c38e73f2f7 100644 --- a/presto-main-base/pom.xml +++ b/presto-main-base/pom.xml @@ -461,6 +461,11 @@ io.netty netty-transport + + com.facebook.presto + presto-built-in-worker-function-tools + test + diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInFunctionKind.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInFunctionKind.java index 4d12bb7f97d16..71ca25dd468cc 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInFunctionKind.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInFunctionKind.java @@ -20,7 +20,8 @@ public enum BuiltInFunctionKind { ENGINE(0), - PLUGIN(1); + PLUGIN(1), + WORKER(2); private final int value; diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInPluginFunctionNamespaceManager.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInPluginFunctionNamespaceManager.java index 6a694f436f17e..75e0d3415d935 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInPluginFunctionNamespaceManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInPluginFunctionNamespaceManager.java @@ -13,226 +13,33 @@ */ package com.facebook.presto.metadata; -import com.facebook.presto.common.Page; -import com.facebook.presto.common.QualifiedObjectName; -import com.facebook.presto.common.block.BlockEncodingSerde; -import com.facebook.presto.common.function.SqlFunctionResult; -import com.facebook.presto.common.type.TypeManager; -import com.facebook.presto.common.type.UserDefinedType; -import com.facebook.presto.spi.PrestoException; -import com.facebook.presto.spi.function.AlterRoutineCharacteristics; -import com.facebook.presto.spi.function.FunctionHandle; -import com.facebook.presto.spi.function.FunctionMetadata; -import com.facebook.presto.spi.function.FunctionNamespaceManager; -import com.facebook.presto.spi.function.FunctionNamespaceTransactionHandle; -import com.facebook.presto.spi.function.Parameter; -import com.facebook.presto.spi.function.ScalarFunctionImplementation; -import com.facebook.presto.spi.function.Signature; +import com.facebook.presto.spi.function.FunctionImplementationType; import com.facebook.presto.spi.function.SqlFunction; -import com.facebook.presto.spi.function.SqlInvokedFunction; -import com.facebook.presto.spi.function.SqlInvokedScalarFunctionImplementation; -import com.google.common.base.Supplier; -import com.google.common.base.Suppliers; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.UncheckedExecutionException; import java.util.Collection; import java.util.List; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; import static com.facebook.presto.metadata.BuiltInFunctionKind.PLUGIN; import static com.facebook.presto.spi.function.FunctionImplementationType.SQL; -import static com.facebook.presto.spi.function.FunctionKind.SCALAR; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Throwables.throwIfInstanceOf; -import static com.google.common.collect.ImmutableList.toImmutableList; -import static java.util.Collections.emptyList; -import static java.util.Objects.requireNonNull; -import static java.util.concurrent.TimeUnit.HOURS; public class BuiltInPluginFunctionNamespaceManager - implements FunctionNamespaceManager + extends BuiltInSpecialFunctionNamespaceManager { - private volatile FunctionMap functions = new FunctionMap(); - private final FunctionAndTypeManager functionAndTypeManager; - private final Supplier cachedFunctions = - Suppliers.memoize(this::checkForNamingConflicts); - private final LoadingCache specializedFunctionKeyCache; - private final LoadingCache specializedScalarCache; - public BuiltInPluginFunctionNamespaceManager(FunctionAndTypeManager functionAndTypeManager) { - this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null"); - specializedFunctionKeyCache = CacheBuilder.newBuilder() - .maximumSize(1000) - .expireAfterWrite(1, HOURS) - .build(CacheLoader.from(this::doGetSpecializedFunctionKey)); - specializedScalarCache = CacheBuilder.newBuilder() - .maximumSize(1000) - .expireAfterWrite(1, HOURS) - .build(CacheLoader.from(key -> { - checkArgument( - key.getFunction() instanceof SqlInvokedFunction, - "Unsupported scalar function class: %s", - key.getFunction().getClass()); - return new SqlInvokedScalarFunctionImplementation(((SqlInvokedFunction) key.getFunction()).getBody()); - })); + super(functionAndTypeManager); } - public synchronized void registerPluginFunctions(List functions) + @Override + public synchronized void registerBuiltInSpecialFunctions(List functions) { checkForNamingConflicts(functions); this.functions = new FunctionMap(this.functions, functions); } @Override - public FunctionHandle getFunctionHandle(Optional transactionHandle, Signature signature) - { - return new BuiltInFunctionHandle(signature, PLUGIN); - } - - @Override - public Collection getFunctions(Optional transactionHandle, QualifiedObjectName functionName) - { - if (functions.list().isEmpty() || - (!functionName.getCatalogSchemaName().equals(functionAndTypeManager.getDefaultNamespace()))) { - return emptyList(); - } - return cachedFunctions.get().get(functionName); - } - - /** - * likePattern / escape is not used for optimization, returning all functions. - */ - @Override - public Collection listFunctions(Optional likePattern, Optional escape) - { - return cachedFunctions.get().list(); - } - - public FunctionMetadata getFunctionMetadata(FunctionHandle functionHandle) - { - checkArgument(functionHandle instanceof BuiltInFunctionHandle, "Expect BuiltInFunctionHandle"); - Signature signature = ((BuiltInFunctionHandle) functionHandle).getSignature(); - SpecializedFunctionKey functionKey; - try { - functionKey = specializedFunctionKeyCache.getUnchecked(signature); - } - catch (UncheckedExecutionException e) { - throwIfInstanceOf(e.getCause(), PrestoException.class); - throw e; - } - SqlFunction function = functionKey.getFunction(); - checkArgument(function instanceof SqlInvokedFunction, "BuiltInPluginFunctionNamespaceManager only support SqlInvokedFunctions"); - SqlInvokedFunction sqlFunction = (SqlInvokedFunction) function; - List argumentNames = sqlFunction.getParameters().stream().map(Parameter::getName).collect(toImmutableList()); - return new FunctionMetadata( - signature.getName(), - signature.getArgumentTypes(), - argumentNames, - signature.getReturnType(), - signature.getKind(), - sqlFunction.getRoutineCharacteristics().getLanguage(), - SQL, - function.isDeterministic(), - function.isCalledOnNullInput(), - sqlFunction.getVersion(), - sqlFunction.getComplexTypeFunctionDescriptor()); - } - - public ScalarFunctionImplementation getScalarFunctionImplementation(FunctionHandle functionHandle) - { - checkArgument(functionHandle instanceof BuiltInFunctionHandle, "Expect BuiltInFunctionHandle"); - return getScalarFunctionImplementation(((BuiltInFunctionHandle) functionHandle).getSignature()); - } - - @Override - public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); - } - - @Override - public FunctionNamespaceTransactionHandle beginTransaction() - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); - } - - @Override - public void commit(FunctionNamespaceTransactionHandle transactionHandle) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); - } - - @Override - public void abort(FunctionNamespaceTransactionHandle transactionHandle) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); - } - - @Override - public void createFunction(SqlInvokedFunction function, boolean replace) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); - } - - @Override - public void dropFunction(QualifiedObjectName functionName, Optional parameterTypes, boolean exists) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support drop function"); - } - - @Override - public void alterFunction(QualifiedObjectName functionName, Optional parameterTypes, AlterRoutineCharacteristics alterRoutineCharacteristics) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not alter function"); - } - - @Override - public void addUserDefinedType(UserDefinedType userDefinedType) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support adding user defined types"); - } - - @Override - public Optional getUserDefinedType(QualifiedObjectName typeName) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support getting user defined types"); - } - - @Override - public CompletableFuture executeFunction(String source, FunctionHandle functionHandle, Page input, List channels, TypeManager typeManager) - { - throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not execute function"); - } - - private ScalarFunctionImplementation getScalarFunctionImplementation(Signature signature) - { - checkArgument(signature.getKind() == SCALAR, "%s is not a scalar function", signature); - checkArgument(signature.getTypeVariableConstraints().isEmpty(), "%s has unbound type parameters", signature); - - try { - return specializedScalarCache.getUnchecked(getSpecializedFunctionKey(signature)); - } - catch (UncheckedExecutionException e) { - throwIfInstanceOf(e.getCause(), PrestoException.class); - throw e; - } - } - - private synchronized FunctionMap checkForNamingConflicts() - { - Optional> functionNamespaceManager = - functionAndTypeManager.getServingFunctionNamespaceManager(functionAndTypeManager.getDefaultNamespace()); - checkArgument(functionNamespaceManager.isPresent(), "Cannot find function namespace for catalog '%s'", functionAndTypeManager.getDefaultNamespace().getCatalogName()); - checkForNamingConflicts(functionNamespaceManager.get().listFunctions(Optional.empty(), Optional.empty())); - return functions; - } - - private synchronized void checkForNamingConflicts(Collection functions) + protected synchronized void checkForNamingConflicts(Collection functions) { for (SqlFunction function : functions) { for (SqlFunction existingFunction : this.functions.list()) { @@ -241,19 +48,15 @@ private synchronized void checkForNamingConflicts(Collection +{ + protected volatile FunctionMap functions = new FunctionMap(); + private final FunctionAndTypeManager functionAndTypeManager; + private final Supplier cachedFunctions = + Suppliers.memoize(this::createFunctionMap); + private final LoadingCache specializedFunctionKeyCache; + private final LoadingCache specializedScalarCache; + + public BuiltInSpecialFunctionNamespaceManager(FunctionAndTypeManager functionAndTypeManager) + { + this.functionAndTypeManager = requireNonNull(functionAndTypeManager, "functionAndTypeManager is null"); + specializedFunctionKeyCache = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(1, HOURS) + .build(CacheLoader.from(this::doGetSpecializedFunctionKey)); + specializedScalarCache = CacheBuilder.newBuilder() + .maximumSize(1000) + .expireAfterWrite(1, HOURS) + .build(CacheLoader.from(key -> { + checkArgument( + key.getFunction() instanceof SqlInvokedFunction, + "Unsupported scalar function class: %s", + key.getFunction().getClass()); + return new SqlInvokedScalarFunctionImplementation(((SqlInvokedFunction) key.getFunction()).getBody()); + })); + } + + @Override + public Collection getFunctions(Optional transactionHandle, QualifiedObjectName functionName) + { + if (functions.list().isEmpty() || + (!functionName.getCatalogSchemaName().equals(functionAndTypeManager.getDefaultNamespace()))) { + return emptyList(); + } + return cachedFunctions.get().get(functionName); + } + + /** + * likePattern / escape is not used for optimization, returning all functions. + */ + @Override + public Collection listFunctions(Optional likePattern, Optional escape) + { + return cachedFunctions.get().list(); + } + + @Override + public FunctionHandle getFunctionHandle(Optional transactionHandle, Signature signature) + { + return new BuiltInFunctionHandle(signature, getBuiltInFunctionKind()); + } + + public FunctionMetadata getFunctionMetadata(FunctionHandle functionHandle) + { + checkArgument(functionHandle instanceof BuiltInFunctionHandle, "Expect BuiltInFunctionHandle"); + Signature signature = ((BuiltInFunctionHandle) functionHandle).getSignature(); + SpecializedFunctionKey functionKey; + try { + functionKey = specializedFunctionKeyCache.getUnchecked(signature); + } + catch (UncheckedExecutionException e) { + throwIfInstanceOf(e.getCause(), PrestoException.class); + throw e; + } + SqlFunction function = functionKey.getFunction(); + checkArgument(function instanceof SqlInvokedFunction, "BuiltInPluginFunctionNamespaceManager only support SqlInvokedFunctions"); + SqlInvokedFunction sqlFunction = (SqlInvokedFunction) function; + List argumentNames = sqlFunction.getParameters().stream().map(Parameter::getName).collect(toImmutableList()); + return new FunctionMetadata( + signature.getName(), + signature.getArgumentTypes(), + argumentNames, + signature.getReturnType(), + signature.getKind(), + sqlFunction.getRoutineCharacteristics().getLanguage(), + getDefaultFunctionMetadataImplementationType(), + function.isDeterministic(), + function.isCalledOnNullInput(), + sqlFunction.getVersion(), + sqlFunction.getComplexTypeFunctionDescriptor()); + } + + @Override + public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); + } + + @Override + public FunctionNamespaceTransactionHandle beginTransaction() + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); + } + + @Override + public void commit(FunctionNamespaceTransactionHandle transactionHandle) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); + } + + @Override + public void abort(FunctionNamespaceTransactionHandle transactionHandle) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); + } + + @Override + public void createFunction(SqlInvokedFunction function, boolean replace) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support setting block encoding"); + } + + @Override + public void dropFunction(QualifiedObjectName functionName, Optional parameterTypes, boolean exists) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support drop function"); + } + + @Override + public void alterFunction(QualifiedObjectName functionName, Optional parameterTypes, AlterRoutineCharacteristics alterRoutineCharacteristics) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not alter function"); + } + + @Override + public void addUserDefinedType(UserDefinedType userDefinedType) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support adding user defined types"); + } + + @Override + public Optional getUserDefinedType(QualifiedObjectName typeName) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not support getting user defined types"); + } + + @Override + public CompletableFuture executeFunction(String source, FunctionHandle functionHandle, Page input, List channels, TypeManager typeManager) + { + throw new UnsupportedOperationException("BuiltInPluginFunctionNamespaceManager does not execute function"); + } + + protected abstract void checkForNamingConflicts(Collection functions); + + protected abstract BuiltInFunctionKind getBuiltInFunctionKind(); + + protected abstract FunctionImplementationType getDefaultFunctionMetadataImplementationType(); + + public abstract void registerBuiltInSpecialFunctions(List functions); + + @Override + public ScalarFunctionImplementation getScalarFunctionImplementation(FunctionHandle functionHandle) + { + checkArgument(functionHandle instanceof BuiltInFunctionHandle, "Expect BuiltInFunctionHandle"); + return getScalarFunctionImplementation(((BuiltInFunctionHandle) functionHandle).getSignature()); + } + + private synchronized FunctionMap createFunctionMap() + { + Optional> functionNamespaceManager = + functionAndTypeManager.getServingFunctionNamespaceManager(functionAndTypeManager.getDefaultNamespace()); + checkArgument(functionNamespaceManager.isPresent(), "Cannot find function namespace for catalog '%s'", functionAndTypeManager.getDefaultNamespace().getCatalogName()); + checkForNamingConflicts(functionNamespaceManager.get().listFunctions(Optional.empty(), Optional.empty())); + return functions; + } + + private ScalarFunctionImplementation getScalarFunctionImplementation(Signature signature) + { + checkArgument(signature.getKind() == SCALAR, "%s is not a scalar function", signature); + checkArgument(signature.getTypeVariableConstraints().isEmpty(), "%s has unbound type parameters", signature); + + try { + return specializedScalarCache.getUnchecked(getSpecializedFunctionKey(signature)); + } + catch (UncheckedExecutionException e) { + throwIfInstanceOf(e.getCause(), PrestoException.class); + throw e; + } + } + + private SpecializedFunctionKey doGetSpecializedFunctionKey(Signature signature) + { + return functionAndTypeManager.getSpecializedFunctionKey(signature, getFunctions(Optional.empty(), signature.getName())); + } + + private SpecializedFunctionKey getSpecializedFunctionKey(Signature signature) + { + try { + return specializedFunctionKeyCache.getUnchecked(signature); + } + catch (UncheckedExecutionException e) { + throwIfInstanceOf(e.getCause(), PrestoException.class); + throw e; + } + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInWorkerFunctionNamespaceManager.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInWorkerFunctionNamespaceManager.java new file mode 100644 index 0000000000000..41ef943001cb3 --- /dev/null +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/BuiltInWorkerFunctionNamespaceManager.java @@ -0,0 +1,59 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.metadata; + +import com.facebook.presto.spi.function.FunctionImplementationType; +import com.facebook.presto.spi.function.SqlFunction; + +import java.util.Collection; +import java.util.List; + +import static com.facebook.presto.metadata.BuiltInFunctionKind.WORKER; +import static com.facebook.presto.spi.function.FunctionImplementationType.CPP; + +public class BuiltInWorkerFunctionNamespaceManager + extends BuiltInSpecialFunctionNamespaceManager +{ + public BuiltInWorkerFunctionNamespaceManager(FunctionAndTypeManager functionAndTypeManager) + { + super(functionAndTypeManager); + } + + @Override + public synchronized void registerBuiltInSpecialFunctions(List functions) + { + // only register functions once + if (!this.functions.list().isEmpty()) { + return; + } + this.functions = new FunctionMap(this.functions, functions); + } + + @Override + protected synchronized void checkForNamingConflicts(Collection functions) + { + } + + @Override + protected BuiltInFunctionKind getBuiltInFunctionKind() + { + return WORKER; + } + + @Override + protected FunctionImplementationType getDefaultFunctionMetadataImplementationType() + { + return CPP; + } +} diff --git a/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java b/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java index a409fdddb4afc..a00a42726c113 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java +++ b/presto-main-base/src/main/java/com/facebook/presto/metadata/FunctionAndTypeManager.java @@ -13,6 +13,7 @@ */ package com.facebook.presto.metadata; +import com.facebook.airlift.log.Logger; import com.facebook.presto.Session; import com.facebook.presto.common.CatalogSchemaName; import com.facebook.presto.common.Page; @@ -39,6 +40,7 @@ import com.facebook.presto.spi.function.AggregationFunctionImplementation; import com.facebook.presto.spi.function.AlterRoutineCharacteristics; import com.facebook.presto.spi.function.FunctionHandle; +import com.facebook.presto.spi.function.FunctionImplementationType; import com.facebook.presto.spi.function.FunctionMetadata; import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.FunctionNamespaceManager; @@ -94,6 +96,7 @@ import static com.facebook.presto.SystemSessionProperties.isListBuiltInFunctionsOnly; import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature; import static com.facebook.presto.metadata.BuiltInFunctionKind.PLUGIN; +import static com.facebook.presto.metadata.BuiltInFunctionKind.WORKER; import static com.facebook.presto.metadata.BuiltInTypeAndFunctionNamespaceManager.JAVA_BUILTIN_NAMESPACE; import static com.facebook.presto.metadata.CastType.toOperatorType; import static com.facebook.presto.metadata.FunctionSignatureMatcher.constructFunctionNotFoundErrorMessage; @@ -132,6 +135,7 @@ public class FunctionAndTypeManager implements FunctionMetadataManager, TypeManager { private static final Pattern DEFAULT_NAMESPACE_PREFIX_PATTERN = Pattern.compile("[a-z]+\\.[a-z]+"); + private static final Logger log = Logger.get(FunctionAndTypeManager.class); private final TransactionManager transactionManager; private final TableFunctionRegistry tableFunctionRegistry; private final BlockEncodingSerde blockEncodingSerde; @@ -147,9 +151,11 @@ public class FunctionAndTypeManager private final LoadingCache functionCache; private final CacheStatsMBean cacheStatsMBean; private final boolean nativeExecution; + private final boolean isBuiltInSidecarFunctionsEnabled; private final CatalogSchemaName defaultNamespace; private final AtomicReference servingTypeManager; private final AtomicReference>> servingTypeManagerParametricTypesSupplier; + private final BuiltInWorkerFunctionNamespaceManager builtInWorkerFunctionNamespaceManager; private final BuiltInPluginFunctionNamespaceManager builtInPluginFunctionNamespaceManager; private final FunctionsConfig functionsConfig; private final Set types; @@ -185,9 +191,11 @@ public FunctionAndTypeManager( this.functionSignatureMatcher = new FunctionSignatureMatcher(this); this.typeCoercer = new TypeCoercer(functionsConfig, this); this.nativeExecution = featuresConfig.isNativeExecutionEnabled(); + this.isBuiltInSidecarFunctionsEnabled = featuresConfig.isBuiltInSidecarFunctionsEnabled(); this.defaultNamespace = configureDefaultNamespace(functionsConfig.getDefaultNamespacePrefix()); this.servingTypeManager = new AtomicReference<>(builtInTypeAndFunctionNamespaceManager); this.servingTypeManagerParametricTypesSupplier = new AtomicReference<>(this::getServingTypeManagerParametricTypes); + this.builtInWorkerFunctionNamespaceManager = new BuiltInWorkerFunctionNamespaceManager(this); this.builtInPluginFunctionNamespaceManager = new BuiltInPluginFunctionNamespaceManager(this); } @@ -361,8 +369,12 @@ public FunctionMetadata getFunctionMetadata(FunctionHandle functionHandle) if (isBuiltInPluginFunctionHandle(functionHandle)) { return builtInPluginFunctionNamespaceManager.getFunctionMetadata(functionHandle); } + if (isBuiltInWorkerFunctionHandle(functionHandle)) { + return builtInWorkerFunctionNamespaceManager.getFunctionMetadata(functionHandle); + } Optional> functionNamespaceManager = getServingFunctionNamespaceManager(functionHandle.getCatalogSchemaName()); checkArgument(functionNamespaceManager.isPresent(), "Cannot find function namespace for '%s'", functionHandle.getCatalogSchemaName()); + return functionNamespaceManager.get().getFunctionMetadata(functionHandle); } @@ -457,9 +469,16 @@ public void registerBuiltInFunctions(List functions) builtInTypeAndFunctionNamespaceManager.registerBuiltInFunctions(functions); } + public void registerWorkerFunctions(List functions) + { + if (isBuiltInSidecarFunctionsEnabled) { + builtInWorkerFunctionNamespaceManager.registerBuiltInSpecialFunctions(functions); + } + } + public void registerPluginFunctions(List functions) { - builtInPluginFunctionNamespaceManager.registerPluginFunctions(functions); + builtInPluginFunctionNamespaceManager.registerBuiltInSpecialFunctions(functions); } public void registerConnectorFunctions(String catalogName, List functions) @@ -490,6 +509,7 @@ public List listFunctions(Session session, Optional likePat defaultNamespace.getCatalogName()).listFunctions(likePattern, escape).stream() .collect(toImmutableList())); functions.addAll(builtInPluginFunctionNamespaceManager.listFunctions(likePattern, escape).stream().collect(toImmutableList())); + functions.addAll(builtInWorkerFunctionNamespaceManager.listFunctions(likePattern, escape).stream().collect(toImmutableList())); } else { functions.addAll(SessionFunctionUtils.listFunctions(session.getSessionFunctions())); @@ -497,6 +517,7 @@ public List listFunctions(Session session, Optional likePat .flatMap(manager -> manager.listFunctions(likePattern, escape).stream()) .collect(toImmutableList())); functions.addAll(builtInPluginFunctionNamespaceManager.listFunctions(likePattern, escape).stream().collect(toImmutableList())); + functions.addAll(builtInWorkerFunctionNamespaceManager.listFunctions(likePattern, escape).stream().collect(toImmutableList())); } return functions.build().stream() @@ -642,6 +663,10 @@ public ScalarFunctionImplementation getScalarFunctionImplementation(FunctionHand if (isBuiltInPluginFunctionHandle(functionHandle)) { return builtInPluginFunctionNamespaceManager.getScalarFunctionImplementation(functionHandle); } + if (isBuiltInWorkerFunctionHandle(functionHandle)) { + return builtInWorkerFunctionNamespaceManager.getScalarFunctionImplementation(functionHandle); + } + Optional> functionNamespaceManager = getServingFunctionNamespaceManager(functionHandle.getCatalogSchemaName()); checkArgument(functionNamespaceManager.isPresent(), "Cannot find function namespace for '%s'", functionHandle.getCatalogSchemaName()); return functionNamespaceManager.get().getScalarFunctionImplementation(functionHandle); @@ -965,13 +990,15 @@ private Collection getFunctions( return ImmutableList.builder() .addAll(functionNamespaceManager.getFunctions(transactionHandle, functionName)) .addAll(builtInPluginFunctionNamespaceManager.getFunctions(transactionHandle, functionName)) + .addAll(builtInWorkerFunctionNamespaceManager.getFunctions(transactionHandle, functionName)) .build(); } /** * Gets the function handle of the function if there is a match. We enforce explicit naming for dynamic function namespaces. * All unqualified function names will only be resolved against the built-in default function namespace. We get all the candidates - * from the current default namespace and additionally all the candidates from builtInPluginFunctionNamespaceManager. + * from the current default namespace and additionally all the candidates from builtInPluginFunctionNamespaceManager and + * builtInWorkerFunctionNamespaceManager. * * @throws PrestoException if there are no matches or multiple matches */ @@ -986,18 +1013,36 @@ private FunctionHandle getMatchingFunctionHandle( getMatchingFunction(functionNamespaceManager.getFunctions(transactionHandle, functionName), parameterTypes, coercionAllowed); Optional matchingPluginFunctionSignature = getMatchingFunction(builtInPluginFunctionNamespaceManager.getFunctions(transactionHandle, functionName), parameterTypes, coercionAllowed); + Optional matchingWorkerFunctionSignature = + getMatchingFunction(builtInWorkerFunctionNamespaceManager.getFunctions(transactionHandle, functionName), parameterTypes, coercionAllowed); if (matchingDefaultFunctionSignature.isPresent() && matchingPluginFunctionSignature.isPresent()) { throw new PrestoException(AMBIGUOUS_FUNCTION_CALL, format("Function '%s' has two matching signatures. Please specify parameter types. \n" + "First match : '%s', Second match: '%s'", functionName, matchingDefaultFunctionSignature.get(), matchingPluginFunctionSignature.get())); } + if (matchingPluginFunctionSignature.isPresent()) { + return builtInPluginFunctionNamespaceManager.getFunctionHandle(transactionHandle, matchingPluginFunctionSignature.get()); + } + + if (matchingDefaultFunctionSignature.isPresent() && matchingWorkerFunctionSignature.isPresent()) { + FunctionHandle defaultFunctionHandle = functionNamespaceManager.getFunctionHandle(transactionHandle, matchingDefaultFunctionSignature.get()); + FunctionHandle workerFunctionHandle = builtInWorkerFunctionNamespaceManager.getFunctionHandle(transactionHandle, matchingWorkerFunctionSignature.get()); + + if (functionNamespaceManager.getFunctionMetadata(defaultFunctionHandle).getImplementationType().equals(FunctionImplementationType.JAVA)) { + return defaultFunctionHandle; + } + if (functionNamespaceManager.getFunctionMetadata(defaultFunctionHandle).getImplementationType().equals(FunctionImplementationType.SQL)) { + return workerFunctionHandle; + } + } + if (matchingDefaultFunctionSignature.isPresent()) { return functionNamespaceManager.getFunctionHandle(transactionHandle, matchingDefaultFunctionSignature.get()); } - if (matchingPluginFunctionSignature.isPresent()) { - return builtInPluginFunctionNamespaceManager.getFunctionHandle(transactionHandle, matchingPluginFunctionSignature.get()); + if (matchingWorkerFunctionSignature.isPresent()) { + return builtInWorkerFunctionNamespaceManager.getFunctionHandle(transactionHandle, matchingWorkerFunctionSignature.get()); } throw new PrestoException(FUNCTION_NOT_FOUND, constructFunctionNotFoundErrorMessage(functionName, parameterTypes, @@ -1017,6 +1062,11 @@ private boolean isBuiltInPluginFunctionHandle(FunctionHandle functionHandle) return (functionHandle instanceof BuiltInFunctionHandle) && ((BuiltInFunctionHandle) functionHandle).getBuiltInFunctionKind().equals(PLUGIN); } + private boolean isBuiltInWorkerFunctionHandle(FunctionHandle functionHandle) + { + return (functionHandle instanceof BuiltInFunctionHandle) && ((BuiltInFunctionHandle) functionHandle).getBuiltInFunctionKind().equals(WORKER); + } + private static class FunctionResolutionCacheKey { private final QualifiedObjectName functionName; diff --git a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java index 2b0b89d99b472..06b1fee1fdce7 100644 --- a/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java +++ b/presto-main-base/src/main/java/com/facebook/presto/sql/analyzer/FeaturesConfig.java @@ -310,6 +310,8 @@ public class FeaturesConfig private boolean pushdownSubfieldForMapFunctions = true; private long maxSerializableObjectSize = 1000; + private boolean builtInSidecarFunctionsEnabled; + public enum PartitioningPrecisionStrategy { // Let Presto decide when to repartition @@ -3111,4 +3113,17 @@ public long getMaxSerializableObjectSize() { return maxSerializableObjectSize; } + + @Config("built-in-sidecar-functions-enabled") + @ConfigDescription("Enable using CPP functions from sidecar over coordinator SQL implementations.") + public FeaturesConfig setBuiltInSidecarFunctionsEnabled(boolean builtInSidecarFunctionsEnabled) + { + this.builtInSidecarFunctionsEnabled = builtInSidecarFunctionsEnabled; + return this; + } + + public boolean isBuiltInSidecarFunctionsEnabled() + { + return this.builtInSidecarFunctionsEnabled; + } } diff --git a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestConvertApplicableTypeToVariable.java b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestConvertApplicableTypeToVariable.java similarity index 98% rename from presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestConvertApplicableTypeToVariable.java rename to presto-main-base/src/test/java/com/facebook/presto/metadata/TestConvertApplicableTypeToVariable.java index 93f9ee75f2b82..9e9a8b629124a 100644 --- a/presto-native-sidecar-plugin/src/test/java/com/facebook/presto/sidecar/TestConvertApplicableTypeToVariable.java +++ b/presto-main-base/src/test/java/com/facebook/presto/metadata/TestConvertApplicableTypeToVariable.java @@ -11,7 +11,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.facebook.presto.sidecar; +package com.facebook.presto.metadata; import com.facebook.presto.common.type.NamedTypeSignature; import com.facebook.presto.common.type.TypeSignature; @@ -21,8 +21,8 @@ import java.util.List; import java.util.Optional; +import static com.facebook.presto.builtin.tools.WorkerFunctionUtil.convertApplicableTypeToVariable; import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature; -import static com.facebook.presto.sidecar.functionNamespace.NativeFunctionNamespaceManager.convertApplicableTypeToVariable; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertTrue; diff --git a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java index 25a802b1e06ea..cc29fe7160751 100644 --- a/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java +++ b/presto-main-base/src/test/java/com/facebook/presto/sql/analyzer/TestFeaturesConfig.java @@ -201,6 +201,7 @@ public void testDefaults() .setPushRemoteExchangeThroughGroupId(false) .setOptimizeMultipleApproxPercentileOnSameFieldEnabled(true) .setNativeExecutionEnabled(false) + .setBuiltInSidecarFunctionsEnabled(false) .setDisableTimeStampWithTimeZoneForNative(false) .setDisableIPAddressForNative(false) .setNativeExecutionExecutablePath("./presto_server") @@ -416,6 +417,7 @@ public void testExplicitPropertyMappings() .put("optimizer.push-remote-exchange-through-group-id", "true") .put("optimizer.optimize-multiple-approx-percentile-on-same-field", "false") .put("native-execution-enabled", "true") + .put("built-in-sidecar-functions-enabled", "true") .put("disable-timestamp-with-timezone-for-native-execution", "true") .put("disable-ipaddress-for-native-execution", "true") .put("native-execution-executable-path", "/bin/echo") @@ -628,6 +630,7 @@ public void testExplicitPropertyMappings() .setPushRemoteExchangeThroughGroupId(true) .setOptimizeMultipleApproxPercentileOnSameFieldEnabled(false) .setNativeExecutionEnabled(true) + .setBuiltInSidecarFunctionsEnabled(true) .setDisableTimeStampWithTimeZoneForNative(true) .setDisableIPAddressForNative(true) .setNativeExecutionExecutablePath("/bin/echo") diff --git a/presto-main/pom.xml b/presto-main/pom.xml index 07e0f4828ceb0..194d70a0610c8 100644 --- a/presto-main/pom.xml +++ b/presto-main/pom.xml @@ -23,6 +23,11 @@ presto-main-base + + com.facebook.presto + presto-built-in-worker-function-tools + + com.facebook.airlift jmx-http @@ -53,6 +58,11 @@ presto-common + + com.facebook.presto + presto-function-namespace-managers-common + + io.jsonwebtoken jjwt-api diff --git a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java index 552b6da5d812e..6462e9b8fb57b 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/PrestoServer.java @@ -33,6 +33,7 @@ import com.facebook.drift.transport.netty.server.DriftNettyServerTransport; import com.facebook.presto.ClientRequestFilterManager; import com.facebook.presto.ClientRequestFilterModule; +import com.facebook.presto.builtin.tools.WorkerFunctionRegistryTool; import com.facebook.presto.dispatcher.QueryPrerequisitesManager; import com.facebook.presto.dispatcher.QueryPrerequisitesManagerModule; import com.facebook.presto.eventlistener.EventListenerManager; @@ -43,6 +44,7 @@ import com.facebook.presto.metadata.Catalog; import com.facebook.presto.metadata.CatalogManager; import com.facebook.presto.metadata.DiscoveryNodeManager; +import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.SessionPropertyManager; import com.facebook.presto.metadata.StaticCatalogStore; @@ -54,6 +56,7 @@ import com.facebook.presto.server.security.PasswordAuthenticatorManager; import com.facebook.presto.server.security.PrestoAuthenticatorManager; import com.facebook.presto.server.security.ServerSecurityModule; +import com.facebook.presto.spi.function.SqlFunction; import com.facebook.presto.sql.analyzer.FeaturesConfig; import com.facebook.presto.sql.expressions.ExpressionOptimizerManager; import com.facebook.presto.sql.parser.SqlParserOptions; @@ -198,6 +201,11 @@ public void run() PluginNodeManager pluginNodeManager = new PluginNodeManager(nodeManager, nodeInfo.getEnvironment()); planCheckerProviderManager.loadPlanCheckerProviders(pluginNodeManager); + if (injector.getInstance(FeaturesConfig.class).isBuiltInSidecarFunctionsEnabled()) { + List functions = injector.getInstance(WorkerFunctionRegistryTool.class).getWorkerFunctions(); + injector.getInstance(FunctionAndTypeManager.class).registerWorkerFunctions(functions); + } + injector.getInstance(ClientRequestFilterManager.class).loadClientRequestFilters(); injector.getInstance(ExpressionOptimizerManager.class).loadExpressionOptimizerFactories(); diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 634b8654634a6..c564764143ce7 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -16,7 +16,10 @@ import com.facebook.airlift.concurrent.BoundedExecutor; import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; import com.facebook.airlift.discovery.client.ServiceAnnouncement; +import com.facebook.airlift.http.client.HttpClient; import com.facebook.airlift.http.server.TheServlet; +import com.facebook.airlift.json.JsonCodec; +import com.facebook.airlift.json.JsonCodecFactory; import com.facebook.airlift.json.JsonObjectMapperProvider; import com.facebook.airlift.stats.GcMonitor; import com.facebook.airlift.stats.JmxGcMonitor; @@ -32,6 +35,10 @@ import com.facebook.presto.PagesIndexPageSorter; import com.facebook.presto.SystemSessionProperties; import com.facebook.presto.block.BlockJsonSerde; +import com.facebook.presto.builtin.tools.ForNativeFunctionRegistryInfo; +import com.facebook.presto.builtin.tools.NativeSidecarFunctionRegistryTool; +import com.facebook.presto.builtin.tools.NativeSidecarRegistryToolConfig; +import com.facebook.presto.builtin.tools.WorkerFunctionRegistryTool; import com.facebook.presto.catalogserver.CatalogServerClient; import com.facebook.presto.catalogserver.RandomCatalogServerAddressSelector; import com.facebook.presto.catalogserver.RemoteMetadataManager; @@ -79,6 +86,7 @@ import com.facebook.presto.execution.scheduler.TableWriteInfo; import com.facebook.presto.execution.scheduler.nodeSelection.NodeSelectionStats; import com.facebook.presto.execution.scheduler.nodeSelection.SimpleTtlNodeSelectorConfig; +import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata; import com.facebook.presto.index.IndexManager; import com.facebook.presto.memory.LocalMemoryManager; import com.facebook.presto.memory.LocalMemoryManagerExporter; @@ -243,6 +251,7 @@ import com.google.inject.Module; import com.google.inject.Provides; import com.google.inject.Scopes; +import com.google.inject.TypeLiteral; import com.google.inject.multibindings.MapBinder; import io.airlift.slice.Slice; import jakarta.annotation.PreDestroy; @@ -251,6 +260,7 @@ import jakarta.servlet.Servlet; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; import java.util.concurrent.LinkedBlockingQueue; @@ -266,6 +276,7 @@ import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder; import static com.facebook.airlift.jaxrs.JaxrsBinder.jaxrsBinder; import static com.facebook.airlift.json.JsonBinder.jsonBinder; +import static com.facebook.airlift.json.JsonCodec.listJsonCodec; import static com.facebook.airlift.json.JsonCodecBinder.jsonCodecBinder; import static com.facebook.airlift.json.smile.SmileCodecBinder.smileCodecBinder; import static com.facebook.airlift.units.DataSize.Unit.MEGABYTE; @@ -394,6 +405,11 @@ else if (serverConfig.isCoordinator()) { .withAddressSelector(((addressSelectorBinder, annotation, prefix) -> addressSelectorBinder.bind(AddressSelector.class).annotatedWith(annotation).to(FixedAddressSelector.class))); + binder.bind(new TypeLiteral>>>() {}) + .toInstance(new JsonCodecFactory().mapJsonCodec(String.class, listJsonCodec(JsonBasedUdfFunctionMetadata.class))); + httpClientBinder(binder).bindHttpClient("native-function-registry", ForNativeFunctionRegistryInfo.class); + configBinder(binder).bindConfig(NativeSidecarRegistryToolConfig.class); + // node scheduler // TODO: remove from NodePartitioningManager and move to CoordinatorModule configBinder(binder).bindConfig(NodeSchedulerConfig.class); @@ -891,6 +907,22 @@ public static FragmentResultCacheManager createFragmentResultCacheManager(FileFr return new NoOpFragmentResultCacheManager(); } + @Provides + @Singleton + public WorkerFunctionRegistryTool provideWorkerFunctionRegistryTool( + NativeSidecarRegistryToolConfig config, + @ForNativeFunctionRegistryInfo HttpClient httpClient, + JsonCodec>> nativeFunctionSignatureMapJsonCodec, + NodeManager nodeManager) + { + return new NativeSidecarFunctionRegistryTool( + httpClient, + nativeFunctionSignatureMapJsonCodec, + nodeManager, + config.getNativeSidecarRegistryToolNumRetries(), + config.getNativeSidecarRegistryToolRetryDelayMs()); + } + public static class ExecutorCleanup { private final List executors; diff --git a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java index 1cb04591b15da..43c574058f2bb 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/testing/TestingPrestoServer.java @@ -34,6 +34,7 @@ import com.facebook.drift.transport.netty.server.DriftNettyServerTransport; import com.facebook.presto.ClientRequestFilterManager; import com.facebook.presto.ClientRequestFilterModule; +import com.facebook.presto.builtin.tools.WorkerFunctionRegistryTool; import com.facebook.presto.connector.ConnectorManager; import com.facebook.presto.cost.StatsCalculator; import com.facebook.presto.dispatcher.DispatchManager; @@ -51,6 +52,7 @@ import com.facebook.presto.memory.LocalMemoryManager; import com.facebook.presto.metadata.AllNodes; import com.facebook.presto.metadata.CatalogManager; +import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.metadata.InternalNode; import com.facebook.presto.metadata.InternalNodeManager; import com.facebook.presto.metadata.Metadata; @@ -147,6 +149,8 @@ public class TestingPrestoServer private final boolean preserveData; private final LifeCycleManager lifeCycleManager; private final PluginManager pluginManager; + private final FunctionAndTypeManager functionAndTypeManager; + private final WorkerFunctionRegistryTool workerFunctionRegistryTool; private final ConnectorManager connectorManager; private final TestingHttpServer server; private final CatalogManager catalogManager; @@ -367,6 +371,9 @@ public TestingPrestoServer( connectorManager = injector.getInstance(ConnectorManager.class); + functionAndTypeManager = injector.getInstance(FunctionAndTypeManager.class); + workerFunctionRegistryTool = injector.getInstance(WorkerFunctionRegistryTool.class); + server = injector.getInstance(TestingHttpServer.class); catalogManager = injector.getInstance(CatalogManager.class); transactionManager = injector.getInstance(TransactionManager.class); @@ -501,6 +508,11 @@ private Map getServerProperties( return ImmutableMap.copyOf(serverProperties); } + public void registerWorkerFunctions() + { + functionAndTypeManager.registerWorkerFunctions(workerFunctionRegistryTool.getWorkerFunctions()); + } + @Override public void close() throws IOException diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml index ab6d81aba382b..8f849f866335a 100644 --- a/presto-native-execution/pom.xml +++ b/presto-native-execution/pom.xml @@ -36,6 +36,18 @@ test + + org.jetbrains + annotations + test + + + + org.weakref + jmxutils + test + + io.airlift.tpch tpch @@ -116,6 +128,22 @@ test + + com.facebook.presto + presto-parser + test + + + com.google.guava + guava + + + com.facebook.presto + presto-spi + + + + com.facebook.presto presto-iceberg diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java index bc3049b63ff59..9e7581f1702f5 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java @@ -123,6 +123,7 @@ public static class HiveQueryRunnerBuilder private String security; private boolean addStorageFormatToPath; private boolean coordinatorSidecarEnabled; + private boolean builtInWorkerFunctionsEnabled; private boolean enableRuntimeMetricsCollection; private boolean enableSsdCache; private boolean failOnNestedLoopJoin; @@ -218,6 +219,12 @@ public HiveQueryRunnerBuilder setCoordinatorSidecarEnabled(boolean coordinatorSi return this; } + public HiveQueryRunnerBuilder setBuiltInWorkerFunctionsEnabled(boolean builtInWorkerFunctionsEnabled) + { + this.builtInWorkerFunctionsEnabled = builtInWorkerFunctionsEnabled; + return this; + } + public HiveQueryRunnerBuilder setStorageFormat(String storageFormat) { this.storageFormat = storageFormat; @@ -261,7 +268,7 @@ public QueryRunner build() Optional> externalWorkerLauncher = Optional.empty(); if (this.useExternalWorkerLauncher) { externalWorkerLauncher = getExternalWorkerLauncher("hive", serverBinary, cacheMaxSize, remoteFunctionServerUds, - failOnNestedLoopJoin, coordinatorSidecarEnabled, enableRuntimeMetricsCollection, enableSsdCache); + failOnNestedLoopJoin, coordinatorSidecarEnabled, builtInWorkerFunctionsEnabled, enableRuntimeMetricsCollection, enableSsdCache); } return HiveQueryRunner.createQueryRunner( ImmutableList.of(), @@ -350,7 +357,7 @@ public QueryRunner build() Optional> externalWorkerLauncher = Optional.empty(); if (this.useExternalWorkerLauncher) { externalWorkerLauncher = getExternalWorkerLauncher("iceberg", serverBinary, cacheMaxSize, remoteFunctionServerUds, - false, false, false, false); + false, false, false, false, false); } return IcebergQueryRunner.builder() .setExtraProperties(extraProperties) @@ -446,6 +453,7 @@ public static Optional> getExternalWorkerLaunc Optional remoteFunctionServerUds, Boolean failOnNestedLoopJoin, boolean isCoordinatorSidecarEnabled, + boolean isBuiltInWorkerFunctionsEnabled, boolean enableRuntimeMetricsCollection, boolean enableSsdCache) { @@ -468,6 +476,10 @@ public static Optional> getExternalWorkerLaunc "native-sidecar=true%n" + "presto.default-namespace=native.default%n", configProperties); } + else if (isBuiltInWorkerFunctionsEnabled) { + configProperties = format("%s%n" + + "native-sidecar=true%n", configProperties); + } if (enableRuntimeMetricsCollection) { configProperties = format("%s%n" + diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestBuiltInNativeFunctions.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestBuiltInNativeFunctions.java new file mode 100644 index 0000000000000..f22707cf0bfad --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestBuiltInNativeFunctions.java @@ -0,0 +1,178 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.facebook.presto.nativeworker; + +import com.facebook.presto.cost.CostCalculator; +import com.facebook.presto.cost.CostCalculatorUsingExchanges; +import com.facebook.presto.cost.CostCalculatorWithEstimatedExchanges; +import com.facebook.presto.cost.CostComparator; +import com.facebook.presto.cost.TaskCountEstimator; +import com.facebook.presto.execution.QueryManagerConfig; +import com.facebook.presto.execution.TaskManagerConfig; +import com.facebook.presto.metadata.InMemoryNodeManager; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.nodeManager.PluginNodeManager; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.sql.analyzer.FeaturesConfig; +import com.facebook.presto.sql.analyzer.QueryExplainer; +import com.facebook.presto.sql.expressions.ExpressionOptimizerManager; +import com.facebook.presto.sql.planner.PartitioningProviderManager; +import com.facebook.presto.sql.planner.PlanFragmenter; +import com.facebook.presto.sql.planner.PlanOptimizers; +import com.facebook.presto.sql.planner.optimizations.PlanOptimizer; +import com.facebook.presto.sql.planner.sanity.PlanChecker; +import com.facebook.presto.sql.tree.ExplainType; +import com.facebook.presto.testing.QueryRunner; +import com.facebook.presto.tests.AbstractTestQueryFramework; +import com.facebook.presto.tests.DistributedQueryRunner; +import com.google.common.collect.ImmutableMap; +import org.intellij.lang.annotations.Language; +import org.testng.annotations.Test; +import org.weakref.jmx.MBeanExporter; +import org.weakref.jmx.testing.TestingMBeanServer; + +import java.util.List; +import java.util.regex.Pattern; + +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createLineitem; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createNation; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrders; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createOrdersEx; +import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.createRegion; +import static com.facebook.presto.transaction.TransactionBuilder.transaction; +import static com.facebook.presto.util.AnalyzerUtil.createParsingOptions; +import static java.util.Collections.emptyList; +import static org.testng.Assert.fail; + +public class TestBuiltInNativeFunctions + extends AbstractTestQueryFramework +{ + @Override + protected void createTables() + { + QueryRunner queryRunner = (QueryRunner) getExpectedQueryRunner(); + createLineitem(queryRunner); + createNation(queryRunner); + createOrders(queryRunner); + createOrdersEx(queryRunner); + createRegion(queryRunner); + } + + @Override + protected QueryRunner createQueryRunner() + throws Exception + { + DistributedQueryRunner queryRunner = (DistributedQueryRunner) PrestoNativeQueryRunnerUtils.nativeHiveQueryRunnerBuilder() + .setExtraProperties(ImmutableMap.of("built-in-sidecar-functions-enabled", "true")) + .setAddStorageFormatToPath(true) + .setBuiltInWorkerFunctionsEnabled(true) + .build(); + + queryRunner.registerNativeFunctions(); + + return queryRunner; + } + + @Override + protected QueryRunner createExpectedQueryRunner() + throws Exception + { + return PrestoNativeQueryRunnerUtils.javaHiveQueryRunnerBuilder() + .setAddStorageFormatToPath(true) + .build(); + } + + private void assertJsonPlan(@Language("SQL") String query, boolean withBuiltInSidecarEnabled, @Language("RegExp") String jsonPlanRegex, boolean shouldContainRegex) + { + QueryRunner queryRunner; + if (withBuiltInSidecarEnabled) { + queryRunner = getQueryRunner(); + } + else { + queryRunner = (QueryRunner) getExpectedQueryRunner(); + } + + QueryExplainer explainer = getQueryExplainerFromProvidedQueryRunner(queryRunner); + transaction(queryRunner.getTransactionManager(), queryRunner.getAccessControl()) + .singleStatement() + .execute(queryRunner.getDefaultSession(), transactionSession -> { + String actualPlan = explainer.getJsonPlan(transactionSession, getSqlParser().createStatement(query, createParsingOptions(transactionSession)), ExplainType.Type.LOGICAL, emptyList(), WarningCollector.NOOP, query); + Pattern p = Pattern.compile(jsonPlanRegex, Pattern.MULTILINE); + if (shouldContainRegex) { + if (!p.matcher(actualPlan).find()) { + fail("Query plan text does not contain regex"); + } + } + else { + if (p.matcher(actualPlan).find()) { + fail("Query plan text contains bad pattern"); + } + } + + return null; + }); + } + + private QueryExplainer getQueryExplainerFromProvidedQueryRunner(QueryRunner queryRunner) + { + Metadata metadata = queryRunner.getMetadata(); + FeaturesConfig featuresConfig = createFeaturesConfig(); + boolean noExchange = queryRunner.getNodeCount() == 1; + TaskCountEstimator taskCountEstimator = new TaskCountEstimator(queryRunner::getNodeCount); + CostCalculator costCalculator = new CostCalculatorUsingExchanges(taskCountEstimator); + List optimizers = new PlanOptimizers( + metadata, + getSqlParser(), + noExchange, + new MBeanExporter(new TestingMBeanServer()), + queryRunner.getSplitManager(), + queryRunner.getPlanOptimizerManager(), + queryRunner.getPageSourceManager(), + queryRunner.getStatsCalculator(), + costCalculator, + new CostCalculatorWithEstimatedExchanges(costCalculator, taskCountEstimator), + new CostComparator(featuresConfig), + taskCountEstimator, + new PartitioningProviderManager(), + featuresConfig, + new ExpressionOptimizerManager( + new PluginNodeManager(new InMemoryNodeManager()), + queryRunner.getMetadata().getFunctionAndTypeManager()), + new TaskManagerConfig()) + .getPlanningTimeOptimizers(); + return new QueryExplainer( + optimizers, + new PlanFragmenter(metadata, queryRunner.getNodePartitioningManager(), new QueryManagerConfig(), featuresConfig, queryRunner.getPlanCheckerProviderManager()), + metadata, + queryRunner.getAccessControl(), + getSqlParser(), + queryRunner.getStatsCalculator(), + costCalculator, + ImmutableMap.of(), + new PlanChecker(featuresConfig, false, queryRunner.getPlanCheckerProviderManager())); + } + + @Test + public void testUdfQueries() + { + assertQuery("SELECT ARRAY['abc']"); + assertQuery("SELECT ARRAY[1, 2, 3]"); + assertQuery("SELECT map_remove_null_values( MAP( ARRAY['a', 'b', 'c'], ARRAY[1, NULL, 3] ) )"); + assertQuery("SELECT presto.default.map_remove_null_values( MAP( ARRAY['a', 'b', 'c'], ARRAY[1, NULL, 3] ) )"); + assertQueryFails("SELECT native.default.map_remove_null_values( MAP( ARRAY['a', 'b', 'c'], ARRAY[1, NULL, 3] ) )", ".*Function native.default.map_remove_null_values not registered.*"); + assertJsonPlan("SELECT map_remove_null_values( MAP( ARRAY['a', 'b', 'c'], ARRAY[1, NULL, 3] ) )", true, "lambda", false); + assertJsonPlan("SELECT map_remove_null_values( MAP( ARRAY['a', 'b', 'c'], ARRAY[1, NULL, 3] ) )", false, "lambda", true); + } +} diff --git a/presto-native-sidecar-plugin/pom.xml b/presto-native-sidecar-plugin/pom.xml index 153ad18295f8c..b2bc40e8f2bd4 100644 --- a/presto-native-sidecar-plugin/pom.xml +++ b/presto-native-sidecar-plugin/pom.xml @@ -260,6 +260,10 @@ + + com.facebook.presto + presto-built-in-worker-function-tools + diff --git a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/functionNamespace/NativeFunctionNamespaceManager.java b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/functionNamespace/NativeFunctionNamespaceManager.java index 11ef917f5522d..f1b7142a188ee 100644 --- a/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/functionNamespace/NativeFunctionNamespaceManager.java +++ b/presto-native-sidecar-plugin/src/main/java/com/facebook/presto/sidecar/functionNamespace/NativeFunctionNamespaceManager.java @@ -14,14 +14,10 @@ package com.facebook.presto.sidecar.functionNamespace; import com.facebook.airlift.log.Logger; -import com.facebook.presto.common.CatalogSchemaName; import com.facebook.presto.common.QualifiedObjectName; -import com.facebook.presto.common.type.NamedTypeSignature; -import com.facebook.presto.common.type.StandardTypes; import com.facebook.presto.common.type.Type; import com.facebook.presto.common.type.TypeManager; import com.facebook.presto.common.type.TypeSignature; -import com.facebook.presto.common.type.TypeSignatureParameter; import com.facebook.presto.common.type.UserDefinedType; import com.facebook.presto.functionNamespace.AbstractSqlInvokedFunctionNamespaceManager; import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata; @@ -38,7 +34,6 @@ import com.facebook.presto.spi.function.FunctionMetadata; import com.facebook.presto.spi.function.FunctionMetadataManager; import com.facebook.presto.spi.function.FunctionNamespaceTransactionHandle; -import com.facebook.presto.spi.function.LongVariableConstraint; import com.facebook.presto.spi.function.Parameter; import com.facebook.presto.spi.function.ScalarFunctionImplementation; import com.facebook.presto.spi.function.Signature; @@ -48,35 +43,29 @@ import com.facebook.presto.spi.function.SqlFunctionSupplier; import com.facebook.presto.spi.function.SqlInvokedAggregationFunctionImplementation; import com.facebook.presto.spi.function.SqlInvokedFunction; -import com.facebook.presto.spi.function.TypeVariableConstraint; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Suppliers; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.UncheckedExecutionException; import jakarta.inject.Inject; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Supplier; +import static com.facebook.presto.builtin.tools.WorkerFunctionUtil.createSqlInvokedFunction; import static com.facebook.presto.common.type.TypeSignatureUtils.resolveIntermediateType; import static com.facebook.presto.spi.StandardErrorCode.DUPLICATE_FUNCTION_ERROR; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_INTERNAL_ERROR; import static com.facebook.presto.spi.StandardErrorCode.GENERIC_USER_ERROR; import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED; -import static com.facebook.presto.spi.function.FunctionVersion.notVersioned; -import static com.facebook.presto.spi.function.RoutineCharacteristics.Language.CPP; import static com.google.common.base.Preconditions.checkArgument; -import static com.google.common.base.Preconditions.checkState; import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.collect.MoreCollectors.onlyElement; import static java.lang.String.format; @@ -138,7 +127,7 @@ private synchronized void populateNamespaceManager(UdfFunctionSignatureMap udfFu { Map> udfSignatureMap = udfFunctionSignatureMap.getUDFSignatureMap(); udfSignatureMap.forEach((name, metaInfoList) -> { - List functions = metaInfoList.stream().map(metaInfo -> createSqlInvokedFunction(name, metaInfo)).collect(toImmutableList()); + List functions = metaInfoList.stream().map(metaInfo -> createSqlInvokedFunction(name, metaInfo, getCatalogName())).collect(toImmutableList()); functions.forEach(this::createFunction); }); } @@ -200,44 +189,6 @@ private AggregationFunctionImplementation processSqlFunctionHandle(SqlFunctionHa return aggregationImplementationByHandle.get(sqlFunctionHandle); } - protected synchronized SqlInvokedFunction createSqlInvokedFunction(String functionName, JsonBasedUdfFunctionMetadata jsonBasedUdfFunctionMetaData) - { - checkState(jsonBasedUdfFunctionMetaData.getRoutineCharacteristics().getLanguage().equals(CPP), "NativeFunctionNamespaceManager only supports CPP UDF"); - QualifiedObjectName qualifiedFunctionName = QualifiedObjectName.valueOf(new CatalogSchemaName(getCatalogName(), jsonBasedUdfFunctionMetaData.getSchema()), functionName); - List parameterNameList = jsonBasedUdfFunctionMetaData.getParamNames(); - List parameterTypeList = convertApplicableTypeToVariable(jsonBasedUdfFunctionMetaData.getParamTypes()); - List typeVariableConstraintsList = jsonBasedUdfFunctionMetaData.getTypeVariableConstraints().isPresent() ? - jsonBasedUdfFunctionMetaData.getTypeVariableConstraints().get() : Collections.emptyList(); - List longVariableConstraintList = jsonBasedUdfFunctionMetaData.getLongVariableConstraints().isPresent() ? - jsonBasedUdfFunctionMetaData.getLongVariableConstraints().get() : Collections.emptyList(); - - TypeSignature outputType = convertApplicableTypeToVariable(jsonBasedUdfFunctionMetaData.getOutputType()); - ImmutableList.Builder parameterBuilder = ImmutableList.builder(); - for (int i = 0; i < parameterNameList.size(); i++) { - parameterBuilder.add(new Parameter(parameterNameList.get(i), parameterTypeList.get(i))); - } - - Optional aggregationFunctionMetadata = - jsonBasedUdfFunctionMetaData.getAggregateMetadata() - .map(metadata -> new AggregationFunctionMetadata( - convertApplicableTypeToVariable(metadata.getIntermediateType()), - metadata.isOrderSensitive())); - - return new SqlInvokedFunction( - qualifiedFunctionName, - parameterBuilder.build(), - typeVariableConstraintsList, - longVariableConstraintList, - outputType, - jsonBasedUdfFunctionMetaData.getDocString(), - jsonBasedUdfFunctionMetaData.getRoutineCharacteristics(), - "", - jsonBasedUdfFunctionMetaData.getVariableArity(), - notVersioned(), - jsonBasedUdfFunctionMetaData.getFunctionKind(), - aggregationFunctionMetadata); - } - @Override protected Collection fetchFunctionsDirect(QualifiedObjectName functionName) { @@ -320,109 +271,12 @@ public final FunctionHandle getFunctionHandle(Optional typeSignaturesList = convertApplicableTypeToVariable(ImmutableList.of(typeSignature)); - checkArgument(!typeSignaturesList.isEmpty(), "Type signature list is empty for : " + typeSignature); - return typeSignaturesList.get(0); - } - - public static List convertApplicableTypeToVariable(List typeSignatures) - { - List newTypeSignaturesList = new ArrayList<>(); - for (TypeSignature typeSignature : typeSignatures) { - if (!typeSignature.getParameters().isEmpty()) { - TypeSignature newTypeSignature = - new TypeSignature( - typeSignature.getBase(), - getTypeSignatureParameters( - typeSignature, - typeSignature.getParameters())); - newTypeSignaturesList.add(newTypeSignature); - } - else { - newTypeSignaturesList.add(typeSignature); - } - } - return newTypeSignaturesList; - } - @VisibleForTesting public FunctionDefinitionProvider getFunctionDefinitionProvider() { return functionDefinitionProvider; } - private static List getTypeSignatureParameters( - TypeSignature typeSignature, - List typeSignatureParameterList) - { - List newParameterTypeList = new ArrayList<>(); - for (TypeSignatureParameter parameter : typeSignatureParameterList) { - if (parameter.isLongLiteral()) { - newParameterTypeList.add(parameter); - continue; - } - - boolean isNamedTypeSignature = parameter.isNamedTypeSignature(); - TypeSignature parameterTypeSignature; - // If it's a named type signatures only in the case of row signature types. - if (isNamedTypeSignature) { - parameterTypeSignature = parameter.getNamedTypeSignature().getTypeSignature(); - } - else { - parameterTypeSignature = parameter.getTypeSignature(); - } - - if (parameterTypeSignature.getParameters().isEmpty()) { - boolean changeTypeToVariable = isDecimalTypeBase(typeSignature.getBase()); - if (changeTypeToVariable) { - newParameterTypeList.add( - TypeSignatureParameter.of(parameterTypeSignature.getBase())); - } - else { - if (isNamedTypeSignature) { - newParameterTypeList.add(TypeSignatureParameter.of(parameter.getNamedTypeSignature())); - } - else { - newParameterTypeList.add(TypeSignatureParameter.of(parameterTypeSignature)); - } - } - } - else { - TypeSignature newTypeSignature = - new TypeSignature( - parameterTypeSignature.getBase(), - getTypeSignatureParameters( - parameterTypeSignature.getStandardTypeSignature(), - parameterTypeSignature.getParameters())); - if (isNamedTypeSignature) { - newParameterTypeList.add( - TypeSignatureParameter.of( - new NamedTypeSignature( - Optional.empty(), - newTypeSignature))); - } - else { - newParameterTypeList.add(TypeSignatureParameter.of(newTypeSignature)); - } - } - } - return newParameterTypeList; - } - - private static boolean isDecimalTypeBase(String typeBase) - { - return typeBase.equals(StandardTypes.DECIMAL); - } - - // Hack ends here - private synchronized void createFunction(SqlInvokedFunction function) { checkFunctionLanguageSupported(function); diff --git a/presto-spark-base/pom.xml b/presto-spark-base/pom.xml index 8f3dd0f91a66d..37dc1d4fb541b 100644 --- a/presto-spark-base/pom.xml +++ b/presto-spark-base/pom.xml @@ -117,6 +117,10 @@ com.facebook.airlift.drift * + + com.facebook.presto + presto-function-namespace-managers-common + diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java index 72999fbe97c16..12d1a4b712425 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/AbstractTestQueryFramework.java @@ -565,7 +565,7 @@ protected SubPlan subplan(String sql, Session session) } } - private QueryExplainer getQueryExplainer() + protected QueryExplainer getQueryExplainer() { Metadata metadata = queryRunner.getMetadata(); FeaturesConfig featuresConfig = createFeaturesConfig(); @@ -630,6 +630,12 @@ protected ExpectedQueryRunner getExpectedQueryRunner() return expectedQueryRunner; } + protected SqlParser getSqlParser() + { + checkState(sqlParser != null, "sqlParser not set"); + return sqlParser; + } + public interface QueryRunnerSupplier { QueryRunner get() diff --git a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java index d9ce3439011ab..67bf65f67eb55 100644 --- a/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java +++ b/presto-tests/src/main/java/com/facebook/presto/tests/DistributedQueryRunner.java @@ -1040,6 +1040,13 @@ public void loadPlanCheckerProviderManager(String planCheckerProviderName, Map