Skip to content

Commit cbe071b

Browse files
committed
add more tests
1 parent 5e9f462 commit cbe071b

File tree

7 files changed

+219
-18
lines changed

7 files changed

+219
-18
lines changed

sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2121
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
2222

23+
import com.fasterxml.jackson.core.type.TypeReference;
2324
import java.util.HashMap;
2425
import java.util.Map;
2526
import org.apache.beam.sdk.extensions.sql.TableUtils;
@@ -60,7 +61,12 @@ public void createTable(Table table) {
6061
} else {
6162
String identifier = getIdentifier(table);
6263
try {
63-
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields());
64+
Map<String, String> properties =
65+
TableUtils.getObjectMapper()
66+
.convertValue(table.getProperties(), new TypeReference<Map<String, String>>() {});
67+
;
68+
catalogConfig.createTable(
69+
identifier, table.getSchema(), table.getPartitionFields(), properties);
6470
} catch (TableAlreadyExistsException e) {
6571
LOG.info(
6672
"Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier);

sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergAlterTest.java

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,14 @@
2020
import static java.lang.String.format;
2121
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2222
import static org.apache.iceberg.types.Types.NestedField.optional;
23+
import static org.hamcrest.MatcherAssert.assertThat;
24+
import static org.hamcrest.Matchers.allOf;
25+
import static org.hamcrest.Matchers.hasEntry;
2326
import static org.junit.Assert.assertEquals;
2427
import static org.junit.Assert.assertTrue;
2528

2629
import java.io.File;
2730
import java.io.IOException;
28-
import java.util.HashMap;
2931
import java.util.Map;
3032
import java.util.UUID;
3133
import org.apache.beam.sdk.extensions.sql.BeamSqlCli;
@@ -88,24 +90,22 @@ public void testAlterTableProps() {
8890
cli.execute(createCatalog("my_catalog"));
8991
cli.execute("CREATE DATABASE my_catalog.my_db");
9092
cli.execute("USE DATABASE my_catalog.my_db");
91-
cli.execute("CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE 'iceberg'");
93+
cli.execute(
94+
"CREATE EXTERNAL TABLE my_table(col1 VARCHAR, col2 INTEGER) TYPE 'iceberg' TBLPROPERTIES '{ \"prop1\" : \"123\", \"prop2\" : \"abc\"}'");
9295
IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
9396
Table table =
9497
catalog.catalogConfig.catalog().loadTable(TableIdentifier.parse("my_db.my_table"));
95-
Map<String, String> baseProps = table.properties();
98+
assertThat(table.properties(), allOf(hasEntry("prop1", "123"), hasEntry("prop2", "abc")));
9699

97-
cli.execute("ALTER TABLE my_table SET('prop1'='123', 'prop2'='abc', 'prop3'='foo')");
100+
cli.execute("ALTER TABLE my_table SET('prop3'='foo')");
98101
table.refresh();
99-
Map<String, String> expectedProps = new HashMap<>(baseProps);
100-
expectedProps.putAll(ImmutableMap.of("prop1", "123", "prop2", "abc", "prop3", "foo"));
101-
102-
assertEquals(expectedProps, table.properties());
102+
assertThat(
103+
table.properties(),
104+
allOf(hasEntry("prop1", "123"), hasEntry("prop2", "abc"), hasEntry("prop3", "foo")));
103105

104106
cli.execute("ALTER TABLE my_table RESET ('prop1') SET ('prop2'='xyz')");
105-
expectedProps.put("prop2", "xyz");
106-
expectedProps.remove("prop1");
107107
table.refresh();
108-
assertEquals(expectedProps, table.properties());
108+
assertThat(table.properties(), allOf(hasEntry("prop2", "xyz"), hasEntry("prop3", "foo")));
109109
}
110110

111111
@Test

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlAlterCatalog.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,10 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.impl.parser;
1919

20-
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2120
import static org.apache.beam.vendor.calcite.v1_40_0.org.apache.calcite.util.Static.RESOURCE;
2221
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
2322

2423
import java.util.Collection;
25-
import java.util.Collections;
26-
import java.util.HashMap;
2724
import java.util.List;
2825
import java.util.Map;
2926
import org.apache.beam.sdk.extensions.sql.impl.CatalogManagerSchema;
Lines changed: 98 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
118
package org.apache.beam.sdk.extensions.sql.meta.provider.test;
219

3-
public class AlterTestTableOps {
20+
import com.fasterxml.jackson.databind.node.ObjectNode;
21+
import java.util.Collection;
22+
import java.util.HashMap;
23+
import java.util.List;
24+
import java.util.Map;
25+
import java.util.concurrent.CopyOnWriteArrayList;
26+
import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
27+
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider.TableWithRows;
28+
import org.apache.beam.sdk.schemas.Schema;
29+
import org.apache.beam.sdk.schemas.Schema.Field;
30+
import org.apache.beam.sdk.values.Row;
31+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
32+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
33+
34+
public class AlterTestTableOps implements AlterTableOps {
35+
private final TableWithRows tableWithRows;
36+
37+
AlterTestTableOps(TableWithRows tableWithRows) {
38+
this.tableWithRows = tableWithRows;
39+
}
40+
41+
@Override
42+
public void updateTableProperties(Map<String, String> setProps, List<String> resetProps) {
43+
ObjectNode props = tableWithRows.getTable().getProperties();
44+
resetProps.forEach(props::remove);
45+
setProps.forEach(props::put);
46+
tableWithRows.setTable(tableWithRows.getTable().toBuilder().properties(props).build());
47+
}
48+
49+
@Override
50+
public void updateSchema(List<Field> columnsToAdd, Collection<String> columnsToDrop) {
51+
if (!columnsToAdd.isEmpty() && !tableWithRows.getRows().isEmpty()) {
52+
ImmutableList.Builder<String> requiredFields = ImmutableList.builder();
53+
for (Field f : columnsToAdd) {
54+
if (!f.getType().getNullable()) {
55+
requiredFields.add(f.getName());
56+
}
57+
}
58+
Preconditions.checkArgument(
59+
requiredFields.build().isEmpty(),
60+
"Cannot add required fields %s because table '%s' already contains rows.",
61+
requiredFields.build(),
62+
tableWithRows.getTable().getName());
63+
}
64+
65+
// update the schema
66+
List<Field> schemaFields = tableWithRows.getTable().getSchema().getFields();
67+
ImmutableList.Builder<Field> newSchemaFields = ImmutableList.builder();
68+
// remove dropped fields
69+
schemaFields.stream()
70+
.filter(f -> !columnsToDrop.contains(f.getName()))
71+
.forEach(newSchemaFields::add);
72+
// add new fields
73+
newSchemaFields.addAll(columnsToAdd);
74+
Schema newSchema = Schema.of(newSchemaFields.build().toArray(new Field[0]));
75+
tableWithRows.setTable(tableWithRows.getTable().toBuilder().schema(newSchema).build());
76+
77+
// update existing rows
78+
List<Row> rows = tableWithRows.getRows();
79+
List<Row> newRows = new CopyOnWriteArrayList<>();
80+
for (Row row : rows) {
81+
Map<String, Object> values = new HashMap<>();
82+
// add existing values, minus dropped columns
83+
for (Field field : schemaFields) {
84+
String name = field.getName();
85+
if (!columnsToDrop.contains(name)) {
86+
values.put(name, row.getValue(name));
87+
}
88+
}
89+
Row newRow = Row.withSchema(newSchema).withFieldValues(values).build();
90+
newRows.add(newRow);
91+
}
92+
tableWithRows.setRows(newRows);
93+
}
94+
95+
@Override
96+
public void updatePartitionSpec(
97+
List<String> partitionsToAdd, Collection<String> partitionsToDrop) {
98+
throw new UnsupportedOperationException(
99+
TestTableProvider.class.getSimpleName() + " does not support partitions.");
100+
}
4101
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/test/TestTableProvider.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.apache.beam.sdk.extensions.sql.meta.DefaultTableFilter;
3737
import org.apache.beam.sdk.extensions.sql.meta.ProjectSupport;
3838
import org.apache.beam.sdk.extensions.sql.meta.Table;
39+
import org.apache.beam.sdk.extensions.sql.meta.provider.AlterTableOps;
3940
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
4041
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
4142
import org.apache.beam.sdk.options.PipelineOptions;
@@ -50,6 +51,7 @@
5051
import org.apache.beam.sdk.transforms.PTransform;
5152
import org.apache.beam.sdk.transforms.ParDo;
5253
import org.apache.beam.sdk.transforms.SerializableFunction;
54+
import org.apache.beam.sdk.util.Preconditions;
5355
import org.apache.beam.sdk.values.PBegin;
5456
import org.apache.beam.sdk.values.PCollection;
5557
import org.apache.beam.sdk.values.PDone;
@@ -107,6 +109,13 @@ public Map<String, Table> getTables() {
107109
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().table));
108110
}
109111

112+
@Override
113+
public AlterTableOps alterTable(String name) {
114+
TableWithRows table =
115+
Preconditions.checkArgumentNotNull(tables().get(name), "Could not find table '%s'", name);
116+
return new AlterTestTableOps(table);
117+
}
118+
110119
@Override
111120
public synchronized BeamSqlTable buildBeamSqlTable(Table table) {
112121
return new InMemoryTable(tables().get(table.getName()));
@@ -133,9 +142,21 @@ public TableWithRows(long tableProviderInstanceId, Table table) {
133142
this.rows = new CopyOnWriteArrayList<>();
134143
}
135144

145+
public Table getTable() {
146+
return table;
147+
}
148+
149+
void setTable(Table table) {
150+
this.table = table;
151+
}
152+
136153
public List<Row> getRows() {
137154
return rows;
138155
}
156+
157+
void setRows(List<Row> rows) {
158+
this.rows = rows;
159+
}
139160
}
140161

141162
private static class InMemoryTable extends BaseBeamTable {

sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliTest.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,22 @@
2424
import static org.apache.beam.sdk.schemas.Schema.toSchema;
2525
import static org.hamcrest.MatcherAssert.assertThat;
2626
import static org.hamcrest.Matchers.equalTo;
27+
import static org.hamcrest.Matchers.everyItem;
28+
import static org.hamcrest.Matchers.is;
29+
import static org.hamcrest.Matchers.oneOf;
2730
import static org.junit.Assert.assertEquals;
31+
import static org.junit.Assert.assertFalse;
2832
import static org.junit.Assert.assertNotNull;
2933
import static org.junit.Assert.assertNull;
3034

3135
import java.time.LocalDate;
3236
import java.time.LocalTime;
37+
import java.util.Arrays;
38+
import java.util.List;
3339
import java.util.stream.Stream;
3440
import org.apache.beam.sdk.extensions.sql.impl.ParseException;
3541
import org.apache.beam.sdk.extensions.sql.meta.Table;
42+
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
3643
import org.apache.beam.sdk.extensions.sql.meta.provider.test.TestTableProvider;
3744
import org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider;
3845
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
@@ -300,4 +307,74 @@ public void test_time_types() throws Exception {
300307
// test TIMESTAMP field
301308
assertEquals(parseTimestampWithUTCTimeZone("2018-07-01 21:26:07.123"), row.getDateTime("f_ts"));
302309
}
310+
311+
@Test
312+
public void testAlterTableSchema() {
313+
InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
314+
TestTableProvider provider = new TestTableProvider();
315+
catalogManager.registerTableProvider(provider);
316+
BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager);
317+
318+
cli.execute(
319+
"CREATE EXTERNAL TABLE test_table(id integer not null, str varchar not null, fl float) type 'test'");
320+
cli.execute("INSERT INTO test_table VALUES (1, 'a', 0.1), (2, 'b', 0.2), (3, 'c', 0.3)");
321+
TestTableProvider.TableWithRows tableWithRows = provider.tables().get("test_table");
322+
assertNotNull(tableWithRows);
323+
Schema initialSchema =
324+
Schema.builder()
325+
.addInt32Field("id")
326+
.addStringField("str")
327+
.addNullableFloatField("fl")
328+
.build();
329+
assertEquals(initialSchema, tableWithRows.getTable().getSchema());
330+
List<Row> initialRows =
331+
Arrays.asList(
332+
Row.withSchema(initialSchema).addValues(1, "a", 0.1f).build(),
333+
Row.withSchema(initialSchema).addValues(2, "b", 0.2f).build(),
334+
Row.withSchema(initialSchema).addValues(3, "c", 0.3f).build());
335+
assertThat(initialRows, everyItem(is(oneOf(tableWithRows.getRows().toArray(new Row[0])))));
336+
337+
cli.execute(
338+
"ALTER TABLE test_table DROP COLUMNS (str, fl) ADD COLUMNS (newBool boolean, newLong bigint)");
339+
cli.execute("INSERT INTO test_table VALUES (4, true, 4), (5, false, 5), (6, false, 6)");
340+
Schema newSchema =
341+
Schema.builder()
342+
.addInt32Field("id")
343+
.addNullableBooleanField("newBool")
344+
.addNullableInt64Field("newLong")
345+
.build();
346+
assertEquals(newSchema, tableWithRows.getTable().getSchema());
347+
348+
// existing rows should have the corresponding values dropped
349+
List<Row> newRows =
350+
Arrays.asList(
351+
Row.withSchema(newSchema).addValues(1, null, null).build(),
352+
Row.withSchema(newSchema).addValues(2, null, null).build(),
353+
Row.withSchema(newSchema).addValues(3, null, null).build(),
354+
Row.withSchema(newSchema).addValues(4, true, 4L).build(),
355+
Row.withSchema(newSchema).addValues(5, false, 5L).build(),
356+
Row.withSchema(newSchema).addValues(6, false, 6L).build());
357+
assertThat(newRows, everyItem(is(oneOf(tableWithRows.getRows().toArray(new Row[0])))));
358+
}
359+
360+
@Test
361+
public void testAlterTableProperties() {
362+
InMemoryCatalogManager catalogManager = new InMemoryCatalogManager();
363+
TestTableProvider provider = new TestTableProvider();
364+
catalogManager.registerTableProvider(provider);
365+
BeamSqlCli cli = new BeamSqlCli().catalogManager(catalogManager);
366+
367+
cli.execute(
368+
"CREATE EXTERNAL TABLE test_table(id integer, str varchar) type 'test' "
369+
+ "TBLPROPERTIES '{ \"foo\" : \"123\", \"bar\" : \"abc\"}'");
370+
TestTableProvider.TableWithRows tableWithRows = provider.tables().get("test_table");
371+
assertNotNull(tableWithRows);
372+
assertEquals("123", tableWithRows.getTable().getProperties().get("foo").asText());
373+
assertEquals("abc", tableWithRows.getTable().getProperties().get("bar").asText());
374+
375+
cli.execute("ALTER TABLE test_table RESET('bar') SET('foo'='456', 'baz'='xyz')");
376+
assertEquals("456", tableWithRows.getTable().getProperties().get("foo").asText());
377+
assertEquals("xyz", tableWithRows.getTable().getProperties().get("baz").asText());
378+
assertFalse(tableWithRows.getTable().getProperties().has("bar"));
379+
}
303380
}

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,10 @@ public boolean dropNamespace(String namespace, boolean cascade) {
149149
}
150150

151151
public void createTable(
152-
String tableIdentifier, Schema tableSchema, @Nullable List<String> partitionFields) {
152+
String tableIdentifier,
153+
Schema tableSchema,
154+
@Nullable List<String> partitionFields,
155+
Map<String, String> properties) {
153156
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
154157
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
155158
PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
@@ -159,7 +162,7 @@ public void createTable(
159162
icebergIdentifier,
160163
icebergSchema,
161164
icebergSpec);
162-
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
165+
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties);
163166
LOG.info("Successfully created table '{}'.", icebergIdentifier);
164167
} catch (AlreadyExistsException e) {
165168
throw new TableAlreadyExistsException(e);

0 commit comments

Comments
 (0)