diff --git a/pom.xml b/pom.xml index 94c7baf21bc30..980a063f688e8 100644 --- a/pom.xml +++ b/pom.xml @@ -212,6 +212,7 @@ presto-native-sidecar-plugin presto-base-arrow-flight presto-function-server + presto-clp @@ -761,6 +762,12 @@ ${project.version} + + com.facebook.presto + presto-clp + ${project.version} + + com.facebook.presto presto-expressions diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml new file mode 100644 index 0000000000000..5682de9be673b --- /dev/null +++ b/presto-clp/pom.xml @@ -0,0 +1,134 @@ + + + 4.0.0 + + + com.facebook.presto + presto-root + 0.292-SNAPSHOT + + + presto-clp + Presto - CLP Connector + presto-plugin + + + ${project.parent.basedir} + + + + + mysql + mysql-connector-java + runtime + + + + com.facebook.airlift + bootstrap + + + + com.facebook.airlift + json + + + + com.facebook.airlift + log + + + + com.facebook.airlift + configuration + + + + com.google.inject + guice + + + + com.google.code.findbugs + jsr305 + true + + + + com.google.guava + guava + + + + javax.inject + javax.inject + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.facebook.presto + presto-spi + provided + + + + com.facebook.presto + presto-common + provided + + + + io.airlift + units + provided + + + + io.airlift + slice + provided + + + + org.testng + testng + test + + + + com.facebook.presto + presto-main + test + + + + com.facebook.presto + presto-analyzer + test + + + + com.h2database + h2 + test + + + + com.facebook.presto + presto-parser + test + + + + org.apache.commons + commons-math3 + test + + + diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java new file mode 100644 index 0000000000000..98a05bf15ac7c --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class ClpColumnHandle + implements ColumnHandle +{ + private final String columnName; + private final String originalColumnName; + private final Type columnType; + private final boolean nullable; + + @JsonCreator + public ClpColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("originalColumnName") String originalColumnName, + @JsonProperty("columnType") Type columnType, + @JsonProperty("nullable") boolean nullable) + { + this.columnName = columnName; + this.originalColumnName = originalColumnName; + this.columnType = columnType; + this.nullable = nullable; + } + + public ClpColumnHandle(String columnName, Type columnType, boolean nullable) + { + this(columnName, columnName, columnType, nullable); + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public String getOriginalColumnName() + { + return originalColumnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + @JsonProperty + public boolean isNullable() + { + return nullable; + } + + public ColumnMetadata getColumnMetadata() + { + ColumnMetadata.Builder builder = ColumnMetadata.builder() + .setName(columnName) + .setType(columnType) + .setNullable(nullable); + return builder.build(); + } + + @Override + public int hashCode() + { + return Objects.hash(columnName, columnType); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ClpColumnHandle other = (ClpColumnHandle) obj; + return Objects.equals(this.columnName, other.columnName) && + Objects.equals(this.columnType, other.columnType); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("columnName", columnName) + .add("columnType", columnType) + .add("nullable", nullable) + .toString(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java new file mode 100644 index 0000000000000..641bd5860e984 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java @@ -0,0 +1,189 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.configuration.Config; +import com.facebook.presto.spi.PrestoException; + +import java.util.regex.Pattern; + +public class ClpConfig +{ + public enum ArchiveSource + { + LOCAL, + S3 + } + + public enum MetadataProviderType + { + MYSQL + } + + public enum SplitProviderType + { + MYSQL + } + + private boolean polymorphicTypeEnabled = true; + private MetadataProviderType metadataProviderType = MetadataProviderType.MYSQL; + private String metadataDbUrl; + private String metadataDbName; + private String metadataDbUser; + private String metadataDbPassword; + private String metadataTablePrefix; + private long metadataRefreshInterval = 60; + private long metadataExpireInterval = 600; + private ArchiveSource archiveSource = ArchiveSource.LOCAL; + private SplitProviderType splitProviderType = SplitProviderType.MYSQL; + + public static final Pattern SAFE_SQL_IDENTIFIER = Pattern.compile("^[a-zA-Z0-9_]+$"); + + public boolean isPolymorphicTypeEnabled() + { + return polymorphicTypeEnabled; + } + + @Config("clp.polymorphic-type-enabled") + public ClpConfig setPolymorphicTypeEnabled(boolean polymorphicTypeEnabled) + { + this.polymorphicTypeEnabled = polymorphicTypeEnabled; + return this; + } + + public MetadataProviderType getMetadataProviderType() + { + return metadataProviderType; + } + + @Config("clp.metadata-provider-type") + public ClpConfig setMetadataProviderType(MetadataProviderType metadataProviderType) + { + this.metadataProviderType = metadataProviderType; + return this; + } + + public String getMetadataDbUrl() + { + return metadataDbUrl; + } + + @Config("clp.metadata-db-url") + public ClpConfig setMetadataDbUrl(String metadataDbUrl) + { + this.metadataDbUrl = metadataDbUrl; + return this; + } + + public String getMetadataDbName() + { + return metadataDbName; + } + + @Config("clp.metadata-db-name") + public ClpConfig setMetadataDbName(String metadataDbName) + { + this.metadataDbName = metadataDbName; + return this; + } + + public String getMetadataDbUser() + { + return metadataDbUser; + } + + @Config("clp.metadata-db-user") + public ClpConfig setMetadataDbUser(String metadataDbUser) + { + this.metadataDbUser = metadataDbUser; + return this; + } + + public String getMetadataDbPassword() + { + return metadataDbPassword; + } + + @Config("clp.metadata-db-password") + public ClpConfig setMetadataDbPassword(String metadataDbPassword) + { + this.metadataDbPassword = metadataDbPassword; + return this; + } + + public String getMetadataTablePrefix() + { + return metadataTablePrefix; + } + + @Config("clp.metadata-table-prefix") + public ClpConfig setMetadataTablePrefix(String metadataTablePrefix) + { + if (metadataTablePrefix == null || !SAFE_SQL_IDENTIFIER.matcher(metadataTablePrefix).matches()) { + throw new PrestoException(ClpErrorCode.CLP_UNSUPPORTED_CONFIG_OPTION, "Invalid metadataTablePrefix: " + + metadataTablePrefix + ". Only alphanumeric characters and underscores are allowed."); + } + + this.metadataTablePrefix = metadataTablePrefix; + return this; + } + + public long getMetadataRefreshInterval() + { + return metadataRefreshInterval; + } + + @Config("clp.metadata-refresh-interval") + public ClpConfig setMetadataRefreshInterval(long metadataRefreshInterval) + { + this.metadataRefreshInterval = metadataRefreshInterval; + return this; + } + + public long getMetadataExpireInterval() + { + return metadataExpireInterval; + } + + @Config("clp.metadata-expire-interval") + public ClpConfig setMetadataExpireInterval(long metadataExpireInterval) + { + this.metadataExpireInterval = metadataExpireInterval; + return this; + } + + public ArchiveSource getInputSource() + { + return archiveSource; + } + + @Config("clp.input-source") + public ClpConfig setInputSource(ArchiveSource archiveSource) + { + this.archiveSource = archiveSource; + return this; + } + + public SplitProviderType getSplitProviderType() + { + return splitProviderType; + } + + @Config("clp.split-provider-type") + public ClpConfig setSplitProviderType(SplitProviderType splitProviderType) + { + this.splitProviderType = splitProviderType; + return this; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java new file mode 100644 index 0000000000000..fe4f66df99516 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java @@ -0,0 +1,90 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.bootstrap.LifeCycleManager; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.transaction.IsolationLevel; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class ClpConnector + implements Connector +{ + private static final Logger log = Logger.get(ClpConnector.class); + + private final LifeCycleManager lifeCycleManager; + private final ClpMetadata metadata; + private final ClpSplitManager splitManager; + private final FunctionMetadataManager functionManager; + private final StandardFunctionResolution functionResolution; + + @Inject + public ClpConnector(LifeCycleManager lifeCycleManager, + ClpMetadata metadata, + ClpSplitManager splitManager, + FunctionMetadataManager functionManager, + StandardFunctionResolution functionResolution) + { + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + this.functionManager = requireNonNull(functionManager, "functionManager is null"); + this.functionResolution = requireNonNull(functionResolution, "functionResolution is null"); + } + + @Override + public ConnectorPlanOptimizerProvider getConnectorPlanOptimizerProvider() + { + return new ClpPlanOptimizerProvider(functionManager, functionResolution); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + return ClpTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) + { + return metadata; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public final void shutdown() + { + try { + lifeCycleManager.stop(); + } + catch (Exception e) { + log.error(e, "Error shutting down connector"); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java new file mode 100644 index 0000000000000..a984943df87b7 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.google.inject.Injector; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class ClpConnectorFactory + implements ConnectorFactory +{ + @Override + public String getName() + { + return "clp"; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new ClpHandleResolver(); + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) + { + requireNonNull(catalogName, "catalogName is null"); + requireNonNull(config, "config is null"); + try { + Bootstrap app = new Bootstrap(new JsonModule(), new ClpModule(), binder -> { + binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + binder.bind(NodeManager.class).toInstance(context.getNodeManager()); + binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager()); + binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); + binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService()); + }); + + Injector injector = app.doNotInitializeLogging().setRequiredConfigurationProperties(config).initialize(); + + return injector.getInstance(ClpConnector.class); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java new file mode 100644 index 0000000000000..8cb2438277404 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.ErrorCode; +import com.facebook.presto.common.ErrorType; +import com.facebook.presto.spi.ErrorCodeSupplier; + +import static com.facebook.presto.common.ErrorType.EXTERNAL; + +public enum ClpErrorCode + implements ErrorCodeSupplier +{ + CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION(0, EXTERNAL), + CLP_UNSUPPORTED_METADATA_SOURCE(1, EXTERNAL), + CLP_UNSUPPORTED_SPLIT_SOURCE(2, EXTERNAL), + CLP_UNSUPPORTED_TYPE(3, EXTERNAL), + CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL); + + private final ErrorCode errorCode; + + ClpErrorCode(int code, ErrorType type) + { + errorCode = new ErrorCode(code + 0x0400_0000, name(), type); + } + + @Override + public ErrorCode toErrorCode() + { + return errorCode; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpExpression.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpExpression.java new file mode 100644 index 0000000000000..df32727bcf27b --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpExpression.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.relation.RowExpression; + +import java.util.Optional; + +/** + * Represents the result of converting a Presto RowExpression into a CLP-compatible KQL query. + * There are three possible cases: + * 1. The entire RowExpression is convertible to KQL: `definition` is set, `remainingExpression` is empty. + * 2. Part of the RowExpression is convertible: the KQL part is stored in `definition`, + * and the remaining untranslatable part is stored in `remainingExpression`. + * 3. None of the expression is convertible: the full RowExpression is stored in `remainingExpression`, + * and `definition` is empty. + */ +public class ClpExpression +{ + // Optional KQL query string representing the fully or partially translatable part of the expression. + private final Optional definition; + + // The remaining (non-translatable) portion of the RowExpression, if any. + private final Optional remainingExpression; + + public ClpExpression(Optional definition, Optional remainingExpression) + { + this.definition = definition; + this.remainingExpression = remainingExpression; + } + + // Creates an empty ClpExpression (no KQL definition, no remaining expression). + public ClpExpression() + { + this (Optional.empty(), Optional.empty()); + } + + // Creates a ClpExpression from a fully translatable KQL string. + public ClpExpression(String definition) + { + this(Optional.of(definition), Optional.empty()); + } + + // Creates a ClpExpression from a non-translatable RowExpression. + public ClpExpression(RowExpression remainingExpression) + { + this(Optional.empty(), Optional.of(remainingExpression)); + } + + public Optional getDefinition() + { + return definition; + } + + public Optional getRemainingExpression() + { + return remainingExpression; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java new file mode 100644 index 0000000000000..e332534aadd7b --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpFilterToKqlConverter.java @@ -0,0 +1,687 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.function.OperatorType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.function.FunctionHandle; +import com.facebook.presto.spi.function.FunctionMetadata; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.relation.CallExpression; +import com.facebook.presto.spi.relation.ConstantExpression; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.RowExpressionVisitor; +import com.facebook.presto.spi.relation.SpecialFormExpression; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.google.common.collect.ImmutableSet; +import io.airlift.slice.Slice; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.common.function.OperatorType.EQUAL; +import static com.facebook.presto.common.function.OperatorType.GREATER_THAN; +import static com.facebook.presto.common.function.OperatorType.GREATER_THAN_OR_EQUAL; +import static com.facebook.presto.common.function.OperatorType.LESS_THAN; +import static com.facebook.presto.common.function.OperatorType.LESS_THAN_OR_EQUAL; +import static com.facebook.presto.common.function.OperatorType.NOT_EQUAL; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION; +import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND; +import static java.util.Objects.requireNonNull; + +/** + * ClpFilterToKqlConverter translates Presto RowExpressions into KQL (Kibana Query Language) filters + * used as CLP queries. This is used primarily for pushing down supported filters to the CLP engine. + * This class implements the RowExpressionVisitor interface and recursively walks Presto filter expressions, + * attempting to convert supported expressions (e.g., comparisons, logical AND/OR, LIKE, IN, IS NULL, + * and SUBSTR-based expressions) into corresponding KQL filter strings. Any part of the expression that + * cannot be translated is preserved as a "remaining expression" for potential fallback processing. + * Supported translations include: + * - Variable-to-literal comparisons (e.g., =, !=, <, >, <=, >=) + * - String pattern matches using LIKE + * - Membership checks using IN + * - NULL checks via IS NULL + * - Substring comparisons (e.g., SUBSTR(x, start, len) = "val") mapped to wildcard KQL queries + * - Dereferencing fields from row-typed variables + * - Logical operators AND, OR, and NOT + */ +public class ClpFilterToKqlConverter + implements RowExpressionVisitor +{ + private static final Set LOGICAL_BINARY_OPS_FILTER = + ImmutableSet.of(EQUAL, NOT_EQUAL, LESS_THAN, LESS_THAN_OR_EQUAL, GREATER_THAN, GREATER_THAN_OR_EQUAL); + + private final StandardFunctionResolution standardFunctionResolution; + private final FunctionMetadataManager functionMetadataManager; + private final Map assignments; + + public ClpFilterToKqlConverter(StandardFunctionResolution standardFunctionResolution, + FunctionMetadataManager functionMetadataManager, + Map assignments) + { + this.standardFunctionResolution = + requireNonNull(standardFunctionResolution, "standardFunctionResolution is null"); + this.functionMetadataManager = requireNonNull(functionMetadataManager, "function metadata manager is null"); + this.assignments = requireNonNull(assignments, "assignments is null"); + } + + @Override + public ClpExpression visitCall(CallExpression node, Void context) + { + FunctionHandle functionHandle = node.getFunctionHandle(); + if (standardFunctionResolution.isNotFunction(functionHandle)) { + return handleNot(node); + } + + if (standardFunctionResolution.isLikeFunction(functionHandle)) { + return handleLike(node); + } + + FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(node.getFunctionHandle()); + Optional operatorTypeOptional = functionMetadata.getOperatorType(); + if (operatorTypeOptional.isPresent()) { + OperatorType operatorType = operatorTypeOptional.get(); + if (operatorType.isComparisonOperator() && operatorType != OperatorType.IS_DISTINCT_FROM) { + return handleLogicalBinary(operatorType, node); + } + } + + return new ClpExpression(node); + } + + @Override + public ClpExpression visitConstant(ConstantExpression node, Void context) + { + return new ClpExpression(getLiteralString(node)); + } + + @Override + public ClpExpression visitVariableReference(VariableReferenceExpression node, Void context) + { + return new ClpExpression(getVariableName(node)); + } + + @Override + public ClpExpression visitSpecialForm(SpecialFormExpression node, Void context) + { + switch (node.getForm()) { + case AND: + return handleAnd(node); + case OR: + return handleOr(node); + case IN: + return handleIn(node); + case IS_NULL: + return handleIsNull(node); + case DEREFERENCE: + return handleDereference(node); + default: + return new ClpExpression(node); + } + } + + // For all other expressions, return the original expression + @Override + public ClpExpression visitExpression(RowExpression node, Void context) + { + return new ClpExpression(node); + } + + private static String getLiteralString(ConstantExpression literal) + { + if (literal.getValue() instanceof Slice) { + return ((Slice) literal.getValue()).toStringUtf8(); + } + return literal.toString(); + } + + private String getVariableName(VariableReferenceExpression variable) + { + return ((ClpColumnHandle) assignments.get(variable)).getOriginalColumnName(); + } + + /** + * Handles the logical NOT expression. + * Example: + * Input: NOT (col1 = 5) + * Output: NOT col1: 5 + */ + private ClpExpression handleNot(CallExpression node) + { + if (node.getArguments().size() != 1) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "NOT operator must have exactly one argument. Received: " + node); + } + + RowExpression input = node.getArguments().get(0); + ClpExpression expression = input.accept(this, null); + if (expression.getRemainingExpression().isPresent() || !expression.getDefinition().isPresent()) { + return new ClpExpression(node); + } + return new ClpExpression("NOT " + expression.getDefinition().get()); + } + + /** + * Handles the logical AND expression. + * Combines all definable child expressions into a single KQL query joined by AND. + * Any unsupported children are collected into remaining expressions. + * Example: + * Input: col1 = 5 AND col2 = 'abc' + * Output: (col1: 5 AND col2: "abc") + */ + private ClpExpression handleAnd(SpecialFormExpression node) + { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("("); + ArrayList remainingExpressions = new ArrayList<>(); + boolean hasDefinition = false; + for (RowExpression argument : node.getArguments()) { + ClpExpression expression = argument.accept(this, null); + if (expression.getDefinition().isPresent()) { + hasDefinition = true; + queryBuilder.append(expression.getDefinition().get()); + queryBuilder.append(" AND "); + } + if (expression.getRemainingExpression().isPresent()) { + remainingExpressions.add(expression.getRemainingExpression().get()); + } + } + if (!hasDefinition) { + return new ClpExpression(node); + } + else if (!remainingExpressions.isEmpty()) { + if (remainingExpressions.size() == 1) { + return new ClpExpression(Optional.of(queryBuilder.substring(0, queryBuilder.length() - 5) + ")"), + Optional.of(remainingExpressions.get(0))); + } + else { + return new ClpExpression(Optional.of(queryBuilder.substring(0, queryBuilder.length() - 5) + ")"), + Optional.of(new SpecialFormExpression(node.getSourceLocation(), + AND, + BOOLEAN, + remainingExpressions))); + } + } + // Remove the last " AND " from the query + return new ClpExpression(queryBuilder.substring(0, queryBuilder.length() - 5) + ")"); + } + + /** + * Handles the logical OR expression. + * Combines all fully convertible child expressions into a single CLP query joined by OR. + * Returns the original node if any child is unsupported. + * Example: + * Input: col1 = 5 OR col1 = 10 + * Output: (col1: 5 OR col1: 10) + */ + private ClpExpression handleOr(SpecialFormExpression node) + { + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("("); + ArrayList remainingExpressions = new ArrayList<>(); + for (RowExpression argument : node.getArguments()) { + ClpExpression expression = argument.accept(this, null); + if (expression.getRemainingExpression().isPresent() || !expression.getDefinition().isPresent()) { + return new ClpExpression(node); + } + queryBuilder.append(expression.getDefinition().get()); + queryBuilder.append(" OR "); + } + // Remove the last " OR " from the query + return new ClpExpression(queryBuilder.substring(0, queryBuilder.length() - 4) + ")"); + } + + /** + * Handles the IN predicate. + * Example: + * Input: col1 IN (1, 2, 3) + * Output: (col1: 1 OR col1: 2 OR col1: 3) + */ + private ClpExpression handleIn(SpecialFormExpression node) + { + ClpExpression variable = node.getArguments().get(0).accept(this, null); + if (!variable.getDefinition().isPresent()) { + return new ClpExpression(node); + } + String variableName = variable.getDefinition().get(); + StringBuilder queryBuilder = new StringBuilder(); + queryBuilder.append("("); + for (RowExpression argument : node.getArguments().subList(1, node.getArguments().size())) { + if (!(argument instanceof ConstantExpression)) { + return new ClpExpression(node); + } + ConstantExpression literal = (ConstantExpression) argument; + String literalString = getLiteralString(literal); + queryBuilder.append(variableName).append(": "); + if (literal.getType() instanceof VarcharType) { + queryBuilder.append("\"").append(literalString).append("\""); + } + else { + queryBuilder.append(literalString); + } + queryBuilder.append(" OR "); + } + // Remove the last " OR " from the query + return new ClpExpression(queryBuilder.substring(0, queryBuilder.length() - 4) + ")"); + } + + /** + * Handles the IS NULL predicate. + * Example: + * Input: col1 IS NULL + * Output: NOT col1: * + */ + private ClpExpression handleIsNull(SpecialFormExpression node) + { + if (node.getArguments().size() != 1) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "IS NULL operator must have exactly one argument. Received: " + node); + } + + ClpExpression expression = node.getArguments().get(0).accept(this, null); + if (!expression.getDefinition().isPresent()) { + return new ClpExpression(node); + } + + String variableName = expression.getDefinition().get(); + return new ClpExpression(String.format("NOT %s: *", variableName)); + } + + /** + * Handles dereference expressions on RowTypes (e.g., col.row_field). + * Converts row dereferences into dot-separated field access. + * Example: + * Input: address.city (from a RowType 'address') + * Output: address.city + */ + private ClpExpression handleDereference(RowExpression expression) + { + if (expression instanceof VariableReferenceExpression) { + return expression.accept(this, null); + } + + if (!(expression instanceof SpecialFormExpression)) { + return new ClpExpression(expression); + } + + SpecialFormExpression specialForm = (SpecialFormExpression) expression; + List arguments = specialForm.getArguments(); + if (arguments.size() != 2) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "DEREFERENCE expects 2 arguments"); + } + + RowExpression base = arguments.get(0); + RowExpression index = arguments.get(1); + if (!(index instanceof ConstantExpression)) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "DEREFERENCE index must be a constant"); + } + + ConstantExpression constExpr = (ConstantExpression) index; + Object value = constExpr.getValue(); + if (!(value instanceof Long)) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "DEREFERENCE index constant is not a long"); + } + + int fieldIndex = ((Long) value).intValue(); + + Type baseType = base.getType(); + if (!(baseType instanceof RowType)) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, "DEREFERENCE base is not a RowType: " + baseType); + } + + RowType rowType = (RowType) baseType; + if (fieldIndex < 0 || fieldIndex >= rowType.getFields().size()) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "Invalid field index " + fieldIndex + " for RowType: " + rowType); + } + + RowType.Field field = rowType.getFields().get(fieldIndex); + String fieldName = field.getName().orElse("field" + fieldIndex); + + ClpExpression baseString = handleDereference(base); + if (!baseString.getDefinition().isPresent()) { + return new ClpExpression(expression); + } + return new ClpExpression(baseString.getDefinition().get() + "." + fieldName); + } + + /** + * Handles LIKE expressions. + * Transforms SQL LIKE into KQL queries using wildcards (* and ?). + * Supports constant patterns or constant casts only. + * Example: + * Input: col1 LIKE 'a_bc%' + * Output: col1: "a?bc*" + */ + private ClpExpression handleLike(CallExpression node) + { + if (node.getArguments().size() != 2) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "LIKE operator must have exactly two arguments. Received: " + node); + } + ClpExpression variable = node.getArguments().get(0).accept(this, null); + if (!variable.getDefinition().isPresent()) { + return new ClpExpression(node); + } + + String variableName = variable.getDefinition().get(); + RowExpression argument = node.getArguments().get(1); + + String pattern; + if (argument instanceof ConstantExpression) { + ConstantExpression literal = (ConstantExpression) argument; + pattern = getLiteralString(literal); + } + else if (argument instanceof CallExpression) { + CallExpression callExpression = (CallExpression) argument; + if (!standardFunctionResolution.isCastFunction(callExpression.getFunctionHandle())) { + return new ClpExpression(node); + } + if (callExpression.getArguments().size() != 1) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "CAST function must have exactly one argument. Received: " + callExpression); + } + if (!(callExpression.getArguments().get(0) instanceof ConstantExpression)) { + return new ClpExpression(node); + } + pattern = getLiteralString((ConstantExpression) callExpression.getArguments().get(0)); + } + else { + return new ClpExpression(node); + } + pattern = pattern.replace("%", "*").replace("_", "?"); + return new ClpExpression(String.format("%s: \"%s\"", variableName, pattern)); + } + + private static class SubstrInfo + { + String variableName; + RowExpression startExpression; + RowExpression lengthExpression; + SubstrInfo(String variableName, RowExpression start, RowExpression length) + { + this.variableName = variableName; + this.startExpression = start; + this.lengthExpression = length; + } + } + + /** + * Parse SUBSTR(...) calls that appear either as: + * SUBSTR(x, start) + * or + * SUBSTR(x, start, length) + */ + private Optional parseSubstringCall(CallExpression callExpression) + { + FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(callExpression.getFunctionHandle()); + String functionName = functionMetadata.getName().getObjectName(); + if (!functionName.equals("substr")) { + return Optional.empty(); + } + + int argCount = callExpression.getArguments().size(); + if (argCount < 2 || argCount > 3) { + return Optional.empty(); + } + + ClpExpression variable = callExpression.getArguments().get(0).accept(this, null); + if (!variable.getDefinition().isPresent()) { + return Optional.empty(); + } + + String varName = variable.getDefinition().get(); + RowExpression startExpression = callExpression.getArguments().get(1); + RowExpression lengthExpression = null; + if (argCount == 3) { + lengthExpression = callExpression.getArguments().get(2); + } + + return Optional.of(new SubstrInfo(varName, startExpression, lengthExpression)); + } + + /** + * Attempt to parse "start" or "length" as an integer. + */ + private Optional parseIntValue(RowExpression expression) + { + if (expression instanceof ConstantExpression) { + try { + return Optional.of(Integer.parseInt(getLiteralString((ConstantExpression) expression))); + } + catch (NumberFormatException ignored) { } + } + else if (expression instanceof CallExpression) { + CallExpression call = (CallExpression) expression; + FunctionMetadata functionMetadata = functionMetadataManager.getFunctionMetadata(call.getFunctionHandle()); + Optional operatorTypeOptional = functionMetadata.getOperatorType(); + if (operatorTypeOptional.isPresent() && operatorTypeOptional.get().equals(OperatorType.NEGATION)) { + RowExpression arg0 = call.getArguments().get(0); + if (arg0 instanceof ConstantExpression) { + try { + return Optional.of(-Integer.parseInt(getLiteralString((ConstantExpression) arg0))); + } + catch (NumberFormatException ignored) { } + } + } + } + return Optional.empty(); + } + + /** + * If lengthExpression is a constant integer that matches targetString.length(), + * return that length. Otherwise empty. + */ + private Optional parseLengthLiteralOrFunction(RowExpression lengthExpression, String targetString) + { + if (lengthExpression instanceof ConstantExpression) { + String val = getLiteralString((ConstantExpression) lengthExpression); + try { + int parsed = Integer.parseInt(val); + if (parsed == targetString.length()) { + return Optional.of(parsed); + } + } + catch (NumberFormatException ignored) { } + } + return Optional.empty(); + } + + /** + * Translate SUBSTR(x, start) or SUBSTR(x, start, length) = 'someString' to KQL. + * Examples: + * SUBSTR(message, 1, 3) = 'abc' + * → message: "abc*" + * SUBSTR(message, 4, 3) = 'abc' + * → message: "???abc*" + * SUBSTR(message, 2) = 'hello' + * → message: "?hello" + * SUBSTR(message, -5) = 'hello' + * → message: "*hello" + */ + private ClpExpression interpretSubstringEquality(SubstrInfo info, String targetString) + { + if (info.lengthExpression != null) { + Optional maybeStart = parseIntValue(info.startExpression); + Optional maybeLen = parseLengthLiteralOrFunction(info.lengthExpression, targetString); + + if (maybeStart.isPresent() && maybeLen.isPresent()) { + int start = maybeStart.get(); + int len = maybeLen.get(); + if (start > 0 && len == targetString.length()) { + StringBuilder result = new StringBuilder(); + result.append(info.variableName).append(": \""); + for (int i = 1; i < start; i++) { + result.append("?"); + } + result.append(targetString).append("*\""); + return new ClpExpression(result.toString()); + } + } + } + else { + Optional maybeStart = parseIntValue(info.startExpression); + if (maybeStart.isPresent()) { + int start = maybeStart.get(); + if (start > 0) { + StringBuilder result = new StringBuilder(); + result.append(info.variableName).append(": \""); + for (int i = 1; i < start; i++) { + result.append("?"); + } + result.append(targetString).append("\""); + return new ClpExpression(result.toString()); + } + if (start == -targetString.length()) { + return new ClpExpression(String.format("%s: \"*%s\"", info.variableName, targetString)); + } + } + } + + return new ClpExpression(Optional.empty(), Optional.empty()); + } + + /** + * Checks whether the given expression matches the pattern SUBSTR(x, ...) = 'someString', + * and if so, attempts to convert it into a KQL query using wildcards and construct a CLP expression. + */ + private ClpExpression tryInterpretSubstringEquality( + OperatorType operator, + RowExpression possibleSubstring, + RowExpression possibleLiteral) + { + if (!operator.equals(OperatorType.EQUAL)) { + return new ClpExpression(); + } + + if (!(possibleSubstring instanceof CallExpression) || + !(possibleLiteral instanceof ConstantExpression)) { + return new ClpExpression(); + } + + Optional maybeSubstringCall = parseSubstringCall((CallExpression) possibleSubstring); + if (!maybeSubstringCall.isPresent()) { + return new ClpExpression(); + } + + String targetString = getLiteralString((ConstantExpression) possibleLiteral); + return interpretSubstringEquality(maybeSubstringCall.get(), targetString); + } + + /** + * Builds a CLP expression from a basic comparison between a variable and a literal. + * Handles different operator types (EQUAL, NOT_EQUAL, and logical binary ops like <, >, etc.) + * and formats them appropriately based on whether the literal is a string or a non-string type. + * Examples: + * col = 'abc' → col: "abc" + * col != 42 → NOT col: 42 + * 5 < col → col > 5 + */ + private ClpExpression buildClpExpression( + String variableName, + String literalString, + OperatorType operator, + Type literalType, + RowExpression originalNode) + { + if (operator.equals(OperatorType.EQUAL)) { + if (literalType instanceof VarcharType) { + return new ClpExpression(String.format("%s: \"%s\"", variableName, literalString)); + } + else { + return new ClpExpression(String.format("%s: %s", variableName, literalString)); + } + } + else if (operator.equals(OperatorType.NOT_EQUAL)) { + if (literalType instanceof VarcharType) { + return new ClpExpression(String.format("NOT %s: \"%s\"", variableName, literalString)); + } + else { + return new ClpExpression(String.format("NOT %s: %s", variableName, literalString)); + } + } + else if (LOGICAL_BINARY_OPS_FILTER.contains(operator) && !(literalType instanceof VarcharType)) { + return new ClpExpression(String.format("%s %s %s", variableName, operator.getOperator(), literalString)); + } + return new ClpExpression(originalNode); + } + + /** + * Handles logical binary operators (e.g., =, !=, <, >) between two expressions. + * Supports constant on either side by flipping the operator when needed. + * Also checks for SUBSTR(x, ...) = 'value' patterns and delegates to substring handler. + */ + private ClpExpression handleLogicalBinary(OperatorType operator, CallExpression node) + { + if (node.getArguments().size() != 2) { + throw new PrestoException(CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION, + "Logical binary operator must have exactly two arguments. Received: " + node); + } + RowExpression left = node.getArguments().get(0); + RowExpression right = node.getArguments().get(1); + + ClpExpression maybeLeftSubstring = tryInterpretSubstringEquality(operator, left, right); + if (maybeLeftSubstring.getDefinition().isPresent()) { + return maybeLeftSubstring; + } + + ClpExpression maybeRightSubstring = tryInterpretSubstringEquality(operator, right, left); + if (maybeRightSubstring.getDefinition().isPresent()) { + return maybeRightSubstring; + } + + ClpExpression leftExpression = left.accept(this, null); + ClpExpression rightExpression = right.accept(this, null); + Optional leftDefinition = leftExpression.getDefinition(); + Optional rightDefinition = rightExpression.getDefinition(); + if (!leftDefinition.isPresent() || !rightDefinition.isPresent()) { + return new ClpExpression(node); + } + + boolean leftIsConstant = (left instanceof ConstantExpression); + boolean rightIsConstant = (right instanceof ConstantExpression); + + Type leftType = left.getType(); + Type rightType = right.getType(); + + if (rightIsConstant) { + return buildClpExpression( + leftDefinition.get(), // variable + rightDefinition.get(), // literal + operator, + rightType, + node); + } + else if (leftIsConstant) { + OperatorType newOperator = OperatorType.flip(operator); + return buildClpExpression( + rightDefinition.get(), // variable + leftDefinition.get(), // literal + newOperator, + leftType, + node); + } + // fallback + return new ClpExpression(node); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java new file mode 100644 index 0000000000000..462ecc039b9c6 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public class ClpHandleResolver + implements ConnectorHandleResolver +{ + @Override + public Class getTableHandleClass() + { + return ClpTableHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() + { + return ClpTableLayoutHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return ClpColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return ClpSplit.class; + } + + @Override + public Class getTransactionHandleClass() + { + return ClpTransactionHandle.class; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java new file mode 100644 index 0000000000000..1172d278b63be --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java @@ -0,0 +1,194 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class ClpMetadata + implements ConnectorMetadata +{ + private static final String DEFAULT_SCHEMA_NAME = "default"; + private final ClpMetadataProvider clpMetadataProvider; + private final LoadingCache> columnHandleCache; + private final LoadingCache> tableNameCache; + + @Inject + public ClpMetadata(ClpConfig clpConfig, ClpMetadataProvider clpMetadataProvider) + { + this.columnHandleCache = CacheBuilder.newBuilder() + .expireAfterWrite(clpConfig.getMetadataExpireInterval(), SECONDS) + .refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS) + .build(CacheLoader.from(this::loadColumnHandles)); + this.tableNameCache = CacheBuilder.newBuilder() + .expireAfterWrite(clpConfig.getMetadataExpireInterval(), SECONDS) + .refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS) + .build(CacheLoader.from(this::loadTableNames)); + + this.clpMetadataProvider = clpMetadataProvider; + } + + private List loadColumnHandles(SchemaTableName schemaTableName) + { + return clpMetadataProvider.listColumnHandles(schemaTableName); + } + + private List loadTableNames(String schemaName) + { + return clpMetadataProvider.listTableNames(schemaName); + } + + private List listTables(String schemaName) + { + return tableNameCache.getUnchecked(schemaName); + } + + private List listColumns(SchemaTableName schemaTableName) + { + return columnHandleCache.getUnchecked(schemaTableName); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return ImmutableList.of(DEFAULT_SCHEMA_NAME); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) + { + String schemaNameValue = schemaName.orElse(DEFAULT_SCHEMA_NAME); + if (!listSchemaNames(session).contains(schemaNameValue)) { + return ImmutableList.of(); + } + + return listTables(schemaNameValue).stream() + .map(tableName -> new SchemaTableName(schemaNameValue, tableName)) + .collect(ImmutableList.toImmutableList()); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + String schemaName = tableName.getSchemaName(); + if (!listSchemaNames(session).contains(schemaName)) { + return null; + } + + if (!listTables(schemaName).contains(tableName.getTableName())) { + return null; + } + + return new ClpTableHandle(tableName); + } + + @Override + public List getTableLayouts(ConnectorSession session, + ConnectorTableHandle table, + Constraint constraint, + Optional> desiredColumns) + { + ClpTableHandle tableHandle = (ClpTableHandle) table; + ConnectorTableLayout layout = new ConnectorTableLayout(new ClpTableLayoutHandle(tableHandle, Optional.empty())); + return ImmutableList.of(new ConnectorTableLayoutResult(layout, constraint.getSummary())); + } + + @Override + public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) + { + return new ConnectorTableLayout(handle); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + ClpTableHandle clpTableHandle = (ClpTableHandle) table; + SchemaTableName schemaTableName = clpTableHandle.getSchemaTableName(); + List columns = listColumns(schemaTableName).stream() + .map(ClpColumnHandle::getColumnMetadata) + .collect(ImmutableList.toImmutableList()); + + return new ConnectorTableMetadata(schemaTableName, columns); + } + + @Override + public Map> listTableColumns(ConnectorSession session, + SchemaTablePrefix prefix) + { + requireNonNull(prefix, "prefix is null"); + String schemaName = prefix.getSchemaName(); + if (schemaName != null && !schemaName.equals(DEFAULT_SCHEMA_NAME)) { + return ImmutableMap.of(); + } + + List schemaTableNames; + if (prefix.getTableName() == null) { + schemaTableNames = listTables(session, Optional.of(prefix.getSchemaName())); + } + else { + schemaTableNames = ImmutableList.of(new SchemaTableName(prefix.getSchemaName(), prefix.getTableName())); + } + + return schemaTableNames.stream() + .collect(ImmutableMap.toImmutableMap( + Function.identity(), + tableName -> getTableMetadata(session, getTableHandle(session, tableName)).getColumns())); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle; + return listColumns(clpTableHandle.getSchemaTableName()).stream() + .collect(ImmutableMap.toImmutableMap( + ClpColumnHandle::getColumnName, + column -> column)); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, + ConnectorTableHandle tableHandle, + ColumnHandle columnHandle) + { + ClpColumnHandle clpColumnHandle = (ClpColumnHandle) columnHandle; + return clpColumnHandle.getColumnMetadata(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java new file mode 100644 index 0000000000000..da8db4d83efa4 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider; +import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider; +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.PrestoException; +import com.google.inject.Binder; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; + +public class ClpModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(ClpConnector.class).in(Scopes.SINGLETON); + binder.bind(ClpMetadata.class).in(Scopes.SINGLETON); + binder.bind(ClpSplitManager.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(ClpConfig.class); + + ClpConfig config = buildConfigObject(ClpConfig.class); + if (config.getMetadataProviderType() == ClpConfig.MetadataProviderType.MYSQL) { + binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON); + } + else { + throw new PrestoException(ClpErrorCode.CLP_UNSUPPORTED_METADATA_SOURCE, + "Unsupported metadata provider type: " + config.getMetadataProviderType()); + } + + if (config.getSplitProviderType() == ClpConfig.SplitProviderType.MYSQL) { + binder.bind(ClpSplitProvider.class).to(ClpMySqlSplitProvider.class).in(Scopes.SINGLETON); + } + else { + throw new PrestoException(ClpErrorCode.CLP_UNSUPPORTED_SPLIT_SOURCE, + "Unsupported split provider type: " + config.getSplitProviderType()); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java new file mode 100644 index 0000000000000..8703f538a0661 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizer.java @@ -0,0 +1,115 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorPlanOptimizer; +import com.facebook.presto.spi.ConnectorPlanRewriter; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.TableHandle; +import com.facebook.presto.spi.VariableAllocator; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.plan.FilterNode; +import com.facebook.presto.spi.plan.PlanNode; +import com.facebook.presto.spi.plan.PlanNodeIdAllocator; +import com.facebook.presto.spi.plan.TableScanNode; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.VariableReferenceExpression; + +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.spi.ConnectorPlanRewriter.rewriteWith; + +public class ClpPlanOptimizer + implements ConnectorPlanOptimizer +{ + private static final Logger log = Logger.get(ClpPlanOptimizer.class); + private final FunctionMetadataManager functionManager; + private final StandardFunctionResolution functionResolution; + + public ClpPlanOptimizer(FunctionMetadataManager functionManager, + StandardFunctionResolution functionResolution) + { + this.functionManager = functionManager; + this.functionResolution = functionResolution; + } + + @Override + public PlanNode optimize(PlanNode maxSubplan, + ConnectorSession session, + VariableAllocator variableAllocator, + PlanNodeIdAllocator idAllocator) + { + return rewriteWith(new Rewriter(idAllocator), maxSubplan); + } + + private class Rewriter + extends ConnectorPlanRewriter + { + private final PlanNodeIdAllocator idAllocator; + + public Rewriter(PlanNodeIdAllocator idAllocator) + { + this.idAllocator = idAllocator; + } + + @Override + public PlanNode visitFilter(FilterNode node, RewriteContext context) + { + if (!(node.getSource() instanceof TableScanNode)) { + return node; + } + + TableScanNode tableScanNode = (TableScanNode) node.getSource(); + Map assignments = tableScanNode.getAssignments(); + TableHandle tableHandle = tableScanNode.getTable(); + ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle.getConnectorHandle(); + ClpExpression clpExpression = node.getPredicate() + .accept(new ClpFilterToKqlConverter(functionResolution, functionManager, assignments), + null); + Optional kqlQuery = clpExpression.getDefinition(); + Optional remainingPredicate = clpExpression.getRemainingExpression(); + if (!kqlQuery.isPresent()) { + return node; + } + log.debug("KQL query: %s", kqlQuery.get()); + ClpTableLayoutHandle clpTableLayoutHandle = new ClpTableLayoutHandle(clpTableHandle, kqlQuery); + TableScanNode newTableScanNode = new TableScanNode( + tableScanNode.getSourceLocation(), + idAllocator.getNextId(), + new TableHandle( + tableHandle.getConnectorId(), + clpTableHandle, + tableHandle.getTransaction(), + Optional.of(clpTableLayoutHandle)), + tableScanNode.getOutputVariables(), + tableScanNode.getAssignments(), + tableScanNode.getTableConstraints(), + tableScanNode.getCurrentConstraint(), + tableScanNode.getEnforcedConstraint(), + tableScanNode.getCteMaterializationInfo()); + if (!remainingPredicate.isPresent()) { + return newTableScanNode; + } + + return new FilterNode(node.getSourceLocation(), + idAllocator.getNextId(), + newTableScanNode, + remainingPredicate.get()); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizerProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizerProvider.java new file mode 100644 index 0000000000000..2268ce26c238e --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlanOptimizerProvider.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorPlanOptimizer; +import com.facebook.presto.spi.connector.ConnectorPlanOptimizerProvider; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.google.common.collect.ImmutableSet; + +import javax.inject.Inject; + +import java.util.Set; + +public class ClpPlanOptimizerProvider + implements ConnectorPlanOptimizerProvider +{ + private final FunctionMetadataManager functionManager; + private final StandardFunctionResolution functionResolution; + + @Inject + public ClpPlanOptimizerProvider(FunctionMetadataManager functionManager, + StandardFunctionResolution functionResolution) + { + this.functionManager = functionManager; + this.functionResolution = functionResolution; + } + + @Override + public Set getLogicalPlanOptimizers() + { + return ImmutableSet.of(); + } + + @Override + public Set getPhysicalPlanOptimizers() + { + return ImmutableSet.of(new ClpPlanOptimizer(functionManager, functionResolution)); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java new file mode 100644 index 0000000000000..985c707a32483 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.common.collect.ImmutableList; + +public class ClpPlugin + implements Plugin +{ + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new ClpConnectorFactory()); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java new file mode 100644 index 0000000000000..680a5c672ff87 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java @@ -0,0 +1,85 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; + +public class ClpSplit + implements ConnectorSplit +{ + private final SchemaTableName schemaTableName; + private final String archivePath; + private final Optional query; + + @JsonCreator + public ClpSplit(@JsonProperty("schemaTableName") @Nullable SchemaTableName schemaTableName, + @JsonProperty("archivePath") @Nullable String archivePath, + @JsonProperty("query") Optional query) + { + this.schemaTableName = schemaTableName; + this.archivePath = archivePath; + this.query = query; + } + + @JsonProperty + @Nullable + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public String getArchivePath() + { + return archivePath; + } + + @JsonProperty + public Optional getQuery() + { + return query; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return NO_PREFERENCE; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return ImmutableList.of(); + } + + @Override + public Object getInfo() + { + return this; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java new file mode 100644 index 0000000000000..07c77036bf405 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java @@ -0,0 +1,46 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import javax.inject.Inject; + +public class ClpSplitManager + implements ConnectorSplitManager +{ + private final ClpSplitProvider clpSplitProvider; + + @Inject + public ClpSplitManager(ClpSplitProvider clpSplitProvider) + { + this.clpSplitProvider = clpSplitProvider; + } + + @Override + public ConnectorSplitSource getSplits(ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorTableLayoutHandle layout, + SplitSchedulingContext splitSchedulingContext) + { + ClpTableLayoutHandle layoutHandle = (ClpTableLayoutHandle) layout; + return new FixedSplitSource(clpSplitProvider.listSplits(layoutHandle)); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java new file mode 100644 index 0000000000000..fb6dc88bbb584 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java @@ -0,0 +1,64 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +public class ClpTableHandle + implements ConnectorTableHandle +{ + private final SchemaTableName schemaTableName; + + @JsonCreator + public ClpTableHandle(@JsonProperty("schemaTableName") SchemaTableName schemaTableName) + { + this.schemaTableName = schemaTableName; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @Override + public int hashCode() + { + return Objects.hash(schemaTableName); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ClpTableHandle other = (ClpTableHandle) obj; + return this.schemaTableName.equals(other.schemaTableName); + } + + @Override + public String toString() + { + return schemaTableName.toString(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java new file mode 100644 index 0000000000000..d524101bed863 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; +import java.util.Optional; + +public class ClpTableLayoutHandle + implements ConnectorTableLayoutHandle +{ + private final ClpTableHandle table; + private final Optional query; + + @JsonCreator + public ClpTableLayoutHandle(@JsonProperty("table") ClpTableHandle table, + @JsonProperty("query") Optional query) + { + this.table = table; + this.query = query; + } + + @JsonProperty + public ClpTableHandle getTable() + { + return table; + } + + @JsonProperty + public Optional getQuery() + { + return query; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClpTableLayoutHandle that = (ClpTableLayoutHandle) o; + return Objects.equals(table, that.table); + } + + @Override + public int hashCode() + { + return Objects.hash(table); + } + + @Override + public String toString() + { + return table.toString(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java new file mode 100644 index 0000000000000..f39cd639072d6 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public enum ClpTransactionHandle + implements ConnectorTransactionHandle +{ + INSTANCE +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java new file mode 100644 index 0000000000000..7ed353c7a222c --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java @@ -0,0 +1,32 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.spi.SchemaTableName; + +import java.util.List; + +public interface ClpMetadataProvider +{ + /** + * Returns the list of column handles for the given table. + */ + public List listColumnHandles(SchemaTableName schemaTableName); + + /** + * Returns the list of table names in the given schema. + */ + public List listTableNames(String schema); +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java new file mode 100644 index 0000000000000..4644568d1d300 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java @@ -0,0 +1,112 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.plugin.clp.ClpConfig; +import com.facebook.presto.spi.SchemaTableName; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.List; + +public class ClpMySqlMetadataProvider + implements ClpMetadataProvider +{ + private static final Logger log = Logger.get(ClpMySqlMetadataProvider.class); + + public static final String COLUMN_METADATA_PREFIX = "column_metadata_"; + private static final String QUERY_SELECT_COLUMNS = "SELECT * FROM %s" + COLUMN_METADATA_PREFIX + "%s"; + private static final String TABLE_METADATA_TABLE_SUFFIX = "table_metadata"; + private static final String QUERY_SELECT_TABLES = "SELECT table_name FROM %s" + TABLE_METADATA_TABLE_SUFFIX; + + private final ClpConfig config; + + public ClpMySqlMetadataProvider(ClpConfig config) + { + try { + Class.forName("com.mysql.jdbc.Driver"); + } + catch (ClassNotFoundException e) { + log.error(e, "Failed to load MySQL JDBC driver"); + throw new RuntimeException("MySQL JDBC driver not found", e); + } + this.config = config; + } + + private Connection getConnection() throws SQLException + { + Connection connection = DriverManager.getConnection(config.getMetadataDbUrl(), config.getMetadataDbUser(), config.getMetadataDbPassword()); + String dbName = config.getMetadataDbName(); + if (dbName != null && !dbName.isEmpty()) { + connection.createStatement().execute("USE " + dbName); + } + return connection; + } + + private boolean isValidIdentifier(String identifier) + { + return identifier != null && ClpConfig.SAFE_SQL_IDENTIFIER.matcher(identifier).matches(); + } + + @Override + public List listColumnHandles(SchemaTableName schemaTableName) + { + String query = String.format(QUERY_SELECT_COLUMNS, config.getMetadataTablePrefix(), schemaTableName.getTableName()); + ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled()); + try (Connection connection = getConnection(); + PreparedStatement statement = connection.prepareStatement(query)) { + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + schemaTree.addColumn(resultSet.getString("name"), resultSet.getByte("type")); + } + } + } + catch (SQLException e) { + log.error("Failed to load table schema for %s: %s" + schemaTableName.getTableName(), e); + } + return schemaTree.collectColumnHandles(); + } + + @Override + public List listTableNames(String schema) + { + List tableNames = new ArrayList<>(); + + String query = String.format(QUERY_SELECT_TABLES, config.getMetadataTablePrefix()); + try (Connection connection = getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + while (resultSet.next()) { + String tableName = resultSet.getString("table_name"); + if (isValidIdentifier(tableName)) { + tableNames.add(tableName); + } + else { + log.warn("Ignoring invalid table name found in metadata: %s", tableName); + } + } + } + catch (SQLException e) { + log.error("Failed to load table names: %s", e); + } + return tableNames; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpNodeType.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpNodeType.java new file mode 100644 index 0000000000000..a2b30bde98c98 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpNodeType.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +public enum ClpNodeType +{ + Integer((byte) 0), + Float((byte) 1), + ClpString((byte) 2), + VarString((byte) 3), + Boolean((byte) 4), + Object((byte) 5), + UnstructuredArray((byte) 6), + NullValue((byte) 7), + DateString((byte) 8), + StructuredArray((byte) 9); + + private final byte type; + private static final ClpNodeType[] LOOKUP_TABLE; + + static { + byte maxType = 0; + for (ClpNodeType nodeType : values()) { + if (nodeType.type > maxType) { + maxType = nodeType.type; + } + } + + ClpNodeType[] lookup = new ClpNodeType[maxType + 1]; + for (ClpNodeType nodeType : values()) { + lookup[nodeType.type] = nodeType; + } + + LOOKUP_TABLE = lookup; + } + + ClpNodeType(byte type) + { + this.type = type; + } + + public static ClpNodeType fromType(byte type) + { + if (type < 0 || type >= LOOKUP_TABLE.length || LOOKUP_TABLE[type] == null) { + throw new IllegalArgumentException("Invalid type code: " + type); + } + return LOOKUP_TABLE[type]; + } + + public byte getType() + { + return type; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java new file mode 100644 index 0000000000000..9d73cfa8b4513 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java @@ -0,0 +1,170 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.plugin.clp.ClpErrorCode; +import com.facebook.presto.spi.PrestoException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +public class ClpSchemaTree +{ + static class ClpNode + { + Type type; // Only non-null for leaf nodes + String originalName; + Map children = new HashMap<>(); + Set conflictingBaseNames = new HashSet<>(); + + boolean isLeaf() + { + return children.isEmpty(); + } + } + + private final ClpNode root; + private final boolean polymorphicTypeEnabled; + ClpSchemaTree(boolean polymorphicTypeEnabled) + { + this.polymorphicTypeEnabled = polymorphicTypeEnabled; + this.root = new ClpNode(); + } + + private Type mapColumnType(byte type) + { + switch (ClpNodeType.fromType(type)) { + case Integer: + return BigintType.BIGINT; + case Float: + return DoubleType.DOUBLE; + case ClpString: + case VarString: + case DateString: + case NullValue: + return VarcharType.VARCHAR; + case UnstructuredArray: + return new ArrayType(VarcharType.VARCHAR); + case Boolean: + return BooleanType.BOOLEAN; + default: + throw new PrestoException(ClpErrorCode.CLP_UNSUPPORTED_TYPE, "Unsupported type: " + type); + } + } + + /** + * Adds a column to the internal CLP schema tree, creating intermediate nested nodes as needed. + * Handles potential name conflicts when polymorphic types are enabled by suffixing column names + * with type display names. + * + * @param fullName Fully qualified column name using dot notation (e.g., "a.b.c"). + * @param type Serialized byte value representing the CLP column's type. + */ + public void addColumn(String fullName, byte type) + { + Type prestoType = mapColumnType(type); + String[] path = fullName.split("\\."); + ClpNode current = root; + + for (int i = 0; i < path.length - 1; i++) { + String segment = path[i]; + current.children.putIfAbsent(segment, new ClpNode()); + current = current.children.get(segment); + } + + String leafName = path[path.length - 1]; + String finalLeafName = leafName; + + if (polymorphicTypeEnabled) { + boolean conflictDetected = false; + + if (current.children.containsKey(leafName)) { + ClpNode existing = current.children.get(leafName); + + if (existing.type != null && !existing.type.equals(prestoType)) { + String existingSuffix = existing.type.getDisplayName(); + String renamedExisting = leafName + "_" + existingSuffix; + + current.children.remove(leafName); + current.children.put(renamedExisting, existing); + + current.conflictingBaseNames.add(leafName); + conflictDetected = true; + } + } + else if (current.conflictingBaseNames.contains(leafName)) { + conflictDetected = true; + } + + if (conflictDetected) { + String newSuffix = prestoType.getDisplayName(); + finalLeafName = leafName + "_" + newSuffix; + } + } + + ClpNode leaf = new ClpNode(); + leaf.type = prestoType; + leaf.originalName = leafName; + current.children.put(finalLeafName, leaf); + } + + /** + * Traverses the CLP schema tree and collects all leaf and nested structure nodes + * into a flat list of column handles. For nested structures, builds a RowType + * from child nodes. + * + * @return List of ClpColumnHandle objects representing the full schema. + */ + public List collectColumnHandles() + { + List columns = new ArrayList<>(); + for (Map.Entry entry : root.children.entrySet()) { + String name = entry.getKey(); + ClpNode child = entry.getValue(); + if (child.isLeaf()) { + columns.add(new ClpColumnHandle(name, child.originalName, child.type, true)); + } + else { + Type rowType = buildRowType(child); + columns.add(new ClpColumnHandle(name, child.originalName, rowType, true)); + } + } + return columns; + } + + private Type buildRowType(ClpNode node) + { + List fields = new ArrayList<>(); + for (Map.Entry entry : node.children.entrySet()) { + String name = entry.getKey(); + ClpNode child = entry.getValue(); + Type fieldType = child.isLeaf() ? child.type : buildRowType(child); + fields.add(new RowType.Field(Optional.of(name), fieldType)); + } + return RowType.from(fields); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java new file mode 100644 index 0000000000000..c2e422281d607 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java @@ -0,0 +1,108 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.split; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpConfig; +import com.facebook.presto.plugin.clp.ClpSplit; +import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +public class ClpMySqlSplitProvider + implements ClpSplitProvider +{ + private static final Logger log = Logger.get(ClpMySqlSplitProvider.class); + + private static final String ARCHIVE_TABLE_SUFFIX = "_archives"; + private static final String TABLE_METADATA_TABLE_SUFFIX = "table_metadata"; + private static final String QUERY_SELECT_ARCHIVE_IDS = "SELECT archive_id FROM %s%s" + ARCHIVE_TABLE_SUFFIX; + private static final String QUERY_SELECT_TABLE_METADATA = "SELECT table_path FROM %s" + TABLE_METADATA_TABLE_SUFFIX + " WHERE table_name = '%s'"; + + private final ClpConfig config; + + public ClpMySqlSplitProvider(ClpConfig config) + { + try { + Class.forName("com.mysql.jdbc.Driver"); + } + catch (ClassNotFoundException e) { + log.error(e, "Failed to load MySQL JDBC driver"); + throw new RuntimeException("MySQL JDBC driver not found", e); + } + this.config = config; + } + + private Connection getConnection() throws SQLException + { + Connection connection = DriverManager.getConnection(config.getMetadataDbUrl(), config.getMetadataDbUser(), config.getMetadataDbPassword()); + String dbName = config.getMetadataDbName(); + if (dbName != null && !dbName.isEmpty()) { + connection.createStatement().execute("USE " + dbName); + } + return connection; + } + + @Override + public List listSplits(ClpTableLayoutHandle clpTableLayoutHandle) + { + List splits = new ArrayList<>(); + SchemaTableName tableSchemaName = clpTableLayoutHandle.getTable().getSchemaTableName(); + String tableName = tableSchemaName.getTableName(); + + String tablePathQuery = String.format(QUERY_SELECT_TABLE_METADATA, config.getMetadataTablePrefix(), tableName); + String archivePathQuery = String.format(QUERY_SELECT_ARCHIVE_IDS, config.getMetadataTablePrefix(), tableName); + + try (Connection connection = getConnection()) { + // Fetch table path + String tablePath; + try (PreparedStatement statement = connection.prepareStatement(tablePathQuery); + ResultSet resultSet = statement.executeQuery()) { + if (!resultSet.next()) { + log.error("Table metadata not found for table: %s", tableName); + return ImmutableList.of(); + } + tablePath = resultSet.getString("table_path"); + } + + if (tablePath == null || tablePath.isEmpty()) { + log.error("Table path is null for table: %s", tableName); + return ImmutableList.of(); + } + + // Fetch archive IDs and create splits + try (PreparedStatement statement = connection.prepareStatement(archivePathQuery); + ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + final String archiveId = resultSet.getString("archive_id"); + final String archivePath = tablePath + "/" + archiveId; + splits.add(new ClpSplit(tableSchemaName, archivePath, clpTableLayoutHandle.getQuery())); + } + } + } + catch (SQLException e) { + log.error("Database error while processing splits for %s: %s", tableName, e); + } + + return splits; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java new file mode 100644 index 0000000000000..c3686be6f0e5f --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java @@ -0,0 +1,27 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.split; + +import com.facebook.presto.plugin.clp.ClpSplit; +import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; + +import java.util.List; + +public interface ClpSplitProvider +{ + /** + * Returns a list of splits for the given table layout handle. + */ + List listSplits(ClpTableLayoutHandle clpTableLayoutHandle); +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java new file mode 100644 index 0000000000000..f565417e9007e --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java @@ -0,0 +1,203 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.BigintType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpNodeType; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; +import org.apache.commons.math3.util.Pair; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +@Test(singleThreaded = true) +public class TestClpMetadata +{ + private ClpMetadata metadata; + + private static final String TABLE_NAME = "test"; + private static final String TABLE_SCHEMA = "default"; + + @BeforeMethod + public void setUp() + { + final String metadataDbUrl = "jdbc:h2:file:/tmp/metadata_testdb;MODE=MySQL;DATABASE_TO_UPPER=FALSE"; + final String metadataDbUser = "sa"; + final String metadataDbPassword = ""; + final String metadataDbTablePrefix = "clp_"; + final String columnMetadataTablePrefix = "column_metadata_"; + final String tableMetadataSuffix = "table_metadata"; + + ClpConfig config = new ClpConfig().setPolymorphicTypeEnabled(true) + .setMetadataDbUrl(metadataDbUrl) + .setMetadataDbUser("sa") + .setMetadataDbPassword("") + .setMetadataTablePrefix(metadataDbTablePrefix); + ClpMetadataProvider metadataProvider = new ClpMySqlMetadataProvider(config); + metadata = new ClpMetadata(config, metadataProvider); + + final String tableMetadataTableName = metadataDbTablePrefix + tableMetadataSuffix; + final String columnMetadataTableName = metadataDbTablePrefix + columnMetadataTablePrefix + TABLE_NAME; + + final String createTableMetadataSQL = String.format( + "CREATE TABLE IF NOT EXISTS %s (" + + " table_name VARCHAR(512) PRIMARY KEY," + + " table_path VARCHAR(1024) NOT NULL)", tableMetadataTableName); + + final String createColumnMetadataSQL = String.format( + "CREATE TABLE IF NOT EXISTS %s (" + + " name VARCHAR(512) NOT NULL," + + " type TINYINT NOT NULL," + + " PRIMARY KEY (name, type))", columnMetadataTableName); + + final String insertTableMetadataSQL = String.format( + "INSERT INTO %s (table_name, table_path) VALUES (?, ?)", tableMetadataTableName); + + final String insertColumnMetadataSQL = String.format( + "INSERT INTO %s (name, type) VALUES (?, ?)", columnMetadataTableName); + + try (Connection conn = DriverManager.getConnection(metadataDbUrl, metadataDbUser, metadataDbPassword); + Statement stmt = conn.createStatement()) { + stmt.execute(createTableMetadataSQL); + stmt.execute(createColumnMetadataSQL); + + // Insert table metadata + try (PreparedStatement pstmt = conn.prepareStatement(insertTableMetadataSQL)) { + pstmt.setString(1, TABLE_NAME); + pstmt.setString(2, "/tmp/archives/" + TABLE_NAME); + pstmt.executeUpdate(); + } + + // Insert column metadata in batch + List> records = Arrays.asList( + new Pair<>("a", ClpNodeType.Integer), + new Pair<>("a", ClpNodeType.VarString), + new Pair<>("b", ClpNodeType.Float), + new Pair<>("b", ClpNodeType.ClpString), + new Pair<>("c.d", ClpNodeType.Boolean), + new Pair<>("c.e", ClpNodeType.VarString), + new Pair<>("f.g.h", ClpNodeType.UnstructuredArray)); + + try (PreparedStatement pstmt = conn.prepareStatement(insertColumnMetadataSQL)) { + for (Pair record : records) { + pstmt.setString(1, record.getFirst()); + pstmt.setByte(2, record.getSecond().getType()); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + } + catch (SQLException e) { + fail(e.getMessage()); + } + } + + @AfterMethod + public void tearDown() + { + File dbFile = new File("/tmp/metadata_testdb.mv.db"); + File lockFile = new File("/tmp/metadata_testdb.trace.db"); // Optional, H2 sometimes creates this + if (dbFile.exists()) { + dbFile.delete(); + System.out.println("Deleted database file: " + dbFile.getAbsolutePath()); + } + if (lockFile.exists()) { + lockFile.delete(); + } + } + + @Test + public void testListSchemaNames() + { + assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of(TABLE_SCHEMA)); + } + + @Test + public void testListTables() + { + HashSet tables = new HashSet<>(); + tables.add(new SchemaTableName(TABLE_SCHEMA, TABLE_NAME)); + assertEquals(new HashSet<>(metadata.listTables(SESSION, Optional.empty())), tables); + } + + @Test + public void testGetTableMetadata() + { + ClpTableHandle clpTableHandle = + (ClpTableHandle) metadata.getTableHandle(SESSION, new SchemaTableName(TABLE_SCHEMA, TABLE_NAME)); + ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, clpTableHandle); + HashSet columnMetadata = new HashSet<>(); + columnMetadata.add(ColumnMetadata.builder() + .setName("a_bigint") + .setType(BigintType.BIGINT) + .setNullable(true) + .build()); + columnMetadata.add(ColumnMetadata.builder() + .setName("a_varchar") + .setType(VarcharType.VARCHAR) + .setNullable(true) + .build()); + columnMetadata.add(ColumnMetadata.builder() + .setName("b_double") + .setType(DoubleType.DOUBLE) + .setNullable(true) + .build()); + columnMetadata.add(ColumnMetadata.builder() + .setName("b_varchar") + .setType(VarcharType.VARCHAR) + .setNullable(true) + .build()); + columnMetadata.add(ColumnMetadata.builder() + .setName("c") + .setType(RowType.from(ImmutableList.of( + RowType.field("d", BooleanType.BOOLEAN), + RowType.field("e", VarcharType.VARCHAR)))) + .setNullable(true) + .build()); + columnMetadata.add(ColumnMetadata.builder() + .setName("f") + .setType(RowType.from(ImmutableList.of( + RowType.field("g", + RowType.from(ImmutableList.of( + RowType.field("h", new ArrayType(VarcharType.VARCHAR)))))))) + .setNullable(true) + .build()); + assertEquals(columnMetadata, new HashSet<>(tableMetadata.getColumns())); + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPlanOptimizer.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPlanOptimizer.java new file mode 100644 index 0000000000000..66d47a2e2805d --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpPlanOptimizer.java @@ -0,0 +1,208 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.relation.RowExpression; +import org.testng.annotations.Test; + +import java.util.Optional; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +@Test +public class TestClpPlanOptimizer + extends TestClpQueryBase +{ + private void testFilter(String sqlExpression, Optional expectedKqlExpression, + Optional expectedRemainingExpression, SessionHolder sessionHolder) + { + RowExpression pushDownExpression = getRowExpression(sqlExpression, sessionHolder); + ClpExpression clpExpression = pushDownExpression.accept(new ClpFilterToKqlConverter( + standardFunctionResolution, + functionAndTypeManager, + variableToColumnHandleMap), + null); + Optional kqlExpression = clpExpression.getDefinition(); + Optional remainingExpression = clpExpression.getRemainingExpression(); + if (expectedKqlExpression.isPresent()) { + assertTrue(kqlExpression.isPresent()); + assertEquals(kqlExpression.get(), expectedKqlExpression.get()); + } + + if (expectedRemainingExpression.isPresent()) { + assertTrue(remainingExpression.isPresent()); + assertEquals(remainingExpression.get(), getRowExpression(expectedRemainingExpression.get(), sessionHolder)); + } + else { + assertFalse(remainingExpression.isPresent()); + } + } + + @Test + public void testStringMatchPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + // Exact match + testFilter("city.Name = 'hello world'", Optional.of("city.Name: \"hello world\""), Optional.empty(), sessionHolder); + testFilter("'hello world' = city.Name", Optional.of("city.Name: \"hello world\""), Optional.empty(), sessionHolder); + + // Like predicates that are transformed into substring match + testFilter("city.Name like 'hello%'", Optional.of("city.Name: \"hello*\""), Optional.empty(), sessionHolder); + testFilter("city.Name like '%hello'", Optional.of("city.Name: \"*hello\""), Optional.empty(), sessionHolder); + + // Like predicates that are transformed into CARDINALITY(SPLIT(x, 'some string', 2)) = 2 form, and they are not pushed down for now + testFilter("city.Name like '%hello%'", Optional.empty(), Optional.of("city.Name like '%hello%'"), sessionHolder); + + // Like predicates that are kept in the original forms + testFilter("city.Name like 'hello_'", Optional.of("city.Name: \"hello?\""), Optional.empty(), sessionHolder); + testFilter("city.Name like '_hello'", Optional.of("city.Name: \"?hello\""), Optional.empty(), sessionHolder); + testFilter("city.Name like 'hello_w%'", Optional.of("city.Name: \"hello?w*\""), Optional.empty(), sessionHolder); + testFilter("city.Name like '%hello_w'", Optional.of("city.Name: \"*hello?w\""), Optional.empty(), sessionHolder); + testFilter("city.Name like 'hello%world'", Optional.of("city.Name: \"hello*world\""), Optional.empty(), sessionHolder); + testFilter("city.Name like 'hello%wor%ld'", Optional.of("city.Name: \"hello*wor*ld\""), Optional.empty(), sessionHolder); + } + + @Test + public void testSubStringPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("substr(city.Name, 1, 2) = 'he'", Optional.of("city.Name: \"he*\""), Optional.empty(), sessionHolder); + testFilter("substr(city.Name, 5, 2) = 'he'", Optional.of("city.Name: \"????he*\""), Optional.empty(), sessionHolder); + testFilter("substr(city.Name, 5) = 'he'", Optional.of("city.Name: \"????he\""), Optional.empty(), sessionHolder); + testFilter("substr(city.Name, -2) = 'he'", Optional.of("city.Name: \"*he\""), Optional.empty(), sessionHolder); + + // Invalid substring index is not pushed down + testFilter("substr(city.Name, 1, 5) = 'he'", Optional.empty(), Optional.of("substr(city.Name, 1, 5) = 'he'"), sessionHolder); + testFilter("substr(city.Name, -5) = 'he'", Optional.empty(), Optional.of("substr(city.Name, -5) = 'he'"), sessionHolder); + } + + @Test + public void testNumericComparisonPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("fare > 0", Optional.of("fare > 0"), Optional.empty(), sessionHolder); + testFilter("fare >= 0", Optional.of("fare >= 0"), Optional.empty(), sessionHolder); + testFilter("fare < 0", Optional.of("fare < 0"), Optional.empty(), sessionHolder); + testFilter("fare <= 0", Optional.of("fare <= 0"), Optional.empty(), sessionHolder); + testFilter("fare = 0", Optional.of("fare: 0"), Optional.empty(), sessionHolder); + testFilter("fare != 0", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + testFilter("fare <> 0", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + testFilter("0 < fare", Optional.of("fare > 0"), Optional.empty(), sessionHolder); + testFilter("0 <= fare", Optional.of("fare >= 0"), Optional.empty(), sessionHolder); + testFilter("0 > fare", Optional.of("fare < 0"), Optional.empty(), sessionHolder); + testFilter("0 >= fare", Optional.of("fare <= 0"), Optional.empty(), sessionHolder); + testFilter("0 = fare", Optional.of("fare: 0"), Optional.empty(), sessionHolder); + testFilter("0 != fare", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + testFilter("0 <> fare", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + } + + @Test + public void testOrPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("fare > 0 OR city.Name like 'b%'", Optional.of("(fare > 0 OR city.Name: \"b*\")"), Optional.empty(), + sessionHolder); + testFilter("lower(city.Region.Name) = 'hello world' OR city.Region.Id != 1", Optional.empty(), Optional.of("(lower(city.Region.Name) = 'hello world' OR city.Region.Id != 1)"), + sessionHolder); + + // Multiple ORs + testFilter("fare > 0 OR city.Name like 'b%' OR lower(city.Region.Name) = 'hello world' OR city.Region.Id != 1", + Optional.empty(), + Optional.of("fare > 0 OR city.Name like 'b%' OR lower(city.Region.Name) = 'hello world' OR city.Region.Id != 1"), + sessionHolder); + testFilter("fare > 0 OR city.Name like 'b%' OR city.Region.Id != 1", + Optional.of("((fare > 0 OR city.Name: \"b*\") OR NOT city.Region.Id: 1)"), + Optional.empty(), + sessionHolder); + } + + @Test + public void testAndPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("fare > 0 AND city.Name like 'b%'", Optional.of("(fare > 0 AND city.Name: \"b*\")"), Optional.empty(), sessionHolder); + testFilter("lower(city.Region.Name) = 'hello world' AND city.Region.Id != 1", Optional.of("(NOT city.Region.Id: 1)"), Optional.of("lower(city.Region.Name) = 'hello world'"), + sessionHolder); + + // Multiple ANDs + testFilter("fare > 0 AND city.Name like 'b%' AND lower(city.Region.Name) = 'hello world' AND city.Region.Id != 1", + Optional.of("(((fare > 0 AND city.Name: \"b*\")) AND NOT city.Region.Id: 1)"), + Optional.of("(lower(city.Region.Name) = 'hello world')"), + sessionHolder); + testFilter("fare > 0 AND city.Name like '%b%' AND lower(city.Region.Name) = 'hello world' AND city.Region.Id != 1", + Optional.of("(((fare > 0)) AND NOT city.Region.Id: 1)"), + Optional.of("city.Name like '%b%' AND lower(city.Region.Name) = 'hello world'"), + sessionHolder); + } + + @Test + public void testNotPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("city.Region.Name NOT LIKE 'hello%'", Optional.of("NOT city.Region.Name: \"hello*\""), Optional.empty(), sessionHolder); + testFilter("NOT (city.Region.Name LIKE 'hello%')", Optional.of("NOT city.Region.Name: \"hello*\""), Optional.empty(), sessionHolder); + testFilter("city.Name != 'hello world'", Optional.of("NOT city.Name: \"hello world\""), Optional.empty(), sessionHolder); + testFilter("city.Name <> 'hello world'", Optional.of("NOT city.Name: \"hello world\""), Optional.empty(), sessionHolder); + testFilter("NOT (city.Name = 'hello world')", Optional.of("NOT city.Name: \"hello world\""), Optional.empty(), sessionHolder); + testFilter("fare != 0", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + testFilter("fare <> 0", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + testFilter("NOT (fare = 0)", Optional.of("NOT fare: 0"), Optional.empty(), sessionHolder); + + // Multiple NOTs + testFilter("NOT (NOT fare = 0)", Optional.of("NOT NOT fare: 0"), Optional.empty(), sessionHolder); + testFilter("NOT (fare = 0 AND city.Name = 'hello world')", Optional.of("NOT (fare: 0 AND city.Name: \"hello world\")"), Optional.empty(), sessionHolder); + testFilter("NOT (fare = 0 OR city.Name = 'hello world')", Optional.of("NOT (fare: 0 OR city.Name: \"hello world\")"), Optional.empty(), sessionHolder); + } + + @Test + public void testInPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("city.Name IN ('hello world', 'hello world 2')", Optional.of("(city.Name: \"hello world\" OR city.Name: \"hello world 2\")"), Optional.empty(), sessionHolder); + } + + @Test + public void testIsNullPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("city.Name IS NULL", Optional.of("NOT city.Name: *"), Optional.empty(), sessionHolder); + testFilter("city.Name IS NOT NULL", Optional.of("NOT NOT city.Name: *"), Optional.empty(), sessionHolder); + testFilter("NOT (city.Name IS NULL)", Optional.of("NOT NOT city.Name: *"), Optional.empty(), sessionHolder); + } + + @Test + public void testComplexPushdown() + { + SessionHolder sessionHolder = new SessionHolder(); + + testFilter("(fare > 0 OR city.Name like 'b%') AND (lower(city.Region.Name) = 'hello world' OR city.Name IS NULL)", + Optional.of("((fare > 0 OR city.Name: \"b*\"))"), + Optional.of("(lower(city.Region.Name) = 'hello world' OR city.Name IS NULL)"), + sessionHolder); + testFilter("city.Region.Id = 1 AND (fare > 0 OR city.Name NOT like 'b%') AND (lower(city.Region.Name) = 'hello world' OR city.Name IS NULL)", + Optional.of("((city.Region.Id: 1 AND (fare > 0 OR NOT city.Name: \"b*\")))"), + Optional.of("lower(city.Region.Name) = 'hello world' OR city.Name IS NULL"), + sessionHolder); + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java new file mode 100644 index 0000000000000..56b6a5bbb78b7 --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpQueryBase.java @@ -0,0 +1,120 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.Session; +import com.facebook.presto.SystemSessionProperties; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.metadata.FunctionAndTypeManager; +import com.facebook.presto.metadata.Metadata; +import com.facebook.presto.metadata.MetadataManager; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.WarningCollector; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.relation.RowExpression; +import com.facebook.presto.spi.relation.VariableReferenceExpression; +import com.facebook.presto.sql.ExpressionUtils; +import com.facebook.presto.sql.parser.ParsingOptions; +import com.facebook.presto.sql.parser.SqlParser; +import com.facebook.presto.sql.planner.TypeProvider; +import com.facebook.presto.sql.relational.FunctionResolution; +import com.facebook.presto.sql.relational.SqlToRowExpressionTranslator; +import com.facebook.presto.sql.tree.Expression; +import com.facebook.presto.sql.tree.NodeRef; +import com.facebook.presto.testing.TestingSession; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.metadata.FunctionAndTypeManager.createTestFunctionAndTypeManager; +import static com.facebook.presto.metadata.SessionPropertyManager.createTestingSessionPropertyManager; +import static com.facebook.presto.sql.analyzer.ExpressionAnalyzer.getExpressionTypes; +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static java.util.stream.Collectors.toMap; + +public class TestClpQueryBase +{ + protected static final FunctionAndTypeManager functionAndTypeManager = createTestFunctionAndTypeManager(); + protected static final StandardFunctionResolution standardFunctionResolution = new FunctionResolution(functionAndTypeManager.getFunctionAndTypeResolver()); + protected static final Metadata metadata = MetadataManager.createTestMetadataManager(); + + protected static ClpColumnHandle city = new ClpColumnHandle("city", RowType.from(ImmutableList.of( + RowType.field("Name", VARCHAR), + RowType.field("Region", RowType.from(ImmutableList.of( + RowType.field("Id", BIGINT), + RowType.field("Name", VARCHAR) + ))))), true); + protected static final ClpColumnHandle fare = new ClpColumnHandle("fare", DOUBLE, true); + protected static final ClpColumnHandle isHoliday = new ClpColumnHandle("isHoliday", BOOLEAN, true); + protected static final Map variableToColumnHandleMap = + Stream.of(city, fare, isHoliday) + .collect(toMap( + ch -> new VariableReferenceExpression(Optional.empty(), ch.getColumnName(), ch.getColumnType()), + ch -> ch)); + protected final TypeProvider typeProvider = TypeProvider.fromVariables(variableToColumnHandleMap.keySet()); + + protected static class SessionHolder + { + private final ConnectorSession connectorSession; + private final Session session; + + public SessionHolder() + { + connectorSession = SESSION; + session = TestingSession.testSessionBuilder(createTestingSessionPropertyManager(new SystemSessionProperties().getSessionProperties())).build(); + } + + public ConnectorSession getConnectorSession() + { + return connectorSession; + } + + public Session getSession() + { + return session; + } + } + + public static Expression expression(String sql) + { + return ExpressionUtils.rewriteIdentifiersToSymbolReferences(new SqlParser().createExpression(sql, new ParsingOptions(ParsingOptions.DecimalLiteralTreatment.AS_DECIMAL))); + } + + protected RowExpression toRowExpression(Expression expression, Session session) + { + Map, Type> expressionTypes = getExpressionTypes( + session, + metadata, + new SqlParser(), + typeProvider, + expression, + ImmutableMap.of(), + WarningCollector.NOOP); + return SqlToRowExpressionTranslator.translate(expression, expressionTypes, ImmutableMap.of(), functionAndTypeManager, session); + } + + protected RowExpression getRowExpression(String sqlExpression, SessionHolder sessionHolder) + { + return toRowExpression(expression(sqlExpression), sessionHolder.getSession()); + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java new file mode 100644 index 0000000000000..61024e81a2b9f --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider; +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.SchemaTableName; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.io.File; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +@Test(singleThreaded = true) +public class TestClpSplit +{ + private ClpConfig config; + private static final String TABLE_NAME_1 = "test_1"; + private static final String TABLE_NAME_2 = "test_2"; + private static final String TABLE_NAME_3 = "test_3"; + private static final String TABLE_SCHEMA = "default"; + private static final List TABLE_NAME_LIST = Arrays.asList(TABLE_NAME_1, TABLE_NAME_2, TABLE_NAME_3); + private static final int NUM_SPLITS = 10; + + @BeforeMethod + public void setUp() + { + final String metadataDbUrl = "jdbc:h2:file:/tmp/split_testdb;MODE=MySQL;DATABASE_TO_UPPER=FALSE"; + final String metadataDbUser = "sa"; + final String metadataDbPassword = ""; + final String metadataDbTablePrefix = "clp_"; + final String tableMetadataSuffix = "table_metadata"; + final String archiveTableSuffix = "_archives"; + + this.config = new ClpConfig().setPolymorphicTypeEnabled(true) + .setMetadataDbUrl(metadataDbUrl) + .setMetadataDbUser("sa") + .setMetadataDbPassword("") + .setMetadataTablePrefix(metadataDbTablePrefix); + + final String tableMetadataTableName = metadataDbTablePrefix + tableMetadataSuffix; + final String archiveTableFormat = metadataDbTablePrefix + "%s" + archiveTableSuffix; + + final String createTableMetadataSQL = String.format( + "CREATE TABLE IF NOT EXISTS %s (" + + " table_name VARCHAR(512) PRIMARY KEY," + + " table_path VARCHAR(1024) NOT NULL)", tableMetadataTableName); + + try (Connection conn = DriverManager.getConnection(metadataDbUrl, metadataDbUser, metadataDbPassword); + Statement stmt = conn.createStatement()) { + stmt.execute(createTableMetadataSQL); + + // Insert table metadata in batch + String insertTableMetadataSQL = String.format("INSERT INTO %s (table_name, table_path) VALUES (?, ?)", tableMetadataTableName); + try (PreparedStatement pstmt = conn.prepareStatement(insertTableMetadataSQL)) { + for (String tableName : TABLE_NAME_LIST) { + pstmt.setString(1, tableName); + pstmt.setString(2, "/tmp/archives/" + tableName); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + + // Create and populate archive tables + for (String tableName : TABLE_NAME_LIST) { + String archiveTableName = String.format(archiveTableFormat, tableName); + String createArchiveTableSQL = String.format( + "CREATE TABLE IF NOT EXISTS %s (" + + "id BIGINT AUTO_INCREMENT PRIMARY KEY, " + + "archive_id VARCHAR(128) NOT NULL" + + ")", + archiveTableName); + stmt.execute(createArchiveTableSQL); + + String insertArchiveTableSQL = String.format("INSERT INTO %s (archive_id) VALUES (?)", archiveTableName); + try (PreparedStatement pstmt = conn.prepareStatement(insertArchiveTableSQL)) { + for (int i = 0; i < NUM_SPLITS; i++) { + pstmt.setString(1, "id_" + i); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + } + } + catch (SQLException e) { + fail(e.getMessage()); + } + } + + @AfterMethod + public void tearDown() + { + File dbFile = new File("/tmp/split_testdb.mv.db"); + File lockFile = new File("/tmp/split_testdb.trace.db"); // Optional, H2 sometimes creates this + if (dbFile.exists()) { + dbFile.delete(); + System.out.println("Deleted database file: " + dbFile.getAbsolutePath()); + } + if (lockFile.exists()) { + lockFile.delete(); + } + } + + @Test + public void testListSplits() + { + ClpSplitProvider splitProvider = new ClpMySqlSplitProvider(config); + for (String tableName : TABLE_NAME_LIST) { + ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(new ClpTableHandle(new SchemaTableName(TABLE_SCHEMA, tableName)), Optional.empty()); + List splits = splitProvider.listSplits(layoutHandle); + assertEquals(splits.size(), NUM_SPLITS); + for (int i = 0; i < NUM_SPLITS; i++) { + assertEquals(splits.get(i).getArchivePath(), "/tmp/archives/" + tableName + "/id_" + i); + assertEquals(splits.get(i).getQuery(), Optional.empty()); + } + } + } +} diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst index d337fe4ed12d1..a45ab8f9bafa4 100644 --- a/presto-docs/src/main/sphinx/connector.rst +++ b/presto-docs/src/main/sphinx/connector.rst @@ -14,6 +14,7 @@ from different data sources. connector/blackhole connector/cassandra connector/clickhouse + connector/clp connector/deltalake connector/druid connector/elasticsearch diff --git a/presto-docs/src/main/sphinx/connector/clp.rst b/presto-docs/src/main/sphinx/connector/clp.rst new file mode 100644 index 0000000000000..f37f6eba7df1a --- /dev/null +++ b/presto-docs/src/main/sphinx/connector/clp.rst @@ -0,0 +1,234 @@ +======================= +CLP Connector +======================= + +.. contents:: + :local: + :backlinks: none + :depth: 1 + +Overview +-------- + +The CLP Connector enables SQL-based querying of CLP-S archives from Presto. This document describes how to setup the +CLP Connector to run SQL queries. + + +Configuration +------------- + +To configure the CLP connector, create a catalog properties file +``etc/catalog/clp.properties`` with the following contents, +replacing the properties as appropriate: + +.. code-block:: none + + connector.name=clp + clp.archive-source=local + clp.metadata-source=mysql + clp.metadata-db-url=jdbc:mysql://localhost:3306 + clp.metadata-db-name=clp_db + clp.metadata-db-user=clp_user + clp.metadata-db-password=clp_password + clp.metadata-table-prefix=clp_ + clp.split-source=mysql + + +Configuration Properties +------------------------ + +The following configuration properties are available: + +============================================= ============================================================================== +Property Name Description +============================================= ============================================================================== +``clp.archive-source`` The source of the CLP archive. +``clp.metadata-expire-interval`` The time interval after which metadata entries are considered expired. +``clp.metadata-refresh-interval`` The frequency at which metadata is refreshed from the source. +``clp.polymorphic-type-enabled`` Enables or disables support for polymorphic types within CLP. +``clp.metadata-source`` The source from which metadata is fetched. +``clp.metadata-db-url`` The connection URL for the metadata database. +``clp.metadata-db-name`` The name of the metadata database. +``clp.metadata-db-user`` The database user with access to the metadata database. +``clp.metadata-db-password`` The password for the metadata database user. +``clp.metadata-table-prefix`` A prefix applied to table names in the metadata database. +``clp.split-source`` The source of split information for query execution. +============================================= ============================================================================== + +``clp.archive-source`` +^^^^^^^^^^^^^^^^^^^^^^ + +Specifies the source of the CLP archive. Supported values include ``local`` (local storage) and ``s3`` (Amazon S3). + +This property is optional. The default is ``local``. + +``clp.metadata-expire-interval`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Defines how long metadata entries remain valid before being considered expired, in seconds. + +This property is optional. The default is ``600``. + +``clp.metadata-refresh-interval`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Specifies how frequently metadata is refreshed from the source, in seconds. This ensures that metadata remains up to +date. + +Set this to a lower value for frequently changing datasets or to a higher value to reduce load. + +This property is optional. The default is ``60``. + +``clp.polymorphic-type-enabled`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +Enables or disables support for polymorphic types in CLP, allowing the same field to have different types. +This is useful for schema-less, semi-structured data where the same field may appear with different types. +When enabled, type annotations are added to conflicting field names to distinguish between types. For example, if ``id`` +column appears as both an ``int`` and ``string`` types, the connector will create two columns named ``id_bigint`` and +``id_varchar``. + +Supported type annotations include ``bigint``, ``varchar``, ``double``, ``boolean``, and +``array(varchar)`` (See `Data Types`_ for details). For columns with only one type, the original column name is used. + +This property is optional. The default is ``false``. + +``clp.metadata-source`` +^^^^^^^^^^^^^^^^^^^^^^^ +Currently, the only supported source is a MySQL database, which is also used by the CLP package to store metadata. +Additional sources can be supported by implementing the ``ClpMetadataProvider`` interface. + +This property is optional. The default is ``mysql``. + +``clp.metadata-db-url`` +^^^^^^^^^^^^^^^^^^^^^^^ +The JDBC URL used to connect to the metadata database. + +This property is required if ``clp.metadata-source`` is set to ``mysql``. + +``clp.metadata-db-name`` +^^^^^^^^^^^^^^^^^^^^^^^^ + +The name of the metadata database. + +This option is required if ``clp.metadata-source`` is set to ``mysql`` and the database name is not specified in the URL. + +``clp.metadata-db-user`` +^^^^^^^^^^^^^^^^^^^^^^^^ + +The username used to authenticate with the metadata database. + +Ensure this user has read access to the relevant metadata tables. + +This option is required if ``clp.metadata-source`` is set to ``mysql``. + +``clp.metadata-db-password`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +The password for the user specified in ``clp.metadata-db-user``. + +This option is required if ``clp.metadata-source`` is set to ``mysql``. + +``clp.metadata-table-prefix`` +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +A string prefix prepended to all metadata table names when querying the database. Useful for namespacing or avoiding +collisions. + +This option is optional. The default is empty. + +``clp.split-source`` +^^^^^^^^^^^^^^^^^^^^ + +Specifies the source of split information for tables. By default, it uses the same source as the metadata with the same +connection parameters. Additional sources can be supported by implementing the ``ClpSplitProvider`` interface. + +This property is optional. The default is ``mysql``. + +Metadata and Split Providers +---------------------------- +As mentioned earlier, the CLP connector relies on metadata and split providers to retrieve information from various +sources. By default, it uses a MySQL database for both metadata and split storage. We recommend using the CLP package +for log ingestion, which automatically populates the database with the required information. However, if you prefer to +use a different source—or the same source with a custom implementation—you can provide your own implementations of +the ``ClpMetadataProvider`` and ``ClpSplitProvider`` interfaces, and configure the connector accordingly. + +Data Types +---------- + +The data type mappings are as follows: + +====================== ==================== +CLP Type Presto Type +====================== ==================== +``Integer`` ``BIGINT`` +``Float`` ``DOUBLE`` +``ClpString`` ``VARCHAR`` +``VarString`` ``VARCHAR`` +``DateString`` ``VARCHAR`` +``Boolean`` ``BOOLEAN`` +``UnstructuredArray`` ``ARRAY(VARCHAR)`` +``Object`` ``ROW`` +(others) (unsupported) +====================== ==================== + +String Types +^^^^^^^^^^^^ + +In CLP, we have three distinct string types: ``ClpString`` (strings with whitespace), ``VarString`` (strings without +whitespace), and ``DateString`` (strings representing dates). Currently, all three are mapped to Presto's ``VARCHAR`` +type. + +Array Types +^^^^^^^^^^^ + +CLP supports two array types: ``UnstructuredArray`` and ``StructuredArray``. Unstructured arrays are stored as strings +in CLP and elements can be any type. However, in Presto arrays are homogeneous, so the elements are converted to strings +when read. ``StructuredArray`` type is not supported yet. + +Object Types +^^^^^^^^^^^^ +CLP stores metadata using a global schema tree structure that captures all possible fields from various log structures. +Internal nodes may represent objects containing nested fields as their children. In Presto, we map these internal object +nodes specifically to the ``ROW`` data type, including all subfields as fields within the ``ROW``. + +For instance, consider a table containing two distinct JSON log types: + +Log Type 1: + +.. code-block:: json + + { + "msg": { + "ts": 0, + "status": "ok" + } + } + +Log Type 2: + +.. code-block:: json + + { + "msg": { + "ts": 1, + "status": "error", + "thread_num": 4, + "backtrace": "" + } + } + +In CLP's schema tree, these two structures are combined into a unified internal node (``msg``) with four child nodes: +``ts``, ``status``, ``thread_num`` and ``backtrace``. In Presto, we represent this combined structure using the +following ``ROW`` type: + +.. code-block:: sql + + ROW(ts BIGINT, status VARCHAR, thread_num BIGINT, backtrace VARCHAR) + +Each JSON log maps to this unified ``ROW`` type, with absent fields represented as ``NULL``. Thus, the child nodes +(``ts``, ``status``, ``thread_num``, ``backtrace``) become fields within the ``ROW``, clearly reflecting the nested and +varying structures of the original JSON logs. + +SQL support +----------- + +The connector only provides read access to data. It does not support DDL operations, such as creating or dropping +tables. Currently, we only support one ``default`` schema. diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml index 4e82ac23b9a1f..0c88f798c727b 100644 --- a/presto-server/src/main/provisio/presto.xml +++ b/presto-server/src/main/provisio/presto.xml @@ -230,6 +230,12 @@ + + + + + +