Skip to content

Commit 2f9a910

Browse files
authored
Implement a hierarchy for Beam SQL Calcite Schemas (#35787)
* add catalog-database-table hierarchy * some java doc * spotless * spotless * cleanup * use databaseExists for efficiency; don't use LOCATION for iceberg tables; fix setOption gap; maybe register table providers from top-level CatalogManager cache * fix postcommits * reset * address comments * spotless * fix
1 parent 15b8560 commit 2f9a910

File tree

48 files changed

+1896
-843
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

48 files changed

+1896
-843
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
{
2-
"modification": 1
2+
"modification": 2
33
}

sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,11 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
1919

20+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
21+
22+
import java.util.HashMap;
2023
import java.util.Map;
21-
import java.util.Set;
2224
import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog;
23-
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
2425
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
2526
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
2627
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
@@ -29,7 +30,7 @@ public class IcebergCatalog extends InMemoryCatalog {
2930
// TODO(ahmedabu98): extend this to the IO implementation so
3031
// other SDKs can make use of it too
3132
private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop";
32-
private final InMemoryMetaStore metaStore = new InMemoryMetaStore();
33+
private final Map<String, IcebergMetastore> metaStores = new HashMap<>();
3334
@VisibleForTesting final IcebergCatalogConfig catalogConfig;
3435

3536
public IcebergCatalog(String name, Map<String, String> properties) {
@@ -52,12 +53,12 @@ public IcebergCatalog(String name, Map<String, String> properties) {
5253
.setCatalogProperties(catalogProps.build())
5354
.setConfigProperties(hadoopProps.build())
5455
.build();
55-
metaStore.registerProvider(new IcebergTableProvider(catalogConfig));
5656
}
5757

5858
@Override
59-
public InMemoryMetaStore metaStore() {
60-
return metaStore;
59+
public IcebergMetastore metaStore(String db) {
60+
metaStores.putIfAbsent(db, new IcebergMetastore(db, catalogConfig));
61+
return metaStores.get(db);
6162
}
6263

6364
@Override
@@ -70,17 +71,24 @@ public boolean createDatabase(String database) {
7071
return catalogConfig.createNamespace(database);
7172
}
7273

74+
@Override
75+
public void useDatabase(String database) {
76+
checkArgument(databaseExists(database), "Database '%s' does not exist.");
77+
currentDatabase = database;
78+
}
79+
80+
@Override
81+
public boolean databaseExists(String db) {
82+
return catalogConfig.namespaceExists(db);
83+
}
84+
7385
@Override
7486
public boolean dropDatabase(String database, boolean cascade) {
7587
boolean removed = catalogConfig.dropNamespace(database, cascade);
88+
metaStores.remove(database);
7689
if (database.equals(currentDatabase)) {
7790
currentDatabase = null;
7891
}
7992
return removed;
8093
}
81-
82-
@Override
83-
public Set<String> listDatabases() {
84-
return catalogConfig.listNamespaces();
85-
}
8694
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,154 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
19+
20+
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
21+
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
22+
23+
import java.util.HashMap;
24+
import java.util.Map;
25+
import org.apache.beam.sdk.extensions.sql.TableUtils;
26+
import org.apache.beam.sdk.extensions.sql.impl.TableName;
27+
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
28+
import org.apache.beam.sdk.extensions.sql.meta.Table;
29+
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
30+
import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore;
31+
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig;
32+
import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig.IcebergTableInfo;
33+
import org.apache.beam.sdk.io.iceberg.TableAlreadyExistsException;
34+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
35+
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
36+
import org.checkerframework.checker.nullness.qual.Nullable;
37+
import org.slf4j.Logger;
38+
import org.slf4j.LoggerFactory;
39+
40+
public class IcebergMetastore extends InMemoryMetaStore {
41+
private static final Logger LOG = LoggerFactory.getLogger(IcebergMetastore.class);
42+
@VisibleForTesting final IcebergCatalogConfig catalogConfig;
43+
private final Map<String, Table> cachedTables = new HashMap<>();
44+
private final String database;
45+
46+
public IcebergMetastore(String db, IcebergCatalogConfig catalogConfig) {
47+
this.database = db;
48+
this.catalogConfig = catalogConfig;
49+
}
50+
51+
@Override
52+
public String getTableType() {
53+
return "iceberg";
54+
}
55+
56+
@Override
57+
public void createTable(Table table) {
58+
if (!table.getType().equals("iceberg")) {
59+
getProvider(table.getType()).createTable(table);
60+
} else {
61+
String identifier = getIdentifier(table);
62+
try {
63+
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields());
64+
} catch (TableAlreadyExistsException e) {
65+
LOG.info(
66+
"Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier);
67+
}
68+
}
69+
cachedTables.put(table.getName(), table);
70+
}
71+
72+
@Override
73+
public void dropTable(String tableName) {
74+
String identifier = getIdentifier(tableName);
75+
if (catalogConfig.dropTable(identifier)) {
76+
LOG.info("Dropped table '{}' (path: '{}').", tableName, identifier);
77+
} else {
78+
LOG.info(
79+
"Ignoring DROP TABLE call for '{}' (path: '{}') because it does not exist.",
80+
tableName,
81+
identifier);
82+
}
83+
cachedTables.remove(tableName);
84+
}
85+
86+
@Override
87+
public Map<String, Table> getTables() {
88+
for (String id : catalogConfig.listTables(database)) {
89+
String name = TableName.create(id).getTableName();
90+
@Nullable Table cachedTable = cachedTables.get(name);
91+
if (cachedTable == null) {
92+
Table table = checkStateNotNull(loadTable(id));
93+
cachedTables.put(name, table);
94+
}
95+
}
96+
return ImmutableMap.copyOf(cachedTables);
97+
}
98+
99+
@Override
100+
public @Nullable Table getTable(String name) {
101+
if (cachedTables.containsKey(name)) {
102+
return cachedTables.get(name);
103+
}
104+
@Nullable Table table = loadTable(getIdentifier(name));
105+
if (table != null) {
106+
cachedTables.put(name, table);
107+
}
108+
return table;
109+
}
110+
111+
private String getIdentifier(String name) {
112+
return database + "." + name;
113+
}
114+
115+
private String getIdentifier(Table table) {
116+
checkArgument(
117+
table.getLocation() == null, "Cannot create Iceberg tables using LOCATION property.");
118+
return getIdentifier(table.getName());
119+
}
120+
121+
private @Nullable Table loadTable(String identifier) {
122+
@Nullable IcebergTableInfo tableInfo = catalogConfig.loadTable(identifier);
123+
if (tableInfo == null) {
124+
return null;
125+
}
126+
return Table.builder()
127+
.type(getTableType())
128+
.name(identifier)
129+
.schema(tableInfo.getSchema())
130+
.properties(TableUtils.parseProperties(tableInfo.getProperties()))
131+
.build();
132+
}
133+
134+
@Override
135+
public BeamSqlTable buildBeamSqlTable(Table table) {
136+
if (table.getType().equals("iceberg")) {
137+
return new IcebergTable(getIdentifier(table), table, catalogConfig);
138+
}
139+
return getProvider(table.getType()).buildBeamSqlTable(table);
140+
}
141+
142+
@Override
143+
public boolean supportsPartitioning(Table table) {
144+
if (table.getType().equals("iceberg")) {
145+
return true;
146+
}
147+
return getProvider(table.getType()).supportsPartitioning(table);
148+
}
149+
150+
@Override
151+
public void registerProvider(TableProvider provider) {
152+
super.registerProvider(provider);
153+
}
154+
}

sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
*/
1818
package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg;
1919

20-
import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
2120
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2221

2322
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -66,10 +65,10 @@ class IcebergTable extends SchemaBaseBeamTable {
6665
@VisibleForTesting @Nullable Integer triggeringFrequency;
6766
@VisibleForTesting final @Nullable List<String> partitionFields;
6867

69-
IcebergTable(Table table, IcebergCatalogConfig catalogConfig) {
68+
IcebergTable(String tableIdentifier, Table table, IcebergCatalogConfig catalogConfig) {
7069
super(table.getSchema());
7170
this.schema = table.getSchema();
72-
this.tableIdentifier = checkArgumentNotNull(table.getLocation());
71+
this.tableIdentifier = tableIdentifier;
7372
this.catalogConfig = catalogConfig;
7473
ObjectNode properties = table.getProperties();
7574
if (properties.has(TRIGGERING_FREQUENCY_FIELD)) {

sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java

Lines changed: 0 additions & 96 deletions
This file was deleted.

0 commit comments

Comments
 (0)