diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 5df3841d2363..e584718ac8fb 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 3 + "modification": 0 } diff --git a/.github/trigger_files/beam_PreCommit_SQL.json b/.github/trigger_files/beam_PreCommit_SQL.json index ab4daeae2349..07d1fb889961 100644 --- a/.github/trigger_files/beam_PreCommit_SQL.json +++ b/.github/trigger_files/beam_PreCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run.", - "modification": 3 + "modification": 0 } diff --git a/sdks/java/extensions/sql/iceberg/build.gradle b/sdks/java/extensions/sql/iceberg/build.gradle index d5f9e74c53bd..1e319c97a8e2 100644 --- a/sdks/java/extensions/sql/iceberg/build.gradle +++ b/sdks/java/extensions/sql/iceberg/build.gradle @@ -31,6 +31,8 @@ dependencies { implementation project(":sdks:java:core") implementation project(":sdks:java:managed") implementation project(":sdks:java:io:iceberg") + implementation library.java.jackson_databind + implementation library.java.jackson_core runtimeOnly project(":sdks:java:io:iceberg:bqms") runtimeOnly project(":sdks:java:io:iceberg:hive") // TODO(https://github.com/apache/beam/issues/21156): Determine how to build without this dependency diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java new file mode 100644 index 000000000000..0c8a5519ee40 --- /dev/null +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergAlterTableOps.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig.IcebergTableInfo; +import org.apache.beam.sdk.schemas.Schema; + +/** {@link AlterTableOps} for Iceberg tables. */ +public class IcebergAlterTableOps implements AlterTableOps { + private final IcebergTableInfo table; + + IcebergAlterTableOps(IcebergTableInfo table) { + this.table = table; + } + + @Override + public void updateTableProperties(Map setProps, List resetProps) { + table.updateTableProps(setProps, resetProps); + } + + @Override + public void updateSchema(List columnsToAdd, Collection columnsToDrop) { + table.updateSchema(columnsToAdd, columnsToDrop); + } + + @Override + public void updatePartitionSpec( + List partitionsToAdd, Collection partitionsToDrop) { + table.updatePartitionSpec(partitionsToAdd, partitionsToDrop); + } +} diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java index 7dee72511e85..6330f550a2b3 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java @@ -32,28 +32,11 @@ public class IcebergCatalog extends InMemoryCatalog { // other SDKs can make use of it too private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop"; private final Map metaStores = new HashMap<>(); - @VisibleForTesting final IcebergCatalogConfig catalogConfig; + @VisibleForTesting IcebergCatalogConfig catalogConfig; - public IcebergCatalog(String name, Map properties) { - super(name, properties); - - ImmutableMap.Builder catalogProps = ImmutableMap.builder(); - ImmutableMap.Builder hadoopProps = ImmutableMap.builder(); - - for (Map.Entry entry : properties.entrySet()) { - if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) { - hadoopProps.put(entry.getKey(), entry.getValue()); - } else { - catalogProps.put(entry.getKey(), entry.getValue()); - } - } - - catalogConfig = - IcebergCatalogConfig.builder() - .setCatalogName(name) - .setCatalogProperties(catalogProps.build()) - .setConfigProperties(hadoopProps.build()) - .build(); + public IcebergCatalog(String name, Map props) { + super(name, props); + catalogConfig = initConfig(name, props); } @Override @@ -67,6 +50,12 @@ public String type() { return "iceberg"; } + @Override + public void updateProperties(Map setProps, Collection resetProps) { + super.updateProperties(setProps, resetProps); + catalogConfig = initConfig(name(), properties()); + } + @Override public boolean createDatabase(String database) { return catalogConfig.createNamespace(database); @@ -97,4 +86,23 @@ public boolean dropDatabase(String database, boolean cascade) { } return removed; } + + private static IcebergCatalogConfig initConfig(String name, Map properties) { + ImmutableMap.Builder catalogProps = ImmutableMap.builder(); + ImmutableMap.Builder hadoopProps = ImmutableMap.builder(); + + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) { + hadoopProps.put(entry.getKey(), entry.getValue()); + } else { + catalogProps.put(entry.getKey(), entry.getValue()); + } + } + + return IcebergCatalogConfig.builder() + .setCatalogName(name) + .setCatalogProperties(catalogProps.build()) + .setConfigProperties(hadoopProps.build()) + .build(); + } } diff --git a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java index b73aa25c7a2b..8e8bef0c87ab 100644 --- a/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java +++ b/sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java @@ -20,6 +20,7 @@ import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import com.fasterxml.jackson.core.type.TypeReference; import java.util.HashMap; import java.util.Map; import org.apache.beam.sdk.extensions.sql.TableUtils; @@ -60,7 +61,12 @@ public void createTable(Table table) { } else { String identifier = getIdentifier(table); try { - catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields()); + Map properties = + TableUtils.getObjectMapper() + .convertValue(table.getProperties(), new TypeReference>() {}); + ; + catalogConfig.createTable( + identifier, table.getSchema(), table.getPartitionFields(), properties); } catch (TableAlreadyExistsException e) { LOG.info( "Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier); @@ -147,6 +153,15 @@ public boolean supportsPartitioning(Table table) { return getProvider(table.getType()).supportsPartitioning(table); } + @Override + public IcebergAlterTableOps alterTable(String name) { + IcebergTableInfo table = + checkStateNotNull( + catalogConfig.loadTable(getIdentifier(name)), "Could not find table '%s'", name); + + return new IcebergAlterTableOps(table); + } + @Override public void registerProvider(TableProvider provider) { super.registerProvider(provider); diff --git a/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java new file mode 100644 index 000000000000..b6fd2b14a5e7 --- /dev/null +++ b/sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static java.lang.String.format; +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.iceberg.types.Types.NestedField.optional; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.allOf; +import static org.hamcrest.Matchers.hasEntry; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.Map; +import java.util.UUID; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.types.Types; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +/** Unit tests specifically for ALTERing Iceberg catalogs and tables. */ +public class BeamSqlCliIcebergAlterTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + private InMemoryCatalogManager catalogManager; + private BeamSqlCli cli; + private String warehouse; + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Before + public void setup() throws IOException { + catalogManager = new InMemoryCatalogManager(); + cli = new BeamSqlCli().catalogManager(catalogManager); + File warehouseFile = TEMPORARY_FOLDER.newFolder(); + assertTrue(warehouseFile.delete()); + warehouse = "file:" + warehouseFile + "/" + UUID.randomUUID(); + } + + private String createCatalog(String name) { + return format("CREATE CATALOG %s \n", name) + + "TYPE iceberg \n" + + "PROPERTIES (\n" + + " 'type' = 'hadoop', \n" + + format(" 'warehouse' = '%s')", warehouse); + } + + @Test + public void testAlterCatalog() { + cli.execute(createCatalog("my_catalog")); + IcebergCatalog catalog = + (IcebergCatalog) checkStateNotNull(catalogManager.getCatalog("my_catalog")); + Map expected = ImmutableMap.of("type", "hadoop", "warehouse", warehouse); + assertEquals(expected, catalog.properties()); + assertEquals(expected, catalog.catalogConfig.getCatalogProperties()); + + cli.execute("ALTER CATALOG my_catalog SET ('abc'='123', 'foo'='bar') RESET ('type')"); + expected = ImmutableMap.of("warehouse", warehouse, "abc", "123", "foo", "bar"); + assertEquals(expected, catalog.properties()); + assertEquals(expected, catalog.catalogConfig.getCatalogProperties()); + } + + @Test + public void testAlterTableProps() { + cli.execute(createCatalog("my_catalog")); + cli.execute("CREATE DATABASE my_catalog.my_db"); + cli.execute("USE DATABASE my_catalog.my_db"); + cli.execute( + "CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE 'iceberg' TBLPROPERTIES '{ \"prop1\" : \"123\", \"prop2\" : \"abc\"}'"); + IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog(); + Table table = + catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table")); + assertThat(table.properties(), allOf(hasEntry("prop1", "123"), hasEntry("prop2", "abc"))); + + cli.execute("ALTER TABLE my_table SET('prop3'='foo')"); + table.refresh(); + assertThat( + table.properties(), + allOf(hasEntry("prop1", "123"), hasEntry("prop2", "abc"), hasEntry("prop3", "foo"))); + + cli.execute("ALTER TABLE my_table RESET ('prop1') SET ('prop2'='xyz')"); + table.refresh(); + assertThat(table.properties(), allOf(hasEntry("prop2", "xyz"), hasEntry("prop3", "foo"))); + } + + @Test + public void testAlterTableSchema() { + cli.execute(createCatalog("my_catalog")); + cli.execute("CREATE DATABASE my_catalog.my_db"); + cli.execute("USE DATABASE my_catalog.my_db"); + cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE 'iceberg'"); + IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog(); + Table table = + catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table")); + Schema actualSchema = table.schema(); + Schema expectedSchema = + new Schema( + optional(1, "col1", Types.StringType.get()), + optional(2, "col2", Types.IntegerType.get())); + assertTrue( + String.format("Unequal schemas.\nExpected: %s\nActual: %s", expectedSchema, actualSchema), + expectedSchema.sameSchema(actualSchema)); + + // add some columns + cli.execute( + "ALTER TABLE my_table ADD COLUMNS (col3 BOOLEAN COMMENT 'col3-comment', col4 FLOAT COMMENT 'col4-comment')"); + table.refresh(); + actualSchema = table.schema(); + expectedSchema = + new Schema( + optional(1, "col1", Types.StringType.get()), + optional(2, "col2", Types.IntegerType.get()), + optional(3, "col3", Types.BooleanType.get(), "col3-comment"), + optional(4, "col4", Types.FloatType.get(), "col4-comment")); + assertTrue( + String.format("Unequal schemas.\nExpected: %s\nActual: %s", expectedSchema, actualSchema), + expectedSchema.sameSchema(actualSchema)); + + // remove some columns and add other columns + cli.execute( + "ALTER TABLE my_table DROP COLUMNS (col1, col2, col3) ADD COLUMNS (colA VARCHAR, colB INTEGER)"); + table.refresh(); + actualSchema = table.schema(); + expectedSchema = + new Schema( + optional(4, "col4", Types.FloatType.get(), "col4-comment"), + optional(5, "colA", Types.StringType.get()), + optional(6, "colB", Types.IntegerType.get())); + assertTrue( + String.format("Unequal schemas.\nExpected: %s\nActual: %s", expectedSchema, actualSchema), + expectedSchema.sameSchema(actualSchema)); + } + + @Test + public void testAlterTableSchemaFailsHelpfullyWhenAddingRequiredColumns() { + // adding required columns is not yet supported because Beam Schemas do not + // allow specifying a 'default value' for fields. This concept is required when adding a new + // Iceberg columns because it allows previously written rows to default to this value. + cli.execute(createCatalog("my_catalog")); + cli.execute("CREATE DATABASE my_catalog.my_db"); + cli.execute("USE DATABASE my_catalog.my_db"); + cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE 'iceberg'"); + + thrown.expect(UnsupportedOperationException.class); + thrown.expectMessage( + "Adding required columns is not yet supported. Encountered required columns: [col3]"); + cli.execute("ALTER TABLE my_table ADD COLUMNS (col3 BOOLEAN NOT NULL)"); + } + + @Test + public void testAlterTablePartitionSpec() { + cli.execute(createCatalog("my_catalog")); + cli.execute("CREATE DATABASE my_catalog.my_db"); + cli.execute("USE DATABASE my_catalog.my_db"); + cli.execute( + "CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER, col3 FLOAT, col4 BOOLEAN, col5 TIMESTAMP) " + + "TYPE 'iceberg' PARTITIONED BY ('col3', 'bucket(col2, 3)')"); + IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog(); + Table table = + catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table")); + PartitionSpec actualSpec = table.spec(); + PartitionSpec expectedSpec = + PartitionSpec.builderFor(table.schema()).identity("col3").bucket("col2", 3).build(); + assertTrue( + String.format( + "Partition specs are not compatible.\nExpected: %s\nActual: %s", + expectedSpec, actualSpec), + expectedSpec.compatibleWith(actualSpec)); + + // add some partitions + cli.execute("ALTER TABLE my_table ADD PARTITIONS ('col4', 'month(col5)')"); + table.refresh(); + actualSpec = table.spec(); + expectedSpec = + PartitionSpec.builderFor(table.schema()) + .identity("col3") + .bucket("col2", 3) + .identity("col4") + .month("col5") + .withSpecId(table.spec().specId()) + .build(); + assertTrue( + String.format( + "Partition specs are not compatible.\nExpected: %s\nActual: %s", + expectedSpec, actualSpec), + expectedSpec.compatibleWith(actualSpec)); + + // remove some partitions and add other partitions + cli.execute( + "ALTER TABLE my_table DROP PARTITIONS ('month(col5)', 'bucket(col2, 3)') ADD PARTITIONS ('hour(col5)')"); + table.refresh(); + actualSpec = table.spec(); + expectedSpec = + PartitionSpec.builderFor(table.schema()) + .identity("col3") + .identity("col4") + .hour("col5") + .withSpecId(table.spec().specId()) + .build(); + assertTrue( + String.format( + "Partition specs are not compatible.\nExpected: %s\nActual: %s", + expectedSpec, actualSpec), + expectedSpec.compatibleWith(actualSpec)); + } +} diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp index 73af7e18150b..c3692430610f 100644 --- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -36,6 +36,8 @@ data: { "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseCatalog" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseDatabase" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam" + "org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterCatalog" + "org.apache.beam.sdk.extensions.sql.impl.parser.SqlAlterTable" "org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils" "org.apache.beam.sdk.schemas.Schema" ] @@ -53,7 +55,10 @@ data: { "CATALOGS" "DATABASES" "TABLES" + "COLUMNS" + "PARTITIONS" "USE" + "UNSET" ] # List of keywords from "keywords" section that are not reserved. @@ -432,6 +437,8 @@ data: { "SqlUseCatalog(Span.of(), null)" "SqlUseDatabase(Span.of(), null)" "SqlSetOptionBeam(Span.of(), null)" + "SqlAlterCatalog(Span.of(), null)" + "SqlAlterTable(Span.of(), null)" ] # List of methods for parsing custom literals. diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index d3bb8c2af56c..74b8a2fe19e8 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -178,6 +178,21 @@ SqlNode Property() : } } +SqlNodeList ArgList() : +{ + SqlNodeList list = new SqlNodeList(getPos()); + SqlNode property; +} +{ + property = StringLiteral() { list.add(property); } + ( + property = StringLiteral() { list.add(property); } + )* + { + return list; + } +} + /** * CREATE CATALOG ( IF NOT EXISTS )? catalog_name * TYPE type_name @@ -247,6 +262,41 @@ SqlCall SqlUseCatalog(Span s, String scope) : } +/** + * ALTER CATALOG catalog_name + * [ SET (key1=val1, key2=val2, ...) ] + * [ (RESET | UNSET) (key1, key2, ...) ] + */ +SqlCall SqlAlterCatalog(Span s, String scope) : +{ + final SqlNode catalogName; + SqlNodeList setProps = null; + SqlNodeList resetProps = null; +} +{ + { + s.add(this); + } + + ( + catalogName = CompoundIdentifier() + | + catalogName = StringLiteral() + ) + [ setProps = PropertyList() ] + [ ( | ) resetProps = ArgList() ] + + { + return new SqlAlterCatalog( + s.end(this), + scope, + catalogName, + setProps, + resetProps); + } +} + + SqlDrop SqlDropCatalog(Span s, boolean replace) : { final boolean ifExists; @@ -464,6 +514,18 @@ SqlCall SqlShowCurrent(Span s) : } } +SqlNodeList PartitionFieldsParens() : +{ + final SqlNodeList partitions; +} +{ + + partitions = PartitionFieldList() + + { + return partitions; + } +} SqlNodeList PartitionFieldList() : { @@ -517,7 +579,7 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) : | type = SimpleIdentifier() ) - [ partitionFields = PartitionFieldList() ] + [ partitionFields = PartitionFieldsParens() ] [ comment = StringLiteral() ] [ location = StringLiteral() ] [ tblProperties = StringLiteral() ] @@ -537,6 +599,61 @@ SqlCreate SqlCreateExternalTable(Span s, boolean replace) : } } +/** + * ALTER TABLE table_name + * [ ADD COLUMN <> ] + * [ DROP COLUMN <> ] + * [ ADD PARTITION <> ] + * [ DROP PARTITION <> ] + * [ SET (key1=val1, key2=val2, ...) ] + * [ (RESET | UNSET) (key1, key2, ...) ] + */ +SqlCall SqlAlterTable(Span s, String scope) : +{ + final SqlNode tableName; + SqlNodeList columnsToDrop = null; + List columnsToAdd = null; + SqlNodeList partitionsToDrop = null; + SqlNodeList partitionsToAdd = null; + SqlNodeList setProps = null; + SqlNodeList resetProps = null; +} +{ + { + s.add(this); + } + + tableName = CompoundIdentifier() + + [ ( + columnsToDrop = ParenthesizedSimpleIdentifierList() + | + partitionsToDrop = ParenthesizedLiteralOptionCommaList() + ) ] + + [ ( + columnsToAdd = FieldListParens() + | + partitionsToAdd = ParenthesizedLiteralOptionCommaList() + ) ] + + [ ( | ) resetProps = ArgList() ] + [ setProps = PropertyList() ] + + { + return new SqlAlterTable( + s.end(this), + scope, + tableName, + columnsToAdd, + columnsToDrop, + partitionsToAdd, + partitionsToDrop, + setProps, + resetProps); + } +} + SqlCreate SqlCreateFunction(Span s, boolean replace) : { boolean isAggregate = false; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java index 792e5b98bcd3..e532355d8568 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/CatalogSchema.java @@ -68,6 +68,10 @@ public Catalog getCatalog() { return catalog; } + public void updateProperties(Map setProps, Collection resetProps) { + catalog.updateProperties(setProps, resetProps); + } + public @Nullable BeamCalciteSchema getCurrentDatabaseSchema() { return getSubSchema(catalog.currentDatabase()); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java new file mode 100644 index 000000000000..e0abaa1e70aa --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.parser; + +import static org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema; +import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class SqlAlterCatalog extends SqlAlter implements BeamSqlParser.ExecutableStatement { + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("ALTER CATALOG", SqlKind.OTHER_DDL); + private final SqlIdentifier name; + private final @Nullable SqlNodeList setProps; + private final @Nullable SqlNodeList resetProps; + + public SqlAlterCatalog( + SqlParserPos pos, + @Nullable String scope, + SqlNode name, + @Nullable SqlNodeList setProps, + @Nullable SqlNodeList resetProps) { + super(pos, scope); + this.name = SqlDdlNodes.getIdentifier(name, pos); + this.setProps = setProps; + this.resetProps = resetProps; + } + + @Override + public void execute(CalcitePrepare.Context context) { + final Pair pair = SqlDdlNodes.schema(context, true, name); + Schema schema = pair.left.schema; + + if (!(schema instanceof CatalogManagerSchema)) { + throw SqlUtil.newContextException( + name.getParserPosition(), + RESOURCE.internal( + "Attempting to alter catalog '" + + SqlDdlNodes.name(name) + + "' with unexpected Calcite Schema of type " + + schema.getClass())); + } + + CatalogSchema catalogSchema = + ((CatalogManagerSchema) schema).getCatalogSchema(SqlDdlNodes.getString(name)); + + Map setPropsMap = SqlDdlNodes.getStringMap(setProps); + Collection resetPropsList = SqlDdlNodes.getStringList(resetProps); + + ImmutableList.Builder overlappingPropsBuilder = ImmutableList.builder(); + resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add); + List overlappingProps = overlappingPropsBuilder.build(); + checkState( + overlappingProps.isEmpty(), + "Invalid %s call: Found overlapping properties between SET and RESET: %s.", + OPERATOR, + overlappingProps); + + catalogSchema.updateProperties(setPropsMap, resetPropsList); + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int left, int right) { + writer.keyword("CATALOG"); + name.unparse(writer, left, right); + if (setProps != null && !setProps.isEmpty()) { + writer.keyword("SET"); + writer.keyword("("); + for (int i = 0; i < setProps.size(); i++) { + if (i > 0) { + writer.keyword(","); + } + SqlNode property = setProps.get(i); + checkState( + property instanceof SqlNodeList, + String.format( + "Unexpected properties entry '%s' of class '%s'", property, property.getClass())); + SqlNodeList kv = ((SqlNodeList) property); + + kv.get(0).unparse(writer, left, right); // key + writer.keyword("="); + kv.get(1).unparse(writer, left, right); // value + } + writer.keyword(")"); + } + + if (resetProps != null) { + writer.keyword("RESET"); + writer.sep("("); + for (int i = 0; i < resetProps.size(); i++) { + if (i > 0) { + writer.sep(","); + } + SqlNode field = resetProps.get(i); + field.unparse(writer, 0, 0); + } + writer.sep(")"); + } + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + ImmutableList.Builder operands = ImmutableList.builder(); + operands.add(name); + if (setProps != null) { + operands.add(setProps); + } + if (resetProps != null) { + operands.add(resetProps); + } + return operands.build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java new file mode 100644 index 000000000000..20d91591c8b9 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterTable.java @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.parser; + +import static org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects.firstNonNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; + +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; +import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema; +import org.apache.beam.sdk.extensions.sql.impl.CatalogSchema; +import org.apache.beam.sdk.extensions.sql.impl.TableName; +import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils; +import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlAlter; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlOperator; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlUtil; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlWriter; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; + +public class SqlAlterTable extends SqlAlter implements BeamSqlParser.ExecutableStatement { + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("ALTER TABLE", SqlKind.ALTER_TABLE); + private final SqlIdentifier name; + private final @Nullable List columnsToAdd; + private final @Nullable SqlNodeList columnsToDrop; + private final @Nullable SqlNodeList partitionsToAdd; + private final @Nullable SqlNodeList partitionsToDrop; + private final @Nullable SqlNodeList setProps; + private final @Nullable SqlNodeList resetProps; + + public SqlAlterTable( + SqlParserPos pos, + @Nullable String scope, + SqlNode name, + @Nullable List columnsToAdd, + @Nullable SqlNodeList columnsToDrop, + @Nullable SqlNodeList partitionsToAdd, + @Nullable SqlNodeList partitionsToDrop, + @Nullable SqlNodeList setProps, + @Nullable SqlNodeList resetProps) { + super(pos, scope); + this.name = SqlDdlNodes.getIdentifier(name, pos); + this.columnsToAdd = columnsToAdd; + this.columnsToDrop = columnsToDrop; + this.partitionsToAdd = partitionsToAdd; + this.partitionsToDrop = partitionsToDrop; + this.setProps = setProps; + this.resetProps = resetProps; + } + + @Override + public void execute(CalcitePrepare.Context context) { + final Pair pair = SqlDdlNodes.schema(context, true, name); + TableName pathOverride = TableName.create(name.toString()); + Schema schema = pair.left.schema; + + BeamCalciteSchema beamCalciteSchema; + if (schema instanceof CatalogManagerSchema) { + CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; + CatalogSchema catalogSchema = + pathOverride.catalog() != null + ? catalogManagerSchema.getCatalogSchema(pathOverride) + : catalogManagerSchema.getCurrentCatalogSchema(); + beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride); + } else if (schema instanceof BeamCalciteSchema) { + beamCalciteSchema = (BeamCalciteSchema) schema; + } else { + throw SqlUtil.newContextException( + name.getParserPosition(), + RESOURCE.internal( + "Attempting to drop a table using unexpected Calcite Schema of type " + + schema.getClass())); + } + + if (beamCalciteSchema.getTable(pair.right) == null) { + // Table does not exist. + throw SqlUtil.newContextException( + name.getParserPosition(), RESOURCE.tableNotFound(name.toString())); + } + + Map setPropsMap = SqlDdlNodes.getStringMap(setProps); + List resetPropsList = SqlDdlNodes.getStringList(resetProps); + List columnsToDropList = SqlDdlNodes.getStringList(columnsToDrop); + List partitionsToAddList = SqlDdlNodes.getStringList(partitionsToAdd); + List partitionsToDropList = SqlDdlNodes.getStringList(partitionsToDrop); + + AlterTableOps alterOps = + beamCalciteSchema.getTableProvider().alterTable(SqlDdlNodes.name(name)); + + if (!setPropsMap.isEmpty() || !resetPropsList.isEmpty()) { + validateNonOverlappingProps(setPropsMap, resetPropsList); + + alterOps.updateTableProperties(setPropsMap, resetPropsList); + } + if (!columnsToDropList.isEmpty() || (columnsToAdd != null && !columnsToAdd.isEmpty())) { + alterOps.updateSchema(firstNonNull(columnsToAdd, Collections.emptyList()), columnsToDropList); + } + if (!partitionsToDropList.isEmpty() || !partitionsToAddList.isEmpty()) { + alterOps.updatePartitionSpec(partitionsToAddList, partitionsToDropList); + } + } + + private void validateNonOverlappingProps( + Map setPropsMap, Collection resetPropsList) { + ImmutableList.Builder overlappingPropsBuilder = ImmutableList.builder(); + + resetPropsList.stream().filter(setPropsMap::containsKey).forEach(overlappingPropsBuilder::add); + + List overlappingProps = overlappingPropsBuilder.build(); + checkState( + overlappingProps.isEmpty(), + "Invalid '%s' call: Found overlapping properties between SET and RESET: %s.", + OPERATOR, + overlappingProps); + } + + @Override + public void unparseAlterOperation(SqlWriter writer, int left, int right) { + writer.keyword("TABLE"); + name.unparse(writer, left, right); + + if (columnsToDrop != null && !columnsToDrop.isEmpty()) { + writer.keyword("DROP COLUMNS"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (String colName : SqlDdlNodes.getStringList(columnsToDrop)) { + writer.sep(","); + writer.identifier(colName, false); + } + writer.endList(frame); + } + + if (columnsToAdd != null && !columnsToAdd.isEmpty()) { + writer.keyword("ADD COLUMNS"); + SqlWriter.Frame frame = writer.startList("(", ")"); + columnsToAdd.forEach(column -> unparseColumn(writer, column)); + writer.endList(frame); + } + + if (partitionsToDrop != null && !partitionsToDrop.isEmpty()) { + writer.keyword("DROP PARTITIONS"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (String partition : SqlDdlNodes.getStringList(partitionsToDrop)) { + writer.sep(","); + writer.identifier(partition, true); + } + writer.endList(frame); + } + + if (partitionsToAdd != null && !partitionsToAdd.isEmpty()) { + writer.keyword("ADD PARTITIONS"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (String partition : SqlDdlNodes.getStringList(partitionsToAdd)) { + writer.sep(","); + writer.identifier(partition, true); + } + writer.endList(frame); + } + + if (resetProps != null && !resetProps.isEmpty()) { + writer.keyword("RESET"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (SqlNode resetProp : resetProps) { + writer.sep(","); + resetProp.unparse(writer, 0, 0); + } + writer.endList(frame); + } + + if (setProps != null && !setProps.isEmpty()) { + writer.keyword("SET"); + SqlWriter.Frame frame = writer.startList("(", ")"); + for (SqlNode setProp : setProps) { + writer.sep(","); + SqlNodeList kv = (SqlNodeList) setProp; + kv.get(0).unparse(writer, left, right); // key + writer.keyword("="); + kv.get(1).unparse(writer, left, right); // value + } + writer.endList(frame); + } + } + + private void unparseColumn(SqlWriter writer, Field column) { + writer.sep(","); + writer.identifier(column.getName(), false); + writer.identifier(CalciteUtils.toSqlTypeName(column.getType()).name(), false); + + if (column.getType().getNullable() != null && !column.getType().getNullable()) { + writer.keyword("NOT NULL"); + } + + if (column.getDescription() != null) { + writer.keyword("COMMENT"); + writer.literal(column.getDescription()); + } + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + ImmutableList.Builder operands = ImmutableList.builder(); + operands.add(name); + if (setProps != null) { + operands.add(setProps); + } + if (resetProps != null) { + operands.add(resetProps); + } + return operands.build(); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java index de7903897b62..ab644145b4f7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java @@ -159,7 +159,7 @@ public void execute(CalcitePrepare.Context context) { CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; catalogManagerSchema.maybeRegisterProvider(pathOverride, SqlDdlNodes.getString(type)); - CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride); + CatalogSchema catalogSchema = catalogManagerSchema.getCatalogSchema(pathOverride); beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride); } else if (schema instanceof BeamCalciteSchema) { beamCalciteSchema = (BeamCalciteSchema) schema; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java index 6f4d8ee79d9c..6d6be5d5a127 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDdlNodes.java @@ -19,18 +19,24 @@ import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import java.util.Collections; import java.util.List; +import java.util.Map; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalcitePrepare; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.jdbc.CalciteSchema; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlDataTypeSpec; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlIdentifier; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlLiteral; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.SqlNodeList; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.sql.parser.SqlParserPos; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.NlsString; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Pair; import org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Util; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; /** Utilities concerning {@link SqlNode} for DDL. */ @@ -97,6 +103,41 @@ public static String name(SqlIdentifier id) { return literalValue == null ? null : literalValue.getValue(); } + static List getStringList(@Nullable SqlNodeList l) { + if (l == null || l.isEmpty()) { + return Collections.emptyList(); + } + ImmutableList.Builder resetPropsList = ImmutableList.builder(); + for (SqlNode propNode : l) { + @Nullable String prop = SqlDdlNodes.getString(propNode); + if (prop != null) { + resetPropsList.add(prop); + } + } + return resetPropsList.build(); + } + + static Map getStringMap(@Nullable SqlNodeList nodeList) { + if (nodeList == null || nodeList.isEmpty()) { + return Collections.emptyMap(); + } + + ImmutableMap.Builder builder = ImmutableMap.builder(); + for (SqlNode property : nodeList) { + checkState( + property instanceof SqlNodeList, + String.format( + "Unexpected properties entry '%s' of class '%s'", property, property.getClass())); + SqlNodeList kv = ((SqlNodeList) property); + checkState(kv.size() == 2, "Expected 2 items in properties entry, but got %s", kv.size()); + String key = checkStateNotNull(SqlDdlNodes.getString(kv.get(0))); + String value = checkStateNotNull(SqlDdlNodes.getString(kv.get(1))); + builder.put(key, value); + } + + return builder.build(); + } + static SqlIdentifier getIdentifier(SqlNode n, SqlParserPos pos) { if (n instanceof SqlIdentifier) { return (SqlIdentifier) n; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java index 4b838c9f4182..4d4d0343c736 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java @@ -84,7 +84,11 @@ public void execute(CalcitePrepare.Context context) { List components = Lists.newArrayList(Splitter.on(".").split(databaseName.toString())); TableName pathOverride = TableName.create(components, ""); - CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride); + CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; + CatalogSchema catalogSchema = + pathOverride.catalog() != null + ? catalogManagerSchema.getCatalogSchema(pathOverride) + : catalogManagerSchema.getCurrentCatalogSchema(); catalogSchema.dropDatabase(databaseName, cascade, ifExists); } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java index 0bc5cd911614..5d0cf2234062 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropTable.java @@ -53,7 +53,11 @@ public void execute(CalcitePrepare.Context context) { BeamCalciteSchema beamCalciteSchema; if (schema instanceof CatalogManagerSchema) { - CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride); + CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; + CatalogSchema catalogSchema = + pathOverride.catalog() != null + ? catalogManagerSchema.getCatalogSchema(pathOverride) + : catalogManagerSchema.getCurrentCatalogSchema(); beamCalciteSchema = catalogSchema.getDatabaseSchema(pathOverride); } else if (schema instanceof BeamCalciteSchema) { beamCalciteSchema = (BeamCalciteSchema) schema; diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java index f0e3fa59ddc7..9d06e471dbbe 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java @@ -78,7 +78,7 @@ public void execute(CalcitePrepare.Context context) { } CatalogManagerSchema catalogManagerSchema = (CatalogManagerSchema) schema; - CatalogSchema catalogSchema = ((CatalogManagerSchema) schema).getCatalogSchema(pathOverride); + CatalogSchema catalogSchema = catalogManagerSchema.getCatalogSchema(pathOverride); // if database exists in a different catalog, we need to also switch to that catalog if (pathOverride.catalog() != null && !pathOverride diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java index c387a5ace10c..cbf1b45c31e7 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java @@ -88,6 +88,9 @@ public interface Catalog { /** User-specified configuration properties. */ Map properties(); + /** Set some catalog properties. If a property already exists, it will be overridden. */ + void updateProperties(Map setProps, Collection resetProps); + /** Registers this {@link TableProvider} and propagates it to underlying {@link MetaStore}s. */ void registerTableProvider(TableProvider provider); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index 7c0d8b9d32ea..cdee6c930224 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -34,7 +34,7 @@ public class InMemoryCatalog implements Catalog { private final String name; - private final Map properties; + protected final Map properties; protected final Map tableProviders = new HashMap<>(); private final Map metaStores = new HashMap<>(); private final HashSet databases = new HashSet<>(Collections.singleton(DEFAULT)); @@ -77,6 +77,12 @@ public Map properties() { return Preconditions.checkStateNotNull(properties, "InMemoryCatalog has not been initialized"); } + @Override + public void updateProperties(Map setProps, Collection resetProps) { + properties.putAll(setProps); + resetProps.forEach(properties::remove); + } + @Override public boolean createDatabase(String database) { return databases.add(database); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java new file mode 100644 index 000000000000..3f1ea1721684 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/AlterTableOps.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; + +/** + * An interface that handles ALTER TABLE operations. + * + *

