Skip to content

Commit a50970b

Browse files
authored
[IcebergIO SQL] create real iceberg table (#35341)
* create real iceberg table * cleanup * remove unit test no longer applicable * spotless * use overarching InMemoryMetaStore * cleanup * cleanup
1 parent 2d68eff commit a50970b

File tree

14 files changed

+170
-101
lines changed

14 files changed

+170
-101
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
2-
"comment": "Modify this file in a trivial way to cause this test suite to run",
2+
"comment": "Modify this file in a trivial way to cause this test suite to run ",
33
"modification": 2
44
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateExternalTable.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,16 +152,17 @@ public void execute(CalcitePrepare.Context context) {
152152
}
153153

154154
BeamCalciteSchema schema = (BeamCalciteSchema) pair.left.schema;
155+
Table table = toTable();
155156
if (partitionFields != null) {
156157
checkArgument(
157-
schema.resolveMetastore().supportsPartitioning(),
158+
schema.resolveMetastore().supportsPartitioning(table),
158159
"Invalid use of 'PARTITIONED BY()': Table '%s' of type '%s' "
159160
+ "does not support partitioning.",
160161
SqlDdlNodes.name(name),
161162
SqlDdlNodes.getString(type));
162163
}
163164

164-
schema.resolveMetastore().createTable(toTable());
165+
schema.resolveMetastore().createTable(table);
165166
}
166167

