diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 3700163b2999..5df3841d2363 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,3 +1,4 @@ { - "https://github.com/apache/beam/pull/36890": "fixing some null errors" + "comment": "Modify this file in a trivial way to cause this test suite to run ", + "modification": 3 } diff --git a/.github/trigger_files/beam_PreCommit_SQL.json b/.github/trigger_files/beam_PreCommit_SQL.json index 5abe02fc09c7..ab4daeae2349 100644 --- a/.github/trigger_files/beam_PreCommit_SQL.json +++ b/.github/trigger_files/beam_PreCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 1 + "modification": 3 } diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java index 0ca38824204b..7dee72511e85 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog; @@ -71,6 +72,11 @@ public boolean createDatabase(String database) { return catalogConfig.createNamespace(database); } + @Override + public Collection databases() { + return catalogConfig.listNamespaces(); + } + @Override public void useDatabase(String database) { checkArgument(databaseExists(database), "Database '%s' does not exist."); diff --git a/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java new file mode 100644 index 000000000000..0b593a1b2cfb --- /dev/null +++ b/sdks/java/extensions/sql/jdbc/src/test/java/org/apache/beam/sdk/extensions/sql/jdbc/BeamSqlLineShowTest.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.jdbc; + +import static org.apache.beam.sdk.extensions.sql.jdbc.BeamSqlLineTestingUtils.buildArgs; +import static org.hamcrest.CoreMatchers.everyItem; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import org.junit.Test; + +public class BeamSqlLineShowTest { + @Test + public void testShowTables() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE other_db", + "CREATE EXTERNAL TABLE other_db.should_not_show_up (id int, name varchar) TYPE 'text'", + "CREATE CATALOG my_catalog TYPE 'local'", + "CREATE DATABASE my_catalog.my_db", + "USE DATABASE my_catalog.my_db", + "CREATE EXTERNAL TABLE my_table (id int, name varchar) TYPE 'text'", + "CREATE EXTERNAL TABLE my_other_table (col1 int, col2 timestamp) TYPE 'text'", + "CREATE EXTERNAL TABLE my_other_table_with_a_long_name (foo varchar, bar boolean) TYPE 'test'", + "SHOW TABLES"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + System.out.println(byteArrayOutputStream.toString("UTF-8")); + assertThat( + Arrays.asList( + "+------+------+", + "| NAME | TYPE |", + "+------+------+", + "| my_other_table | text |", + "| my_other_table_with_a_long_name | test |", + "| my_table | text |", + "+------+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowTablesInOtherDatabase() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE my_db", + "USE DATABASE my_db", + "CREATE EXTERNAL TABLE should_not_show_up (id int, name varchar) TYPE 'text'", + "CREATE CATALOG other_catalog TYPE 'local'", + "CREATE DATABASE other_catalog.other_db", + "CREATE EXTERNAL TABLE other_catalog.other_db.other_table (id int, name varchar) TYPE 'text'", + "SHOW TABLES IN other_catalog.other_db"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList( + "+------+------+", + "| NAME | TYPE |", + "+------+------+", + "| other_table | text |", + "+------+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowTablesWithPattern() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE my_db", + "USE DATABASE my_db", + "CREATE EXTERNAL TABLE my_table (id int, name varchar) TYPE 'text'", + "CREATE EXTERNAL TABLE my_table_2 (id int, name varchar) TYPE 'text'", + "CREATE EXTERNAL TABLE my_foo_table_1 (id int, name varchar) TYPE 'text'", + "CREATE EXTERNAL TABLE my_foo_table_2 (id int, name varchar) TYPE 'text'", + "SHOW TABLES LIKE '%foo%'"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList( + "+------+------+", + "| NAME | TYPE |", + "+------+------+", + "| my_foo_table_1 | text |", + "| my_foo_table_2 | text |", + "+------+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowCurrentDatabase() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE should_not_show_up", + "CREATE CATALOG my_catalog TYPE 'local'", + "USE CATALOG my_catalog", + "CREATE DATABASE my_db", + "CREATE DATABASE my_other_db", + "CREATE DATABASE my_database_that_has_a_very_long_name", + "USE DATABASE my_other_db", + "SHOW CURRENT database"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList("+------+", "| NAME |", "+------+", "| my_other_db |", "+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowCurrentDatabaseWithNoneSet() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE should_not_show_up", + "CREATE CATALOG my_catalog TYPE 'local'", + "USE CATALOG my_catalog", + "DROP DATABASE `default`", + "SHOW CURRENT DATABASE"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList("+------+", "| NAME |", "+------+", "+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowDatabases() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE should_not_show_up", + "CREATE CATALOG my_catalog TYPE 'local'", + "USE CATALOG my_catalog", + "CREATE DATABASE my_db", + "CREATE DATABASE my_other_db", + "CREATE DATABASE my_database_that_has_a_very_long_name", + "SHOW DATABASES"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + System.out.println(byteArrayOutputStream.toString("UTF-8")); + assertThat( + Arrays.asList( + "+------+", + "| NAME |", + "+------+", + "| default |", + "| my_database_that_has_a_very_long_name |", + "| my_db |", + "| my_other_db |", + "+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowDatabasesInOtherCatalog() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE DATABASE should_not_show_up", + "CREATE CATALOG my_catalog TYPE 'local'", + "USE CATALOG my_catalog", + "CREATE DATABASE my_db", + "CREATE CATALOG my_other_catalog TYPE 'local'", + "CREATE DATABASE my_other_catalog.other_db", + "SHOW DATABASES FROM my_other_catalog"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList( + "+------+", "| NAME |", "+------+", "| default |", "| other_db |", "+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowDatabasesWithPattern() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE CATALOG my_catalog TYPE 'local'", + "CREATE DATABASE my_catalog.my_db", + "CREATE DATABASE my_catalog.other_db", + "CREATE DATABASE my_catalog.some_foo_db", + "CREATE DATABASE my_catalog.some_other_foo_db", + "SHOW DATABASES FROM my_catalog LIKE '%foo%'"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList( + "+------+", + "| NAME |", + "+------+", + "| some_foo_db |", + "| some_other_foo_db |", + "+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowCurrentCatalog() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE CATALOG my_catalog TYPE 'local'", + "CREATE CATALOG my_very_long_catalog_name TYPE 'local'", + "SHOW CURRENT CATALOG"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList( + "+------+------+", + "| NAME | TYPE |", + "+------+------+", + "| default | local |", + "+------+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowCatalogs() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE CATALOG my_catalog TYPE 'local'", + "CREATE CATALOG my_very_long_catalog_name TYPE 'local'", + "SHOW CATALOGS"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + System.out.println(byteArrayOutputStream.toString("UTF-8")); + assertThat( + Arrays.asList( + "+------+------+", + "| NAME | TYPE |", + "+------+------+", + "| default | local |", + "| my_catalog | local |", + "| my_very_long_catalog_name | local |", + "+------+------+"), + everyItem(is(oneOf(lines.toArray())))); + } + + @Test + public void testShowCatalogsWithPattern() throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + String[] args = + buildArgs( + "CREATE CATALOG my_catalog TYPE 'local'", + "CREATE CATALOG my_catalog_2 TYPE 'local'", + "CREATE CATALOG my_very_long_catalog_name TYPE 'local'", + "SHOW CATALOGS LIKE 'my_catalog%'"); + + BeamSqlLine.runSqlLine(args, null, byteArrayOutputStream, null); + + List lines = Arrays.asList(byteArrayOutputStream.toString("UTF-8").split("\n")); + assertThat( + Arrays.asList( + "+------+------+", + "| NAME | TYPE |", + "+------+------+", + "| my_catalog | local |", + "| my_catalog_2 | local |", + "+------+------+"), + everyItem(is(oneOf(lines.toArray())))); + } +} diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp index 77772c5858e3..73af7e18150b 100644 --- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -50,6 +50,9 @@ data: { "TBLPROPERTIES" "PROPERTIES" "PARTITIONED" + "CATALOGS" + "DATABASES" + "TABLES" "USE" ] @@ -422,6 +425,10 @@ data: { # Return type of method implementation should be 'SqlNode'. # Example: SqlShowDatabases(), SqlShowTables(). statementParserMethods: [ + "SqlShowTables(Span.of())" + "SqlShowDatabases(Span.of())" + "SqlShowCatalogs(Span.of())" + "SqlShowCurrent(Span.of())" "SqlUseCatalog(Span.of(), null)" "SqlUseDatabase(Span.of(), null)" "SqlSetOptionBeam(Span.of(), null)" diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index 46102c7b92fe..d3bb8c2af56c 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -264,6 +264,47 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) : } } +/** + * SHOW CATALOGS [ LIKE regex_pattern ] + */ +SqlCall SqlShowCatalogs(Span s) : +{ + SqlNode regex = null; +} +{ + { s.add(this); } + [ regex = StringLiteral() ] + { + List path = new ArrayList(); + path.add("beamsystem"); + path.add("catalogs"); + SqlNodeList selectList = SqlNodeList.of(SqlIdentifier.star(s.end(this))); + SqlIdentifier from = new SqlIdentifier(path, s.end(this)); + SqlNode where = null; + if (regex != null) { + SqlIdentifier nameIdentifier = new SqlIdentifier("NAME", s.end(this)); + where = SqlStdOperatorTable.LIKE.createCall( + s.end(this), + nameIdentifier, regex); + } + + return new SqlSelect( + s.end(this), + null, + selectList, + from, + where, + null, + null, + null, + null, + null, + null, + null); + } +} + + /** * CREATE DATABASE ( IF NOT EXISTS )? ( catalog_name '.' )? database_name */ @@ -331,6 +372,98 @@ SqlDrop SqlDropDatabase(Span s, boolean replace) : } } +/** + * SHOW DATABASES [ ( FROM | IN )? catalog_name ] [LIKE regex_pattern ] + */ +SqlCall SqlShowDatabases(Span s) : +{ + SqlIdentifier catalogName = null; + SqlNode regex = null; +} +{ + { s.add(this); } + [ ( | ) catalogName = SimpleIdentifier() ] + [ regex = StringLiteral() ] + { + List path = new ArrayList(); + path.add("beamsystem"); + path.add("databases"); + SqlNodeList selectList = SqlNodeList.of(SqlIdentifier.star(s.end(this))); + SqlNode where = null; + if (regex != null) { + SqlIdentifier nameIdentifier = new SqlIdentifier("NAME", s.end(this)); + where = SqlStdOperatorTable.LIKE.createCall( + s.end(this), + nameIdentifier, regex); + } + if (catalogName != null) { + path.add(catalogName.getSimple()); + } else { + path.add("__current_catalog__"); + } + SqlIdentifier from = new SqlIdentifier(path, s.end(this)); + + return new SqlSelect( + s.end(this), + null, + selectList, + from, + where, + null, + null, + null, + null, + null, + null, + null); + } +} + +/** + * SHOW CURRENT ( CATALOG | DATABASE ) + */ +SqlCall SqlShowCurrent(Span s) : +{ +} +{ + { s.add(this); } + { + List path = new ArrayList(); + path.add("beamsystem"); + } + ( + { + path.add("__current_catalog__"); + } + | + { + path.add("__current_database__"); + } + ) + { + if (path.size() != 2) { + throw new ParseException( + "Expected SHOW CURRENT CATALOG or SHOW CURRENT DATABASE"); + } + SqlNodeList selectList = SqlNodeList.of(SqlIdentifier.star(s.end(this))); + SqlIdentifier from = new SqlIdentifier(path, s.end(this)); + + return new SqlSelect( + s.end(this), + null, + selectList, + from, + null, + null, + null, + null, + null, + null, + null, + null); + } +} + SqlNodeList PartitionFieldList() : { @@ -456,6 +589,64 @@ SqlDrop SqlDropTable(Span s, boolean replace) : } } +/** + * SHOW TABLES [ ( FROM | IN )? [ catalog_name '.' ] database_name ] [ LIKE regex_pattern ] + */ +SqlCall SqlShowTables(Span s) : +{ + SqlIdentifier databaseCatalog = null; + SqlNode regex = null; +} +{ + { s.add(this); } + [ ( | ) databaseCatalog = CompoundIdentifier() ] + [ regex = StringLiteral() ] + { + List path = new ArrayList(); + path.add("beamsystem"); + path.add("tables"); + SqlNodeList selectList = SqlNodeList.of(SqlIdentifier.star(s.end(this))); + SqlNode where = null; + if (regex != null) { + SqlIdentifier nameIdentifier = new SqlIdentifier("NAME", s.end(this)); + where = SqlStdOperatorTable.LIKE.createCall( + s.end(this), + nameIdentifier, regex); + } + if (databaseCatalog != null) { + List components = databaseCatalog.names; + if (components.size() == 1) { + path.add("__current_catalog__"); + path.add(components.get(0)); + } else if (components.size() == 2) { + path.addAll(components); + } else { + throw new ParseException( + "SHOW TABLES FROM/IN accepts at most a catalog name and a database name."); + } + } else { + path.add("__current_catalog__"); + path.add("__current_database__"); + } + SqlIdentifier from = new SqlIdentifier(path, s.end(this)); + + return new SqlSelect( + s.end(this), + null, + selectList, + from, + where, + null, + null, + null, + null, + null, + null, + null); + } +} + + Schema.FieldType FieldType() : { final SqlTypeName collectionTypeName; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java index 6ef6e82e6a70..f7783e7c3eca 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamCalciteSchema.java @@ -129,6 +129,10 @@ public Set getTypeNames() { connection.getPipelineOptions()); } + public Collection getTables() { + return tableProvider.getTables().values(); + } + @Override public Set getFunctionNames() { return Collections.emptySet(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java new file mode 100644 index 000000000000..66c05a35313e --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemDbMetadataSchema.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.sql.meta.SystemTables; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A Calcite {@link Schema} responsible for {@code SHOW DATABASES} requests. */ +public class BeamSystemDbMetadataSchema implements Schema { + private final CatalogManager catalogManager; + + BeamSystemDbMetadataSchema(CatalogManager catalogManager) { + this.catalogManager = catalogManager; + } + + @Override + public @Nullable Table getTable(String catalogName) { + Catalog catalog; + if (catalogName.equals("__current_catalog__")) { + catalog = catalogManager.currentCatalog(); + } else { + catalog = + checkArgumentNotNull( + catalogManager.getCatalog(catalogName), "Catalog '%s' does not exist.", catalogName); + } + + return BeamCalciteTable.of(SystemTables.databases(catalog, false)); + } + + @Override + public Set getTableNames() { + return catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet()); + } + + @Override + public @Nullable Schema getSubSchema(@Nullable String name) { + return null; + } + + @Override + public Set getSubSchemaNames() { + return Collections.emptySet(); + } + + @Override + public Set getTypeNames() { + return Collections.emptySet(); + } + + @Override + public @Nullable RelProtoDataType getType(String s) { + return null; + } + + @Override + public Collection getFunctions(String s) { + return Collections.emptySet(); + } + + @Override + public Set getFunctionNames() { + return Collections.emptySet(); + } + + @Override + public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) { + return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s, getClass()); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public Schema snapshot(SchemaVersion schemaVersion) { + return this; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java new file mode 100644 index 000000000000..c9f7c417ca94 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemSchema.java @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.beam.sdk.extensions.sql.meta.SystemTables; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * A Calcite {@link Schema} specialized for displaying the session's metadata. Top node that manages + * requests to {@code SHOW} {@code CATALOGS}, {@code DATABASES}, and {@code TABLES}. Used by {@link + * CatalogManagerSchema}. + * + *

{@code SHOW} requests are treated as aliases, listed below: + * + *

    + *
  • {@code SHOW CURRENT CATALOG} --> {@code SELECT * FROM `beamsystem`.`__current_catalog__`} + *
  • {@code SHOW CATALOGS} --> {@code SELECT * FROM `beamsystem`.`catalogs`} + *
  • {@code SHOW CATALOGS LIKE '{pattern}'} --> {@code SELECT * FROM `beamsystem`.`catalogs` + * WHERE NAME LIKE '{pattern}'} + *
  • {@code SHOW CURRENT DATABASE} --> {@code SELECT * FROM `beamsystem`.`__current_database__`} + *
  • {@code SHOW DATABASES} --> {@code SELECT * FROM + * `beamsystem`.`databases`.`__current_catalog__`} + *
  • {@code SHOW DATABASES FROM my_catalog} --> {@code SELECT * FROM + * `beamsystem`.`databases`.`my_catalog`} + *
  • {@code SHOW DATABASES FROM my_catalog LIKE '{pattern}'} --> {@code SELECT * FROM + * `beamsystem`.`databases`.`my_catalog` WHERE NAME LIKE '{pattern}'} + *
  • {@code SHOW TABLES} --> {@code SELECT * FROM + * `beamsystem`.`tables`.`__current_catalog__`.`__current_database__`} + *
  • {@code SHOW TABLES FROM my_db} --> {@code SELECT * FROM + * `beamsystem`.`tables`.`__current_catalog__`.`my_db`} + *
  • {@code SHOW TABLES FROM my_catalog.my_db} --> {@code SELECT * FROM + * `beamsystem`.`tables`.`my_catalog`.`my_db`} + *
  • {@code SHOW TABLES FROM my_catalog.my_db LIKE '{pattern}'} --> {@code SELECT * FROM + * `beamsystem`.`tables`.`my_catalog`.`my_db` WHERE NAME LIKE '{pattern}'} + *
+ */ +public class BeamSystemSchema implements Schema { + private final CatalogManager catalogManager; + private final BeamSystemDbMetadataSchema dbSchema; + private final BeamSystemTableMetadataSchema tableSchema; + public static final String BEAMSYSTEM = "beamsystem"; + private static final String CATALOGS = "catalogs"; + private static final String DATABASES = "databases"; + private static final String TABLES = "tables"; + + BeamSystemSchema(CatalogManager catalogManager) { + this.catalogManager = catalogManager; + this.dbSchema = new BeamSystemDbMetadataSchema(catalogManager); + this.tableSchema = new BeamSystemTableMetadataSchema(catalogManager, null); + } + + @Override + public @Nullable Table getTable(String table) { + switch (table) { + case CATALOGS: + return BeamCalciteTable.of(SystemTables.catalogs(catalogManager, false)); + case "__current_catalog__": + return BeamCalciteTable.of(SystemTables.catalogs(catalogManager, true)); + case "__current_database__": + return BeamCalciteTable.of(SystemTables.databases(catalogManager.currentCatalog(), true)); + default: + return null; + } + } + + @Override + public Set getTableNames() { + return ImmutableSet.of(CATALOGS); + } + + @Override + public @Nullable Schema getSubSchema(@Nullable String name) { + if (name == null) { + return null; + } + switch (name) { + case DATABASES: + return dbSchema; + case TABLES: + return tableSchema; + default: + return null; + } + } + + @Override + public Set getSubSchemaNames() { + return ImmutableSet.of(DATABASES, TABLES); + } + + @Override + public Set getTypeNames() { + return Collections.emptySet(); + } + + @Override + public @Nullable RelProtoDataType getType(String s) { + return null; + } + + @Override + public Collection getFunctions(String s) { + return Collections.emptySet(); + } + + @Override + public Set getFunctionNames() { + return Collections.emptySet(); + } + + @Override + public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) { + return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s, getClass()); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public Schema snapshot(SchemaVersion schemaVersion) { + return this; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java new file mode 100644 index 000000000000..b081a1b886c3 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/BeamSystemTableMetadataSchema.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; + +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import org.apache.beam.sdk.extensions.sql.meta.SystemTables; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.linq4j.tree.Expression; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.rel.type.RelProtoDataType; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Function; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaPlus; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.SchemaVersion; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schemas; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Table; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** A Calcite {@link Schema} responsible for {@code SHOW TABLES} requests. */ +public class BeamSystemTableMetadataSchema implements Schema { + private final CatalogManager catalogManager; + private final @MonotonicNonNull String catalog; + + BeamSystemTableMetadataSchema(CatalogManager catalogManager, @Nullable String catalog) { + this.catalogManager = catalogManager; + this.catalog = catalog; + } + + @Override + public @Nullable Table getTable(String dbName) { + // returns a table if this instance has a catalog referenced + if (catalog == null) { + return null; + } + + Catalog cat = + checkArgumentNotNull( + catalogManager.getCatalog(catalog), "Catalog '%s' does not exist.", catalog); + if (dbName.equals("__current_database__")) { + dbName = + checkStateNotNull( + cat.currentDatabase(), + "Catalog '%s' has not set a default database. Please specify one."); + } + return BeamCalciteTable.of(SystemTables.tables(cat, dbName)); + } + + @Override + public Set getTableNames() { + return Collections.emptySet(); + } + + @Override + public @Nullable Schema getSubSchema(@Nullable String catalogName) { + // if this is a top instance (i.e. no catalog reference), return child schema with the specified + // catalog referenced + if (catalog == null && catalogName != null) { + if (catalogName.equals("__current_catalog__")) { + catalogName = catalogManager.currentCatalog().name(); + } + return new BeamSystemTableMetadataSchema(catalogManager, catalogName); + } + return null; + } + + @Override + public Set getSubSchemaNames() { + return Collections.emptySet(); + } + + @Override + public Set getTypeNames() { + return Collections.emptySet(); + } + + @Override + public @Nullable RelProtoDataType getType(String s) { + return null; + } + + @Override + public Collection getFunctions(String s) { + return Collections.emptySet(); + } + + @Override + public Set getFunctionNames() { + return Collections.emptySet(); + } + + @Override + public Expression getExpression(@Nullable SchemaPlus schemaPlus, String s) { + return Schemas.subSchemaExpression(checkStateNotNull(schemaPlus), s, getClass()); + } + + @Override + public boolean isMutable() { + return true; + } + + @Override + public Schema snapshot(SchemaVersion schemaVersion) { + return this; + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java index ec225efc1c39..098b72b28695 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogManagerSchema.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.impl; +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE; @@ -43,6 +44,7 @@ import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,11 +57,13 @@ public class CatalogManagerSchema implements Schema { private static final Logger LOG = LoggerFactory.getLogger(CatalogManagerSchema.class); private final JdbcConnection connection; private final CatalogManager catalogManager; + private final BeamSystemSchema beamSystemSchema; private final Map catalogSubSchemas = new HashMap<>(); CatalogManagerSchema(JdbcConnection jdbcConnection, CatalogManager catalogManager) { this.connection = jdbcConnection; this.catalogManager = catalogManager; + this.beamSystemSchema = new BeamSystemSchema(catalogManager); } @VisibleForTesting @@ -176,15 +180,23 @@ public Set getTableNames() { return getCurrentCatalogSchema().getTableNames(); } + /** + * Returns the {@link CatalogSchema} for the catalog referenced in this {@link TableName}. If the + * path does not reference a catalog, the currently use {@link CatalogSchema} will be returned. + */ public CatalogSchema getCatalogSchema(TableName tablePath) { - @Nullable Schema catalogSchema = getSubSchema(tablePath.catalog()); - if (catalogSchema == null) { - catalogSchema = getCurrentCatalogSchema(); - } + return tablePath.catalog() != null + ? getCatalogSchema(tablePath.catalog()) + : getCurrentCatalogSchema(); + } + + public CatalogSchema getCatalogSchema(@Nullable String catalog) { + Schema catalogSchema = + checkArgumentNotNull(getSubSchema(catalog), "Catalog '%s' not found.", catalog); Preconditions.checkState( catalogSchema instanceof CatalogSchema, "Unexpected Schema type for Catalog '%s': %s", - tablePath.catalog(), + catalog, catalogSchema.getClass()); return (CatalogSchema) catalogSchema; } @@ -202,6 +214,9 @@ public CatalogSchema getCurrentCatalogSchema() { if (name == null) { return null; } + if (name.equals(BeamSystemSchema.BEAMSYSTEM)) { + return beamSystemSchema; + } @Nullable CatalogSchema catalogSchema = catalogSubSchemas.get(name); if (catalogSchema == null) { @Nullable Catalog catalog = catalogManager.getCatalog(name); @@ -222,7 +237,14 @@ public CatalogSchema getCurrentCatalogSchema() { @Override public Set getSubSchemaNames() { - return catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet()); + return ImmutableSet.builder() + .addAll(catalogs().stream().map(Catalog::name).collect(Collectors.toSet())) + .add(BeamSystemSchema.BEAMSYSTEM) + .build(); + } + + public Collection catalogs() { + return catalogManager.catalogs(); } public void setPipelineOption(String key, String value) { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java index ab644145b4f7..de7903897b62 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java @@ -159,7 +159,7 @@ public void execute(CalcitePrepare.Context context) { CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; catalogManagerSchema.maybeRegisterProvider(pathOverride, SqlDdlNodes.getString(type)); - CatalogSchema catalogSchema = catalogManagerSchema.getCatalogSchema(pathOverride); + CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride); beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride); } else if (schema instanceof BeamCalciteSchema) { beamCalciteSchema = (BeamCalciteSchema) schema; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java index c5d162ebbb68..6f4d8ee79d9c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java @@ -85,7 +85,7 @@ public static String name(SqlIdentifier id) { } } - static @Nullable String getString(SqlNode n) { + static @Nullable String getString(@Nullable SqlNode n) { if (n == null) { return null; } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java index 9d06e471dbbe..f0e3fa59ddc7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java @@ -78,7 +78,7 @@ public void execute(CalcitePrepare.Context context) { } CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; - CatalogSchema catalogSchema = catalogManagerSchema.getCatalogSchema(pathOverride); + CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride); // if database exists in a different catalog, we need to also switch to that catalog if (pathOverride.catalog() != null && !pathOverride diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java new file mode 100644 index 000000000000..8e91e9eb0309 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/SystemTables.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.values.PBegin; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.POutput; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; + +/** + * Provides {@link BeamSqlTable}s that track metadata around catalogs, databases, and tables. For + * now, it tracks the following: + * + *
    + *
  • Catalogs: Name and Type + *
  • Databases: Name + *
  • Tables: Name and Type + *
+ */ +public class SystemTables { + public static CatalogsMetaTable catalogs(CatalogManager catalogManager, boolean currentOnly) { + return new CatalogsMetaTable(catalogManager, currentOnly); + } + + public static DatabasesMetaTable databases(Catalog catalog, boolean currentOnly) { + return new DatabasesMetaTable(catalog, currentOnly); + } + + public static TablesMetaTable tables(Catalog catalog, String dbName) { + return new TablesMetaTable(catalog, dbName); + } + + public static class CatalogsMetaTable extends BaseBeamTable { + private final CatalogManager catalogManager; + private final boolean currentOnly; + + private static final Schema SCHEMA = + Schema.builder().addStringField("NAME").addStringField("TYPE").build(); + + public CatalogsMetaTable(CatalogManager catalogManager, boolean currentOnly) { + this.catalogManager = catalogManager; + this.currentOnly = currentOnly; + } + + @Override + public PCollection buildIOReader(PBegin begin) { + Collection catalogs = + currentOnly + ? ImmutableList.of(catalogManager.currentCatalog()) + : catalogManager.catalogs(); + List rows = + catalogs.stream() + .map(cat -> Row.withSchema(SCHEMA).addValues(cat.name(), cat.type()).build()) + .collect(Collectors.toList()); + + return begin.apply(Create.of(rows).withRowSchema(SCHEMA)); + } + + @Override + public POutput buildIOWriter(PCollection input) { + throw new UnsupportedOperationException("Cannot write to SHOW CATALOGS"); + } + + @Override + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + } + + public static class DatabasesMetaTable extends BaseBeamTable { + private final Catalog catalog; + private final boolean currentOnly; + private static final Schema SCHEMA = Schema.builder().addStringField("NAME").build(); + + DatabasesMetaTable(Catalog catalog, boolean currentOnly) { + this.catalog = catalog; + this.currentOnly = currentOnly; + } + + @Override + public PCollection buildIOReader(PBegin begin) { + Collection databases; + if (currentOnly) { + @Nullable String currentDb = catalog.currentDatabase(); + databases = currentDb != null ? Collections.singleton(currentDb) : Collections.emptyList(); + } else { + databases = catalog.databases(); + } + List rows = + databases.stream() + .map(db -> Row.withSchema(SCHEMA).addValues(db).build()) + .collect(Collectors.toList()); + + return begin.apply(Create.of(rows).withRowSchema(SCHEMA)); + } + + @Override + public POutput buildIOWriter(PCollection input) { + throw new UnsupportedOperationException("Cannot write to SHOW DATABASES"); + } + + @Override + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + } + + public static class TablesMetaTable extends BaseBeamTable { + private final Catalog catalog; + private final String dbName; + private static final Schema SCHEMA = + Schema.builder().addStringField("NAME").addStringField("TYPE").build(); + + public TablesMetaTable(Catalog catalog, String dbName) { + this.catalog = catalog; + this.dbName = dbName; + } + + @Override + public PCollection buildIOReader(PBegin begin) { + // Note: This captures the state *at the moment of planning* + List rows = + catalog.metaStore(dbName).getTables().values().stream() + .map( + table -> + Row.withSchema(SCHEMA).addValues(table.getName(), table.getType()).build()) + .collect(Collectors.toList()); + + return begin.apply(Create.of(rows).withRowSchema(SCHEMA)); + } + + @Override + public POutput buildIOWriter(PCollection input) { + throw new UnsupportedOperationException("Cannot write to SHOW TABLES"); + } + + @Override + public PCollection.IsBounded isBounded() { + return PCollection.IsBounded.BOUNDED; + } + + @Override + public Schema getSchema() { + return SCHEMA; + } + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java index db7724a4809d..c387a5ace10c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.extensions.sql.meta.catalog; +import java.util.Collection; import java.util.Map; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; @@ -50,6 +51,9 @@ public interface Catalog { @Nullable String currentDatabase(); + /** Returns a collection of existing database names. */ + Collection databases(); + /** * Creates a database with this name. * diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index 3c7ef5623b1b..7c0d8b9d32ea 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -20,6 +20,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; @@ -97,6 +98,11 @@ public void useDatabase(String database) { return currentDatabase; } + @Override + public Collection databases() { + return databases; + } + @Override public boolean dropDatabase(String database, boolean cascade) { checkState(!cascade, "%s does not support CASCADE.", getClass().getSimpleName()); diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java index cca1bfd93f27..588caa78a2b7 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java @@ -105,8 +105,8 @@ public void testUseDatabaseWithDeletedCatalog_notFound() { assertEquals( ImmutableSet.of("default"), catalogManager.catalogs().stream().map(Catalog::name).collect(Collectors.toSet())); - thrown.expect(CalciteContextException.class); - thrown.expectMessage("Cannot use catalog: 'my_catalog' not found."); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("Catalog 'my_catalog' not found"); cli.execute("USE DATABASE my_catalog.my_database"); }