Skip to content

Commit 1fbd0f5

Browse files
authored
[IcebergIO] Integrate with Beam SQL (#34799)
* add Iceberg table provider and tests * properties go in the tableprovider initialization * tobuilder * streaming integration test * spotless * extend test to include multi nested types; fix iceberg <-> conversion logic * add to changes.md * spotless * fix tests * clean * update CHANGES * add projection pushdown and column pruning * spotless * fixes * fixes * updates * sync with HEAD and use new Catalog implementation * mark new interfaces @internal * spotless * fix unparse method
1 parent f07e3e1 commit 1fbd0f5

File tree

26 files changed

+1256
-35
lines changed

26 files changed

+1256
-35
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run.",
3-
"modification": 4
3+
"modification": 1
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 4
3+
"modification": 5
44
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
11
{
22
"comment": "Modify this file in a trivial way to cause this test suite to run",
3-
"modification": 1
3+
"modification": 2
44
}

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,12 @@
7575
## I/Os
7676

7777
* Support for X source added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
78+
* [IcebergIO] Now available with Beam SQL! ([#34799](https://github.com/apache/beam/pull/34799))
7879
* [IcebergIO] Support reading with column pruning ([#34856](https://github.com/apache/beam/pull/34856))
7980
* [IcebergIO] Support reading with pushdown filtering ([#34827](https://github.com/apache/beam/pull/34827))
8081

8182
## New Features / Improvements
83+
* [Beam SQL] Introducing Beam Catalogs ([#35223](https://github.com/apache/beam/pull/35223))
8284
* Adding Google Storage Requests Pays feature (Golang)([#30747](https://github.com/apache/beam/issues/30747)).
8385
* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
8486
* [Python] Prism runner now auto-enabled for some Python pipelines using the direct runner ([#34921](https://github.com/apache/beam/pull/34921)).

sdks/java/extensions/sql/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,10 @@ dependencies {
7474
fmppTask "org.freemarker:freemarker:2.3.31"
7575
fmppTemplates library.java.vendored_calcite_1_28_0
7676
implementation project(path: ":sdks:java:core", configuration: "shadow")
77+
implementation project(":sdks:java:managed")
78+
implementation project(":sdks:java:io:iceberg")
79+
runtimeOnly project(":sdks:java:io:iceberg:bqms")
80+
runtimeOnly project(":sdks:java:io:iceberg:hive")
7781
implementation project(":sdks:java:extensions:avro")
7882
implementation project(":sdks:java:extensions:join-library")
7983
permitUnusedDeclared project(":sdks:java:extensions:join-library") // BEAM-11761

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateCatalog.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -99,16 +99,16 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
9999
if (i > 0) {
100100
writer.keyword(",");
101101
}
102-
properties.get(i).unparse(writer, leftPrec, rightPrec);
103-
}
104-
105-
for (int i = 0; i < properties.size(); i += 2) {
106-
if (i > 0) {
107-
writer.keyword(",");
108-
}
109-
properties.get(i).unparse(writer, leftPrec, rightPrec); // key
102+
SqlNode property = properties.get(i);
103+
checkState(
104+
property instanceof SqlNodeList,
105+
String.format(
106+
"Unexpected properties entry '%s' of class '%s'", property, property.getClass()));
107+
SqlNodeList kv = ((SqlNodeList) property);
108+
109+
kv.get(0).unparse(writer, leftPrec, rightPrec); // key
110110
writer.keyword("=");
111-
properties.get(i + 1).unparse(writer, leftPrec, rightPrec); // value
111+
kv.get(1).unparse(writer, leftPrec, rightPrec); // value
112112
}
113113
writer.keyword(")");
114114
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,15 @@
1818
package org.apache.beam.sdk.extensions.sql.meta.catalog;
1919

2020
import java.util.Map;
21+
import org.apache.beam.sdk.annotations.Internal;
2122
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
2223

2324
/**
2425
* Represents a named and configurable container for managing tables. Is defined with a type and
2526
* configuration properties. Uses an underlying {@link MetaStore} to manage tables and table
2627
* providers.
2728
*/
29+
@Internal
2830
public interface Catalog {
2931
/** A type that defines this catalog. */
3032
String type();

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.beam.sdk.extensions.sql.meta.catalog;
1919

2020
import java.util.Map;
21+
import org.apache.beam.sdk.annotations.Internal;
2122
import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema;
2223
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
2324
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -32,6 +33,7 @@
3233
* <p>When {@link #registerTableProvider(String, TableProvider)} is called, the provider should
3334
* become available for all catalogs.
3435
*/
36+
@Internal
3537
public interface CatalogManager {
3638
/** Creates and stores a catalog of a particular type. */
3739
void createCatalog(String name, String type, Map<String, String> properties);

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/CatalogRegistrar.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.meta.catalog;
1919

20+
import org.apache.beam.sdk.annotations.Internal;
21+
2022
/**
2123
* Over-arching registrar to capture available {@link Catalog}s. Implementations should be marked
2224
* with {@link com.google.auto.service.AutoService} to be available to {@link
2325
* java.util.ServiceLoader}s.
2426
*/
27+
@Internal
2528
public interface CatalogRegistrar {
2629
Iterable<Class<? extends Catalog>> getCatalogs();
2730
}

sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,15 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.meta.catalog;
1919

20-
import com.google.auto.service.AutoService;
2120
import java.util.Map;
2221
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
2322
import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore;
2423
import org.apache.beam.sdk.util.Preconditions;
2524

26-
@AutoService(Catalog.class)
2725
public class InMemoryCatalog implements Catalog {
2826
private final String name;
2927
private final Map<String, String> properties;
30-
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
28+
protected final InMemoryMetaStore metaStore = new InMemoryMetaStore();
3129

3230
public InMemoryCatalog(String name, Map<String, String> properties) {
3331
this.name = name;
@@ -41,7 +39,8 @@ public String type() {
4139

4240
@Override
4341
public String name() {
44-
return Preconditions.checkStateNotNull(name, "InMemoryCatalog has not been initialized");
42+
return Preconditions.checkStateNotNull(
43+
name, getClass().getSimpleName() + " has not been initialized");
4544
}
4645

4746
@Override

0 commit comments

Comments
 (0)