167168
private void unparseColumn(SqlWriter writer, Schema.Field column) {

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/TableProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ default TableProvider getSubProvider(String name) {
8080
return null;
8181
}
8282

83-
default boolean supportsPartitioning() {
83+
default boolean supportsPartitioning(Table table) {
8484
return false;
8585
}
8686
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,18 +19,18 @@
1919

2020
import java.util.Map;
2121
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
22-
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
22+
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
2323

2424
public class IcebergCatalog extends InMemoryCatalog {
25-
private final IcebergMetastore metaStore = new IcebergMetastore();
25+
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
2626

2727
public IcebergCatalog(String name, Map<String, String> properties) {
2828
super(name, properties);
2929
metaStore.registerProvider(new IcebergTableProvider(name, properties));
3030
}
3131

3232
@Override
33-
public MetaStore metaStore() {
33+
public InMemoryMetaStore metaStore() {
3434
return metaStore;
3535
}
3636

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,19 +17,32 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
22+
23+
import java.util.HashMap;
2024
import java.util.Map;
2125
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
2226
import org.apache.beam.sdk.extensions.sql.meta.Table;
23-
import org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
27+
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
2428
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
29+
import org.apache.beam.sdk.io.iceberg.TableAlreadyExistsException;
2530
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2631
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
32+
import org.slf4j.Logger;
33+
import org.slf4j.LoggerFactory;
2734

28-
public class IcebergTableProvider extends InMemoryMetaTableProvider {
35+
/**
36+
* A table provider for Iceberg tables. CREATE and DROP operations are performed on real external
37+
* tables.
38+
*/
39+
public class IcebergTableProvider implements TableProvider {
40+
private static final Logger LOG = LoggerFactory.getLogger(IcebergTableProvider.class);
2941
// TODO(ahmedabu98): extend this to the IO implementation so
3042
// other SDKs can make use of it too
3143
private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop";
3244
@VisibleForTesting final IcebergCatalogConfig catalogConfig;
45+
private final Map<String, Table> tables = new HashMap<>();
3346

3447
public IcebergTableProvider(String name, Map<String, String> properties) {
3548
ImmutableMap.Builder<String, String> catalogProps = ImmutableMap.builder();
@@ -56,13 +69,48 @@ public String getTableType() {
5669
return "iceberg";
5770
}
5871

72+
@Override
73+
public void createTable(Table table) {
74+
try {
75+
catalogConfig.createTable(
76+
checkStateNotNull(table.getLocation()), table.getSchema(), table.getPartitionFields());
77+
} catch (TableAlreadyExistsException e) {
78+
LOG.info(
79+
"Iceberg table '{}' already exists at location '{}'.",
80+
table.getName(),
81+
table.getLocation());
82+
}
83+
tables.put(table.getName(), table);
84+
}
85+
86+
@Override
87+
public void dropTable(String tableName) {
88+
Table table =
89+
checkArgumentNotNull(getTable(tableName), "Table '%s' is not registered.", tableName);
90+
String location = checkStateNotNull(table.getLocation());
91+
if (catalogConfig.dropTable(location)) {
92+
LOG.info("Dropped table '{}' (location: '{}').", tableName, location);
93+
} else {
94+
LOG.info(
95+
"Ignoring DROP TABLE call for '{}' (location: '{}') because it does not exist.",
96+
tableName,
97+
location);
98+
}
99+
tables.remove(tableName);
100+
}
101+
102+
@Override
103+
public Map<String, Table> getTables() {
104+
return tables;
105+
}
106+
59107
@Override
60108
public BeamSqlTable buildBeamSqlTable(Table table) {
61109
return new IcebergTable(table, catalogConfig);
62110
}
63111

64112
@Override
65-
public boolean supportsPartitioning() {
113+
public boolean supportsPartitioning(Table table) {
66114
return true;
67115
}
68116
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/store/InMemoryMetaStore.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.meta.store;
1919

20+
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
21+
2022
import java.util.HashMap;
2123
import java.util.Map;
2224
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -114,7 +116,18 @@ Map<String, TableProvider> getProviders() {
114116
return providers;
115117
}
116118

117-
public boolean hasProvider(TableProvider provider) {
118-
return providers.containsKey(provider.getTableType());
119+
@Override
120+
public boolean supportsPartitioning(Table table) {
121+
TableProvider provider = providers.get(table.getType());
122+
if (provider == null) {
123+
throw new IllegalArgumentException(
124+
"No TableProvider registered for table type: " + table.getType());
125+
}
126+
return provider.supportsPartitioning(table);
127+
}
128+
129+
public TableProvider getProvider(String type) {
130+
return checkArgumentNotNull(
131+
providers.get(type), "No TableProvider registered for table type: " + type);
119132
}
120133
}

sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/PubsubToIcebergIT.java

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import org.apache.iceberg.catalog.TableIdentifier;
5555
import org.joda.time.Duration;
5656
import org.joda.time.Instant;
57+
import org.junit.After;
5758
import org.junit.AfterClass;
5859
import org.junit.Before;
5960
import org.junit.BeforeClass;
@@ -76,6 +77,7 @@ public class PubsubToIcebergIT implements Serializable {
7677
"org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog";
7778
static final String DATASET = "sql_pubsub_to_iceberg_it_" + System.nanoTime();
7879
static String warehouse;
80+
private static Catalog icebergCatalog;
7981
protected static final GcpOptions OPTIONS =
8082
TestPipeline.testingPipelineOptions().as(GcpOptions.class);
8183
@Rule public TestName testName = new TestName();
@@ -99,6 +101,16 @@ public static void createDataset() throws IOException, InterruptedException {
99101
+ format(" 'gcp_project' = '%s', \n", OPTIONS.getProject())
100102
+ " 'gcp_region' = 'us-central1')";
101103
setCatalogDdl = "USE CATALOG my_catalog";
104+
icebergCatalog =
105+
CatalogUtil.loadCatalog(
106+
BQMS_CATALOG,
107+
"my_catalog",
108+
ImmutableMap.<String, String>builder()
109+
.put("gcp_project", OPTIONS.getProject())
110+
.put("gcp_location", "us-central1")
111+
.put("warehouse", warehouse)
112+
.build(),
113+
null);
102114
}
103115

104116
private String tableIdentifier;
@@ -110,13 +122,18 @@ public void setup() {
110122
tableIdentifier = DATASET + "." + testName.getMethodName();
111123
}
112124

125+
@After
126+
public void cleanup() {
127+
icebergCatalog.dropTable(TableIdentifier.parse(tableIdentifier));
128+
}
129+
113130
@AfterClass
114131
public static void deleteDataset() {
115132
BQ_CLIENT.deleteDataset(OPTIONS.getProject(), DATASET);
116133
}
117134

118135
@Test
119-
public void testSimpleInsert() throws Exception {
136+
public void testSimpleInsertWithPartitionedFields() throws Exception {
120137
String pubsubTableString =
121138
"CREATE EXTERNAL TABLE pubsub_topic (\n"
122139
+ "event_timestamp TIMESTAMP, \n"
@@ -168,16 +185,6 @@ public void testSimpleInsert() throws Exception {
168185
validateRowsWritten();
169186

170187
// verify the table was created with the right partition spec
171-
Catalog icebergCatalog =
172-
CatalogUtil.loadCatalog(
173-
BQMS_CATALOG,
174-
"my_catalog",
175-
ImmutableMap.<String, String>builder()
176-
.put("gcp_project", OPTIONS.getProject())
177-
.put("gcp_location", "us-central1")
178-
.put("warehouse", warehouse)
179-
.build(),
180-
null);
181188
PartitionSpec expectedSpec =
182189
PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(SOURCE_SCHEMA))
183190
.identity("id")

sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergReadWriteIT.java

Lines changed: 35 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,14 @@
2828
import static org.apache.beam.sdk.schemas.Schema.FieldType.STRING;
2929
import static org.apache.beam.sdk.schemas.Schema.FieldType.array;
3030
import static org.apache.beam.sdk.schemas.Schema.FieldType.row;
31+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
3132
import static org.hamcrest.MatcherAssert.assertThat;
3233
import static org.hamcrest.Matchers.containsInAnyOrder;
3334
import static org.hamcrest.Matchers.equalTo;
3435
import static org.hamcrest.Matchers.instanceOf;
3536
import static org.junit.Assert.assertEquals;
37+
import static org.junit.Assert.assertFalse;
38+
import static org.junit.Assert.assertTrue;
3639

3740
import com.google.api.services.bigquery.model.TableRow;
3841
import java.io.IOException;
@@ -45,6 +48,7 @@
4548
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamRelNode;
4649
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
4750
import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
51+
import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager;
4852
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager;
4953
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryUtils;
5054
import org.apache.beam.sdk.io.gcp.testing.BigqueryClient;
@@ -55,8 +59,6 @@
5559
import org.apache.beam.sdk.testing.TestPipeline;
5660
import org.apache.beam.sdk.values.PCollection;
5761
import org.apache.beam.sdk.values.Row;
58-
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
59-
import org.apache.iceberg.CatalogUtil;
6062
import org.apache.iceberg.PartitionSpec;
6163
import org.apache.iceberg.Table;
6264
import org.apache.iceberg.catalog.Catalog;
@@ -132,8 +134,9 @@ public void testSqlWriteWithPartitionFieldsAndRead() throws IOException, Interru
132134

133135
public void runSqlWriteAndRead(boolean withPartitionFields)
134136
throws IOException, InterruptedException {
137+
CatalogManager catalogManager = new InMemoryCatalogManager();
135138
BeamSqlEnv sqlEnv =
136-
BeamSqlEnv.builder(new InMemoryCatalogManager())
139+
BeamSqlEnv.builder(catalogManager)
137140
.setPipelineOptions(PipelineOptionsFactory.create())
138141
.build();
139142
String tableIdentifier = DATASET + "." + testName.getMethodName();
@@ -179,7 +182,30 @@ public void runSqlWriteAndRead(boolean withPartitionFields)
179182
+ "'";
180183
sqlEnv.executeDdl(createTableStatement);
181184

182-
// 3) write to underlying Iceberg table
185+
// 3) verify a real Iceberg table was created, with the right partition spec
186+
IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog();
187+
IcebergTableProvider provider =
188+
(IcebergTableProvider) catalog.metaStore().getProvider("iceberg");
189+
Catalog icebergCatalog = provider.catalogConfig.catalog();
190+
PartitionSpec expectedSpec = PartitionSpec.unpartitioned();
191+
if (withPartitionFields) {
192+
expectedSpec =
193+
PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(SOURCE_SCHEMA))
194+
.bucket("c_integer", 5)
195+
.identity("c_boolean")
196+
.hour("c_timestamp")
197+
.truncate("c_varchar", 3)
198+
.build();
199+
}
200+
Table icebergTable = icebergCatalog.loadTable(TableIdentifier.parse(tableIdentifier));
201+
assertEquals(expectedSpec, icebergTable.spec());
202+
assertEquals("my_catalog." + tableIdentifier, icebergTable.name());
203+
assertTrue(icebergTable.location().startsWith(warehouse));
204+
assertEquals(expectedSpec, icebergTable.spec());
205+
Schema expectedSchema = checkStateNotNull(provider.getTable("TEST")).getSchema();
206+
assertEquals(expectedSchema, IcebergUtils.icebergSchemaToBeamSchema(icebergTable.schema()));
207+
208+
// 4) write to underlying Iceberg table
183209
String insertStatement =
184210
"INSERT INTO TEST VALUES ("
185211
+ "9223372036854775807, "
@@ -199,7 +225,7 @@ public void runSqlWriteAndRead(boolean withPartitionFields)
199225
BeamSqlRelUtils.toPCollection(writePipeline, sqlEnv.parseQuery(insertStatement));
200226
writePipeline.run().waitUntilFinish();
201227

202-
// 4) run external query on Iceberg table (hosted on BQ) to verify correct row was written
228+
// 5) run external query on Iceberg table (hosted on BQ) to verify correct row was written
203229
String query = format("SELECT * FROM `%s.%s`", OPTIONS.getProject(), tableIdentifier);
204230
TableRow returnedRow =
205231
BQ_CLIENT.queryUnflattened(query, OPTIONS.getProject(), true, true).get(0);
@@ -223,37 +249,17 @@ public void runSqlWriteAndRead(boolean withPartitionFields)
223249
.build();
224250
assertEquals(expectedRow, beamRow);
225251

226-
// 5) read using Beam SQL and verify
252+
// 6) read using Beam SQL and verify
227253
String selectTableStatement = "SELECT * FROM TEST";
228254
PCollection<Row> output =
229255
BeamSqlRelUtils.toPCollection(readPipeline, sqlEnv.parseQuery(selectTableStatement));
230256
PAssert.that(output).containsInAnyOrder(expectedRow);
231257
PipelineResult.State state = readPipeline.run().waitUntilFinish();
232258
assertThat(state, equalTo(PipelineResult.State.DONE));
233259

234-
// 6) verify the table was created with the right partition spec
235-
Catalog icebergCatalog =
236-
CatalogUtil.loadCatalog(
237-
BQMS_CATALOG,
238-
"my_catalog",
239-
ImmutableMap.<String, String>builder()
240-
.put("gcp_project", OPTIONS.getProject())
241-
.put("gcp_location", "us-central1")
242-
.put("warehouse", warehouse)
243-
.build(),
244-
null);
245-
PartitionSpec expectedSpec = PartitionSpec.unpartitioned();
246-
if (withPartitionFields) {
247-
expectedSpec =
248-
PartitionSpec.builderFor(IcebergUtils.beamSchemaToIcebergSchema(SOURCE_SCHEMA))
249-
.bucket("c_integer", 5)
250-
.identity("c_boolean")
251-
.hour("c_timestamp")
252-
.truncate("c_varchar", 3)
253-
.build();
254-
}
255-
Table table = icebergCatalog.loadTable(TableIdentifier.parse(tableIdentifier));
256-
assertEquals(expectedSpec, table.spec());
260+
// 7) cleanup
261+
sqlEnv.executeDdl("DROP TABLE TEST");
262+
assertFalse(icebergCatalog.tableExists(TableIdentifier.parse(tableIdentifier)));
257263
}
258264

259265
@Test

0 commit comments

Comments
 (0)