diff --git a/pom.xml b/pom.xml index dd70d9bb351f6..11998e65cf58c 100644 --- a/pom.xml +++ b/pom.xml @@ -216,6 +216,7 @@ presto-function-server presto-router-example-plugin-scheduler presto-plan-checker-router-plugin + presto-clp diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml new file mode 100644 index 0000000000000..409915dff2334 --- /dev/null +++ b/presto-clp/pom.xml @@ -0,0 +1,134 @@ + + + 4.0.0 + + + com.facebook.presto + presto-root + 0.294-SNAPSHOT + + + presto-clp + Presto - CLP Connector + presto-plugin + + + ${project.parent.basedir} + + + + + com.mysql + mysql-connector-j + runtime + + + + com.facebook.airlift + bootstrap + + + + com.facebook.airlift + json + + + + com.facebook.airlift + log + + + + com.facebook.airlift + configuration + + + + com.google.inject + guice + + + + com.google.guava + guava + + + + javax.inject + javax.inject + + + + com.fasterxml.jackson.core + jackson-annotations + provided + + + + com.facebook.presto + presto-spi + provided + + + + com.facebook.presto + presto-common + provided + + + + io.airlift + units + provided + + + + io.airlift + slice + provided + + + + org.testng + testng + test + + + + com.facebook.presto + presto-main-base + test + + + + com.facebook.presto + presto-analyzer + test + + + + com.h2database + h2 + test + + + + com.facebook.presto + presto-parser + test + + + + org.apache.commons + commons-math3 + test + + + + commons-io + commons-io + test + + + diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java new file mode 100644 index 0000000000000..ccd9e1d607635 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java @@ -0,0 +1,117 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.type.Type; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class ClpColumnHandle + implements ColumnHandle +{ + private final String columnName; + private final String originalColumnName; + private final Type columnType; + private final boolean nullable; + + @JsonCreator + public ClpColumnHandle( + @JsonProperty("columnName") String columnName, + @JsonProperty("originalColumnName") String originalColumnName, + @JsonProperty("columnType") Type columnType, + @JsonProperty("nullable") boolean nullable) + { + this.columnName = columnName; + this.originalColumnName = originalColumnName; + this.columnType = columnType; + this.nullable = nullable; + } + + public ClpColumnHandle(String columnName, Type columnType, boolean nullable) + { + this(columnName, columnName, columnType, nullable); + } + + @JsonProperty + public String getColumnName() + { + return columnName; + } + + @JsonProperty + public String getOriginalColumnName() + { + return originalColumnName; + } + + @JsonProperty + public Type getColumnType() + { + return columnType; + } + + @JsonProperty + public boolean isNullable() + { + return nullable; + } + + public ColumnMetadata getColumnMetadata() + { + return ColumnMetadata.builder() + .setName(columnName) + .setType(columnType) + .setNullable(nullable) + .build(); + } + + @Override + public int hashCode() + { + return Objects.hash(columnName, originalColumnName, columnType, nullable); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ClpColumnHandle other = (ClpColumnHandle) obj; + return Objects.equals(this.columnName, other.columnName) && + Objects.equals(this.originalColumnName, other.originalColumnName) && + Objects.equals(this.columnType, other.columnType) && + Objects.equals(this.nullable, other.nullable); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("columnName", columnName) + .add("originalColumnName", originalColumnName) + .add("columnType", columnType) + .add("nullable", nullable) + .toString(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java new file mode 100644 index 0000000000000..761c4702a64b2 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java @@ -0,0 +1,173 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.configuration.Config; +import com.facebook.presto.spi.PrestoException; + +import java.util.regex.Pattern; + +public class ClpConfig +{ + public static final Pattern SAFE_SQL_TABLE_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_]+$"); + + private boolean polymorphicTypeEnabled = true; + + private MetadataProviderType metadataProviderType = MetadataProviderType.MYSQL; + private String metadataDbUrl; + private String metadataDbName; + private String metadataDbUser; + private String metadataDbPassword; + private String metadataTablePrefix; + private long metadataRefreshInterval = 60; + private long metadataExpireInterval = 600; + + private SplitProviderType splitProviderType = SplitProviderType.MYSQL; + + public boolean isPolymorphicTypeEnabled() + { + return polymorphicTypeEnabled; + } + + @Config("clp.polymorphic-type-enabled") + public ClpConfig setPolymorphicTypeEnabled(boolean polymorphicTypeEnabled) + { + this.polymorphicTypeEnabled = polymorphicTypeEnabled; + return this; + } + + public MetadataProviderType getMetadataProviderType() + { + return metadataProviderType; + } + + @Config("clp.metadata-provider-type") + public ClpConfig setMetadataProviderType(MetadataProviderType metadataProviderType) + { + this.metadataProviderType = metadataProviderType; + return this; + } + + public String getMetadataDbUrl() + { + return metadataDbUrl; + } + + @Config("clp.metadata-db-url") + public ClpConfig setMetadataDbUrl(String metadataDbUrl) + { + this.metadataDbUrl = metadataDbUrl; + return this; + } + + public String getMetadataDbName() + { + return metadataDbName; + } + + @Config("clp.metadata-db-name") + public ClpConfig setMetadataDbName(String metadataDbName) + { + this.metadataDbName = metadataDbName; + return this; + } + + public String getMetadataDbUser() + { + return metadataDbUser; + } + + @Config("clp.metadata-db-user") + public ClpConfig setMetadataDbUser(String metadataDbUser) + { + this.metadataDbUser = metadataDbUser; + return this; + } + + public String getMetadataDbPassword() + { + return metadataDbPassword; + } + + @Config("clp.metadata-db-password") + public ClpConfig setMetadataDbPassword(String metadataDbPassword) + { + this.metadataDbPassword = metadataDbPassword; + return this; + } + + public String getMetadataTablePrefix() + { + return metadataTablePrefix; + } + + @Config("clp.metadata-table-prefix") + public ClpConfig setMetadataTablePrefix(String metadataTablePrefix) + { + if (metadataTablePrefix == null || !SAFE_SQL_TABLE_NAME_PATTERN.matcher(metadataTablePrefix).matches()) { + throw new PrestoException( + ClpErrorCode.CLP_UNSUPPORTED_CONFIG_OPTION, + "Invalid metadataTablePrefix: " + metadataTablePrefix + ". Only alphanumeric characters and underscores are allowed."); + } + + this.metadataTablePrefix = metadataTablePrefix; + return this; + } + + public long getMetadataRefreshInterval() + { + return metadataRefreshInterval; + } + + @Config("clp.metadata-refresh-interval") + public ClpConfig setMetadataRefreshInterval(long metadataRefreshInterval) + { + this.metadataRefreshInterval = metadataRefreshInterval; + return this; + } + + public long getMetadataExpireInterval() + { + return metadataExpireInterval; + } + + @Config("clp.metadata-expire-interval") + public ClpConfig setMetadataExpireInterval(long metadataExpireInterval) + { + this.metadataExpireInterval = metadataExpireInterval; + return this; + } + + public SplitProviderType getSplitProviderType() + { + return splitProviderType; + } + + @Config("clp.split-provider-type") + public ClpConfig setSplitProviderType(SplitProviderType splitProviderType) + { + this.splitProviderType = splitProviderType; + return this; + } + + public enum MetadataProviderType + { + MYSQL + } + + public enum SplitProviderType + { + MYSQL + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java new file mode 100644 index 0000000000000..335bc50b96f50 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java @@ -0,0 +1,82 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.bootstrap.LifeCycleManager; +import com.facebook.airlift.log.Logger; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; +import com.facebook.presto.spi.transaction.IsolationLevel; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class ClpConnector + implements Connector +{ + private static final Logger log = Logger.get(ClpConnector.class); + + private final LifeCycleManager lifeCycleManager; + private final ClpMetadata metadata; + private final ClpRecordSetProvider recordSetProvider; + private final ClpSplitManager splitManager; + + @Inject + public ClpConnector(LifeCycleManager lifeCycleManager, ClpMetadata metadata, ClpRecordSetProvider recordSetProvider, ClpSplitManager splitManager) + { + this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null"); + this.metadata = requireNonNull(metadata, "metadata is null"); + this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null"); + this.splitManager = requireNonNull(splitManager, "splitManager is null"); + } + + @Override + public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly) + { + return ClpTransactionHandle.INSTANCE; + } + + @Override + public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle) + { + return metadata; + } + + @Override + public ConnectorRecordSetProvider getRecordSetProvider() + { + return recordSetProvider; + } + + @Override + public ConnectorSplitManager getSplitManager() + { + return splitManager; + } + + @Override + public final void shutdown() + { + try { + lifeCycleManager.stop(); + } + catch (Exception e) { + log.error(e, "Error shutting down connector"); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java new file mode 100644 index 0000000000000..bec007135bccd --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java @@ -0,0 +1,70 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.json.JsonModule; +import com.facebook.presto.common.type.TypeManager; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.connector.Connector; +import com.facebook.presto.spi.connector.ConnectorContext; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.facebook.presto.spi.function.FunctionMetadataManager; +import com.facebook.presto.spi.function.StandardFunctionResolution; +import com.facebook.presto.spi.relation.RowExpressionService; +import com.google.inject.Injector; + +import java.util.Map; + +import static java.util.Objects.requireNonNull; + +public class ClpConnectorFactory + implements ConnectorFactory +{ + @Override + public String getName() + { + return "clp"; + } + + @Override + public ConnectorHandleResolver getHandleResolver() + { + return new ClpHandleResolver(); + } + + @Override + public Connector create(String catalogName, Map config, ConnectorContext context) + { + requireNonNull(catalogName, "catalogName is null"); + requireNonNull(config, "config is null"); + try { + Bootstrap app = new Bootstrap(new JsonModule(), new ClpModule(), binder -> { + binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager()); + binder.bind(NodeManager.class).toInstance(context.getNodeManager()); + binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService()); + binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution()); + binder.bind(TypeManager.class).toInstance(context.getTypeManager()); + }); + + Injector injector = app.doNotInitializeLogging().setRequiredConfigurationProperties(config).initialize(); + + return injector.getInstance(ClpConnector.class); + } + catch (Exception e) { + throw new RuntimeException(e); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java new file mode 100644 index 0000000000000..8cb2438277404 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java @@ -0,0 +1,43 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.ErrorCode; +import com.facebook.presto.common.ErrorType; +import com.facebook.presto.spi.ErrorCodeSupplier; + +import static com.facebook.presto.common.ErrorType.EXTERNAL; + +public enum ClpErrorCode + implements ErrorCodeSupplier +{ + CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION(0, EXTERNAL), + CLP_UNSUPPORTED_METADATA_SOURCE(1, EXTERNAL), + CLP_UNSUPPORTED_SPLIT_SOURCE(2, EXTERNAL), + CLP_UNSUPPORTED_TYPE(3, EXTERNAL), + CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL); + + private final ErrorCode errorCode; + + ClpErrorCode(int code, ErrorType type) + { + errorCode = new ErrorCode(code + 0x0400_0000, name(), type); + } + + @Override + public ErrorCode toErrorCode() + { + return errorCode; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java new file mode 100644 index 0000000000000..462ecc039b9c6 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java @@ -0,0 +1,55 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorHandleResolver; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public class ClpHandleResolver + implements ConnectorHandleResolver +{ + @Override + public Class getTableHandleClass() + { + return ClpTableHandle.class; + } + + @Override + public Class getTableLayoutHandleClass() + { + return ClpTableLayoutHandle.class; + } + + @Override + public Class getColumnHandleClass() + { + return ClpColumnHandle.class; + } + + @Override + public Class getSplitClass() + { + return ClpSplit.class; + } + + @Override + public Class getTransactionHandleClass() + { + return ClpTransactionHandle.class; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java new file mode 100644 index 0000000000000..efa43e37eb517 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java @@ -0,0 +1,206 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.ConnectorTableLayout; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.ConnectorTableLayoutResult; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.Constraint; +import com.facebook.presto.spi.SchemaTableName; +import com.facebook.presto.spi.SchemaTablePrefix; +import com.facebook.presto.spi.connector.ConnectorMetadata; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import javax.inject.Inject; + +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.function.Function; + +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.collect.ImmutableMap.toImmutableMap; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; + +/** + * For efficiency, this class maintains two caches for metadata from the + * {@link ClpMetadataProvider}: + * + */ +public class ClpMetadata + implements ConnectorMetadata +{ + public static final String DEFAULT_SCHEMA_NAME = "default"; + + private final ClpMetadataProvider clpMetadataProvider; + private final LoadingCache> columnHandleCache; + private final LoadingCache> tableHandleCache; + + @Inject + public ClpMetadata(ClpConfig clpConfig, ClpMetadataProvider clpMetadataProvider) + { + this.clpMetadataProvider = requireNonNull(clpMetadataProvider, "ClpMetadataProvider is null"); + + this.columnHandleCache = CacheBuilder.newBuilder() + .expireAfterWrite(clpConfig.getMetadataExpireInterval(), SECONDS) + .refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS) + .build(CacheLoader.from(this::loadColumnHandles)); + this.tableHandleCache = CacheBuilder.newBuilder() + .expireAfterWrite(clpConfig.getMetadataExpireInterval(), SECONDS) + .refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS) + .build(CacheLoader.from(this::loadTableHandles)); + } + + @Override + public List listSchemaNames(ConnectorSession session) + { + return ImmutableList.of(DEFAULT_SCHEMA_NAME); + } + + @Override + public List listTables(ConnectorSession session, Optional schemaName) + { + String schemaNameValue = schemaName.orElse(DEFAULT_SCHEMA_NAME); + if (!listSchemaNames(session).contains(schemaNameValue)) { + return ImmutableList.of(); + } + + return listTables(schemaNameValue).stream() + .map(tableHandle -> new SchemaTableName(schemaNameValue, tableHandle.getSchemaTableName().getTableName())) + .collect(toImmutableList()); + } + + @Override + public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName) + { + String schemaName = tableName.getSchemaName(); + if (!listSchemaNames(session).contains(schemaName)) { + return null; + } + + return listTables(schemaName).stream() + .filter(tableHandle -> tableHandle.getSchemaTableName().equals(tableName)) + .findFirst() + .orElse(null); + } + + @Override + public ConnectorTableLayoutResult getTableLayoutForConstraint( + ConnectorSession session, + ConnectorTableHandle table, + Constraint constraint, + Optional> desiredColumns) + { + ClpTableHandle tableHandle = (ClpTableHandle) table; + ConnectorTableLayout layout = new ConnectorTableLayout(new ClpTableLayoutHandle(tableHandle, Optional.empty())); + return new ConnectorTableLayoutResult(layout, constraint.getSummary()); + } + + @Override + public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle) + { + return new ConnectorTableLayout(handle); + } + + @Override + public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table) + { + ClpTableHandle clpTableHandle = (ClpTableHandle) table; + SchemaTableName schemaTableName = clpTableHandle.getSchemaTableName(); + List columns = listColumns(schemaTableName).stream() + .map(ClpColumnHandle::getColumnMetadata) + .collect(toImmutableList()); + + return new ConnectorTableMetadata(schemaTableName, columns); + } + + @Override + public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix) + { + requireNonNull(prefix, "prefix is null"); + String schemaName = prefix.getSchemaName(); + if (schemaName != null && !listSchemaNames(session).contains(schemaName)) { + return ImmutableMap.of(); + } + + List schemaTableNames; + if (prefix.getTableName() == null) { + schemaTableNames = listTables(session, Optional.ofNullable(prefix.getSchemaName())); + } + else { + SchemaTableName table = new SchemaTableName(schemaName, prefix.getTableName()); + if (listTables(session, Optional.ofNullable(schemaName)).contains(table)) { + schemaTableNames = ImmutableList.of(table); + } + else { + schemaTableNames = ImmutableList.of(); + } + } + + return schemaTableNames.stream() + .collect(toImmutableMap( + Function.identity(), + tableName -> getTableMetadata(session, getTableHandle(session, tableName)).getColumns())); + } + + @Override + public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle) + { + ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle; + return listColumns(clpTableHandle.getSchemaTableName()).stream().collect(toImmutableMap(ClpColumnHandle::getColumnName, column -> column)); + } + + @Override + public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle) + { + ClpColumnHandle clpColumnHandle = (ClpColumnHandle) columnHandle; + return clpColumnHandle.getColumnMetadata(); + } + + private List loadColumnHandles(SchemaTableName schemaTableName) + { + return clpMetadataProvider.listColumnHandles(schemaTableName); + } + + private List loadTableHandles(String schemaName) + { + return clpMetadataProvider.listTableHandles(schemaName); + } + + private List listColumns(SchemaTableName schemaTableName) + { + return columnHandleCache.getUnchecked(schemaTableName); + } + + private List listTables(String schemaName) + { + return tableHandleCache.getUnchecked(schemaName); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java new file mode 100644 index 0000000000000..fa0386376f12a --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider; +import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider; +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.PrestoException; +import com.google.inject.Binder; +import com.google.inject.Scopes; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.facebook.presto.plugin.clp.ClpConfig.MetadataProviderType; +import static com.facebook.presto.plugin.clp.ClpConfig.SplitProviderType; +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_METADATA_SOURCE; +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_SPLIT_SOURCE; + +public class ClpModule + extends AbstractConfigurationAwareModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(ClpConnector.class).in(Scopes.SINGLETON); + binder.bind(ClpMetadata.class).in(Scopes.SINGLETON); + binder.bind(ClpRecordSetProvider.class).in(Scopes.SINGLETON); + binder.bind(ClpSplitManager.class).in(Scopes.SINGLETON); + configBinder(binder).bindConfig(ClpConfig.class); + + ClpConfig config = buildConfigObject(ClpConfig.class); + if (config.getMetadataProviderType() == MetadataProviderType.MYSQL) { + binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON); + } + else { + throw new PrestoException(CLP_UNSUPPORTED_METADATA_SOURCE, "Unsupported metadata provider type: " + config.getMetadataProviderType()); + } + + if (config.getSplitProviderType() == SplitProviderType.MYSQL) { + binder.bind(ClpSplitProvider.class).to(ClpMySqlSplitProvider.class).in(Scopes.SINGLETON); + } + else { + throw new PrestoException(CLP_UNSUPPORTED_SPLIT_SOURCE, "Unsupported split provider type: " + config.getSplitProviderType()); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java new file mode 100644 index 0000000000000..985c707a32483 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.Plugin; +import com.facebook.presto.spi.connector.ConnectorFactory; +import com.google.common.collect.ImmutableList; + +public class ClpPlugin + implements Plugin +{ + @Override + public Iterable getConnectorFactories() + { + return ImmutableList.of(new ClpConnectorFactory()); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpRecordSetProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpRecordSetProvider.java new file mode 100644 index 0000000000000..452fe39c7d9ff --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpRecordSetProvider.java @@ -0,0 +1,33 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ColumnHandle; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.RecordSet; +import com.facebook.presto.spi.connector.ConnectorRecordSetProvider; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import java.util.List; + +public class ClpRecordSetProvider + implements ConnectorRecordSetProvider +{ + @Override + public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List columns) + { + throw new UnsupportedOperationException("getRecordSet is not supported"); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java new file mode 100644 index 0000000000000..687a8e3e18401 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java @@ -0,0 +1,65 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorSplit; +import com.facebook.presto.spi.HostAddress; +import com.facebook.presto.spi.NodeProvider; +import com.facebook.presto.spi.schedule.NodeSelectionStrategy; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Map; + +import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE; +import static java.util.Objects.requireNonNull; + +public class ClpSplit + implements ConnectorSplit +{ + private final String path; + + @JsonCreator + public ClpSplit(@JsonProperty("path") String path) + { + this.path = requireNonNull(path, "Split path is null"); + } + + @JsonProperty + public String getPath() + { + return path; + } + + @Override + public NodeSelectionStrategy getNodeSelectionStrategy() + { + return NO_PREFERENCE; + } + + @Override + public List getPreferredNodes(NodeProvider nodeProvider) + { + return ImmutableList.of(); + } + + @Override + public Map getInfo() + { + return ImmutableMap.of("path", path); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java new file mode 100644 index 0000000000000..cf7df605a1d0d --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java @@ -0,0 +1,49 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.ConnectorSession; +import com.facebook.presto.spi.ConnectorSplitSource; +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.facebook.presto.spi.FixedSplitSource; +import com.facebook.presto.spi.connector.ConnectorSplitManager; +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +import javax.inject.Inject; + +import static java.util.Objects.requireNonNull; + +public class ClpSplitManager + implements ConnectorSplitManager +{ + private final ClpSplitProvider clpSplitProvider; + + @Inject + public ClpSplitManager(ClpSplitProvider clpSplitProvider) + { + this.clpSplitProvider = requireNonNull(clpSplitProvider, "clpSplitProvider is null"); + } + + @Override + public ConnectorSplitSource getSplits( + ConnectorTransactionHandle transactionHandle, + ConnectorSession session, + ConnectorTableLayoutHandle layout, + SplitSchedulingContext splitSchedulingContext) + { + ClpTableLayoutHandle layoutHandle = (ClpTableLayoutHandle) layout; + return new FixedSplitSource(clpSplitProvider.listSplits(layoutHandle)); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java new file mode 100644 index 0000000000000..07be04ef1584d --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java @@ -0,0 +1,77 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class ClpTableHandle + implements ConnectorTableHandle +{ + private final SchemaTableName schemaTableName; + private final String tablePath; + + @JsonCreator + public ClpTableHandle(@JsonProperty("schemaTableName") SchemaTableName schemaTableName, @JsonProperty("tablePath") String tablePath) + { + this.schemaTableName = schemaTableName; + this.tablePath = tablePath; + } + + @JsonProperty + public SchemaTableName getSchemaTableName() + { + return schemaTableName; + } + + @JsonProperty + public String getTablePath() + { + return tablePath; + } + + @Override + public int hashCode() + { + return Objects.hash(schemaTableName, tablePath); + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + ClpTableHandle other = (ClpTableHandle) obj; + return this.schemaTableName.equals(other.schemaTableName) && this.tablePath.equals(other.tablePath); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("schemaTableName", schemaTableName) + .add("tablePath", tablePath) + .toString(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java new file mode 100644 index 0000000000000..f1f7d7c708815 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java @@ -0,0 +1,78 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.ConnectorTableLayoutHandle; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; +import java.util.Optional; + +import static com.google.common.base.MoreObjects.toStringHelper; + +public class ClpTableLayoutHandle + implements ConnectorTableLayoutHandle +{ + private final ClpTableHandle table; + private final Optional kqlQuery; + + @JsonCreator + public ClpTableLayoutHandle(@JsonProperty("table") ClpTableHandle table, @JsonProperty("kqlQuery") Optional kqlQuery) + { + this.table = table; + this.kqlQuery = kqlQuery; + } + + @JsonProperty + public ClpTableHandle getTable() + { + return table; + } + + @JsonProperty + public Optional getKqlQuery() + { + return kqlQuery; + } + + @Override + public boolean equals(Object o) + { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + ClpTableLayoutHandle that = (ClpTableLayoutHandle) o; + return Objects.equals(table, that.table) && + Objects.equals(kqlQuery, that.kqlQuery); + } + + @Override + public int hashCode() + { + return Objects.hash(table, kqlQuery); + } + + @Override + public String toString() + { + return toStringHelper(this) + .add("table", table) + .add("kqlQuery", kqlQuery) + .toString(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java new file mode 100644 index 0000000000000..f39cd639072d6 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java @@ -0,0 +1,22 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.spi.connector.ConnectorTransactionHandle; + +public enum ClpTransactionHandle + implements ConnectorTransactionHandle +{ + INSTANCE +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java new file mode 100644 index 0000000000000..33e4b748a30d4 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java @@ -0,0 +1,39 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.plugin.clp.ClpTableHandle; +import com.facebook.presto.spi.SchemaTableName; + +import java.util.List; + +/** + * A provider for metadata that describes what tables exist in the CLP connector, and what columns + * exist in each of those tables. + */ +public interface ClpMetadataProvider +{ + /** + * @param schemaTableName the name of the schema and the table + * @return the list of column handles for the given table. + */ + List listColumnHandles(SchemaTableName schemaTableName); + + /** + * @param schema the name of the schema + * @return the list of table handles in the specified schema + */ + List listTableHandles(String schema); +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java new file mode 100644 index 0000000000000..a52a24a0a15b4 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java @@ -0,0 +1,133 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.plugin.clp.ClpConfig; +import com.facebook.presto.plugin.clp.ClpTableHandle; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import static java.lang.String.format; + +public class ClpMySqlMetadataProvider + implements ClpMetadataProvider +{ + // Column names + public static final String COLUMN_METADATA_TABLE_COLUMN_NAME = "name"; + public static final String COLUMN_METADATA_TABLE_COLUMN_TYPE = "type"; + public static final String DATASETS_TABLE_COLUMN_NAME = "name"; + public static final String DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY = "archive_storage_directory"; + + // Table suffixes + public static final String COLUMN_METADATA_TABLE_SUFFIX = "_column_metadata"; + public static final String DATASETS_TABLE_SUFFIX = "datasets"; + + // SQL templates + private static final String SQL_SELECT_COLUMN_METADATA_TEMPLATE = "SELECT * FROM `%s%s" + COLUMN_METADATA_TABLE_SUFFIX + "`"; + private static final String SQL_SELECT_DATASETS_TEMPLATE = format( + "SELECT `%s`, `%s` FROM `%%s%s`", + DATASETS_TABLE_COLUMN_NAME, + DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY, + DATASETS_TABLE_SUFFIX); + + private static final Logger log = Logger.get(ClpMySqlMetadataProvider.class); + + private final ClpConfig config; + + @Inject + public ClpMySqlMetadataProvider(ClpConfig config) + { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } + catch (ClassNotFoundException e) { + log.error(e, "Failed to load MySQL JDBC driver"); + throw new RuntimeException("MySQL JDBC driver not found", e); + } + this.config = config; + } + + @Override + public List listColumnHandles(SchemaTableName schemaTableName) + { + String query = format(SQL_SELECT_COLUMN_METADATA_TEMPLATE, config.getMetadataTablePrefix(), schemaTableName.getTableName()); + ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled()); + try (Connection connection = getConnection(); PreparedStatement statement = connection.prepareStatement(query)) { + try (ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + schemaTree.addColumn( + resultSet.getString(COLUMN_METADATA_TABLE_COLUMN_NAME), + resultSet.getByte(COLUMN_METADATA_TABLE_COLUMN_TYPE)); + } + } + } + catch (SQLException e) { + log.warn("Failed to load table schema for %s: %s", schemaTableName.getTableName(), e); + } + return schemaTree.collectColumnHandles(); + } + + @Override + public List listTableHandles(String schemaName) + { + ImmutableList.Builder tableHandles = new ImmutableList.Builder<>(); + String query = format(SQL_SELECT_DATASETS_TEMPLATE, config.getMetadataTablePrefix()); + try (Connection connection = getConnection(); + Statement statement = connection.createStatement(); + ResultSet resultSet = statement.executeQuery(query)) { + while (resultSet.next()) { + String tableName = resultSet.getString(DATASETS_TABLE_COLUMN_NAME); + String archiveStorageDirectory = resultSet.getString(DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY); + if (isValidIdentifier(tableName) && archiveStorageDirectory != null && !archiveStorageDirectory.isEmpty()) { + tableHandles.add(new ClpTableHandle(new SchemaTableName(schemaName, tableName), archiveStorageDirectory)); + } + else { + log.warn("Ignoring invalid table name found in metadata: %s", tableName); + } + } + } + catch (SQLException e) { + log.warn("Failed to load table names: %s", e); + } + return tableHandles.build(); + } + + private Connection getConnection() + throws SQLException + { + Connection connection = DriverManager.getConnection(config.getMetadataDbUrl(), config.getMetadataDbUser(), config.getMetadataDbPassword()); + String dbName = config.getMetadataDbName(); + if (dbName != null && !dbName.isEmpty()) { + connection.createStatement().execute(format("USE `%s`", dbName)); + } + return connection; + } + + private boolean isValidIdentifier(String identifier) + { + return identifier != null && ClpConfig.SAFE_SQL_TABLE_NAME_PATTERN.matcher(identifier).matches(); + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java new file mode 100644 index 0000000000000..90d0b911d0d41 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java @@ -0,0 +1,209 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.spi.PrestoException; +import com.google.common.collect.ImmutableList; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_TYPE; + +/** + * A representation of CLP's schema tree built by turning hierarchical column names (e.g., a.b.c) + * with their types into a tree. The class handles name/type conflicts when the + * `clp.polymorphic-type-enabled` option is enabled, and maps serialized CLP types to Presto types. + */ +public class ClpSchemaTree +{ + private final ClpNode root; + private final boolean polymorphicTypeEnabled; + + ClpSchemaTree(boolean polymorphicTypeEnabled) + { + this.polymorphicTypeEnabled = polymorphicTypeEnabled; + this.root = new ClpNode(""); // Root doesn't have an original name + } + + /** + * Adds a column to the internal CLP schema tree, creating intermediate nested nodes as needed. + * Handles potential name conflicts when polymorphic types are enabled by suffixing column names + * with type display names. + * + * @param fullName Fully qualified column name using dot notation (e.g., "a.b.c"). + * @param type Serialized byte value representing the CLP column's type. + */ + public void addColumn(String fullName, byte type) + { + Type prestoType = mapColumnType(type); + String[] path = fullName.split("\\."); + ClpNode current = root; + + for (int i = 0; i < path.length - 1; i++) { + String segment = path[i]; + ClpNode existingNode = current.children.get(segment); + + if (polymorphicTypeEnabled && existingNode != null && existingNode.type != null) { + // Conflict: An intermediate segment already exists as a leaf node. Rename it. + String existingSuffix = getTypeSuffix(existingNode.type); + String renamedExisting = segment + "_" + existingSuffix; + current.children.remove(segment); + current.children.put(renamedExisting, existingNode); + } + current = current.children.computeIfAbsent(segment, ClpNode::new); + current.type = null; + } + + String leafName = path[path.length - 1]; + String finalLeafName = resolvePolymorphicConflicts(current, leafName, prestoType); + + ClpNode leaf = new ClpNode(leafName, prestoType); + current.children.put(finalLeafName, leaf); + } + + /** + * Traverses the CLP schema tree and collects all leaf and nested structure nodes into a flat + * list of column handles. For nested structures, builds a RowType from child nodes. + * + * @return List of ClpColumnHandle objects representing the full schema. + */ + public List collectColumnHandles() + { + ImmutableList.Builder columns = new ImmutableList.Builder<>(); + for (Map.Entry entry : root.children.entrySet()) { + String name = entry.getKey(); + ClpNode child = entry.getValue(); + if (child.isLeaf()) { + columns.add(new ClpColumnHandle(name, child.originalName, child.type, true)); + } + else { + Type rowType = buildRowType(child); + columns.add(new ClpColumnHandle(name, child.originalName, rowType, true)); + } + } + return columns.build(); + } + + private Type mapColumnType(byte type) + { + switch (ClpSchemaTreeNodeType.fromType(type)) { + case Integer: + return BIGINT; + case Float: + return DOUBLE; + case ClpString: + case VarString: + case DateString: + case NullValue: + return VARCHAR; + case UnstructuredArray: + return new ArrayType(VARCHAR); + case Boolean: + return BOOLEAN; + default: + throw new PrestoException(CLP_UNSUPPORTED_TYPE, "Unsupported type: " + type); + } + } + + private String resolvePolymorphicConflicts(ClpNode parent, String baseName, Type newType) + { + if (!polymorphicTypeEnabled) { + return baseName; + } + + boolean conflictDetected = false; + if (parent.children.containsKey(baseName)) { + ClpNode existing = parent.children.get(baseName); + if (existing.type == null) { + conflictDetected = true; + } + else if (!existing.type.equals(newType)) { + String existingSuffix = getTypeSuffix(existing.type); + String renamedExisting = baseName + "_" + existingSuffix; + parent.children.remove(baseName); + parent.children.put(renamedExisting, existing); + parent.conflictingBaseNames.add(baseName); + conflictDetected = true; + } + } + else if (parent.conflictingBaseNames.contains(baseName)) { + conflictDetected = true; + } + + if (conflictDetected) { + String newSuffix = getTypeSuffix(newType); + return baseName + "_" + newSuffix; + } + + return baseName; + } + + private String getTypeSuffix(Type type) + { + return (type instanceof ArrayType) ? "array" : type.getDisplayName(); + } + + private Type buildRowType(ClpNode node) + { + List fields = new ArrayList<>(); + List sortedKeys = new ArrayList<>(node.children.keySet()); + Collections.sort(sortedKeys); + + for (String name : sortedKeys) { + ClpNode child = node.children.get(name); + Type fieldType = child.isLeaf() ? child.type : buildRowType(child); + fields.add(new RowType.Field(Optional.of(name), fieldType)); + } + return RowType.from(fields); + } + + private static class ClpNode + { + Type type; // Only non-null for leaf nodes + String originalName; + Map children = new HashMap<>(); + Set conflictingBaseNames = new HashSet<>(); + + ClpNode(String originalName) + { + this.originalName = originalName; + } + + ClpNode(String originalName, Type type) + { + this.originalName = originalName; + this.type = type; + } + + boolean isLeaf() + { + return children.isEmpty(); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTreeNodeType.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTreeNodeType.java new file mode 100644 index 0000000000000..5e034f67caad1 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTreeNodeType.java @@ -0,0 +1,68 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +/** + * The CLP schema-tree node types used in clp-s archives. + */ +public enum ClpSchemaTreeNodeType +{ + Integer((byte) 0), + Float((byte) 1), + ClpString((byte) 2), + VarString((byte) 3), + Boolean((byte) 4), + Object((byte) 5), + UnstructuredArray((byte) 6), + NullValue((byte) 7), + DateString((byte) 8), + StructuredArray((byte) 9); + + private static final ClpSchemaTreeNodeType[] LOOKUP_TABLE; + private final byte type; + + ClpSchemaTreeNodeType(byte type) + { + this.type = type; + } + + public static ClpSchemaTreeNodeType fromType(byte type) + { + if (type < 0 || type >= LOOKUP_TABLE.length || LOOKUP_TABLE[type] == null) { + throw new IllegalArgumentException("Invalid type code: " + type); + } + return LOOKUP_TABLE[type]; + } + + public byte getType() + { + return type; + } + + static { + byte maxType = 0; + for (ClpSchemaTreeNodeType nodeType : values()) { + if (nodeType.type > maxType) { + maxType = nodeType.type; + } + } + + ClpSchemaTreeNodeType[] lookup = new ClpSchemaTreeNodeType[maxType + 1]; + for (ClpSchemaTreeNodeType nodeType : values()) { + lookup[nodeType.type] = nodeType; + } + + LOOKUP_TABLE = lookup; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java new file mode 100644 index 0000000000000..ac646a061ba4b --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java @@ -0,0 +1,99 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.split; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpConfig; +import com.facebook.presto.plugin.clp.ClpSplit; +import com.facebook.presto.plugin.clp.ClpTableHandle; +import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; + +import static java.lang.String.format; + +public class ClpMySqlSplitProvider + implements ClpSplitProvider +{ + // Column names + public static final String ARCHIVES_TABLE_COLUMN_ID = "id"; + + // Table suffixes + public static final String ARCHIVE_TABLE_SUFFIX = "_archives"; + + // SQL templates + private static final String SQL_SELECT_ARCHIVES_TEMPLATE = format("SELECT `%s` FROM `%%s%%s%s`", ARCHIVES_TABLE_COLUMN_ID, ARCHIVE_TABLE_SUFFIX); + + private static final Logger log = Logger.get(ClpMySqlSplitProvider.class); + + private final ClpConfig config; + + @Inject + public ClpMySqlSplitProvider(ClpConfig config) + { + try { + Class.forName("com.mysql.cj.jdbc.Driver"); + } + catch (ClassNotFoundException e) { + log.error(e, "Failed to load MySQL JDBC driver"); + throw new RuntimeException("MySQL JDBC driver not found", e); + } + this.config = config; + } + + @Override + public List listSplits(ClpTableLayoutHandle clpTableLayoutHandle) + { + ImmutableList.Builder splits = new ImmutableList.Builder<>(); + ClpTableHandle clpTableHandle = clpTableLayoutHandle.getTable(); + String tablePath = clpTableHandle.getTablePath(); + String tableName = clpTableHandle.getSchemaTableName().getTableName(); + String archivePathQuery = format(SQL_SELECT_ARCHIVES_TEMPLATE, config.getMetadataTablePrefix(), tableName); + + try (Connection connection = getConnection()) { + // Fetch archive IDs and create splits + try (PreparedStatement statement = connection.prepareStatement(archivePathQuery); ResultSet resultSet = statement.executeQuery()) { + while (resultSet.next()) { + final String archiveId = resultSet.getString(ARCHIVES_TABLE_COLUMN_ID); + final String archivePath = tablePath + "/" + archiveId; + splits.add(new ClpSplit(archivePath)); + } + } + } + catch (SQLException e) { + log.warn("Database error while processing splits for %s: %s", tableName, e); + } + + return splits.build(); + } + + private Connection getConnection() + throws SQLException + { + Connection connection = DriverManager.getConnection(config.getMetadataDbUrl(), config.getMetadataDbUser(), config.getMetadataDbPassword()); + String dbName = config.getMetadataDbName(); + if (dbName != null && !dbName.isEmpty()) { + connection.createStatement().execute(format("USE `%s`", dbName)); + } + return connection; + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java new file mode 100644 index 0000000000000..2868cfeb0a7bc --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.split; + +import com.facebook.presto.plugin.clp.ClpSplit; +import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; + +import java.util.List; + +/** + * A provider for splits from a CLP dataset. + */ +public interface ClpSplitProvider +{ + /** + * @param clpTableLayoutHandle the table layout handle + * @return the list of splits for the specified table layout + */ + List listSplits(ClpTableLayoutHandle clpTableLayoutHandle); +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java new file mode 100644 index 0000000000000..99305fa34a634 --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java @@ -0,0 +1,217 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType; +import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider; +import org.apache.commons.io.FileUtils; +import org.apache.commons.math3.util.Pair; + +import java.io.File; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; +import java.util.Map; + +import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_NAME; +import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_TYPE; +import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY; +import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_NAME; +import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_SUFFIX; +import static com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider.ARCHIVES_TABLE_COLUMN_ID; +import static com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider.ARCHIVE_TABLE_SUFFIX; +import static java.lang.String.format; +import static java.util.UUID.randomUUID; +import static org.testng.Assert.fail; + +public final class ClpMetadataDbSetUp +{ + public static final String METADATA_DB_PASSWORD = ""; + public static final String METADATA_DB_TABLE_PREFIX = "clp_"; + public static final String METADATA_DB_URL_TEMPLATE = "jdbc:h2:file:%s;MODE=MySQL;DATABASE_TO_UPPER=FALSE"; + public static final String METADATA_DB_USER = "sa"; + public static final String ARCHIVE_STORAGE_DIRECTORY_BASE = "/tmp/archives/"; + + private static final Logger log = Logger.get(ClpMetadataDbSetUp.class); + private static final String DATASETS_TABLE_NAME = METADATA_DB_TABLE_PREFIX + DATASETS_TABLE_SUFFIX; + private static final String ARCHIVE_TABLE_COLUMN_PAGINATION_ID = "pagination_id"; + + private ClpMetadataDbSetUp() + { + throw new UnsupportedOperationException("This is a utility class and cannot be instantiated"); + } + + public static DbHandle getDbHandle(String dbName) + { + return new DbHandle(format("/tmp/presto-clp-test-%s/%s", randomUUID(), dbName)); + } + + public static ClpMetadata setupMetadata(DbHandle dbHandle, Map>> clpFields) + { + final String metadataDbUrl = format(METADATA_DB_URL_TEMPLATE, dbHandle.dbPath); + final String columnMetadataTableSuffix = "_column_metadata"; + + try (Connection conn = DriverManager.getConnection(metadataDbUrl, METADATA_DB_USER, METADATA_DB_PASSWORD); Statement stmt = conn.createStatement()) { + createDatasetsTable(stmt); + + for (Map.Entry>> entry : clpFields.entrySet()) { + String tableName = entry.getKey(); + String columnMetadataTableName = METADATA_DB_TABLE_PREFIX + tableName + columnMetadataTableSuffix; + String createColumnMetadataSQL = format( + "CREATE TABLE IF NOT EXISTS %s (" + + " %s VARCHAR(512) NOT NULL," + + " %s TINYINT NOT NULL," + + " PRIMARY KEY (%s, %s))", + columnMetadataTableName, + COLUMN_METADATA_TABLE_COLUMN_NAME, + COLUMN_METADATA_TABLE_COLUMN_TYPE, + COLUMN_METADATA_TABLE_COLUMN_NAME, + COLUMN_METADATA_TABLE_COLUMN_TYPE); + String insertColumnMetadataSQL = format( + "INSERT INTO %s (%s, %s) VALUES (?, ?)", + columnMetadataTableName, + COLUMN_METADATA_TABLE_COLUMN_NAME, + COLUMN_METADATA_TABLE_COLUMN_TYPE); + + stmt.execute(createColumnMetadataSQL); + updateDatasetsTable(conn, tableName); + + try (PreparedStatement pstmt = conn.prepareStatement(insertColumnMetadataSQL)) { + for (Pair record : entry.getValue()) { + pstmt.setString(1, record.getFirst()); + pstmt.setByte(2, record.getSecond().getType()); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + } + } + catch (SQLException e) { + fail(e.getMessage()); + } + + ClpConfig config = new ClpConfig() + .setPolymorphicTypeEnabled(true) + .setMetadataDbUrl(metadataDbUrl) + .setMetadataDbUser(METADATA_DB_USER) + .setMetadataDbPassword(METADATA_DB_PASSWORD) + .setMetadataTablePrefix(METADATA_DB_TABLE_PREFIX); + ClpMetadataProvider metadataProvider = new ClpMySqlMetadataProvider(config); + return new ClpMetadata(config, metadataProvider); + } + + public static ClpMySqlSplitProvider setupSplit(DbHandle dbHandle, Map> splits) + { + final String metadataDbUrl = format(METADATA_DB_URL_TEMPLATE, dbHandle.dbPath); + final String archiveTableFormat = METADATA_DB_TABLE_PREFIX + "%s" + ARCHIVE_TABLE_SUFFIX; + + try (Connection conn = DriverManager.getConnection(metadataDbUrl, METADATA_DB_USER, METADATA_DB_PASSWORD); Statement stmt = conn.createStatement()) { + createDatasetsTable(stmt); + + // Create and populate archive tables + for (Map.Entry> tableSplits : splits.entrySet()) { + String tableName = tableSplits.getKey(); + updateDatasetsTable(conn, tableName); + + String archiveTableName = format(archiveTableFormat, tableSplits.getKey()); + String createArchiveTableSQL = format( + "CREATE TABLE IF NOT EXISTS %s (" + + "%s BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, " + + "%s VARCHAR(64) NOT NULL)", + archiveTableName, + ARCHIVE_TABLE_COLUMN_PAGINATION_ID, + ARCHIVES_TABLE_COLUMN_ID); + + stmt.execute(createArchiveTableSQL); + + String insertArchiveTableSQL = format("INSERT INTO %s (%s) VALUES (?)", archiveTableName, ARCHIVES_TABLE_COLUMN_ID); + try (PreparedStatement pstmt = conn.prepareStatement(insertArchiveTableSQL)) { + for (String splitPath : tableSplits.getValue()) { + pstmt.setString(1, splitPath); + pstmt.addBatch(); + } + pstmt.executeBatch(); + } + } + } + catch (SQLException e) { + fail(e.getMessage()); + } + + return new ClpMySqlSplitProvider( + new ClpConfig() + .setPolymorphicTypeEnabled(true) + .setMetadataDbUrl(metadataDbUrl) + .setMetadataDbUser(METADATA_DB_USER) + .setMetadataDbPassword(METADATA_DB_PASSWORD) + .setMetadataTablePrefix(METADATA_DB_TABLE_PREFIX)); + } + + public static void tearDown(DbHandle dbHandle) + { + File dir = new File(dbHandle.dbPath).getParentFile(); + if (dir.exists()) { + try { + FileUtils.deleteDirectory(dir); + log.info("Deleted database dir" + dir.getAbsolutePath()); + } + catch (IOException e) { + log.warn("Failed to delete directory " + dir + ": " + e.getMessage()); + } + } + } + + private static void createDatasetsTable(Statement stmt) + throws SQLException + { + final String createDatasetsTableSql = format( + "CREATE TABLE IF NOT EXISTS %s (%s VARCHAR(255) PRIMARY KEY, %s VARCHAR(4096) NOT NULL)", + DATASETS_TABLE_NAME, + DATASETS_TABLE_COLUMN_NAME, + DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY); + stmt.execute(createDatasetsTableSql); + } + + private static void updateDatasetsTable(Connection conn, String tableName) + throws SQLException + { + final String insertDatasetsTableSql = format( + "INSERT INTO %s (%s, %s) VALUES (?, ?)", + DATASETS_TABLE_NAME, + DATASETS_TABLE_COLUMN_NAME, + DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY); + try (PreparedStatement pstmt = conn.prepareStatement(insertDatasetsTableSql)) { + pstmt.setString(1, tableName); + pstmt.setString(2, ARCHIVE_STORAGE_DIRECTORY_BASE + tableName); + pstmt.executeUpdate(); + } + } + + public static final class DbHandle + { + public String dbPath; + + private DbHandle(String dbPath) + { + this.dbPath = dbPath; + } + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java new file mode 100644 index 0000000000000..e8dd126da6ff2 --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java @@ -0,0 +1,138 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.RowType; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.math3.util.Pair; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.Optional; + +import static com.facebook.presto.common.type.BigintType.BIGINT; +import static com.facebook.presto.common.type.BooleanType.BOOLEAN; +import static com.facebook.presto.common.type.DoubleType.DOUBLE; +import static com.facebook.presto.common.type.VarcharType.VARCHAR; +import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.DbHandle; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.getDbHandle; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.setupMetadata; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Boolean; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.ClpString; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.UnstructuredArray; +import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString; +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestClpMetadata +{ + private static final String TABLE_NAME = "test"; + private DbHandle dbHandle; + private ClpMetadata metadata; + + @BeforeMethod + public void setUp() + { + dbHandle = getDbHandle("metadata_testdb"); + metadata = setupMetadata( + dbHandle, + ImmutableMap.of( + TABLE_NAME, + ImmutableList.of( + new Pair<>("a", Integer), + new Pair<>("a", ClpString), + new Pair<>("b", Float), + new Pair<>("b", ClpString), + new Pair<>("c.d", Boolean), + new Pair<>("c.e", VarString), + new Pair<>("f.g.h", UnstructuredArray)))); + } + + @AfterMethod + public void tearDown() + { + ClpMetadataDbSetUp.tearDown(dbHandle); + } + + @Test + public void testListSchemaNames() + { + assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of(DEFAULT_SCHEMA_NAME)); + } + + @Test + public void testListTables() + { + HashSet tables = new HashSet<>(); + tables.add(new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME)); + assertEquals(new HashSet<>(metadata.listTables(SESSION, Optional.empty())), tables); + } + + @Test + public void testGetTableMetadata() + { + ClpTableHandle clpTableHandle = (ClpTableHandle) metadata.getTableHandle(SESSION, new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME)); + ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, clpTableHandle); + ImmutableSet columnMetadata = ImmutableSet.builder() + .add(ColumnMetadata.builder() + .setName("a_bigint") + .setType(BIGINT) + .setNullable(true) + .build()) + .add(ColumnMetadata.builder() + .setName("a_varchar") + .setType(VARCHAR) + .setNullable(true) + .build()) + .add(ColumnMetadata.builder() + .setName("b_double") + .setType(DOUBLE) + .setNullable(true) + .build()) + .add(ColumnMetadata.builder() + .setName("b_varchar") + .setType(VARCHAR) + .setNullable(true) + .build()) + .add(ColumnMetadata.builder() + .setName("c") + .setType(RowType.from(ImmutableList.of( + RowType.field("d", BOOLEAN), + RowType.field("e", VARCHAR)))) + .setNullable(true) + .build()) + .add(ColumnMetadata.builder() + .setName("f") + .setType(RowType.from(ImmutableList.of( + RowType.field("g", + RowType.from(ImmutableList.of( + RowType.field("h", new ArrayType(VARCHAR)))))))) + .setNullable(true) + .build()) + .build(); + assertEquals(columnMetadata, ImmutableSet.copyOf(tableMetadata.getColumns())); + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java new file mode 100644 index 0000000000000..7dcd3c768d4e9 --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java @@ -0,0 +1,94 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableSet; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.ARCHIVE_STORAGE_DIRECTORY_BASE; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.DbHandle; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.getDbHandle; +import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.setupSplit; +import static org.testng.Assert.assertEquals; + +@Test(singleThreaded = true) +public class TestClpSplit +{ + private DbHandle dbHandle; + private ClpSplitProvider clpSplitProvider; + private Map> tableSplits; + + @BeforeMethod + public void setUp() + { + dbHandle = getDbHandle("split_testdb"); + tableSplits = new HashMap<>(); + + int numKeys = 3; + int numValuesPerKey = 10; + + for (int i = 0; i < numKeys; i++) { + String key = "test_" + i; + List values = new ArrayList<>(); + + for (int j = 0; j < numValuesPerKey; j++) { + values.add("id_" + j); + } + + tableSplits.put(key, values); + } + clpSplitProvider = setupSplit(dbHandle, tableSplits); + } + + @AfterMethod + public void tearDown() + { + ClpMetadataDbSetUp.tearDown(dbHandle); + } + + @Test + public void testListSplits() + { + for (Map.Entry> entry : tableSplits.entrySet()) { + String tableName = entry.getKey(); + String tablePath = ARCHIVE_STORAGE_DIRECTORY_BASE + tableName; + List expectedSplits = entry.getValue(); + ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle( + new ClpTableHandle(new SchemaTableName(DEFAULT_SCHEMA_NAME, tableName), tablePath), Optional.empty()); + List splits = clpSplitProvider.listSplits(layoutHandle); + assertEquals(splits.size(), expectedSplits.size()); + + ImmutableSet actualSplitPaths = splits.stream() + .map(ClpSplit::getPath) + .collect(ImmutableSet.toImmutableSet()); + + ImmutableSet expectedSplitPaths = expectedSplits.stream() + .map(split -> tablePath + "/" + split) + .collect(ImmutableSet.toImmutableSet()); + + assertEquals(actualSplitPaths, expectedSplitPaths); + } + } +} diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst index d337fe4ed12d1..a45ab8f9bafa4 100644 --- a/presto-docs/src/main/sphinx/connector.rst +++ b/presto-docs/src/main/sphinx/connector.rst @@ -14,6 +14,7 @@ from different data sources. connector/blackhole connector/cassandra connector/clickhouse + connector/clp connector/deltalake connector/druid connector/elasticsearch diff --git a/presto-docs/src/main/sphinx/connector/clp.rst b/presto-docs/src/main/sphinx/connector/clp.rst new file mode 100644 index 0000000000000..e1beb5c180c1b --- /dev/null +++ b/presto-docs/src/main/sphinx/connector/clp.rst @@ -0,0 +1,179 @@ +============= +CLP Connector +============= + +.. contents:: + :local: + :backlinks: none + :depth: 1 + +Overview +-------- + +The CLP Connector enables SQL-based querying of `CLP `_ archives via Presto. This +document describes how to configure the CLP Connector for use with a CLP cluster, as well as essential details for +understanding the CLP connector. + + +Configuration +------------- + +To configure the CLP connector, create a catalog properties file ``etc/catalog/clp.properties`` with at least the +following contents, modifying the properties as appropriate: + +.. code-block:: ini + + connector.name=clp + clp.metadata-provider-type=mysql + clp.metadata-db-url=jdbc:mysql://localhost:3306 + clp.metadata-db-name=clp_db + clp.metadata-db-user=clp_user + clp.metadata-db-password=clp_password + clp.metadata-table-prefix=clp_ + clp.split-provider-type=mysql + + +Configuration Properties +------------------------ + +The following configuration properties are available: + +================================== ======================================================================== ========= +Property Name Description Default +================================== ======================================================================== ========= +``clp.polymorphic-type-enabled`` Enables or disables support for polymorphic types in CLP, allowing the ``false`` + same field to have different types. This is useful for schema-less, + semi-structured data where the same field may appear with different + types. + + When enabled, type annotations are added to conflicting field names to + distinguish between types. For example, if an ``id`` column appears with + both an ``int`` and ``string`` types, the connector will create two + columns named ``id_bigint`` and ``id_varchar``. + + Supported type annotations include ``bigint``, ``varchar``, ``double``, + ``boolean``, and ``array(varchar)`` (See `Data Types`_ for details). For + columns with only one type, the original column name is used. +``clp.metadata-provider-type`` Specifies the metadata provider type. Currently, the only supported ``mysql`` + type is a MySQL database, which is also used by the CLP package to store + metadata. Additional providers can be supported by implementing the + ``ClpMetadataProvider`` interface. +``clp.metadata-db-url`` The JDBC URL used to connect to the metadata database. This property is + required if ``clp.metadata-provider-type`` is set to ``mysql``. +``clp.metadata-db-name`` The name of the metadata database. This option is required if + ``clp.metadata-provider-type`` is set to ``mysql`` and the database name + is not specified in the URL. +``clp.metadata-db-user`` The database user with access to the metadata database. This option is + required if ``clp.metadata-provider-type`` is set to ``mysql`` and the + database name is not specified in the URL. +``clp.metadata-db-password`` The password for the metadata database user. This option is required if + ``clp.metadata-provider-type`` is set to ``mysql``. +``clp.metadata-table-prefix`` A string prefix prepended to all metadata table names when querying the + database. Useful for namespacing or avoiding collisions. This option is + required if ``clp.metadata-provider-type`` is set to ``mysql``. +``clp.metadata-expire-interval`` Defines how long, in seconds, metadata entries remain valid before they 600 + need to be refreshed. +``clp.metadata-refresh-interval`` Specifies how frequently metadata is refreshed from the source, in 60 + seconds. Set this to a lower value for frequently changing datasets or + to a higher value to reduce load. +``clp.split-provider-type`` Specifies the split provider type. By default, it uses the same type as ``mysql`` + the metadata provider with the same connection parameters. Additional + types can be supported by implementing the ``ClpSplitProvider`` + interface. +================================== ======================================================================== ========= + + +Metadata and Split Providers +---------------------------- + +The CLP connector relies on metadata and split providers to retrieve information from various sources. By default, it +uses a MySQL database for both metadata and split storage. We recommend using the CLP package for log ingestion, which +automatically populates the database with the required information. + +If you prefer to use a different source--or the same source with a custom implementation--you can provide your own +implementations of the ``ClpMetadataProvider`` and ``ClpSplitProvider`` interfaces, and configure the connector +accordingly. + +Data Types +---------- + +The data type mappings are as follows: + +====================== ==================== +CLP Type Presto Type +====================== ==================== +``Integer`` ``BIGINT`` +``Float`` ``DOUBLE`` +``ClpString`` ``VARCHAR`` +``VarString`` ``VARCHAR`` +``DateString`` ``VARCHAR`` +``Boolean`` ``BOOLEAN`` +``UnstructuredArray`` ``ARRAY(VARCHAR)`` +``Object`` ``ROW`` +(others) (unsupported) +====================== ==================== + +String Types +^^^^^^^^^^^^ + +CLP uses three distinct string types: ``ClpString`` (strings with whitespace), ``VarString`` (strings without +whitespace), and ``DateString`` (strings representing dates). Currently, all three are mapped to Presto's ``VARCHAR`` +type. + +Array Types +^^^^^^^^^^^ + +CLP supports two array types: ``UnstructuredArray`` and ``StructuredArray``. Unstructured arrays are stored as strings +in CLP and elements can be any type. However, in Presto arrays are homogeneous, so the elements are converted to strings +when read. ``StructuredArray`` type is not supported in Presto. + +Object Types +^^^^^^^^^^^^ + +CLP stores metadata using a global schema tree structure that captures all possible fields from various log structures. +Internal nodes may represent objects containing nested fields as their children. In Presto, we map these internal object +nodes to the ``ROW`` data type, including all subfields as fields within the ``ROW``. + +For instance, consider a table containing two distinct JSON log types: + +Log Type 1: + +.. code-block:: json + + { + "msg": { + "ts": 0, + "status": "ok" + } + } + +Log Type 2: + +.. code-block:: json + + { + "msg": { + "ts": 1, + "status": "error", + "thread_num": 4, + "backtrace": "" + } + } + +In CLP's schema tree, these two structures are combined into a unified internal node (``msg``) with four child nodes: +``ts``, ``status``, ``thread_num`` and ``backtrace``. In Presto, we represent this combined structure using the +following ``ROW`` type: + +.. code-block:: sql + + ROW(ts BIGINT, status VARCHAR, thread_num BIGINT, backtrace VARCHAR) + +Each JSON log maps to this unified ``ROW`` type, with absent fields represented as ``NULL``. The child nodes (``ts``, +``status``, ``thread_num``, ``backtrace``) become fields within the ``ROW``, clearly reflecting the nested and varying +structures of the original JSON logs. + +SQL support +----------- + +The connector only provides read access to data. It does not support DDL operations, such as creating or dropping +tables. Currently, we only support one ``default`` schema. diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml index c66cf1159ce5e..4ff13686fee21 100644 --- a/presto-server/src/main/provisio/presto.xml +++ b/presto-server/src/main/provisio/presto.xml @@ -233,6 +233,12 @@ + + + + + +