diff --git a/pom.xml b/pom.xml
index a9a936da2315c..f57eb32378792 100644
--- a/pom.xml
+++ b/pom.xml
@@ -215,6 +215,7 @@
presto-base-arrow-flight
presto-function-server
presto-router-example-plugin-scheduler
+ presto-clp
diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml
new file mode 100644
index 0000000000000..bc0c83a94ef68
--- /dev/null
+++ b/presto-clp/pom.xml
@@ -0,0 +1,134 @@
+
+
+ 4.0.0
+
+
+ com.facebook.presto
+ presto-root
+ 0.293
+
+
+ presto-clp
+ Presto - CLP Connector
+ presto-plugin
+
+
+ ${project.parent.basedir}
+
+
+
+
+ com.mysql
+ mysql-connector-j
+ runtime
+
+
+
+ com.facebook.airlift
+ bootstrap
+
+
+
+ com.facebook.airlift
+ json
+
+
+
+ com.facebook.airlift
+ log
+
+
+
+ com.facebook.airlift
+ configuration
+
+
+
+ com.google.inject
+ guice
+
+
+
+ com.google.guava
+ guava
+
+
+
+ javax.inject
+ javax.inject
+
+
+
+ com.fasterxml.jackson.core
+ jackson-annotations
+ provided
+
+
+
+ com.facebook.presto
+ presto-spi
+ provided
+
+
+
+ com.facebook.presto
+ presto-common
+ provided
+
+
+
+ io.airlift
+ units
+ provided
+
+
+
+ io.airlift
+ slice
+ provided
+
+
+
+ org.testng
+ testng
+ test
+
+
+
+ com.facebook.presto
+ presto-main-base
+ test
+
+
+
+ com.facebook.presto
+ presto-analyzer
+ test
+
+
+
+ com.h2database
+ h2
+ test
+
+
+
+ com.facebook.presto
+ presto-parser
+ test
+
+
+
+ org.apache.commons
+ commons-math3
+ test
+
+
+
+ commons-io
+ commons-io
+ test
+
+
+
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java
new file mode 100644
index 0000000000000..ccd9e1d607635
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpColumnHandle.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ClpColumnHandle
+ implements ColumnHandle
+{
+ private final String columnName;
+ private final String originalColumnName;
+ private final Type columnType;
+ private final boolean nullable;
+
+ @JsonCreator
+ public ClpColumnHandle(
+ @JsonProperty("columnName") String columnName,
+ @JsonProperty("originalColumnName") String originalColumnName,
+ @JsonProperty("columnType") Type columnType,
+ @JsonProperty("nullable") boolean nullable)
+ {
+ this.columnName = columnName;
+ this.originalColumnName = originalColumnName;
+ this.columnType = columnType;
+ this.nullable = nullable;
+ }
+
+ public ClpColumnHandle(String columnName, Type columnType, boolean nullable)
+ {
+ this(columnName, columnName, columnType, nullable);
+ }
+
+ @JsonProperty
+ public String getColumnName()
+ {
+ return columnName;
+ }
+
+ @JsonProperty
+ public String getOriginalColumnName()
+ {
+ return originalColumnName;
+ }
+
+ @JsonProperty
+ public Type getColumnType()
+ {
+ return columnType;
+ }
+
+ @JsonProperty
+ public boolean isNullable()
+ {
+ return nullable;
+ }
+
+ public ColumnMetadata getColumnMetadata()
+ {
+ return ColumnMetadata.builder()
+ .setName(columnName)
+ .setType(columnType)
+ .setNullable(nullable)
+ .build();
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(columnName, originalColumnName, columnType, nullable);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ClpColumnHandle other = (ClpColumnHandle) obj;
+ return Objects.equals(this.columnName, other.columnName) &&
+ Objects.equals(this.originalColumnName, other.originalColumnName) &&
+ Objects.equals(this.columnType, other.columnType) &&
+ Objects.equals(this.nullable, other.nullable);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("columnName", columnName)
+ .add("originalColumnName", originalColumnName)
+ .add("columnType", columnType)
+ .add("nullable", nullable)
+ .toString();
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java
new file mode 100644
index 0000000000000..761c4702a64b2
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.airlift.configuration.Config;
+import com.facebook.presto.spi.PrestoException;
+
+import java.util.regex.Pattern;
+
+public class ClpConfig
+{
+ public static final Pattern SAFE_SQL_TABLE_NAME_PATTERN = Pattern.compile("^[a-zA-Z0-9_]+$");
+
+ private boolean polymorphicTypeEnabled = true;
+
+ private MetadataProviderType metadataProviderType = MetadataProviderType.MYSQL;
+ private String metadataDbUrl;
+ private String metadataDbName;
+ private String metadataDbUser;
+ private String metadataDbPassword;
+ private String metadataTablePrefix;
+ private long metadataRefreshInterval = 60;
+ private long metadataExpireInterval = 600;
+
+ private SplitProviderType splitProviderType = SplitProviderType.MYSQL;
+
+ public boolean isPolymorphicTypeEnabled()
+ {
+ return polymorphicTypeEnabled;
+ }
+
+ @Config("clp.polymorphic-type-enabled")
+ public ClpConfig setPolymorphicTypeEnabled(boolean polymorphicTypeEnabled)
+ {
+ this.polymorphicTypeEnabled = polymorphicTypeEnabled;
+ return this;
+ }
+
+ public MetadataProviderType getMetadataProviderType()
+ {
+ return metadataProviderType;
+ }
+
+ @Config("clp.metadata-provider-type")
+ public ClpConfig setMetadataProviderType(MetadataProviderType metadataProviderType)
+ {
+ this.metadataProviderType = metadataProviderType;
+ return this;
+ }
+
+ public String getMetadataDbUrl()
+ {
+ return metadataDbUrl;
+ }
+
+ @Config("clp.metadata-db-url")
+ public ClpConfig setMetadataDbUrl(String metadataDbUrl)
+ {
+ this.metadataDbUrl = metadataDbUrl;
+ return this;
+ }
+
+ public String getMetadataDbName()
+ {
+ return metadataDbName;
+ }
+
+ @Config("clp.metadata-db-name")
+ public ClpConfig setMetadataDbName(String metadataDbName)
+ {
+ this.metadataDbName = metadataDbName;
+ return this;
+ }
+
+ public String getMetadataDbUser()
+ {
+ return metadataDbUser;
+ }
+
+ @Config("clp.metadata-db-user")
+ public ClpConfig setMetadataDbUser(String metadataDbUser)
+ {
+ this.metadataDbUser = metadataDbUser;
+ return this;
+ }
+
+ public String getMetadataDbPassword()
+ {
+ return metadataDbPassword;
+ }
+
+ @Config("clp.metadata-db-password")
+ public ClpConfig setMetadataDbPassword(String metadataDbPassword)
+ {
+ this.metadataDbPassword = metadataDbPassword;
+ return this;
+ }
+
+ public String getMetadataTablePrefix()
+ {
+ return metadataTablePrefix;
+ }
+
+ @Config("clp.metadata-table-prefix")
+ public ClpConfig setMetadataTablePrefix(String metadataTablePrefix)
+ {
+ if (metadataTablePrefix == null || !SAFE_SQL_TABLE_NAME_PATTERN.matcher(metadataTablePrefix).matches()) {
+ throw new PrestoException(
+ ClpErrorCode.CLP_UNSUPPORTED_CONFIG_OPTION,
+ "Invalid metadataTablePrefix: " + metadataTablePrefix + ". Only alphanumeric characters and underscores are allowed.");
+ }
+
+ this.metadataTablePrefix = metadataTablePrefix;
+ return this;
+ }
+
+ public long getMetadataRefreshInterval()
+ {
+ return metadataRefreshInterval;
+ }
+
+ @Config("clp.metadata-refresh-interval")
+ public ClpConfig setMetadataRefreshInterval(long metadataRefreshInterval)
+ {
+ this.metadataRefreshInterval = metadataRefreshInterval;
+ return this;
+ }
+
+ public long getMetadataExpireInterval()
+ {
+ return metadataExpireInterval;
+ }
+
+ @Config("clp.metadata-expire-interval")
+ public ClpConfig setMetadataExpireInterval(long metadataExpireInterval)
+ {
+ this.metadataExpireInterval = metadataExpireInterval;
+ return this;
+ }
+
+ public SplitProviderType getSplitProviderType()
+ {
+ return splitProviderType;
+ }
+
+ @Config("clp.split-provider-type")
+ public ClpConfig setSplitProviderType(SplitProviderType splitProviderType)
+ {
+ this.splitProviderType = splitProviderType;
+ return this;
+ }
+
+ public enum MetadataProviderType
+ {
+ MYSQL
+ }
+
+ public enum SplitProviderType
+ {
+ MYSQL
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java
new file mode 100644
index 0000000000000..335bc50b96f50
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnector.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.airlift.bootstrap.LifeCycleManager;
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+import com.facebook.presto.spi.transaction.IsolationLevel;
+
+import javax.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+public class ClpConnector
+ implements Connector
+{
+ private static final Logger log = Logger.get(ClpConnector.class);
+
+ private final LifeCycleManager lifeCycleManager;
+ private final ClpMetadata metadata;
+ private final ClpRecordSetProvider recordSetProvider;
+ private final ClpSplitManager splitManager;
+
+ @Inject
+ public ClpConnector(LifeCycleManager lifeCycleManager, ClpMetadata metadata, ClpRecordSetProvider recordSetProvider, ClpSplitManager splitManager)
+ {
+ this.lifeCycleManager = requireNonNull(lifeCycleManager, "lifeCycleManager is null");
+ this.metadata = requireNonNull(metadata, "metadata is null");
+ this.recordSetProvider = requireNonNull(recordSetProvider, "recordSetProvider is null");
+ this.splitManager = requireNonNull(splitManager, "splitManager is null");
+ }
+
+ @Override
+ public ConnectorTransactionHandle beginTransaction(IsolationLevel isolationLevel, boolean readOnly)
+ {
+ return ClpTransactionHandle.INSTANCE;
+ }
+
+ @Override
+ public ConnectorMetadata getMetadata(ConnectorTransactionHandle transactionHandle)
+ {
+ return metadata;
+ }
+
+ @Override
+ public ConnectorRecordSetProvider getRecordSetProvider()
+ {
+ return recordSetProvider;
+ }
+
+ @Override
+ public ConnectorSplitManager getSplitManager()
+ {
+ return splitManager;
+ }
+
+ @Override
+ public final void shutdown()
+ {
+ try {
+ lifeCycleManager.stop();
+ }
+ catch (Exception e) {
+ log.error(e, "Error shutting down connector");
+ }
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java
new file mode 100644
index 0000000000000..bec007135bccd
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConnectorFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.airlift.bootstrap.Bootstrap;
+import com.facebook.airlift.json.JsonModule;
+import com.facebook.presto.common.type.TypeManager;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.NodeManager;
+import com.facebook.presto.spi.connector.Connector;
+import com.facebook.presto.spi.connector.ConnectorContext;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.facebook.presto.spi.function.FunctionMetadataManager;
+import com.facebook.presto.spi.function.StandardFunctionResolution;
+import com.facebook.presto.spi.relation.RowExpressionService;
+import com.google.inject.Injector;
+
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+public class ClpConnectorFactory
+ implements ConnectorFactory
+{
+ @Override
+ public String getName()
+ {
+ return "clp";
+ }
+
+ @Override
+ public ConnectorHandleResolver getHandleResolver()
+ {
+ return new ClpHandleResolver();
+ }
+
+ @Override
+ public Connector create(String catalogName, Map config, ConnectorContext context)
+ {
+ requireNonNull(catalogName, "catalogName is null");
+ requireNonNull(config, "config is null");
+ try {
+ Bootstrap app = new Bootstrap(new JsonModule(), new ClpModule(), binder -> {
+ binder.bind(FunctionMetadataManager.class).toInstance(context.getFunctionMetadataManager());
+ binder.bind(NodeManager.class).toInstance(context.getNodeManager());
+ binder.bind(RowExpressionService.class).toInstance(context.getRowExpressionService());
+ binder.bind(StandardFunctionResolution.class).toInstance(context.getStandardFunctionResolution());
+ binder.bind(TypeManager.class).toInstance(context.getTypeManager());
+ });
+
+ Injector injector = app.doNotInitializeLogging().setRequiredConfigurationProperties(config).initialize();
+
+ return injector.getInstance(ClpConnector.class);
+ }
+ catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java
new file mode 100644
index 0000000000000..8cb2438277404
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.common.ErrorCode;
+import com.facebook.presto.common.ErrorType;
+import com.facebook.presto.spi.ErrorCodeSupplier;
+
+import static com.facebook.presto.common.ErrorType.EXTERNAL;
+
+public enum ClpErrorCode
+ implements ErrorCodeSupplier
+{
+ CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION(0, EXTERNAL),
+ CLP_UNSUPPORTED_METADATA_SOURCE(1, EXTERNAL),
+ CLP_UNSUPPORTED_SPLIT_SOURCE(2, EXTERNAL),
+ CLP_UNSUPPORTED_TYPE(3, EXTERNAL),
+ CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL);
+
+ private final ErrorCode errorCode;
+
+ ClpErrorCode(int code, ErrorType type)
+ {
+ errorCode = new ErrorCode(code + 0x0400_0000, name(), type);
+ }
+
+ @Override
+ public ErrorCode toErrorCode()
+ {
+ return errorCode;
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java
new file mode 100644
index 0000000000000..462ecc039b9c6
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpHandleResolver.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorHandleResolver;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public class ClpHandleResolver
+ implements ConnectorHandleResolver
+{
+ @Override
+ public Class extends ConnectorTableHandle> getTableHandleClass()
+ {
+ return ClpTableHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorTableLayoutHandle> getTableLayoutHandleClass()
+ {
+ return ClpTableLayoutHandle.class;
+ }
+
+ @Override
+ public Class extends ColumnHandle> getColumnHandleClass()
+ {
+ return ClpColumnHandle.class;
+ }
+
+ @Override
+ public Class extends ConnectorSplit> getSplitClass()
+ {
+ return ClpSplit.class;
+ }
+
+ @Override
+ public Class extends ConnectorTransactionHandle> getTransactionHandleClass()
+ {
+ return ClpTransactionHandle.class;
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java
new file mode 100644
index 0000000000000..73c94dbdfba1d
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpMetadata.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.ConnectorTableLayout;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.ConnectorTableLayoutResult;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.Constraint;
+import com.facebook.presto.spi.SchemaTableName;
+import com.facebook.presto.spi.SchemaTablePrefix;
+import com.facebook.presto.spi.connector.ConnectorMetadata;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import javax.inject.Inject;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.Function;
+
+import static com.google.common.collect.ImmutableList.toImmutableList;
+import static com.google.common.collect.ImmutableMap.toImmutableMap;
+import static java.util.Objects.requireNonNull;
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+/**
+ * For efficiency, this class maintains two caches for metadata from the
+ * {@link ClpMetadataProvider}:
+ *
+ * - columnHandleCache: Maps {@link SchemaTableName} to a list of {@link ClpColumnHandle}s.
+ *
+ * - tableHandleCache: Maps schema names to a list of {@link ClpTableHandle}s.
+ *
+ */
+public class ClpMetadata
+ implements ConnectorMetadata
+{
+ public static final String DEFAULT_SCHEMA_NAME = "default";
+
+ private final ClpMetadataProvider clpMetadataProvider;
+ private final LoadingCache> columnHandleCache;
+ private final LoadingCache> tableHandleCache;
+
+ @Inject
+ public ClpMetadata(ClpConfig clpConfig, ClpMetadataProvider clpMetadataProvider)
+ {
+ this.columnHandleCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(clpConfig.getMetadataExpireInterval(), SECONDS)
+ .refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS)
+ .build(CacheLoader.from(this::loadColumnHandles));
+ this.tableHandleCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(clpConfig.getMetadataExpireInterval(), SECONDS)
+ .refreshAfterWrite(clpConfig.getMetadataRefreshInterval(), SECONDS)
+ .build(CacheLoader.from(this::loadTableHandles));
+
+ this.clpMetadataProvider = requireNonNull(clpMetadataProvider, "ClpMetadataProvider is null");
+ }
+
+ @Override
+ public List listSchemaNames(ConnectorSession session)
+ {
+ return ImmutableList.of(DEFAULT_SCHEMA_NAME);
+ }
+
+ @Override
+ public List listTables(ConnectorSession session, Optional schemaName)
+ {
+ String schemaNameValue = schemaName.orElse(DEFAULT_SCHEMA_NAME);
+ if (!listSchemaNames(session).contains(schemaNameValue)) {
+ return ImmutableList.of();
+ }
+
+ return listTables(schemaNameValue).stream()
+ .map(tableHandle -> new SchemaTableName(schemaNameValue, tableHandle.getSchemaTableName().getTableName()))
+ .collect(toImmutableList());
+ }
+
+ @Override
+ public ConnectorTableHandle getTableHandle(ConnectorSession session, SchemaTableName tableName)
+ {
+ String schemaName = tableName.getSchemaName();
+ if (!listSchemaNames(session).contains(schemaName)) {
+ return null;
+ }
+
+ return listTables(schemaName).stream()
+ .filter(tableHandle -> tableHandle.getSchemaTableName().equals(tableName))
+ .findFirst()
+ .orElse(null);
+ }
+
+ @Override
+ public ConnectorTableLayoutResult getTableLayoutForConstraint(
+ ConnectorSession session,
+ ConnectorTableHandle table,
+ Constraint constraint,
+ Optional> desiredColumns)
+ {
+ ClpTableHandle tableHandle = (ClpTableHandle) table;
+ ConnectorTableLayout layout = new ConnectorTableLayout(new ClpTableLayoutHandle(tableHandle, Optional.empty()));
+ return new ConnectorTableLayoutResult(layout, constraint.getSummary());
+ }
+
+ @Override
+ public ConnectorTableLayout getTableLayout(ConnectorSession session, ConnectorTableLayoutHandle handle)
+ {
+ return new ConnectorTableLayout(handle);
+ }
+
+ @Override
+ public ConnectorTableMetadata getTableMetadata(ConnectorSession session, ConnectorTableHandle table)
+ {
+ ClpTableHandle clpTableHandle = (ClpTableHandle) table;
+ SchemaTableName schemaTableName = clpTableHandle.getSchemaTableName();
+ List columns = listColumns(schemaTableName).stream()
+ .map(ClpColumnHandle::getColumnMetadata)
+ .collect(toImmutableList());
+
+ return new ConnectorTableMetadata(schemaTableName, columns);
+ }
+
+ @Override
+ public Map> listTableColumns(ConnectorSession session, SchemaTablePrefix prefix)
+ {
+ requireNonNull(prefix, "prefix is null");
+ String schemaName = prefix.getSchemaName();
+ if (schemaName != null && !listSchemaNames(session).contains(schemaName)) {
+ return ImmutableMap.of();
+ }
+
+ List schemaTableNames;
+ if (prefix.getTableName() == null) {
+ schemaTableNames = listTables(session, Optional.ofNullable(prefix.getSchemaName()));
+ }
+ else {
+ SchemaTableName table = new SchemaTableName(schemaName, prefix.getTableName());
+ if (listTables(session, Optional.ofNullable(schemaName)).contains(table)) {
+ schemaTableNames = ImmutableList.of(table);
+ }
+ else {
+ schemaTableNames = ImmutableList.of();
+ }
+ }
+
+ return schemaTableNames.stream()
+ .collect(toImmutableMap(
+ Function.identity(),
+ tableName -> getTableMetadata(session, getTableHandle(session, tableName)).getColumns()));
+ }
+
+ @Override
+ public Map getColumnHandles(ConnectorSession session, ConnectorTableHandle tableHandle)
+ {
+ ClpTableHandle clpTableHandle = (ClpTableHandle) tableHandle;
+ return listColumns(clpTableHandle.getSchemaTableName()).stream().collect(toImmutableMap(ClpColumnHandle::getColumnName, column -> column));
+ }
+
+ @Override
+ public ColumnMetadata getColumnMetadata(ConnectorSession session, ConnectorTableHandle tableHandle, ColumnHandle columnHandle)
+ {
+ ClpColumnHandle clpColumnHandle = (ClpColumnHandle) columnHandle;
+ return clpColumnHandle.getColumnMetadata();
+ }
+
+ private List loadColumnHandles(SchemaTableName schemaTableName)
+ {
+ return clpMetadataProvider.listColumnHandles(schemaTableName);
+ }
+
+ private List loadTableHandles(String schemaName)
+ {
+ return clpMetadataProvider.listTableHandles(schemaName);
+ }
+
+ private List listColumns(SchemaTableName schemaTableName)
+ {
+ return columnHandleCache.getUnchecked(schemaTableName);
+ }
+
+ private List listTables(String schemaName)
+ {
+ return tableHandleCache.getUnchecked(schemaName);
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java
new file mode 100644
index 0000000000000..fa0386376f12a
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.airlift.configuration.AbstractConfigurationAwareModule;
+import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
+import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
+import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
+import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
+import com.facebook.presto.spi.PrestoException;
+import com.google.inject.Binder;
+import com.google.inject.Scopes;
+
+import static com.facebook.airlift.configuration.ConfigBinder.configBinder;
+import static com.facebook.presto.plugin.clp.ClpConfig.MetadataProviderType;
+import static com.facebook.presto.plugin.clp.ClpConfig.SplitProviderType;
+import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_METADATA_SOURCE;
+import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_SPLIT_SOURCE;
+
+public class ClpModule
+ extends AbstractConfigurationAwareModule
+{
+ @Override
+ protected void setup(Binder binder)
+ {
+ binder.bind(ClpConnector.class).in(Scopes.SINGLETON);
+ binder.bind(ClpMetadata.class).in(Scopes.SINGLETON);
+ binder.bind(ClpRecordSetProvider.class).in(Scopes.SINGLETON);
+ binder.bind(ClpSplitManager.class).in(Scopes.SINGLETON);
+ configBinder(binder).bindConfig(ClpConfig.class);
+
+ ClpConfig config = buildConfigObject(ClpConfig.class);
+ if (config.getMetadataProviderType() == MetadataProviderType.MYSQL) {
+ binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON);
+ }
+ else {
+ throw new PrestoException(CLP_UNSUPPORTED_METADATA_SOURCE, "Unsupported metadata provider type: " + config.getMetadataProviderType());
+ }
+
+ if (config.getSplitProviderType() == SplitProviderType.MYSQL) {
+ binder.bind(ClpSplitProvider.class).to(ClpMySqlSplitProvider.class).in(Scopes.SINGLETON);
+ }
+ else {
+ throw new PrestoException(CLP_UNSUPPORTED_SPLIT_SOURCE, "Unsupported split provider type: " + config.getSplitProviderType());
+ }
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java
new file mode 100644
index 0000000000000..985c707a32483
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpPlugin.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.Plugin;
+import com.facebook.presto.spi.connector.ConnectorFactory;
+import com.google.common.collect.ImmutableList;
+
+public class ClpPlugin
+ implements Plugin
+{
+ @Override
+ public Iterable getConnectorFactories()
+ {
+ return ImmutableList.of(new ClpConnectorFactory());
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpRecordSetProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpRecordSetProvider.java
new file mode 100644
index 0000000000000..452fe39c7d9ff
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpRecordSetProvider.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.ColumnHandle;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.RecordSet;
+import com.facebook.presto.spi.connector.ConnectorRecordSetProvider;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+import java.util.List;
+
+public class ClpRecordSetProvider
+ implements ConnectorRecordSetProvider
+{
+ @Override
+ public RecordSet getRecordSet(ConnectorTransactionHandle transactionHandle, ConnectorSession session, ConnectorSplit split, List extends ColumnHandle> columns)
+ {
+ throw new UnsupportedOperationException("getRecordSet is not supported");
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java
new file mode 100644
index 0000000000000..687a8e3e18401
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplit.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.ConnectorSplit;
+import com.facebook.presto.spi.HostAddress;
+import com.facebook.presto.spi.NodeProvider;
+import com.facebook.presto.spi.schedule.NodeSelectionStrategy;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+import java.util.List;
+import java.util.Map;
+
+import static com.facebook.presto.spi.schedule.NodeSelectionStrategy.NO_PREFERENCE;
+import static java.util.Objects.requireNonNull;
+
+public class ClpSplit
+ implements ConnectorSplit
+{
+ private final String path;
+
+ @JsonCreator
+ public ClpSplit(@JsonProperty("path") String path)
+ {
+ this.path = requireNonNull(path, "Split path is null");
+ }
+
+ @JsonProperty
+ public String getPath()
+ {
+ return path;
+ }
+
+ @Override
+ public NodeSelectionStrategy getNodeSelectionStrategy()
+ {
+ return NO_PREFERENCE;
+ }
+
+ @Override
+ public List getPreferredNodes(NodeProvider nodeProvider)
+ {
+ return ImmutableList.of();
+ }
+
+ @Override
+ public Map getInfo()
+ {
+ return ImmutableMap.of("path", path);
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java
new file mode 100644
index 0000000000000..cf7df605a1d0d
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpSplitManager.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
+import com.facebook.presto.spi.ConnectorSession;
+import com.facebook.presto.spi.ConnectorSplitSource;
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.facebook.presto.spi.FixedSplitSource;
+import com.facebook.presto.spi.connector.ConnectorSplitManager;
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+import javax.inject.Inject;
+
+import static java.util.Objects.requireNonNull;
+
+public class ClpSplitManager
+ implements ConnectorSplitManager
+{
+ private final ClpSplitProvider clpSplitProvider;
+
+ @Inject
+ public ClpSplitManager(ClpSplitProvider clpSplitProvider)
+ {
+ this.clpSplitProvider = requireNonNull(clpSplitProvider, "clpSplitProvider is null");
+ }
+
+ @Override
+ public ConnectorSplitSource getSplits(
+ ConnectorTransactionHandle transactionHandle,
+ ConnectorSession session,
+ ConnectorTableLayoutHandle layout,
+ SplitSchedulingContext splitSchedulingContext)
+ {
+ ClpTableLayoutHandle layoutHandle = (ClpTableLayoutHandle) layout;
+ return new FixedSplitSource(clpSplitProvider.listSplits(layoutHandle));
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java
new file mode 100644
index 0000000000000..74a6c08684983
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableHandle.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.ConnectorTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ClpTableHandle
+ implements ConnectorTableHandle
+{
+ private final SchemaTableName schemaTableName;
+ private final String tablePath;
+ private final StorageType storageType;
+
+ @JsonCreator
+ public ClpTableHandle(
+ @JsonProperty("schemaTableName") SchemaTableName schemaTableName,
+ @JsonProperty("tablePath") String tablePath,
+ @JsonProperty("storageType") StorageType storageType)
+ {
+ this.schemaTableName = schemaTableName;
+ this.tablePath = tablePath;
+ this.storageType = storageType;
+ }
+
+ @JsonProperty
+ public SchemaTableName getSchemaTableName()
+ {
+ return schemaTableName;
+ }
+
+ @JsonProperty
+ public String getTablePath()
+ {
+ return tablePath;
+ }
+
+ @JsonProperty
+ public StorageType getStorageType()
+ {
+ return storageType;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(schemaTableName, tablePath, storageType);
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ ClpTableHandle other = (ClpTableHandle) obj;
+ return this.schemaTableName.equals(other.schemaTableName) &&
+ this.tablePath.equals(other.tablePath) &&
+ this.storageType == other.storageType;
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("schemaTableName", schemaTableName)
+ .add("tablePath", tablePath)
+ .add("storageType", storageType)
+ .toString();
+ }
+
+ public enum StorageType
+ {
+ FS, // Local File System
+ S3
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java
new file mode 100644
index 0000000000000..f1f7d7c708815
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTableLayoutHandle.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.ConnectorTableLayoutHandle;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+public class ClpTableLayoutHandle
+ implements ConnectorTableLayoutHandle
+{
+ private final ClpTableHandle table;
+ private final Optional kqlQuery;
+
+ @JsonCreator
+ public ClpTableLayoutHandle(@JsonProperty("table") ClpTableHandle table, @JsonProperty("kqlQuery") Optional kqlQuery)
+ {
+ this.table = table;
+ this.kqlQuery = kqlQuery;
+ }
+
+ @JsonProperty
+ public ClpTableHandle getTable()
+ {
+ return table;
+ }
+
+ @JsonProperty
+ public Optional getKqlQuery()
+ {
+ return kqlQuery;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ClpTableLayoutHandle that = (ClpTableLayoutHandle) o;
+ return Objects.equals(table, that.table) &&
+ Objects.equals(kqlQuery, that.kqlQuery);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(table, kqlQuery);
+ }
+
+ @Override
+ public String toString()
+ {
+ return toStringHelper(this)
+ .add("table", table)
+ .add("kqlQuery", kqlQuery)
+ .toString();
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java
new file mode 100644
index 0000000000000..f39cd639072d6
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpTransactionHandle.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.spi.connector.ConnectorTransactionHandle;
+
+public enum ClpTransactionHandle
+ implements ConnectorTransactionHandle
+{
+ INSTANCE
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java
new file mode 100644
index 0000000000000..33e4b748a30d4
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMetadataProvider.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.metadata;
+
+import com.facebook.presto.plugin.clp.ClpColumnHandle;
+import com.facebook.presto.plugin.clp.ClpTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+
+import java.util.List;
+
+/**
+ * A provider for metadata that describes what tables exist in the CLP connector, and what columns
+ * exist in each of those tables.
+ */
+public interface ClpMetadataProvider
+{
+ /**
+ * @param schemaTableName the name of the schema and the table
+ * @return the list of column handles for the given table.
+ */
+ List listColumnHandles(SchemaTableName schemaTableName);
+
+ /**
+ * @param schema the name of the schema
+ * @return the list of table handles in the specified schema
+ */
+ List listTableHandles(String schema);
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java
new file mode 100644
index 0000000000000..1697b454875d2
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpMySqlMetadataProvider.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.metadata;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.plugin.clp.ClpColumnHandle;
+import com.facebook.presto.plugin.clp.ClpConfig;
+import com.facebook.presto.plugin.clp.ClpTableHandle;
+import com.facebook.presto.spi.SchemaTableName;
+import com.google.common.collect.ImmutableList;
+
+import javax.inject.Inject;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+
+import static java.lang.String.format;
+
+public class ClpMySqlMetadataProvider
+ implements ClpMetadataProvider
+{
+ // Column names
+ public static final String COLUMN_METADATA_TABLE_COLUMN_NAME = "name";
+ public static final String COLUMN_METADATA_TABLE_COLUMN_TYPE = "type";
+ public static final String DATASETS_TABLE_COLUMN_NAME = "name";
+ public static final String DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_TYPE = "archive_storage_type";
+ public static final String DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY = "archive_storage_directory";
+
+ // Table suffixes
+ public static final String COLUMN_METADATA_TABLE_SUFFIX = "_column_metadata";
+ public static final String DATASETS_TABLE_SUFFIX = "datasets";
+
+ // SQL templates
+ private static final String SQL_SELECT_COLUMN_METADATA_TEMPLATE = "SELECT * FROM `%s%s" + COLUMN_METADATA_TABLE_SUFFIX + "`";
+ private static final String SQL_SELECT_DATASETS_TEMPLATE = format(
+ "SELECT `%s`, `%s`, `%s` FROM `%%s%s`",
+ DATASETS_TABLE_COLUMN_NAME,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_TYPE,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY,
+ DATASETS_TABLE_SUFFIX);
+
+ private static final Logger log = Logger.get(ClpMySqlMetadataProvider.class);
+
+ private final ClpConfig config;
+
+ @Inject
+ public ClpMySqlMetadataProvider(ClpConfig config)
+ {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ }
+ catch (ClassNotFoundException e) {
+ log.error(e, "Failed to load MySQL JDBC driver");
+ throw new RuntimeException("MySQL JDBC driver not found", e);
+ }
+ this.config = config;
+ }
+
+ @Override
+ public List listColumnHandles(SchemaTableName schemaTableName)
+ {
+ String query = format(SQL_SELECT_COLUMN_METADATA_TEMPLATE, config.getMetadataTablePrefix(), schemaTableName.getTableName());
+ ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled());
+ try (Connection connection = getConnection(); PreparedStatement statement = connection.prepareStatement(query)) {
+ try (ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ schemaTree.addColumn(
+ resultSet.getString(COLUMN_METADATA_TABLE_COLUMN_NAME),
+ resultSet.getByte(COLUMN_METADATA_TABLE_COLUMN_TYPE));
+ }
+ }
+ }
+ catch (SQLException e) {
+ log.warn("Failed to load table schema for %s: %s", schemaTableName.getTableName(), e);
+ }
+ return schemaTree.collectColumnHandles();
+ }
+
+ @Override
+ public List listTableHandles(String schemaName)
+ {
+ ImmutableList.Builder tableHandles = new ImmutableList.Builder<>();
+ String query = format(SQL_SELECT_DATASETS_TEMPLATE, config.getMetadataTablePrefix());
+ try (Connection connection = getConnection();
+ Statement statement = connection.createStatement();
+ ResultSet resultSet = statement.executeQuery(query)) {
+ while (resultSet.next()) {
+ String tableName = resultSet.getString(DATASETS_TABLE_COLUMN_NAME);
+ String archiveStorageType = resultSet.getString(DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_TYPE);
+ String archiveStorageDirectory = resultSet.getString(DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY);
+ if (isValidIdentifier(tableName) && archiveStorageDirectory != null && !archiveStorageDirectory.isEmpty()) {
+ tableHandles.add(
+ new ClpTableHandle(
+ new SchemaTableName(schemaName, tableName),
+ archiveStorageDirectory,
+ ClpTableHandle.StorageType.valueOf(archiveStorageType.toUpperCase())));
+ }
+ else {
+ log.warn("Ignoring invalid table name found in metadata: %s", tableName);
+ }
+ }
+ }
+ catch (SQLException e) {
+ log.warn("Failed to load table names: %s", e);
+ }
+ return tableHandles.build();
+ }
+
+ private Connection getConnection()
+ throws SQLException
+ {
+ Connection connection = DriverManager.getConnection(config.getMetadataDbUrl(), config.getMetadataDbUser(), config.getMetadataDbPassword());
+ String dbName = config.getMetadataDbName();
+ if (dbName != null && !dbName.isEmpty()) {
+ connection.createStatement().execute(format("USE `%s`", dbName));
+ }
+ return connection;
+ }
+
+ private boolean isValidIdentifier(String identifier)
+ {
+ return identifier != null && ClpConfig.SAFE_SQL_TABLE_NAME_PATTERN.matcher(identifier).matches();
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java
new file mode 100644
index 0000000000000..90d0b911d0d41
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTree.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.metadata;
+
+import com.facebook.presto.common.type.ArrayType;
+import com.facebook.presto.common.type.RowType;
+import com.facebook.presto.common.type.Type;
+import com.facebook.presto.plugin.clp.ClpColumnHandle;
+import com.facebook.presto.spi.PrestoException;
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.common.type.DoubleType.DOUBLE;
+import static com.facebook.presto.common.type.VarcharType.VARCHAR;
+import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_TYPE;
+
+/**
+ * A representation of CLP's schema tree built by turning hierarchical column names (e.g., a.b.c)
+ * with their types into a tree. The class handles name/type conflicts when the
+ * `clp.polymorphic-type-enabled` option is enabled, and maps serialized CLP types to Presto types.
+ */
+public class ClpSchemaTree
+{
+ private final ClpNode root;
+ private final boolean polymorphicTypeEnabled;
+
+ ClpSchemaTree(boolean polymorphicTypeEnabled)
+ {
+ this.polymorphicTypeEnabled = polymorphicTypeEnabled;
+ this.root = new ClpNode(""); // Root doesn't have an original name
+ }
+
+ /**
+ * Adds a column to the internal CLP schema tree, creating intermediate nested nodes as needed.
+ * Handles potential name conflicts when polymorphic types are enabled by suffixing column names
+ * with type display names.
+ *
+ * @param fullName Fully qualified column name using dot notation (e.g., "a.b.c").
+ * @param type Serialized byte value representing the CLP column's type.
+ */
+ public void addColumn(String fullName, byte type)
+ {
+ Type prestoType = mapColumnType(type);
+ String[] path = fullName.split("\\.");
+ ClpNode current = root;
+
+ for (int i = 0; i < path.length - 1; i++) {
+ String segment = path[i];
+ ClpNode existingNode = current.children.get(segment);
+
+ if (polymorphicTypeEnabled && existingNode != null && existingNode.type != null) {
+ // Conflict: An intermediate segment already exists as a leaf node. Rename it.
+ String existingSuffix = getTypeSuffix(existingNode.type);
+ String renamedExisting = segment + "_" + existingSuffix;
+ current.children.remove(segment);
+ current.children.put(renamedExisting, existingNode);
+ }
+ current = current.children.computeIfAbsent(segment, ClpNode::new);
+ current.type = null;
+ }
+
+ String leafName = path[path.length - 1];
+ String finalLeafName = resolvePolymorphicConflicts(current, leafName, prestoType);
+
+ ClpNode leaf = new ClpNode(leafName, prestoType);
+ current.children.put(finalLeafName, leaf);
+ }
+
+ /**
+ * Traverses the CLP schema tree and collects all leaf and nested structure nodes into a flat
+ * list of column handles. For nested structures, builds a RowType from child nodes.
+ *
+ * @return List of ClpColumnHandle objects representing the full schema.
+ */
+ public List collectColumnHandles()
+ {
+ ImmutableList.Builder columns = new ImmutableList.Builder<>();
+ for (Map.Entry entry : root.children.entrySet()) {
+ String name = entry.getKey();
+ ClpNode child = entry.getValue();
+ if (child.isLeaf()) {
+ columns.add(new ClpColumnHandle(name, child.originalName, child.type, true));
+ }
+ else {
+ Type rowType = buildRowType(child);
+ columns.add(new ClpColumnHandle(name, child.originalName, rowType, true));
+ }
+ }
+ return columns.build();
+ }
+
+ private Type mapColumnType(byte type)
+ {
+ switch (ClpSchemaTreeNodeType.fromType(type)) {
+ case Integer:
+ return BIGINT;
+ case Float:
+ return DOUBLE;
+ case ClpString:
+ case VarString:
+ case DateString:
+ case NullValue:
+ return VARCHAR;
+ case UnstructuredArray:
+ return new ArrayType(VARCHAR);
+ case Boolean:
+ return BOOLEAN;
+ default:
+ throw new PrestoException(CLP_UNSUPPORTED_TYPE, "Unsupported type: " + type);
+ }
+ }
+
+ private String resolvePolymorphicConflicts(ClpNode parent, String baseName, Type newType)
+ {
+ if (!polymorphicTypeEnabled) {
+ return baseName;
+ }
+
+ boolean conflictDetected = false;
+ if (parent.children.containsKey(baseName)) {
+ ClpNode existing = parent.children.get(baseName);
+ if (existing.type == null) {
+ conflictDetected = true;
+ }
+ else if (!existing.type.equals(newType)) {
+ String existingSuffix = getTypeSuffix(existing.type);
+ String renamedExisting = baseName + "_" + existingSuffix;
+ parent.children.remove(baseName);
+ parent.children.put(renamedExisting, existing);
+ parent.conflictingBaseNames.add(baseName);
+ conflictDetected = true;
+ }
+ }
+ else if (parent.conflictingBaseNames.contains(baseName)) {
+ conflictDetected = true;
+ }
+
+ if (conflictDetected) {
+ String newSuffix = getTypeSuffix(newType);
+ return baseName + "_" + newSuffix;
+ }
+
+ return baseName;
+ }
+
+ private String getTypeSuffix(Type type)
+ {
+ return (type instanceof ArrayType) ? "array" : type.getDisplayName();
+ }
+
+ private Type buildRowType(ClpNode node)
+ {
+ List fields = new ArrayList<>();
+ List sortedKeys = new ArrayList<>(node.children.keySet());
+ Collections.sort(sortedKeys);
+
+ for (String name : sortedKeys) {
+ ClpNode child = node.children.get(name);
+ Type fieldType = child.isLeaf() ? child.type : buildRowType(child);
+ fields.add(new RowType.Field(Optional.of(name), fieldType));
+ }
+ return RowType.from(fields);
+ }
+
+ private static class ClpNode
+ {
+ Type type; // Only non-null for leaf nodes
+ String originalName;
+ Map children = new HashMap<>();
+ Set conflictingBaseNames = new HashSet<>();
+
+ ClpNode(String originalName)
+ {
+ this.originalName = originalName;
+ }
+
+ ClpNode(String originalName, Type type)
+ {
+ this.originalName = originalName;
+ this.type = type;
+ }
+
+ boolean isLeaf()
+ {
+ return children.isEmpty();
+ }
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTreeNodeType.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTreeNodeType.java
new file mode 100644
index 0000000000000..5e034f67caad1
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpSchemaTreeNodeType.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.metadata;
+
+/**
+ * The CLP schema-tree node types used in clp-s archives.
+ */
+public enum ClpSchemaTreeNodeType
+{
+ Integer((byte) 0),
+ Float((byte) 1),
+ ClpString((byte) 2),
+ VarString((byte) 3),
+ Boolean((byte) 4),
+ Object((byte) 5),
+ UnstructuredArray((byte) 6),
+ NullValue((byte) 7),
+ DateString((byte) 8),
+ StructuredArray((byte) 9);
+
+ private static final ClpSchemaTreeNodeType[] LOOKUP_TABLE;
+ private final byte type;
+
+ ClpSchemaTreeNodeType(byte type)
+ {
+ this.type = type;
+ }
+
+ public static ClpSchemaTreeNodeType fromType(byte type)
+ {
+ if (type < 0 || type >= LOOKUP_TABLE.length || LOOKUP_TABLE[type] == null) {
+ throw new IllegalArgumentException("Invalid type code: " + type);
+ }
+ return LOOKUP_TABLE[type];
+ }
+
+ public byte getType()
+ {
+ return type;
+ }
+
+ static {
+ byte maxType = 0;
+ for (ClpSchemaTreeNodeType nodeType : values()) {
+ if (nodeType.type > maxType) {
+ maxType = nodeType.type;
+ }
+ }
+
+ ClpSchemaTreeNodeType[] lookup = new ClpSchemaTreeNodeType[maxType + 1];
+ for (ClpSchemaTreeNodeType nodeType : values()) {
+ lookup[nodeType.type] = nodeType;
+ }
+
+ LOOKUP_TABLE = lookup;
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java
new file mode 100644
index 0000000000000..ac646a061ba4b
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpMySqlSplitProvider.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.split;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.plugin.clp.ClpConfig;
+import com.facebook.presto.plugin.clp.ClpSplit;
+import com.facebook.presto.plugin.clp.ClpTableHandle;
+import com.facebook.presto.plugin.clp.ClpTableLayoutHandle;
+import com.google.common.collect.ImmutableList;
+
+import javax.inject.Inject;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.List;
+
+import static java.lang.String.format;
+
+public class ClpMySqlSplitProvider
+ implements ClpSplitProvider
+{
+ // Column names
+ public static final String ARCHIVES_TABLE_COLUMN_ID = "id";
+
+ // Table suffixes
+ public static final String ARCHIVE_TABLE_SUFFIX = "_archives";
+
+ // SQL templates
+ private static final String SQL_SELECT_ARCHIVES_TEMPLATE = format("SELECT `%s` FROM `%%s%%s%s`", ARCHIVES_TABLE_COLUMN_ID, ARCHIVE_TABLE_SUFFIX);
+
+ private static final Logger log = Logger.get(ClpMySqlSplitProvider.class);
+
+ private final ClpConfig config;
+
+ @Inject
+ public ClpMySqlSplitProvider(ClpConfig config)
+ {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ }
+ catch (ClassNotFoundException e) {
+ log.error(e, "Failed to load MySQL JDBC driver");
+ throw new RuntimeException("MySQL JDBC driver not found", e);
+ }
+ this.config = config;
+ }
+
+ @Override
+ public List listSplits(ClpTableLayoutHandle clpTableLayoutHandle)
+ {
+ ImmutableList.Builder splits = new ImmutableList.Builder<>();
+ ClpTableHandle clpTableHandle = clpTableLayoutHandle.getTable();
+ String tablePath = clpTableHandle.getTablePath();
+ String tableName = clpTableHandle.getSchemaTableName().getTableName();
+ String archivePathQuery = format(SQL_SELECT_ARCHIVES_TEMPLATE, config.getMetadataTablePrefix(), tableName);
+
+ try (Connection connection = getConnection()) {
+ // Fetch archive IDs and create splits
+ try (PreparedStatement statement = connection.prepareStatement(archivePathQuery); ResultSet resultSet = statement.executeQuery()) {
+ while (resultSet.next()) {
+ final String archiveId = resultSet.getString(ARCHIVES_TABLE_COLUMN_ID);
+ final String archivePath = tablePath + "/" + archiveId;
+ splits.add(new ClpSplit(archivePath));
+ }
+ }
+ }
+ catch (SQLException e) {
+ log.warn("Database error while processing splits for %s: %s", tableName, e);
+ }
+
+ return splits.build();
+ }
+
+ private Connection getConnection()
+ throws SQLException
+ {
+ Connection connection = DriverManager.getConnection(config.getMetadataDbUrl(), config.getMetadataDbUser(), config.getMetadataDbPassword());
+ String dbName = config.getMetadataDbName();
+ if (dbName != null && !dbName.isEmpty()) {
+ connection.createStatement().execute(format("USE `%s`", dbName));
+ }
+ return connection;
+ }
+}
diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java
new file mode 100644
index 0000000000000..2868cfeb0a7bc
--- /dev/null
+++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpSplitProvider.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.split;
+
+import com.facebook.presto.plugin.clp.ClpSplit;
+import com.facebook.presto.plugin.clp.ClpTableLayoutHandle;
+
+import java.util.List;
+
+/**
+ * A provider for splits from a CLP dataset.
+ */
+public interface ClpSplitProvider
+{
+ /**
+ * @param clpTableLayoutHandle the table layout handle
+ * @return the list of splits for the specified table layout
+ */
+ List listSplits(ClpTableLayoutHandle clpTableLayoutHandle);
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java
new file mode 100644
index 0000000000000..6b7220cc1dd43
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpMetadataDbSetUp.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider;
+import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider;
+import com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType;
+import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.math3.util.Pair;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_NAME;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_TYPE;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_TYPE;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_NAME;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_SUFFIX;
+import static com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider.ARCHIVES_TABLE_COLUMN_ID;
+import static com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider.ARCHIVE_TABLE_SUFFIX;
+import static java.lang.String.format;
+import static java.util.UUID.randomUUID;
+import static org.testng.Assert.fail;
+
+public final class ClpMetadataDbSetUp
+{
+ public static final String METADATA_DB_PASSWORD = "";
+ public static final String METADATA_DB_TABLE_PREFIX = "clp_";
+ public static final String METADATA_DB_URL_TEMPLATE = "jdbc:h2:file:%s;MODE=MySQL;DATABASE_TO_UPPER=FALSE";
+ public static final String METADATA_DB_USER = "sa";
+ public static final String ARCHIVE_STORAGE_DIRECTORY_BASE = "/tmp/archives/";
+
+ private static final Logger log = Logger.get(ClpMetadataDbSetUp.class);
+ private static final String DATASETS_TABLE_NAME = METADATA_DB_TABLE_PREFIX + DATASETS_TABLE_SUFFIX;
+ private static final String ARCHIVE_TABLE_COLUMN_PAGINATION_ID = "pagination_id";
+
+ private ClpMetadataDbSetUp()
+ {
+ throw new UnsupportedOperationException("This is a utility class and cannot be instantiated");
+ }
+
+ public static DbHandle getDbHandle(String dbName)
+ {
+ return new DbHandle(format("/tmp/presto-clp-test-%s/%s", randomUUID(), dbName));
+ }
+
+ public static ClpMetadata setupMetadata(DbHandle dbHandle, Map>> clpFields)
+ {
+ final String metadataDbUrl = format(METADATA_DB_URL_TEMPLATE, dbHandle.dbPath);
+ final String columnMetadataTableSuffix = "_column_metadata";
+
+ try (Connection conn = DriverManager.getConnection(metadataDbUrl, METADATA_DB_USER, METADATA_DB_PASSWORD); Statement stmt = conn.createStatement()) {
+ createDatasetsTable(stmt);
+
+ for (Map.Entry>> entry : clpFields.entrySet()) {
+ String tableName = entry.getKey();
+ String columnMetadataTableName = METADATA_DB_TABLE_PREFIX + tableName + columnMetadataTableSuffix;
+ String createColumnMetadataSQL = format(
+ "CREATE TABLE IF NOT EXISTS %s (" +
+ " %s VARCHAR(512) NOT NULL," +
+ " %s TINYINT NOT NULL," +
+ " PRIMARY KEY (%s, %s))",
+ columnMetadataTableName,
+ COLUMN_METADATA_TABLE_COLUMN_NAME,
+ COLUMN_METADATA_TABLE_COLUMN_TYPE,
+ COLUMN_METADATA_TABLE_COLUMN_NAME,
+ COLUMN_METADATA_TABLE_COLUMN_TYPE);
+ String insertColumnMetadataSQL = format(
+ "INSERT INTO %s (%s, %s) VALUES (?, ?)",
+ columnMetadataTableName,
+ COLUMN_METADATA_TABLE_COLUMN_NAME,
+ COLUMN_METADATA_TABLE_COLUMN_TYPE);
+
+ stmt.execute(createColumnMetadataSQL);
+ updateDatasetsTable(conn, tableName);
+
+ try (PreparedStatement pstmt = conn.prepareStatement(insertColumnMetadataSQL)) {
+ for (Pair record : entry.getValue()) {
+ pstmt.setString(1, record.getFirst());
+ pstmt.setByte(2, record.getSecond().getType());
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ }
+ }
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ ClpConfig config = new ClpConfig()
+ .setPolymorphicTypeEnabled(true)
+ .setMetadataDbUrl(metadataDbUrl)
+ .setMetadataDbUser(METADATA_DB_USER)
+ .setMetadataDbPassword(METADATA_DB_PASSWORD)
+ .setMetadataTablePrefix(METADATA_DB_TABLE_PREFIX);
+ ClpMetadataProvider metadataProvider = new ClpMySqlMetadataProvider(config);
+ return new ClpMetadata(config, metadataProvider);
+ }
+
+ public static ClpMySqlSplitProvider setupSplit(DbHandle dbHandle, Map> splits)
+ {
+ final String metadataDbUrl = format(METADATA_DB_URL_TEMPLATE, dbHandle.dbPath);
+ final String archiveTableFormat = METADATA_DB_TABLE_PREFIX + "%s" + ARCHIVE_TABLE_SUFFIX;
+
+ try (Connection conn = DriverManager.getConnection(metadataDbUrl, METADATA_DB_USER, METADATA_DB_PASSWORD); Statement stmt = conn.createStatement()) {
+ createDatasetsTable(stmt);
+
+ // Create and populate archive tables
+ for (Map.Entry> tableSplits : splits.entrySet()) {
+ String tableName = tableSplits.getKey();
+ updateDatasetsTable(conn, tableName);
+
+ String archiveTableName = format(archiveTableFormat, tableSplits.getKey());
+ String createArchiveTableSQL = format(
+ "CREATE TABLE IF NOT EXISTS %s (" +
+ "%s BIGINT UNSIGNED AUTO_INCREMENT PRIMARY KEY, " +
+ "%s VARCHAR(64) NOT NULL)",
+ archiveTableName,
+ ARCHIVE_TABLE_COLUMN_PAGINATION_ID,
+ ARCHIVES_TABLE_COLUMN_ID);
+
+ stmt.execute(createArchiveTableSQL);
+
+ String insertArchiveTableSQL = format("INSERT INTO %s (%s) VALUES (?)", archiveTableName, ARCHIVES_TABLE_COLUMN_ID);
+ try (PreparedStatement pstmt = conn.prepareStatement(insertArchiveTableSQL)) {
+ for (String splitPath : tableSplits.getValue()) {
+ pstmt.setString(1, splitPath);
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ }
+ }
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+
+ return new ClpMySqlSplitProvider(
+ new ClpConfig()
+ .setPolymorphicTypeEnabled(true)
+ .setMetadataDbUrl(metadataDbUrl)
+ .setMetadataDbUser(METADATA_DB_USER)
+ .setMetadataDbPassword(METADATA_DB_PASSWORD)
+ .setMetadataTablePrefix(METADATA_DB_TABLE_PREFIX));
+ }
+
+ public static void tearDown(DbHandle dbHandle)
+ {
+ File dir = new File(dbHandle.dbPath).getParentFile();
+ if (dir.exists()) {
+ try {
+ FileUtils.deleteDirectory(dir);
+ log.info("Deleted database dir" + dir.getAbsolutePath());
+ }
+ catch (IOException e) {
+ log.warn("Failed to delete directory " + dir + ": " + e.getMessage());
+ }
+ }
+ }
+
+ private static void createDatasetsTable(Statement stmt)
+ throws SQLException
+ {
+ final String createDatasetsTableSql = format(
+ "CREATE TABLE IF NOT EXISTS %s (" +
+ " %s VARCHAR(255) PRIMARY KEY," +
+ " %s VARCHAR(64) NOT NULL," +
+ " %s VARCHAR(4096) NOT NULL)",
+ DATASETS_TABLE_NAME,
+ DATASETS_TABLE_COLUMN_NAME,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_TYPE,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY);
+ stmt.execute(createDatasetsTableSql);
+ }
+
+ private static void updateDatasetsTable(Connection conn, String tableName)
+ throws SQLException
+ {
+ final String insertDatasetsTableSql = format(
+ "INSERT INTO %s (%s, %s, %s) VALUES (?, ?, ?)",
+ DATASETS_TABLE_NAME,
+ DATASETS_TABLE_COLUMN_NAME,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_TYPE,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY);
+ try (PreparedStatement pstmt = conn.prepareStatement(insertDatasetsTableSql)) {
+ pstmt.setString(1, tableName);
+ pstmt.setString(2, "fs");
+ pstmt.setString(3, ARCHIVE_STORAGE_DIRECTORY_BASE + tableName);
+ pstmt.executeUpdate();
+ }
+ }
+
+ public static final class DbHandle
+ {
+ public String dbPath;
+
+ private DbHandle(String dbPath)
+ {
+ this.dbPath = dbPath;
+ }
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java
new file mode 100644
index 0000000000000..e8dd126da6ff2
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpMetadata.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.common.type.ArrayType;
+import com.facebook.presto.common.type.RowType;
+import com.facebook.presto.spi.ColumnMetadata;
+import com.facebook.presto.spi.ConnectorTableMetadata;
+import com.facebook.presto.spi.SchemaTableName;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import org.apache.commons.math3.util.Pair;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.HashSet;
+import java.util.Optional;
+
+import static com.facebook.presto.common.type.BigintType.BIGINT;
+import static com.facebook.presto.common.type.BooleanType.BOOLEAN;
+import static com.facebook.presto.common.type.DoubleType.DOUBLE;
+import static com.facebook.presto.common.type.VarcharType.VARCHAR;
+import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.DbHandle;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.getDbHandle;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.setupMetadata;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Boolean;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.ClpString;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Float;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.UnstructuredArray;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
+import static com.facebook.presto.testing.TestingConnectorSession.SESSION;
+import static org.testng.Assert.assertEquals;
+
+@Test(singleThreaded = true)
+public class TestClpMetadata
+{
+ private static final String TABLE_NAME = "test";
+ private DbHandle dbHandle;
+ private ClpMetadata metadata;
+
+ @BeforeMethod
+ public void setUp()
+ {
+ dbHandle = getDbHandle("metadata_testdb");
+ metadata = setupMetadata(
+ dbHandle,
+ ImmutableMap.of(
+ TABLE_NAME,
+ ImmutableList.of(
+ new Pair<>("a", Integer),
+ new Pair<>("a", ClpString),
+ new Pair<>("b", Float),
+ new Pair<>("b", ClpString),
+ new Pair<>("c.d", Boolean),
+ new Pair<>("c.e", VarString),
+ new Pair<>("f.g.h", UnstructuredArray))));
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ ClpMetadataDbSetUp.tearDown(dbHandle);
+ }
+
+ @Test
+ public void testListSchemaNames()
+ {
+ assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of(DEFAULT_SCHEMA_NAME));
+ }
+
+ @Test
+ public void testListTables()
+ {
+ HashSet tables = new HashSet<>();
+ tables.add(new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME));
+ assertEquals(new HashSet<>(metadata.listTables(SESSION, Optional.empty())), tables);
+ }
+
+ @Test
+ public void testGetTableMetadata()
+ {
+ ClpTableHandle clpTableHandle = (ClpTableHandle) metadata.getTableHandle(SESSION, new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME));
+ ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, clpTableHandle);
+ ImmutableSet columnMetadata = ImmutableSet.builder()
+ .add(ColumnMetadata.builder()
+ .setName("a_bigint")
+ .setType(BIGINT)
+ .setNullable(true)
+ .build())
+ .add(ColumnMetadata.builder()
+ .setName("a_varchar")
+ .setType(VARCHAR)
+ .setNullable(true)
+ .build())
+ .add(ColumnMetadata.builder()
+ .setName("b_double")
+ .setType(DOUBLE)
+ .setNullable(true)
+ .build())
+ .add(ColumnMetadata.builder()
+ .setName("b_varchar")
+ .setType(VARCHAR)
+ .setNullable(true)
+ .build())
+ .add(ColumnMetadata.builder()
+ .setName("c")
+ .setType(RowType.from(ImmutableList.of(
+ RowType.field("d", BOOLEAN),
+ RowType.field("e", VARCHAR))))
+ .setNullable(true)
+ .build())
+ .add(ColumnMetadata.builder()
+ .setName("f")
+ .setType(RowType.from(ImmutableList.of(
+ RowType.field("g",
+ RowType.from(ImmutableList.of(
+ RowType.field("h", new ArrayType(VARCHAR))))))))
+ .setNullable(true)
+ .build())
+ .build();
+ assertEquals(columnMetadata, ImmutableSet.copyOf(tableMetadata.getColumns()));
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java
new file mode 100644
index 0000000000000..2284778085818
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpSplit.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.presto.plugin.clp.split.ClpSplitProvider;
+import com.facebook.presto.spi.SchemaTableName;
+import com.google.common.collect.ImmutableSet;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.ARCHIVE_STORAGE_DIRECTORY_BASE;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.DbHandle;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.getDbHandle;
+import static com.facebook.presto.plugin.clp.ClpMetadataDbSetUp.setupSplit;
+import static com.facebook.presto.plugin.clp.ClpTableHandle.StorageType.FS;
+import static org.testng.Assert.assertEquals;
+
+@Test(singleThreaded = true)
+public class TestClpSplit
+{
+ private DbHandle dbHandle;
+ private ClpSplitProvider clpSplitProvider;
+ private Map> tableSplits;
+
+ @BeforeMethod
+ public void setUp()
+ {
+ dbHandle = getDbHandle("split_testdb");
+ tableSplits = new HashMap<>();
+
+ int numKeys = 3;
+ int numValuesPerKey = 10;
+
+ for (int i = 0; i < numKeys; i++) {
+ String key = "test_" + i;
+ List values = new ArrayList<>();
+
+ for (int j = 0; j < numValuesPerKey; j++) {
+ values.add("id_" + j);
+ }
+
+ tableSplits.put(key, values);
+ }
+ clpSplitProvider = setupSplit(dbHandle, tableSplits);
+ }
+
+ @AfterMethod
+ public void tearDown()
+ {
+ ClpMetadataDbSetUp.tearDown(dbHandle);
+ }
+
+ @Test
+ public void testListSplits()
+ {
+ for (Map.Entry> entry : tableSplits.entrySet()) {
+ String tableName = entry.getKey();
+ String tablePath = ARCHIVE_STORAGE_DIRECTORY_BASE + tableName;
+ List expectedSplits = entry.getValue();
+ ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle(
+ new ClpTableHandle(new SchemaTableName(DEFAULT_SCHEMA_NAME, tableName),
+ tablePath, FS),
+ Optional.empty());
+ List splits = clpSplitProvider.listSplits(layoutHandle);
+ assertEquals(splits.size(), expectedSplits.size());
+
+ ImmutableSet actualSplitPaths = splits.stream()
+ .map(ClpSplit::getPath)
+ .collect(ImmutableSet.toImmutableSet());
+
+ ImmutableSet expectedSplitPaths = expectedSplits.stream()
+ .map(split -> tablePath + "/" + split)
+ .collect(ImmutableSet.toImmutableSet());
+
+ assertEquals(actualSplitPaths, expectedSplitPaths);
+ }
+ }
+}
diff --git a/presto-docs/src/main/sphinx/connector.rst b/presto-docs/src/main/sphinx/connector.rst
index d337fe4ed12d1..a45ab8f9bafa4 100644
--- a/presto-docs/src/main/sphinx/connector.rst
+++ b/presto-docs/src/main/sphinx/connector.rst
@@ -14,6 +14,7 @@ from different data sources.
connector/blackhole
connector/cassandra
connector/clickhouse
+ connector/clp
connector/deltalake
connector/druid
connector/elasticsearch
diff --git a/presto-docs/src/main/sphinx/connector/clp.rst b/presto-docs/src/main/sphinx/connector/clp.rst
new file mode 100644
index 0000000000000..e1beb5c180c1b
--- /dev/null
+++ b/presto-docs/src/main/sphinx/connector/clp.rst
@@ -0,0 +1,179 @@
+=============
+CLP Connector
+=============
+
+.. contents::
+ :local:
+ :backlinks: none
+ :depth: 1
+
+Overview
+--------
+
+The CLP Connector enables SQL-based querying of `CLP `_ archives via Presto. This
+document describes how to configure the CLP Connector for use with a CLP cluster, as well as essential details for
+understanding the CLP connector.
+
+
+Configuration
+-------------
+
+To configure the CLP connector, create a catalog properties file ``etc/catalog/clp.properties`` with at least the
+following contents, modifying the properties as appropriate:
+
+.. code-block:: ini
+
+ connector.name=clp
+ clp.metadata-provider-type=mysql
+ clp.metadata-db-url=jdbc:mysql://localhost:3306
+ clp.metadata-db-name=clp_db
+ clp.metadata-db-user=clp_user
+ clp.metadata-db-password=clp_password
+ clp.metadata-table-prefix=clp_
+ clp.split-provider-type=mysql
+
+
+Configuration Properties
+------------------------
+
+The following configuration properties are available:
+
+================================== ======================================================================== =========
+Property Name Description Default
+================================== ======================================================================== =========
+``clp.polymorphic-type-enabled`` Enables or disables support for polymorphic types in CLP, allowing the ``false``
+ same field to have different types. This is useful for schema-less,
+ semi-structured data where the same field may appear with different
+ types.
+
+ When enabled, type annotations are added to conflicting field names to
+ distinguish between types. For example, if an ``id`` column appears with
+ both an ``int`` and ``string`` types, the connector will create two
+ columns named ``id_bigint`` and ``id_varchar``.
+
+ Supported type annotations include ``bigint``, ``varchar``, ``double``,
+ ``boolean``, and ``array(varchar)`` (See `Data Types`_ for details). For
+ columns with only one type, the original column name is used.
+``clp.metadata-provider-type`` Specifies the metadata provider type. Currently, the only supported ``mysql``
+ type is a MySQL database, which is also used by the CLP package to store
+ metadata. Additional providers can be supported by implementing the
+ ``ClpMetadataProvider`` interface.
+``clp.metadata-db-url`` The JDBC URL used to connect to the metadata database. This property is
+ required if ``clp.metadata-provider-type`` is set to ``mysql``.
+``clp.metadata-db-name`` The name of the metadata database. This option is required if
+ ``clp.metadata-provider-type`` is set to ``mysql`` and the database name
+ is not specified in the URL.
+``clp.metadata-db-user`` The database user with access to the metadata database. This option is
+ required if ``clp.metadata-provider-type`` is set to ``mysql`` and the
+ database name is not specified in the URL.
+``clp.metadata-db-password`` The password for the metadata database user. This option is required if
+ ``clp.metadata-provider-type`` is set to ``mysql``.
+``clp.metadata-table-prefix`` A string prefix prepended to all metadata table names when querying the
+ database. Useful for namespacing or avoiding collisions. This option is
+ required if ``clp.metadata-provider-type`` is set to ``mysql``.
+``clp.metadata-expire-interval`` Defines how long, in seconds, metadata entries remain valid before they 600
+ need to be refreshed.
+``clp.metadata-refresh-interval`` Specifies how frequently metadata is refreshed from the source, in 60
+ seconds. Set this to a lower value for frequently changing datasets or
+ to a higher value to reduce load.
+``clp.split-provider-type`` Specifies the split provider type. By default, it uses the same type as ``mysql``
+ the metadata provider with the same connection parameters. Additional
+ types can be supported by implementing the ``ClpSplitProvider``
+ interface.
+================================== ======================================================================== =========
+
+
+Metadata and Split Providers
+----------------------------
+
+The CLP connector relies on metadata and split providers to retrieve information from various sources. By default, it
+uses a MySQL database for both metadata and split storage. We recommend using the CLP package for log ingestion, which
+automatically populates the database with the required information.
+
+If you prefer to use a different source--or the same source with a custom implementation--you can provide your own
+implementations of the ``ClpMetadataProvider`` and ``ClpSplitProvider`` interfaces, and configure the connector
+accordingly.
+
+Data Types
+----------
+
+The data type mappings are as follows:
+
+====================== ====================
+CLP Type Presto Type
+====================== ====================
+``Integer`` ``BIGINT``
+``Float`` ``DOUBLE``
+``ClpString`` ``VARCHAR``
+``VarString`` ``VARCHAR``
+``DateString`` ``VARCHAR``
+``Boolean`` ``BOOLEAN``
+``UnstructuredArray`` ``ARRAY(VARCHAR)``
+``Object`` ``ROW``
+(others) (unsupported)
+====================== ====================
+
+String Types
+^^^^^^^^^^^^
+
+CLP uses three distinct string types: ``ClpString`` (strings with whitespace), ``VarString`` (strings without
+whitespace), and ``DateString`` (strings representing dates). Currently, all three are mapped to Presto's ``VARCHAR``
+type.
+
+Array Types
+^^^^^^^^^^^
+
+CLP supports two array types: ``UnstructuredArray`` and ``StructuredArray``. Unstructured arrays are stored as strings
+in CLP and elements can be any type. However, in Presto arrays are homogeneous, so the elements are converted to strings
+when read. ``StructuredArray`` type is not supported in Presto.
+
+Object Types
+^^^^^^^^^^^^
+
+CLP stores metadata using a global schema tree structure that captures all possible fields from various log structures.
+Internal nodes may represent objects containing nested fields as their children. In Presto, we map these internal object
+nodes to the ``ROW`` data type, including all subfields as fields within the ``ROW``.
+
+For instance, consider a table containing two distinct JSON log types:
+
+Log Type 1:
+
+.. code-block:: json
+
+ {
+ "msg": {
+ "ts": 0,
+ "status": "ok"
+ }
+ }
+
+Log Type 2:
+
+.. code-block:: json
+
+ {
+ "msg": {
+ "ts": 1,
+ "status": "error",
+ "thread_num": 4,
+ "backtrace": ""
+ }
+ }
+
+In CLP's schema tree, these two structures are combined into a unified internal node (``msg``) with four child nodes:
+``ts``, ``status``, ``thread_num`` and ``backtrace``. In Presto, we represent this combined structure using the
+following ``ROW`` type:
+
+.. code-block:: sql
+
+ ROW(ts BIGINT, status VARCHAR, thread_num BIGINT, backtrace VARCHAR)
+
+Each JSON log maps to this unified ``ROW`` type, with absent fields represented as ``NULL``. The child nodes (``ts``,
+``status``, ``thread_num``, ``backtrace``) become fields within the ``ROW``, clearly reflecting the nested and varying
+structures of the original JSON logs.
+
+SQL support
+-----------
+
+The connector only provides read access to data. It does not support DDL operations, such as creating or dropping
+tables. Currently, we only support one ``default`` schema.
diff --git a/presto-server/src/main/provisio/presto.xml b/presto-server/src/main/provisio/presto.xml
index c66cf1159ce5e..4ff13686fee21 100644
--- a/presto-server/src/main/provisio/presto.xml
+++ b/presto-server/src/main/provisio/presto.xml
@@ -233,6 +233,12 @@
+
+
+
+
+
+