diff --git a/presto-clp/pom.xml b/presto-clp/pom.xml index d13b59968aff5..8941da517cecb 100644 --- a/presto-clp/pom.xml +++ b/presto-clp/pom.xml @@ -75,6 +75,11 @@ jackson-databind + + com.fasterxml.jackson.dataformat + jackson-dataformat-yaml + + com.facebook.presto presto-spi @@ -154,4 +159,14 @@ test + + + + + org.yaml + snakeyaml + 2.1 + + + diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java index 121eb0d5ff17b..329d5451c71a5 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpConfig.java @@ -33,6 +33,8 @@ public class ClpConfig private long metadataRefreshInterval = 60; private long metadataExpireInterval = 600; + private String metadataYamlPath; + private String splitFilterConfig; private SplitFilterProviderType splitFilterProviderType = SplitFilterProviderType.MYSQL; private SplitProviderType splitProviderType = SplitProviderType.MYSQL; @@ -151,6 +153,18 @@ public ClpConfig setMetadataExpireInterval(long metadataExpireInterval) return this; } + public String getMetadataYamlPath() + { + return metadataYamlPath; + } + + @Config("clp.metadata-yaml-path") + public ClpConfig setMetadataYamlPath(String metadataYamlPath) + { + this.metadataYamlPath = metadataYamlPath; + return this; + } + public String getSplitFilterConfig() { return splitFilterConfig; @@ -189,7 +203,8 @@ public ClpConfig setSplitProviderType(SplitProviderType splitProviderType) public enum MetadataProviderType { - MYSQL + MYSQL, + YAML } public enum SplitFilterProviderType @@ -199,6 +214,7 @@ public enum SplitFilterProviderType public enum SplitProviderType { - MYSQL + MYSQL, + PINOT } } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java index 2530c013455cc..fb6626de25a61 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpErrorCode.java @@ -28,6 +28,7 @@ public enum ClpErrorCode CLP_UNSUPPORTED_SPLIT_SOURCE(2, EXTERNAL), CLP_UNSUPPORTED_TYPE(3, EXTERNAL), CLP_UNSUPPORTED_CONFIG_OPTION(4, EXTERNAL), + CLP_UNSUPPORTED_TABLE_SCHEMA_YAML(5, EXTERNAL), CLP_SPLIT_FILTER_CONFIG_NOT_FOUND(10, USER_ERROR), CLP_MANDATORY_SPLIT_FILTER_NOT_VALID(11, USER_ERROR), diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java index bf801d0d87242..6d979ca0bceb8 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/ClpModule.java @@ -16,7 +16,9 @@ import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; import com.facebook.presto.plugin.clp.metadata.ClpMySqlMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider; import com.facebook.presto.plugin.clp.split.ClpMySqlSplitProvider; +import com.facebook.presto.plugin.clp.split.ClpPinotSplitProvider; import com.facebook.presto.plugin.clp.split.ClpSplitProvider; import com.facebook.presto.plugin.clp.split.filter.ClpMySqlSplitFilterProvider; import com.facebook.presto.plugin.clp.split.filter.ClpSplitFilterProvider; @@ -56,6 +58,9 @@ protected void setup(Binder binder) if (config.getMetadataProviderType() == MetadataProviderType.MYSQL) { binder.bind(ClpMetadataProvider.class).to(ClpMySqlMetadataProvider.class).in(Scopes.SINGLETON); } + else if (config.getMetadataProviderType() == MetadataProviderType.YAML) { + binder.bind(ClpMetadataProvider.class).to(ClpYamlMetadataProvider.class).in(Scopes.SINGLETON); + } else { throw new PrestoException(CLP_UNSUPPORTED_METADATA_SOURCE, "Unsupported metadata provider type: " + config.getMetadataProviderType()); } @@ -63,6 +68,9 @@ protected void setup(Binder binder) if (config.getSplitProviderType() == SplitProviderType.MYSQL) { binder.bind(ClpSplitProvider.class).to(ClpMySqlSplitProvider.class).in(Scopes.SINGLETON); } + else if (config.getSplitProviderType() == SplitProviderType.PINOT) { + binder.bind(ClpSplitProvider.class).to(ClpPinotSplitProvider.class).in(Scopes.SINGLETON); + } else { throw new PrestoException(CLP_UNSUPPORTED_SPLIT_SOURCE, "Unsupported split provider type: " + config.getSplitProviderType()); } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpYamlMetadataProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpYamlMetadataProvider.java new file mode 100644 index 0000000000000..0bbb0b4c9ea79 --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/metadata/ClpYamlMetadataProvider.java @@ -0,0 +1,140 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.metadata; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpColumnHandle; +import com.facebook.presto.plugin.clp.ClpConfig; +import com.facebook.presto.plugin.clp.ClpTableHandle; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.SchemaTableName; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import javax.inject.Inject; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static com.facebook.presto.plugin.clp.ClpConnectorFactory.CONNECTOR_NAME; +import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_UNSUPPORTED_TABLE_SCHEMA_YAML; +import static java.lang.String.format; + +public class ClpYamlMetadataProvider + implements ClpMetadataProvider +{ + private static final Logger log = Logger.get(ClpYamlMetadataProvider.class); + private final ClpConfig config; + private Map tableSchemaYamlMap; + + @Inject + public ClpYamlMetadataProvider(ClpConfig config) + { + this.config = config; + } + + @Override + public List listColumnHandles(SchemaTableName schemaTableName) + { + Path tableSchemaPath = Paths.get(tableSchemaYamlMap.get(schemaTableName)); + ClpSchemaTree schemaTree = new ClpSchemaTree(config.isPolymorphicTypeEnabled()); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + + try { + Map root = mapper.readValue( + new File(tableSchemaPath.toString()), + new TypeReference>() {}); + ImmutableList.Builder namesBuilder = ImmutableList.builder(); + ImmutableList.Builder typesBuilder = ImmutableList.builder(); + collectTypes(root, "", namesBuilder, typesBuilder); + ImmutableList names = namesBuilder.build(); + ImmutableList types = typesBuilder.build(); + // The names and types should have same sizes + for (int i = 0; i < names.size(); i++) { + schemaTree.addColumn(names.get(i), types.get(i)); + } + return schemaTree.collectColumnHandles(); + } + catch (IOException e) { + log.error(format("Failed to parse table schema file %s, error: %s", tableSchemaPath, e.getMessage()), e); + } + return Collections.emptyList(); + } + + @Override + public List listTableHandles(String schemaName) + { + Path tablesSchemaPath = Paths.get(config.getMetadataYamlPath()); + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + + try { + Map root = mapper.readValue(new File(tablesSchemaPath.toString()), + new TypeReference>() {}); + + Object catalogObj = root.get(CONNECTOR_NAME); + if (!(catalogObj instanceof Map)) { + throw new PrestoException(CLP_UNSUPPORTED_TABLE_SCHEMA_YAML, format("The table schema does not contain field: %s", CONNECTOR_NAME)); + } + Object schemaObj = ((Map) catalogObj).get(schemaName); + ImmutableList.Builder tableHandlesBuilder = new ImmutableList.Builder<>(); + ImmutableMap.Builder tableSchemaYamlMapBuilder = new ImmutableMap.Builder<>(); + for (Map.Entry schemaEntry : ((Map) schemaObj).entrySet()) { + String tableName = schemaEntry.getKey(); + String tableSchemaYamlPath = schemaEntry.getValue().toString(); + // The splits' absolute paths will be stored in Pinot metadata database + SchemaTableName schemaTableName = new SchemaTableName(schemaName, tableName); + tableHandlesBuilder.add(new ClpTableHandle(schemaTableName, "")); + tableSchemaYamlMapBuilder.put(schemaTableName, tableSchemaYamlPath); + } + this.tableSchemaYamlMap = tableSchemaYamlMapBuilder.build(); + return tableHandlesBuilder.build(); + } + catch (IOException e) { + log.error(format("Failed to parse metadata file: %s, error: %s", config.getMetadataYamlPath(), e.getMessage()), e); + } + return Collections.emptyList(); + } + + private void collectTypes(Object node, String prefix, ImmutableList.Builder namesBuilder, ImmutableList.Builder typesBuilder) + { + if (node instanceof Number) { + namesBuilder.add(prefix); + typesBuilder.add(((Number) node).byteValue()); + return; + } + if (node instanceof List) { + for (Number type : (List) node) { + namesBuilder.add(prefix); + typesBuilder.add(type.byteValue()); + } + return; + } + for (Map.Entry entry : ((Map) node).entrySet()) { + if (!prefix.isEmpty()) { + collectTypes(entry.getValue(), format("%s.%s", prefix, entry.getKey()), namesBuilder, typesBuilder); + continue; + } + collectTypes(entry.getValue(), entry.getKey(), namesBuilder, typesBuilder); + } + } +} diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java index f99369b4a48bd..b60f63ff2ffe1 100644 --- a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/optimization/ClpFilterToKqlConverter.java @@ -57,6 +57,8 @@ import static com.facebook.presto.common.type.IntegerType.INTEGER; import static com.facebook.presto.common.type.RealType.REAL; import static com.facebook.presto.common.type.SmallintType.SMALLINT; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP; +import static com.facebook.presto.common.type.TimestampType.TIMESTAMP_MICROSECONDS; import static com.facebook.presto.common.type.TinyintType.TINYINT; import static com.facebook.presto.plugin.clp.ClpErrorCode.CLP_PUSHDOWN_UNSUPPORTED_EXPRESSION; import static com.facebook.presto.spi.relation.SpecialFormExpression.Form.AND; @@ -914,6 +916,8 @@ public static boolean isClpCompatibleNumericType(Type type) || type.equals(TINYINT) || type.equals(DOUBLE) || type.equals(REAL) + || type.equals(TIMESTAMP) + || type.equals(TIMESTAMP_MICROSECONDS) || type instanceof DecimalType; } diff --git a/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpPinotSplitProvider.java b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpPinotSplitProvider.java new file mode 100644 index 0000000000000..45b471b96773e --- /dev/null +++ b/presto-clp/src/main/java/com/facebook/presto/plugin/clp/split/ClpPinotSplitProvider.java @@ -0,0 +1,114 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp.split; + +import com.facebook.airlift.log.Logger; +import com.facebook.presto.plugin.clp.ClpConfig; +import com.facebook.presto.plugin.clp.ClpSplit; +import com.facebook.presto.plugin.clp.ClpTableHandle; +import com.facebook.presto.plugin.clp.ClpTableLayoutHandle; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.collect.ImmutableList; + +import javax.inject.Inject; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +import static com.facebook.presto.plugin.clp.ClpSplit.SplitType; +import static com.facebook.presto.plugin.clp.ClpSplit.SplitType.ARCHIVE; +import static com.facebook.presto.plugin.clp.ClpSplit.SplitType.IR; +import static java.lang.String.format; +import static java.util.concurrent.TimeUnit.SECONDS; + +public class ClpPinotSplitProvider + implements ClpSplitProvider +{ + private static final Logger log = Logger.get(ClpPinotSplitProvider.class); + private static final String SQL_SELECT_SPLITS_TEMPLATE = "SELECT tpath FROM %s WHERE 1 = 1 AND (%s) LIMIT 999999"; + private final ClpConfig config; + + @Inject + public ClpPinotSplitProvider(ClpConfig config) + { + this.config = config; + } + + @Override + public List listSplits(ClpTableLayoutHandle clpTableLayoutHandle) + { + ImmutableList.Builder splits = new ImmutableList.Builder<>(); + ClpTableHandle clpTableHandle = clpTableLayoutHandle.getTable(); + String tableName = clpTableHandle.getSchemaTableName().getTableName(); + try { + URL url = new URL(config.getMetadataDbUrl() + "/query/sql"); + HttpURLConnection conn = (HttpURLConnection) url.openConnection(); + conn.setRequestMethod("POST"); + conn.setRequestProperty("Content-Type", "application/json"); + conn.setRequestProperty("Accept", "application/json"); + conn.setDoOutput(true); + conn.setConnectTimeout((int) SECONDS.toMillis(5)); + conn.setReadTimeout((int) SECONDS.toMillis(30)); + + String query = format(SQL_SELECT_SPLITS_TEMPLATE, tableName, clpTableLayoutHandle.getMetadataSql().orElse("1 = 1")); + log.info("Pinot query: %s", query); + ObjectMapper mapper = new ObjectMapper(); + String body = format("{\"sql\": %s }", mapper.writeValueAsString(query)); + try (OutputStream os = conn.getOutputStream()) { + os.write(body.getBytes(StandardCharsets.UTF_8)); + } + + int code = conn.getResponseCode(); + InputStream is = (code >= 200 && code < 300) ? conn.getInputStream() : conn.getErrorStream(); + if (is == null) { + throw new IOException("Pinot HTTP " + code + " with empty body"); + } + + JsonNode root; + try (InputStream in = is) { + root = mapper.readTree(in); + } + JsonNode resultTable = root.get("resultTable"); + if (resultTable == null) { + throw new RuntimeException("No \"resultTable\" field found"); + } + JsonNode rows = resultTable.get("rows"); + if (rows == null) { + throw new RuntimeException("No \"rows\" field found"); + } + for (Iterator it = rows.elements(); it.hasNext(); ) { + JsonNode row = it.next(); + String splitPath = row.elements().next().asText(); + SplitType splitType = splitPath.endsWith(".clp.zst") ? IR : ARCHIVE; + splits.add(new ClpSplit(splitPath, splitType, clpTableLayoutHandle.getKqlQuery())); + } + List filteredSplits = splits.build(); + log.debug("Number of filtered splits: %s", filteredSplits.size()); + return filteredSplits; + } + catch (Exception e) { + log.error(e); + } + + return Collections.emptyList(); + } +} diff --git a/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpYamlMetadata.java b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpYamlMetadata.java new file mode 100644 index 0000000000000..e5a45d05692ae --- /dev/null +++ b/presto-clp/src/test/java/com/facebook/presto/plugin/clp/TestClpYamlMetadata.java @@ -0,0 +1,129 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.plugin.clp; + +import com.facebook.presto.plugin.clp.metadata.ClpMetadataProvider; +import com.facebook.presto.plugin.clp.metadata.ClpYamlMetadataProvider; +import com.facebook.presto.plugin.clp.split.ClpPinotSplitProvider; +import com.facebook.presto.plugin.clp.split.ClpSplitProvider; +import com.facebook.presto.spi.ColumnMetadata; +import com.facebook.presto.spi.ConnectorTableMetadata; +import com.facebook.presto.spi.SchemaTableName; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + +import java.util.HashSet; +import java.util.List; +import java.util.Optional; + +import static com.facebook.presto.plugin.clp.ClpConfig.MetadataProviderType.YAML; +import static com.facebook.presto.plugin.clp.ClpMetadata.DEFAULT_SCHEMA_NAME; +import static com.facebook.presto.testing.TestingConnectorSession.SESSION; +import static org.testng.Assert.assertEquals; + +public class TestClpYamlMetadata +{ + private static final String PINOT_BROKER_URL = "http://localhost:8099"; + private static final String METADATA_YAML_PATH = "/home/xiaochong-dev/presto-e2e/pinot/tables-schema.yaml"; + private static final String TABLE_NAME = "cockroachdb"; + private ClpMetadata metadata; + private ClpSplitProvider clpSplitProvider; + + @BeforeTest + public void setUp() + { + ClpConfig config = new ClpConfig() + .setPolymorphicTypeEnabled(true) + .setMetadataDbUrl(PINOT_BROKER_URL) + .setMetadataProviderType(YAML) + .setMetadataYamlPath(METADATA_YAML_PATH); + ClpMetadataProvider metadataProvider = new ClpYamlMetadataProvider(config); + metadata = new ClpMetadata(config, metadataProvider); + clpSplitProvider = new ClpPinotSplitProvider(config); + } + + @Test + public void testListSchemaNames() + { + assertEquals(metadata.listSchemaNames(SESSION), ImmutableList.of(DEFAULT_SCHEMA_NAME)); + } + + @Test + public void testListTables() + { + ImmutableSet.Builder builder = ImmutableSet.builder(); + builder.add(new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME)); + assertEquals(new HashSet<>(metadata.listTables(SESSION, Optional.empty())), builder.build()); + } + + @Test + public void testListSplits() + { + ClpTableLayoutHandle layoutHandle = new ClpTableLayoutHandle( + new ClpTableHandle(new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME), ""), + Optional.empty(), + Optional.empty()); + List result = clpSplitProvider.listSplits(layoutHandle); + System.out.println("Hello world"); + } + + @Test + public void testGetTableMetadata() + { + ClpTableHandle clpTableHandle = (ClpTableHandle) metadata.getTableHandle(SESSION, new SchemaTableName(DEFAULT_SCHEMA_NAME, TABLE_NAME)); + ConnectorTableMetadata tableMetadata = metadata.getTableMetadata(SESSION, clpTableHandle); +// ImmutableSet columnMetadata = ImmutableSet.builder() +// .add(ColumnMetadata.builder() +// .setName("a_bigint") +// .setType(BIGINT) +// .setNullable(true) +// .build()) +// .add(ColumnMetadata.builder() +// .setName("a_varchar") +// .setType(VARCHAR) +// .setNullable(true) +// .build()) +// .add(ColumnMetadata.builder() +// .setName("b_double") +// .setType(DOUBLE) +// .setNullable(true) +// .build()) +// .add(ColumnMetadata.builder() +// .setName("b_varchar") +// .setType(VARCHAR) +// .setNullable(true) +// .build()) +// .add(ColumnMetadata.builder() +// .setName("c") +// .setType(RowType.from(ImmutableList.of( +// RowType.field("d", BOOLEAN), +// RowType.field("e", VARCHAR)))) +// .setNullable(true) +// .build()) +// .add(ColumnMetadata.builder() +// .setName("f") +// .setType(RowType.from(ImmutableList.of( +// RowType.field("g", +// RowType.from(ImmutableList.of( +// RowType.field("h", new ArrayType(VARCHAR)))))))) +// .setNullable(true) +// .build()) +// .build(); +// assertEquals(columnMetadata, ImmutableSet.copyOf(tableMetadata.getColumns())); + ImmutableSet actual = ImmutableSet.copyOf(tableMetadata.getColumns()); + System.out.println("Hello world"); + } +} diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml index 200ffa6834afc..adbd01ab4b917 100644 --- a/presto-native-execution/pom.xml +++ b/presto-native-execution/pom.xml @@ -267,6 +267,16 @@ + + + + org.yaml + snakeyaml + 2.1 + + + +