An instance is created and used when {@link TableProvider#alterTable(String)} is called. + */ +public interface AlterTableOps { + /** + * Updates a table's properties. Includes setting properties (which overwrites existing values), + * and/or resetting properties (removes values of given keys). + */ + void updateTableProperties(Map setProps, List resetProps); + + /** Updates a table's schema. Includes adding new columns and/or dropping existing columns. */ + void updateSchema(List columnsToAdd, Collection columnsToDrop); + + /** + * Updates a table's partition spec, if applicable. Includes adding new partitions and/or dropping + * existing partitions. + */ + void updatePartitionSpec(List partitionsToAdd, Collection partitionsToDrop); +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java index 9be8c96b7c99..595d9cba52fa 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java @@ -80,4 +80,9 @@ default Set getSubProviders() { default boolean supportsPartitioning(Table table) { return false; } + + default AlterTableOps alterTable(String name) { + throw new UnsupportedOperationException( + String.format("ALTER is not supported for table '%s' of type '%s'.", name, getTableType())); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java new file mode 100644 index 000000000000..e4066dc77627 --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/AlterTestTableOps.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.test; + +import com.fasterxml.jackson.databind.node.ObjectNode; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; +import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps; +import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.TableWithRows; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.Schema.Field; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; + +public class AlterTestTableOps implements AlterTableOps { + private final TableWithRows tableWithRows; + + AlterTestTableOps(TableWithRows tableWithRows) { + this.tableWithRows = tableWithRows; + } + + @Override + public void updateTableProperties(Map setProps, List resetProps) { + ObjectNode props = tableWithRows.getTable().getProperties(); + resetProps.forEach(props::remove); + setProps.forEach(props::put); + tableWithRows.setTable(tableWithRows.getTable().toBuilder().properties(props).build()); + } + + @Override + public void updateSchema(List columnsToAdd, Collection columnsToDrop) { + if (!columnsToAdd.isEmpty() && !tableWithRows.getRows().isEmpty()) { + ImmutableList.Builder requiredFields = ImmutableList.builder(); + for (Field f : columnsToAdd) { + if (!f.getType().getNullable()) { + requiredFields.add(f.getName()); + } + } + Preconditions.checkArgument( + requiredFields.build().isEmpty(), + "Cannot add required fields %s because table '%s' already contains rows.", + requiredFields.build(), + tableWithRows.getTable().getName()); + } + + // update the schema + List schemaFields = tableWithRows.getTable().getSchema().getFields(); + ImmutableList.Builder newSchemaFields = ImmutableList.builder(); + // remove dropped fields + schemaFields.stream() + .filter(f -> !columnsToDrop.contains(f.getName())) + .forEach(newSchemaFields::add); + // add new fields + newSchemaFields.addAll(columnsToAdd); + Schema newSchema = Schema.of(newSchemaFields.build().toArray(new Field[0])); + tableWithRows.setTable(tableWithRows.getTable().toBuilder().schema(newSchema).build()); + + // update existing rows + List rows = tableWithRows.getRows(); + List newRows = new CopyOnWriteArrayList<>(); + for (Row row : rows) { + Map values = new HashMap<>(); + // add existing values, minus dropped columns + for (Field field : schemaFields) { + String name = field.getName(); + if (!columnsToDrop.contains(name)) { + values.put(name, row.getValue(name)); + } + } + Row newRow = Row.withSchema(newSchema).withFieldValues(values).build(); + newRows.add(newRow); + } + tableWithRows.setRows(newRows); + } + + @Override + public void updatePartitionSpec( + List partitionsToAdd, Collection partitionsToDrop) { + throw new UnsupportedOperationException( + TestTableProvider.class.getSimpleName() + " does not support partitions."); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java index 375cb42c4900..365ed9fbc2cf 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter; import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport; import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps; import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.sdk.options.PipelineOptions; @@ -50,6 +51,7 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.util.Preconditions; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; @@ -107,6 +109,13 @@ public Map getTables() { .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table)); } + @Override + public AlterTableOps alterTable(String name) { + TableWithRows table = + Preconditions.checkArgumentNotNull(tables().get(name), "Could not find table '%s'", name); + return new AlterTestTableOps(table); + } + @Override public synchronized BeamSqlTable buildBeamSqlTable(Table table) { return new InMemoryTable(tables().get(table.getName())); @@ -133,9 +142,21 @@ public TableWithRows(long tableProviderInstanceId, Table table) { this.rows = new CopyOnWriteArrayList<>(); } + public Table getTable() { + return table; + } + + void setTable(Table table) { + this.table = table; + } + public List getRows() { return rows; } + + void setRows(List rows) { + this.rows = rows; + } } private static class InMemoryTable extends BaseBeamTable { diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java index 83b8685c3fe9..8892cd889fd1 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java @@ -21,6 +21,7 @@ import java.util.Map; import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable; import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps; import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.checkerframework.checker.nullness.qual.Nullable; @@ -150,4 +151,14 @@ public TableProvider getProvider(String type) { throw new IllegalStateException("No TableProvider registered for table type: " + type); } + + @Override + public AlterTableOps alterTable(String name) { + if (!tables.containsKey(name)) { + throw new IllegalArgumentException("No such table: " + name); + } + + Table table = tables.get(name); + return getProvider(table.getType()).alterTable(name); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java index 0164c634814b..4875122e81e5 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliCatalogTest.java @@ -330,4 +330,18 @@ public void testCreateWriteDropTableWithOtherCatalogScope() { cli.execute("DROP TABLE catalog_1.db_1.person"); assertNull(metastoreDb1.getTable("person")); } + + @Test + public void testAlterCatalog() { + cli.execute("CREATE CATALOG my_catalog TYPE 'local' PROPERTIES('foo'='abc', 'bar'='xyz')"); + cli.execute("USE CATALOG my_catalog"); + assertEquals( + ImmutableMap.of("foo", "abc", "bar", "xyz"), catalogManager.currentCatalog().properties()); + cli.execute("ALTER CATALOG my_catalog SET ('foo'='123', 'new'='val')"); + assertEquals( + ImmutableMap.of("foo", "123", "bar", "xyz", "new", "val"), + catalogManager.currentCatalog().properties()); + cli.execute("ALTER CATALOG my_catalog RESET ('foo', 'bar')"); + assertEquals(ImmutableMap.of("new", "val"), catalogManager.currentCatalog().properties()); + } } diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java index ffbdeb84f136..f29d0c28a90b 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java @@ -24,15 +24,22 @@ import static org.apache.beam.sdk.schemas.Schema.toSchema; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.everyItem; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.oneOf; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import java.time.LocalDate; import java.time.LocalTime; +import java.util.Arrays; +import java.util.List; import java.util.stream.Stream; import org.apache.beam.sdk.extensions.sql.impl.ParseException; import org.apache.beam.sdk.extensions.sql.meta.Table; +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider; import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; @@ -300,4 +307,74 @@ public void test_time_types() throws Exception { // test TIMESTAMP field assertEquals(parseTimestampWithUTCTimeZone("2018-07-01 21:26:07.123"), row.getDateTime("f_ts")); } + + @Test + public void testAlterTableSchema() { + InMemoryCatalogManager catalogManager = new InMemoryCatalogManager(); + TestTableProvider provider = new TestTableProvider(); + catalogManager.registerTableProvider(provider); + BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager); + + cli.execute( + "CREATE EXTERNAL TABLE test_table(id integer not null, str varchar not null, fl float) type 'test'"); + cli.execute("INSERT INTO test_table VALUES (1, 'a', 0.1), (2, 'b', 0.2), (3, 'c', 0.3)"); + TestTableProvider.TableWithRows tableWithRows = provider.tables().get("test_table"); + assertNotNull(tableWithRows); + Schema initialSchema = + Schema.builder() + .addInt32Field("id") + .addStringField("str") + .addNullableFloatField("fl") + .build(); + assertEquals(initialSchema, tableWithRows.getTable().getSchema()); + List initialRows = + Arrays.asList( + Row.withSchema(initialSchema).addValues(1, "a", 0.1f).build(), + Row.withSchema(initialSchema).addValues(2, "b", 0.2f).build(), + Row.withSchema(initialSchema).addValues(3, "c", 0.3f).build()); + assertThat(initialRows, everyItem(is(oneOf(tableWithRows.getRows().toArray(new Row[0]))))); + + cli.execute( + "ALTER TABLE test_table DROP COLUMNS (str, fl) ADD COLUMNS (newBool boolean, newLong bigint)"); + cli.execute("INSERT INTO test_table VALUES (4, true, 4), (5, false, 5), (6, false, 6)"); + Schema newSchema = + Schema.builder() + .addInt32Field("id") + .addNullableBooleanField("newBool") + .addNullableInt64Field("newLong") + .build(); + assertEquals(newSchema, tableWithRows.getTable().getSchema()); + + // existing rows should have the corresponding values dropped + List newRows = + Arrays.asList( + Row.withSchema(newSchema).addValues(1, null, null).build(), + Row.withSchema(newSchema).addValues(2, null, null).build(), + Row.withSchema(newSchema).addValues(3, null, null).build(), + Row.withSchema(newSchema).addValues(4, true, 4L).build(), + Row.withSchema(newSchema).addValues(5, false, 5L).build(), + Row.withSchema(newSchema).addValues(6, false, 6L).build()); + assertThat(newRows, everyItem(is(oneOf(tableWithRows.getRows().toArray(new Row[0]))))); + } + + @Test + public void testAlterTableProperties() { + InMemoryCatalogManager catalogManager = new InMemoryCatalogManager(); + TestTableProvider provider = new TestTableProvider(); + catalogManager.registerTableProvider(provider); + BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager); + + cli.execute( + "CREATE EXTERNAL TABLE test_table(id integer, str varchar) type 'test' " + + "TBLPROPERTIES '{ \"foo\" : \"123\", \"bar\" : \"abc\"}'"); + TestTableProvider.TableWithRows tableWithRows = provider.tables().get("test_table"); + assertNotNull(tableWithRows); + assertEquals("123", tableWithRows.getTable().getProperties().get("foo").asText()); + assertEquals("abc", tableWithRows.getTable().getProperties().get("bar").asText()); + + cli.execute("ALTER TABLE test_table RESET('bar') SET('foo'='456', 'baz'='xyz')"); + assertEquals("456", tableWithRows.getTable().getProperties().get("foo").asText()); + assertEquals("xyz", tableWithRows.getTable().getProperties().get("baz").asText()); + assertFalse(tableWithRows.getTable().getProperties().has("bar")); + } } diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 7603e2c6259f..748dd319c076 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -19,6 +19,7 @@ import com.google.auto.value.AutoValue; import java.io.Serializable; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Set; @@ -27,18 +28,23 @@ import org.apache.beam.sdk.util.ReleaseInfo; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Table; +import org.apache.iceberg.UpdatePartitionSpec; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.Namespace; import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.apache.iceberg.exceptions.NoSuchTableException; +import org.apache.iceberg.types.Type; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.checkerframework.dataflow.qual.Pure; @@ -143,7 +149,10 @@ public boolean dropNamespace(String namespace, boolean cascade) { } public void createTable( - String tableIdentifier, Schema tableSchema, @Nullable List partitionFields) { + String tableIdentifier, + Schema tableSchema, + @Nullable List partitionFields, + Map properties) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema); PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema); @@ -153,7 +162,7 @@ public void createTable( icebergIdentifier, icebergSchema, icebergSpec); - catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec); + catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties); LOG.info("Successfully created table '{}'.", icebergIdentifier); } catch (AlreadyExistsException e) { throw new TableAlreadyExistsException(e); @@ -164,28 +173,92 @@ public void createTable( TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); try { Table table = catalog().loadTable(icebergIdentifier); - return IcebergTableInfo.create( - tableIdentifier, - IcebergUtils.icebergSchemaToBeamSchema(table.schema()), - table.properties()); + return new IcebergTableInfo(tableIdentifier, table); } catch (NoSuchTableException ignored) { return null; } } // Helper class to pass information to Beam SQL module without relying on Iceberg deps - @AutoValue - public abstract static class IcebergTableInfo { - public abstract String getIdentifier(); + public static class IcebergTableInfo { + private final String identifier; + private final Table table; - public abstract Schema getSchema(); + IcebergTableInfo(String identifier, Table table) { + this.identifier = identifier; + this.table = table; + } + + public String getIdentifier() { + return identifier; + } + + public Schema getSchema() { + return IcebergUtils.icebergSchemaToBeamSchema(table.schema()); + } + + public Map getProperties() { + return table.properties(); + } + + public void updateTableProps(Map setProps, List resetProps) { + if (setProps.isEmpty() && resetProps.isEmpty()) { + return; + } + + UpdateProperties update = table.updateProperties(); + resetProps.forEach(update::remove); + setProps.forEach(update::set); + + update.commit(); + } - public abstract Map getProperties(); + public void updateSchema(List columnsToAdd, Collection columnsToDrop) { + if (columnsToAdd.isEmpty() && columnsToDrop.isEmpty()) { + return; + } + UpdateSchema update = table.updateSchema(); + ImmutableList.Builder requiredColumns = ImmutableList.builder(); + + for (Schema.Field col : columnsToAdd) { + String name = col.getName(); + Type type = IcebergUtils.beamFieldTypeToIcebergFieldType(col.getType(), 0).type; + String desc = col.getDescription(); + + if (col.getType().getNullable()) { + if (desc.isEmpty()) { + update.addColumn(name, type); + } else { + update.addColumn(name, type, desc); + } + } else { + requiredColumns.add(name); + } + } + if (!requiredColumns.build().isEmpty()) { + throw new UnsupportedOperationException( + "Adding required columns is not yet supported. " + + "Encountered required columns: " + + requiredColumns.build()); + } + + columnsToDrop.forEach(update::deleteColumn); - static IcebergTableInfo create( - String identifier, Schema schema, Map properties) { - return new AutoValue_IcebergCatalogConfig_IcebergTableInfo(identifier, schema, properties); - }; + update.commit(); + } + + public void updatePartitionSpec( + List partitionsToAdd, Collection partitionsToDrop) { + if (partitionsToAdd.isEmpty() && partitionsToDrop.isEmpty()) { + return; + } + UpdatePartitionSpec update = table.updateSpec(); + + partitionsToDrop.stream().map(PartitionUtils::toIcebergTerm).forEach(update::removeField); + partitionsToAdd.stream().map(PartitionUtils::toIcebergTerm).forEach(update::addField); + + update.commit(); + } } public boolean dropTable(String tableIdentifier) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java index 4b94663c64c5..2b3117f8bf84 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/PartitionUtils.java @@ -22,11 +22,14 @@ import java.util.List; import java.util.Map; import java.util.function.BiFunction; +import java.util.function.Function; import java.util.regex.Matcher; import java.util.regex.Pattern; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.Schema; +import org.apache.iceberg.expressions.Expressions; +import org.apache.iceberg.expressions.Term; import org.checkerframework.checker.nullness.qual.Nullable; class PartitionUtils { @@ -90,4 +93,38 @@ static PartitionSpec toPartitionSpec( return builder.build(); } + + private static final Map> TERMS = + ImmutableMap.of( + HOUR, + matcher -> Expressions.hour(checkStateNotNull(matcher.group(1))), + DAY, + matcher -> Expressions.day(checkStateNotNull(matcher.group(1))), + MONTH, + matcher -> Expressions.month(checkStateNotNull(matcher.group(1))), + YEAR, + matcher -> Expressions.year(checkStateNotNull(matcher.group(1))), + TRUNCATE, + matcher -> + Expressions.truncate( + checkStateNotNull(matcher.group(1)), + Integer.parseInt(checkStateNotNull(matcher.group(2)))), + BUCKET, + matcher -> + Expressions.bucket( + checkStateNotNull(matcher.group(1)), + Integer.parseInt(checkStateNotNull(matcher.group(2)))), + IDENTITY, + matcher -> Expressions.ref(checkStateNotNull(matcher.group(1)))); + + static Term toIcebergTerm(String field) { + for (Map.Entry> entry : TERMS.entrySet()) { + Matcher matcher = entry.getKey().matcher(field); + if (matcher.find()) { + return entry.getValue().apply(matcher); + } + } + + throw new IllegalArgumentException("Could not find a partition term for '" + field + "'."); + } } diff --git a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java index 893e24b61559..591467ce0d05 100644 --- a/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java +++ b/sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/FilterUtilsTest.java @@ -823,14 +823,8 @@ private static void checkEquals(Expression expectedExpr, Expression actualExpr) ImmutableSet inOperations = ImmutableSet.of(Operation.IN, Operation.NOT_IN); if (inOperations.contains(expected.op())) { - System.out.printf( - "xxx op: %s, literals: %s, ref: %s%n", - expected.op(), expected.literals(), expected.ref().name()); assertEquals(expected.literals(), actual.literals()); } else { - System.out.printf( - "xxx op: %s, literal: %s, ref: %s%n", - expected.op(), expected.literal(), expected.ref().name()); assertEquals(expected.literal(), actual.literal()); } }