diff --git a/pom.xml b/pom.xml
index f57eb32378792..57dc0439a8401 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2420,6 +2420,19 @@
2.1.0-3
+
+ com.facebook.presto
+ presto-clp
+ ${project.version}
+
+
+
+ com.facebook.presto
+ presto-clp
+ ${project.version}
+ test-jar
+
+
org.javassist
diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml
index 6e0b35d56bd3e..2a4be7f9df24f 100644
--- a/presto-clp/pom.xml
+++ b/presto-clp/pom.xml
@@ -129,6 +129,12 @@
test
+
+ com.facebook.presto
+ presto-tests
+ test
+
+
org.apache.commons
commons-math3
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpQueryRunner.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpQueryRunner.java
new file mode 100644
index 0000000000000..dc5b03e4111fd
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/ClpQueryRunner.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.Session;
+import com.facebook.presto.tests.DistributedQueryRunner;
+import com.google.common.collect.ImmutableMap;
+
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.BiFunction;
+
+import static com.facebook.presto.testing.TestingSession.testSessionBuilder;
+
+public class ClpQueryRunner
+{
+ private static final Logger log = Logger.get(ClpQueryRunner.class);
+
+ public static final String CLP_CATALOG = "clp";
+ public static final String CLP_CONNECTOR = CLP_CATALOG;
+ public static final int DEFAULT_NUM_OF_WORKERS = 1;
+ public static final String DEFAULT_SCHEMA = "default";
+
+ public static DistributedQueryRunner createQueryRunner(
+ String metadataDbUrl,
+ String metadataDbUser,
+ String metadataDbPassword,
+ String metadataDbTablePrefix,
+ Optional workerCount,
+ Optional> externalWorkerLauncher)
+ throws Exception
+ {
+ log.info("Creating CLP query runner with default session");
+ return createQueryRunner(
+ createDefaultSession(),
+ metadataDbUrl,
+ metadataDbUser,
+ metadataDbPassword,
+ metadataDbTablePrefix,
+ workerCount,
+ externalWorkerLauncher);
+ }
+
+ public static DistributedQueryRunner createQueryRunner(
+ Session session,
+ String metadataDbUrl,
+ String metadataDbUser,
+ String metadataDbPassword,
+ String metadataDbTablePrefix,
+ Optional workerCount,
+ Optional> externalWorkerLauncher)
+ throws Exception
+ {
+ DistributedQueryRunner clpQueryRunner = DistributedQueryRunner.builder(session)
+ .setNodeCount(workerCount.orElse(DEFAULT_NUM_OF_WORKERS))
+ .setExternalWorkerLauncher(externalWorkerLauncher)
+ .build();
+ Map clpProperties = ImmutableMap.builder()
+ .put("clp.metadata-provider-type", "mysql")
+ .put("clp.metadata-db-url", metadataDbUrl)
+ .put("clp.metadata-db-user", metadataDbUser)
+ .put("clp.metadata-db-password", metadataDbPassword)
+ .put("clp.metadata-table-prefix", metadataDbTablePrefix)
+ .put("clp.split-provider-type", "mysql")
+ .build();
+
+ clpQueryRunner.installPlugin(new ClpPlugin());
+ clpQueryRunner.createCatalog(CLP_CATALOG, CLP_CONNECTOR, clpProperties);
+ return clpQueryRunner;
+ }
+
+ /**
+ * Creates a default mock session for query use.
+ *
+ * @return a default session
+ */
+ private static Session createDefaultSession()
+ {
+ return testSessionBuilder()
+ .setCatalog(CLP_CATALOG)
+ .setSchema(DEFAULT_SCHEMA)
+ .build();
+ }
+
+ private ClpQueryRunner()
+ {
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/ClpMockMetadataDatabase.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/ClpMockMetadataDatabase.java
new file mode 100644
index 0000000000000..a96d854d4d0f1
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/ClpMockMetadataDatabase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.mockdb;
+
+import com.facebook.presto.plugin.clp.mockdb.table.ArchivesTableRows;
+import com.facebook.presto.plugin.clp.mockdb.table.ColumnMetadataTableRows;
+import com.facebook.presto.plugin.clp.mockdb.table.DatasetsTableRows;
+import com.google.common.collect.ImmutableList;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_NAME;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_TYPE;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_SUFFIX;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_NAME;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_SUFFIX;
+import static com.facebook.presto.plugin.clp.mockdb.table.ArchivesTableRows.COLUMN_BEGIN_TIMESTAMP;
+import static com.facebook.presto.plugin.clp.mockdb.table.ArchivesTableRows.COLUMN_END_TIMESTAMP;
+import static com.facebook.presto.plugin.clp.mockdb.table.ArchivesTableRows.COLUMN_ID;
+import static com.facebook.presto.plugin.clp.mockdb.table.ArchivesTableRows.COLUMN_PAGINATION_ID;
+import static com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider.ARCHIVES_TABLE_SUFFIX;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.testng.Assert.fail;
+
+/**
+ * File-backed H2 mock metadata database for CLP tests. Uses the same schema as the CLP package.
+ * Provides a builder-driven setup and a single-call teardown that drops all objects and deletes
+ * files.
+ */
+public class ClpMockMetadataDatabase
+{
+ private static final String MOCK_METADATA_DB_DEFAULT_USERNAME = "sa";
+ private static final String MOCK_METADATA_DB_DEFAULT_PASSWORD = "";
+
+ private static final String MOCK_METADATA_DB_DEFAULT_TABLE_PREFIX = "clp_";
+
+ private static final String MOCK_METADATA_DB_URL_TEMPLATE = "jdbc:h2:file:%s;MODE=MySQL;DATABASE_TO_UPPER=FALSE";
+
+ private String url;
+ private String archiveStorageDirectory;
+ private String username;
+ private String password;
+ private String tablePrefix;
+
+ /**
+ * Creates a new builder instance for constructing {@link ClpMockMetadataDatabase}.
+ *
+ * @return a new {@link Builder}
+ */
+ public static Builder builder()
+ {
+ return new Builder();
+ }
+
+ /**
+ * Tears down the mock database by dropping all objects (tables, views, etc.) and deleting the
+ * backing database file. Any exceptions during cleanup will cause the test to fail.
+ */
+ public void teardown()
+ {
+ try (Connection connection = DriverManager.getConnection(url, username, password); Statement stmt = connection.createStatement()) {
+ stmt.execute("DROP ALL OBJECTS DELETE FILES");
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public String getUrl()
+ {
+ return url;
+ }
+
+ public String getUsername()
+ {
+ return username;
+ }
+
+ public String getPassword()
+ {
+ return password;
+ }
+
+ public String getTablePrefix()
+ {
+ return tablePrefix;
+ }
+
+ public void addTableToDatasetsTableIfNotExist(List tableNames)
+ {
+ try (Connection connection = DriverManager.getConnection(url, username, password)) {
+ ImmutableList.Builder repeatedArchiveStorageDirectory = ImmutableList.builder();
+ for (String tableName : tableNames) {
+ createArchivesTableIfNotExist(connection, tableName);
+ createColumnMetadataTableIfNotExist(connection, tableName);
+ repeatedArchiveStorageDirectory.add(archiveStorageDirectory);
+ }
+ DatasetsTableRows datasetsTableRows = new DatasetsTableRows(tableNames, repeatedArchiveStorageDirectory.build());
+ datasetsTableRows.insertToTable(connection, tablePrefix);
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void addColumnMetadata(Map clpFields)
+ {
+ try (Connection connection = DriverManager.getConnection(url, username, password)) {
+ for (Map.Entry entry : clpFields.entrySet()) {
+ entry.getValue().insertToTable(connection, tablePrefix, entry.getKey());
+ }
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public void addSplits(Map splits)
+ {
+ try (Connection connection = DriverManager.getConnection(url, username, password)) {
+ for (Map.Entry entry : splits.entrySet()) {
+ entry.getValue().insertToTable(connection, tablePrefix, entry.getKey());
+ }
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ private ClpMockMetadataDatabase()
+ {
+ }
+
+ private void createArchivesTableIfNotExist(Connection connection, String tableName)
+ {
+ final String createTableSql = format(
+ "CREATE TABLE IF NOT EXISTS `%s` (" +
+ "`%s` BIGINT AUTO_INCREMENT PRIMARY KEY, " +
+ "`%s` VARCHAR(64) NOT NULL, " +
+ "`%s` BIGINT, " +
+ "`%s` BIGINT)",
+ format("%s%s%s", tablePrefix, tableName, ARCHIVES_TABLE_SUFFIX),
+ COLUMN_PAGINATION_ID,
+ COLUMN_ID,
+ COLUMN_BEGIN_TIMESTAMP,
+ COLUMN_END_TIMESTAMP);
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute(createTableSql);
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ private void createColumnMetadataTableIfNotExist(Connection connection, String tableName)
+ {
+ String createTableSql = format(
+ "CREATE TABLE IF NOT EXISTS %s (" +
+ "`%s` VARCHAR(512) NOT NULL, " +
+ "`%s` TINYINT NOT NULL, " +
+ "PRIMARY KEY (`%s`, `%s`))",
+ format("%s%s%s", tablePrefix, tableName, COLUMN_METADATA_TABLE_SUFFIX),
+ COLUMN_METADATA_TABLE_COLUMN_NAME,
+ COLUMN_METADATA_TABLE_COLUMN_TYPE,
+ COLUMN_METADATA_TABLE_COLUMN_NAME,
+ COLUMN_METADATA_TABLE_COLUMN_TYPE);
+ try (Statement stmt = connection.createStatement()) {
+ stmt.execute(createTableSql);
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ private void createDatasetsTableIfNotExist()
+ {
+ final String createTableSql = format(
+ "CREATE TABLE IF NOT EXISTS %s (%s VARCHAR(255) PRIMARY KEY, %s VARCHAR(4096) NOT NULL)",
+ format("%s%s", tablePrefix, DATASETS_TABLE_SUFFIX),
+ DATASETS_TABLE_COLUMN_NAME,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY);
+ try (Connection connection = DriverManager.getConnection(url, username, password); Statement stmt = connection.createStatement()) {
+ stmt.execute(createTableSql);
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public static final class Builder
+ {
+ private String url;
+ private String archiveStorageDirectory;
+ private String username;
+ private String password;
+ private String tablePrefix;
+
+ private Builder()
+ {
+ setDatabaseUrl(format("/tmp/%s", UUID.randomUUID()));
+ setUsername(MOCK_METADATA_DB_DEFAULT_USERNAME);
+ setPassword(MOCK_METADATA_DB_DEFAULT_PASSWORD);
+ setTablePrefix(MOCK_METADATA_DB_DEFAULT_TABLE_PREFIX);
+ }
+
+ public Builder setDatabaseUrl(String databaseFilePath)
+ {
+ this.url = format(MOCK_METADATA_DB_URL_TEMPLATE, databaseFilePath);
+ return this;
+ }
+
+ public Builder setArchiveStorageDirectory(String archiveStorageDirectory)
+ {
+ this.archiveStorageDirectory = archiveStorageDirectory;
+ return this;
+ }
+
+ public Builder setUsername(String username)
+ {
+ this.username = username;
+ return this;
+ }
+
+ public Builder setPassword(String password)
+ {
+ this.password = password;
+ return this;
+ }
+
+ public Builder setTablePrefix(String tablePrefix)
+ {
+ this.tablePrefix = tablePrefix;
+ return this;
+ }
+
+ /**
+ * Builds and returns the configured {@link ClpMockMetadataDatabase} instance.
+ *
+ * @return the constructed {@link ClpMockMetadataDatabase}
+ */
+ public ClpMockMetadataDatabase build()
+ {
+ validate();
+ ClpMockMetadataDatabase mockMetadataDatabase = new ClpMockMetadataDatabase();
+ mockMetadataDatabase.url = this.url;
+ mockMetadataDatabase.archiveStorageDirectory = this.archiveStorageDirectory;
+ mockMetadataDatabase.username = this.username;
+ mockMetadataDatabase.password = this.password;
+ mockMetadataDatabase.tablePrefix = this.tablePrefix;
+
+ mockMetadataDatabase.createDatasetsTableIfNotExist();
+ return mockMetadataDatabase;
+ }
+
+ /**
+ * Validates that all required parameters have been set and the datasets table has been
+ * created.
+ */
+ private void validate()
+ {
+ requireNonNull(url, "url is null");
+ requireNonNull(archiveStorageDirectory, "archiveStorageDirectory is null");
+ requireNonNull(username, "username is null");
+ requireNonNull(password, "password is null");
+ requireNonNull(tablePrefix, "tablePrefix is null");
+ }
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/ArchivesTableRows.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/ArchivesTableRows.java
new file mode 100644
index 0000000000000..007f93a000577
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/ArchivesTableRows.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.mockdb.table;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider.ARCHIVES_TABLE_SUFFIX;
+import static java.lang.String.format;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+public class ArchivesTableRows
+{
+ public static final String COLUMN_PAGINATION_ID = "pagination_id";
+ public static final String COLUMN_ID = "id";
+ public static final String COLUMN_BEGIN_TIMESTAMP = "begin_timestamp";
+ public static final String COLUMN_END_TIMESTAMP = "end_timestamp";
+
+ private final List ids;
+ private final List beginTimestamps;
+ private final List endTimestamps;
+ private final int numberOfRows;
+
+ public void insertToTable(Connection connection, String tablePrefix, String tableName)
+ {
+ final String insertSql = format(
+ "INSERT INTO `%s` (`%s`, `%s`, `%s`) VALUES (?, ?, ?)",
+ format("%s%s%s", tablePrefix, tableName, ARCHIVES_TABLE_SUFFIX),
+ COLUMN_ID,
+ COLUMN_BEGIN_TIMESTAMP,
+ COLUMN_END_TIMESTAMP);
+ try (PreparedStatement pstmt = connection.prepareStatement(insertSql)) {
+ for (int i = 0; i < numberOfRows; ++i) {
+ pstmt.setString(1, ids.get(i));
+ pstmt.setLong(2, beginTimestamps.get(i));
+ pstmt.setLong(3, endTimestamps.get(i));
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public ArchivesTableRows(
+ List ids,
+ List beginTimestamps,
+ List endTimestamps)
+ {
+ assertEquals(ids.size(), beginTimestamps.size());
+ assertEquals(beginTimestamps.size(), endTimestamps.size());
+ this.ids = ids;
+ this.beginTimestamps = beginTimestamps;
+ this.endTimestamps = endTimestamps;
+ this.numberOfRows = ids.size();
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/ColumnMetadataTableRows.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/ColumnMetadataTableRows.java
new file mode 100644
index 0000000000000..2d884227c14e1
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/ColumnMetadataTableRows.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.mockdb.table;
+
+import com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_NAME;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_COLUMN_TYPE;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.COLUMN_METADATA_TABLE_SUFFIX;
+import static java.lang.String.format;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+public class ColumnMetadataTableRows
+{
+ private final List names;
+ private final List types;
+ private final int numOfRows;
+
+ public void insertToTable(Connection connection, String tablePrefix, String tableName)
+ {
+ String insertSql = format(
+ "INSERT INTO `%s` (`%s`, `%s`) VALUES (?, ?)",
+ format("%s%s%s", tablePrefix, tableName, COLUMN_METADATA_TABLE_SUFFIX),
+ COLUMN_METADATA_TABLE_COLUMN_NAME,
+ COLUMN_METADATA_TABLE_COLUMN_TYPE);
+ try (PreparedStatement pstmt = connection.prepareStatement(insertSql)) {
+ for (int i = 0; i < numOfRows; ++i) {
+ pstmt.setString(1, names.get(i));
+ pstmt.setByte(2, types.get(i).getType());
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public ColumnMetadataTableRows(List names, List types)
+ {
+ assertEquals(names.size(), types.size());
+ this.names = names;
+ this.types = types;
+ numOfRows = names.size();
+ }
+}
diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/DatasetsTableRows.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/DatasetsTableRows.java
new file mode 100644
index 0000000000000..6eb5e4abca43a
--- /dev/null
+++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/mockdb/table/DatasetsTableRows.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.plugin.clp.mockdb.table;
+
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.util.List;
+
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_COLUMN_NAME;
+import static com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider.DATASETS_TABLE_SUFFIX;
+import static java.lang.String.format;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.fail;
+
+public class DatasetsTableRows
+{
+ private final List names;
+ private final List archivesStorageDirectories;
+ private final int numOfRows;
+
+ public void insertToTable(Connection connection, String tablePrefix)
+ {
+ final String insertSql = format(
+ "INSERT INTO %s (%s, %s) VALUES (?, ?) " +
+ "ON DUPLICATE KEY UPDATE %s = VALUES(%s)",
+ format("%s%s", tablePrefix, DATASETS_TABLE_SUFFIX),
+ DATASETS_TABLE_COLUMN_NAME,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY,
+ DATASETS_TABLE_COLUMN_ARCHIVE_STORAGE_DIRECTORY);
+ try (PreparedStatement pstmt = connection.prepareStatement(insertSql)) {
+ for (int i = 0; i < numOfRows; ++i) {
+ pstmt.setString(1, names.get(i));
+ pstmt.setString(2, format("%s%s", archivesStorageDirectories.get(i), names.get(i)));
+ pstmt.addBatch();
+ }
+ pstmt.executeBatch();
+ }
+ catch (SQLException e) {
+ fail(e.getMessage());
+ }
+ }
+
+ public DatasetsTableRows(
+ List names,
+ List archivesStorageDirectories)
+ {
+ assertEquals(names.size(), archivesStorageDirectories.size());
+ this.names = names;
+ this.archivesStorageDirectories = archivesStorageDirectories;
+ this.numOfRows = names.size();
+ }
+}
diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml
index fed141f2297fc..200ffa6834afc 100644
--- a/presto-native-execution/pom.xml
+++ b/presto-native-execution/pom.xml
@@ -253,6 +253,18 @@
commons-lang3
test
+
+
+ com.facebook.presto
+ presto-clp
+ runtime
+
+
+
+ com.facebook.presto
+ presto-clp
+ test-jar
+
diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java
index acd0244e054f5..564fc67fdbf70 100644
--- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java
+++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/PrestoNativeQueryRunnerUtils.java
@@ -26,6 +26,7 @@
import com.facebook.presto.hive.metastore.Table;
import com.facebook.presto.iceberg.FileFormat;
import com.facebook.presto.iceberg.IcebergQueryRunner;
+import com.facebook.presto.plugin.clp.ClpQueryRunner;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.testing.QueryRunner;
import com.facebook.presto.tests.DistributedQueryRunner;
@@ -64,6 +65,7 @@
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerSystemProperties;
import static com.facebook.presto.nativeworker.NativeQueryRunnerUtils.getNativeWorkerTpcdsProperties;
import static com.facebook.presto.nativeworker.SymlinkManifestGeneratorUtils.createSymlinkManifest;
+import static com.facebook.presto.plugin.clp.ClpQueryRunner.CLP_CATALOG;
import static java.lang.String.format;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
@@ -217,6 +219,25 @@ public static void createExternalTable(QueryRunner queryRunner, String sourceSch
}
}
+ public static QueryRunner createNativeClpQueryRunner(
+ String metadataDbUrl,
+ String metadataDbUser,
+ String metadataDbPassword,
+ String metadataDbTablePrefix)
+ throws Exception
+ {
+ NativeQueryRunnerParameters nativeQueryRunnerParameters = getNativeQueryRunnerParameters();
+ return ClpQueryRunner.createQueryRunner(
+ metadataDbUrl,
+ metadataDbUser,
+ metadataDbPassword,
+ metadataDbTablePrefix,
+ nativeQueryRunnerParameters.workerCount,
+ getExternalClpWorkerLauncher(
+ CLP_CATALOG,
+ nativeQueryRunnerParameters.serverBinary.toString()));
+ }
+
public static QueryRunner createJavaIcebergQueryRunner(boolean addStorageFormatToPath)
throws Exception
{
@@ -538,6 +559,50 @@ public static NativeQueryRunnerParameters getNativeQueryRunnerParameters()
return new NativeQueryRunnerParameters(prestoServerPath, dataDirectory, workerCount);
}
+ public static Optional> getExternalClpWorkerLauncher(
+ String catalogName,
+ String prestoServerPath)
+ {
+ return
+ Optional.of((workerIndex, discoveryUri) -> {
+ try {
+ Path dir = Paths.get("/tmp", PrestoNativeQueryRunnerUtils.class.getSimpleName());
+ Files.createDirectories(dir);
+ Path tempDirectoryPath = Files.createTempDirectory(dir, "worker");
+ log.info("Temp directory for Worker #%d: %s", workerIndex, tempDirectoryPath.toString());
+
+ // Write config file - use an ephemeral port for the worker.
+ String configProperties = format("discovery.uri=%s%n" +
+ "presto.version=testversion%n" +
+ "system-memory-gb=4%n" +
+ "http-server.http.port=0%n", discoveryUri);
+
+ Files.write(tempDirectoryPath.resolve("config.properties"), configProperties.getBytes());
+ Files.write(tempDirectoryPath.resolve("node.properties"),
+ format("node.id=%s%n" +
+ "node.internal-address=127.0.0.1%n" +
+ "node.environment=testing%n" +
+ "node.location=test-location", UUID.randomUUID()).getBytes());
+
+ Path catalogDirectoryPath = tempDirectoryPath.resolve("catalog");
+ Files.createDirectory(catalogDirectoryPath);
+ Files.write(catalogDirectoryPath.resolve(format("%s.properties", catalogName)),
+ "connector.name=clp".getBytes());
+
+ // Disable stack trace capturing as some queries (using TRY) generate a lot of exceptions.
+ return new ProcessBuilder(prestoServerPath, "--logtostderr=1", "--v=1", "--velox_ssd_odirect=false")
+ .directory(tempDirectoryPath.toFile())
+ .redirectErrorStream(true)
+ .redirectOutput(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("worker." + workerIndex + ".out").toFile()))
+ .redirectError(ProcessBuilder.Redirect.to(tempDirectoryPath.resolve("worker." + workerIndex + ".out").toFile()))
+ .start();
+ }
+ catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
public static Optional> getExternalWorkerLauncher(
String catalogName,
String prestoServerPath,
diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeClpGeneralQueries.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeClpGeneralQueries.java
new file mode 100644
index 0000000000000..e7fe5da57bbfe
--- /dev/null
+++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoNativeClpGeneralQueries.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.facebook.presto.nativeworker;
+
+import com.facebook.airlift.log.Logger;
+import com.facebook.presto.plugin.clp.mockdb.ClpMockMetadataDatabase;
+import com.facebook.presto.plugin.clp.mockdb.table.ArchivesTableRows;
+import com.facebook.presto.plugin.clp.mockdb.table.ColumnMetadataTableRows;
+import com.facebook.presto.testing.QueryRunner;
+import com.facebook.presto.tests.AbstractTestQueryFramework;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.Test;
+
+import java.net.URL;
+import java.nio.file.Paths;
+
+import static com.facebook.presto.nativeworker.PrestoNativeQueryRunnerUtils.getNativeQueryRunnerParameters;
+import static com.facebook.presto.plugin.clp.ClpQueryRunner.DEFAULT_NUM_OF_WORKERS;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Boolean;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.ClpString;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.DateString;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.Integer;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.NullValue;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.UnstructuredArray;
+import static com.facebook.presto.plugin.clp.metadata.ClpSchemaTreeNodeType.VarString;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+public class TestPrestoNativeClpGeneralQueries
+ extends AbstractTestQueryFramework
+{
+ private static final Logger log = Logger.get(TestPrestoNativeClpGeneralQueries.class);
+ private static final String DEFAULT_TABLE_NAME = "test_e2e";
+ private ClpMockMetadataDatabase mockMetadataDatabase;
+
+ @Override
+ protected QueryRunner createQueryRunner()
+ throws Exception
+ {
+ URL resource = requireNonNull(
+ getClass().getClassLoader().getResource("clp-archives"),
+ "Test resource 'clp-archives' not found on classpath");
+ String archiveStorageDirectory = format("%s/", Paths.get(resource.toURI()));
+ mockMetadataDatabase = ClpMockMetadataDatabase.builder().setArchiveStorageDirectory(archiveStorageDirectory).build();
+ return PrestoNativeQueryRunnerUtils.createNativeClpQueryRunner(
+ mockMetadataDatabase.getUrl(),
+ mockMetadataDatabase.getUsername(),
+ mockMetadataDatabase.getPassword(),
+ mockMetadataDatabase.getTablePrefix());
+ }
+
+ @Override
+ protected void createTables()
+ {
+ mockMetadataDatabase.addTableToDatasetsTableIfNotExist(ImmutableList.of(DEFAULT_TABLE_NAME));
+ mockMetadataDatabase.addColumnMetadata(ImmutableMap.of(DEFAULT_TABLE_NAME, new ColumnMetadataTableRows(
+ ImmutableList.of(
+ "id",
+ "msg",
+ "msg",
+ "attr.command.q._id.uid.dollar_sign_binary.sub_type",
+ "attr.existing",
+ "tags",
+ "attr.obj.md.indexes",
+ "attr.build_u_u_i_d",
+ "t.dollar_sign_date"),
+ ImmutableList.of(
+ Integer,
+ ClpString,
+ VarString,
+ VarString,
+ Boolean,
+ UnstructuredArray,
+ UnstructuredArray,
+ NullValue,
+ DateString))));
+ mockMetadataDatabase.addSplits(ImmutableMap.of(DEFAULT_TABLE_NAME, new ArchivesTableRows(
+ ImmutableList.of("mongodb-processed-single-file-archive"),
+ ImmutableList.of(1679441694576L),
+ ImmutableList.of(1679442346492L))));
+ }
+
+ @Test
+ public void test()
+ {
+ QueryRunner queryRunner = getQueryRunner();
+ assertEquals(queryRunner.getNodeCount(), getNativeQueryRunnerParameters().workerCount.orElse(DEFAULT_NUM_OF_WORKERS) + 1);
+ assertTrue(queryRunner.tableExists(getSession(), DEFAULT_TABLE_NAME));
+
+ // H2QueryRunner currently can't change the timestamp format, and the default timestamp
+ // format of Presto is different, so for now we have to manually format the timestamp
+ // field.
+ assertQuery(
+ format("SELECT" +
+ " msg," +
+ " format_datetime(t.dollar_sign_date, 'yyyy-MM-dd HH:mm:ss.SSS')," +
+ " id," +
+ " attr," +
+ " tags" +
+ " FROM %s" +
+ " ORDER BY t.dollar_sign_date" +
+ " LIMIT 1", DEFAULT_TABLE_NAME),
+ "SELECT" +
+ " 'Initialized wire specification'," +
+ " TIMESTAMP '2023-03-22 12:34:54.576'," +
+ " 4915701," +
+ " ARRAY[" +
+ " NULL," +
+ " ARRAY[ARRAY[ARRAY[ARRAY[ARRAY[NULL]]]]]," +
+ " NULL," +
+ " ARRAY[ARRAY[NULL]]" +
+ " ]," +
+ " NULL");
+ }
+
+ @AfterTest
+ public void teardown()
+ {
+ if (null != mockMetadataDatabase) {
+ mockMetadataDatabase.teardown();
+ }
+ }
+}
diff --git a/presto-native-execution/src/test/resources/clp-archives/test_e2e/mongodb-processed-single-file-archive b/presto-native-execution/src/test/resources/clp-archives/test_e2e/mongodb-processed-single-file-archive
new file mode 100644
index 0000000000000..764cca0939ce6
Binary files /dev/null and b/presto-native-execution/src/test/resources/clp-archives/test_e2e/mongodb-processed-single-file-archive differ