Skip to content

Commit 257cd5a

Browse files
committed
add sql ddl website documentation
1 parent 092ea3e commit 257cd5a

File tree

13 files changed

+468
-436
lines changed

13 files changed

+468
-436
lines changed

sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergMetastore.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@
2222

2323
import java.util.HashMap;
2424
import java.util.Map;
25+
import java.util.stream.Collectors;
26+
27+
import com.fasterxml.jackson.core.type.TypeReference;
2528
import org.apache.beam.sdk.extensions.sql.TableUtils;
2629
import org.apache.beam.sdk.extensions.sql.impl.TableName;
2730
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
@@ -59,8 +62,11 @@ public void createTable(Table table) {
5962
getProvider(table.getType()).createTable(table);
6063
} else {
6164
String identifier = getIdentifier(table);
65+
Map<String, String> props = TableUtils.getObjectMapper()
66+
.convertValue(table.getProperties(), new TypeReference<Map<String, String>>() {}).entrySet()
67+
.stream().filter(p -> !p.getKey().startsWith("beam.")).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
6268
try {
63-
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields());
69+
catalogConfig.createTable(identifier, table.getSchema(), table.getPartitionFields(), props);
6470
} catch (TableAlreadyExistsException e) {
6571
LOG.info(
6672
"Iceberg table '{}' already exists at location '{}'.", table.getName(), identifier);

sdks/java/extensions/sql/iceberg/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTable.java

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
2121

22+
import com.fasterxml.jackson.databind.JsonNode;
2223
import com.fasterxml.jackson.databind.node.ObjectNode;
2324
import java.util.HashMap;
2425
import java.util.List;
@@ -56,6 +57,8 @@ class IcebergTable extends SchemaBaseBeamTable {
5657
@VisibleForTesting static final String CATALOG_PROPERTIES_FIELD = "catalog_properties";
5758
@VisibleForTesting static final String HADOOP_CONFIG_PROPERTIES_FIELD = "config_properties";
5859
@VisibleForTesting static final String CATALOG_NAME_FIELD = "catalog_name";
60+
static final String BEAM_WRITE_PROPERTY = "beam.write.";
61+
static final String BEAM_READ_PROPERTY = "beam.read.";
5962

6063
@VisibleForTesting
6164
static final String TRIGGERING_FREQUENCY_FIELD = "triggering_frequency_seconds";
@@ -71,9 +74,21 @@ class IcebergTable extends SchemaBaseBeamTable {
7174
this.tableIdentifier = tableIdentifier;
7275
this.catalogConfig = catalogConfig;
7376
ObjectNode properties = table.getProperties();
74-
if (properties.has(TRIGGERING_FREQUENCY_FIELD)) {
75-
this.triggeringFrequency = properties.get(TRIGGERING_FREQUENCY_FIELD).asInt();
77+
for (Map.Entry<String, JsonNode> property : properties.properties()) {
78+
String name = property.getKey();
79+
if (name.startsWith(BEAM_WRITE_PROPERTY)) {
80+
String prop = name.substring(BEAM_WRITE_PROPERTY.length());
81+
if (prop.equals(TRIGGERING_FREQUENCY_FIELD)) {
82+
this.triggeringFrequency = property.getValue().asInt();
83+
} else {
84+
throw new IllegalArgumentException("Unknown Beam write property: " + name);
85+
}
86+
} else if (name.startsWith(BEAM_READ_PROPERTY)) {
87+
// none supported yet
88+
throw new IllegalArgumentException("Unknown Beam read property: " + name);
89+
}
7690
}
91+
7792
this.partitionFields = table.getPartitionFields();
7893
}
7994

sdks/java/extensions/sql/iceberg/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/PubsubToIcebergIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ public void testSimpleInsertWithPartitionedFields() throws Exception {
156156
+ ") \n"
157157
+ "TYPE 'iceberg' \n"
158158
+ "PARTITIONED BY('id', 'truncate(name, 3)') \n"
159-
+ "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'";
159+
+ "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'";
160160
String insertStatement =
161161
format("INSERT INTO %s \n", tableIdentifier)
162162
+ "SELECT \n"
@@ -211,7 +211,7 @@ public void testSimpleInsertFlat() throws Exception {
211211
+ " name VARCHAR \n "
212212
+ ") \n"
213213
+ "TYPE 'iceberg' \n"
214-
+ "TBLPROPERTIES '{ \"triggering_frequency_seconds\" : 10 }'";
214+
+ "TBLPROPERTIES '{ \"beam.write.triggering_frequency_seconds\" : 10 }'";
215215
String insertStatement =
216216
format("INSERT INTO %s \n", tableIdentifier)
217217
+ "SELECT \n"

sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public boolean dropNamespace(String namespace, boolean cascade) {
143143
}
144144

145145
public void createTable(
146-
String tableIdentifier, Schema tableSchema, @Nullable List<String> partitionFields) {
146+
String tableIdentifier, Schema tableSchema, @Nullable List<String> partitionFields, @Nullable Map<String, String> properties) {
147147
TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier);
148148
org.apache.iceberg.Schema icebergSchema = IcebergUtils.beamSchemaToIcebergSchema(tableSchema);
149149
PartitionSpec icebergSpec = PartitionUtils.toPartitionSpec(partitionFields, tableSchema);
@@ -153,7 +153,7 @@ public void createTable(
153153
icebergIdentifier,
154154
icebergSchema,
155155
icebergSpec);
156-
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec);
156+
catalog().createTable(icebergIdentifier, icebergSchema, icebergSpec, properties);
157157
LOG.info("Successfully created table '{}'.", icebergIdentifier);
158158
} catch (AlreadyExistsException e) {
159159
throw new TableAlreadyExistsException(e);
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
---
2+
type: languages
3+
title: "Beam SQL DDL: Alter"
4+
---
5+
<!--
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
19+
# ALTER statements
20+
21+
The **ALTER** statement modifies the definition of an existing Catalog or Table.
22+
For supported tables (like Iceberg), this enables **schema and partition evolution**.
23+
24+
## ALTER CATALOG
25+
Modifies an existing catalog's properties.
26+
27+
```sql
28+
ALTER CATALOG catalog_name
29+
[ SET ( 'key' = 'val', ... ) ]
30+
[ RESET ( 'key', ... ) ]
31+
```
32+
- **SET**: Adds new properties or updates existing ones.
33+
- **RESET** / **UNSET**: Removes properties.
34+
35+
## ALTER TABLE
36+
Modifies an existing table's properties and evolves its partition and schema.
37+
38+
```sql
39+
ALTER TABLE table_name
40+
[ ADD COLUMNS ( col_def, ... ) ]
41+
[ DROP COLUMNS ( col_name, ... ) ]
42+
[ ADD PARTITIONS ( partition_field, ... ) ]
43+
[ DROP PARTITIONS ( partition_field, ... ) ]
44+
[ SET ( 'key' = 'val', ... ) ]
45+
[ ( RESET | UNSET ) ( 'key', ... ) ];
46+
```
47+
48+
*Example 1: Add or remove columns*
49+
```sql
50+
-- Add columns
51+
ALTER TABLE orders ADD COLUMNS (
52+
customer_email VARCHAR,
53+
shipping_region VARCHAR
54+
);
55+
56+
-- Drop columns
57+
ALTER TABLE orders DROP COLUMNS ( customer_email );
58+
```
59+
60+
*Example 2: Modify partition spec*
61+
```sql
62+
-- Add a partition field
63+
ALTER TABLE orders ADD PARTITIONS ( 'year(order_date)' );
64+
65+
-- Remove a partition field
66+
ALTER TABLE orders DROP PARTITIONS ( 'region_id' );
67+
```
68+
69+
*Example 3: Modify table properties*
70+
```sql
71+
ALTER TABLE orders SET (
72+
'write.format.default' = 'orc',
73+
'write.metadata.metrics.default' = 'full' );
74+
ALTER TABLE orders RESET ( 'write.target-file-size-bytes' );
75+
```
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
---
2+
type: languages
3+
title: "Beam SQL DDL: Create"
4+
---
5+
<!--
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
19+
# CREATE statements
20+
21+
The **CREATE** command serves two potential functions depending on the connector:
22+
23+
- **Registration**: By default, it registers an existing external entity in the Beam SQL session.
24+
- **Instantiation**: For supported connectors (e.g., Iceberg), it physically creates the entity
25+
(e.g. namespace or table) in the underlying storage.
26+
27+
_**Note**: Creating a catalog or database does not automatically switch to it. Remember
28+
to run [USE](TODO:LINK-TO-USE) afterwards to set it as a default._
29+
30+
## `CREATE CATALOG`
31+
Registers a new catalog instance.
32+
33+
```sql
34+
CREATE CATALOG [ IF NOT EXISTS ] catalog_name
35+
TYPE 'type_name'
36+
[ PROPERTIES ( 'key' = 'value' [, ...] ) ]
37+
```
38+
39+
_**Example**: Creating a Hadoop Catalog (Local Storage)_
40+
```sql
41+
CREATE CATALOG local_catalog
42+
TYPE iceberg
43+
PROPERTIES (
44+
'type' = 'hadoop',
45+
'warehouse' = 'file:///tmp/iceberg-warehouse'
46+
)
47+
```
48+
49+
_**Example**: Registering a BigLake Catalog (GCS)_
50+
```sql
51+
CREATE CATALOG prod_iceberg
52+
TYPE iceberg
53+
PROPERTIES (
54+
'type' = 'rest',
55+
'uri' = 'https://biglake.googleapis.com/iceberg/v1/restcatalog',
56+
'warehouse' = 'gs://my-company-bucket/warehouse',
57+
'header.x-goog-user-project' = 'my_prod_project',
58+
'rest.auth.type' = 'org.apache.iceberg.gcp.auth.GoogleAuthManager',
59+
'io-impl' = 'org.apache.iceberg.gcp.gcs.GCSFileIO',
60+
'rest-metrics-reporting-enabled' = 'false'
61+
);
62+
```
63+
64+
### `CREATE DATABASE`
65+
Creates a new Database within the current Catalog (default), or the specified Catalog.
66+
```sql
67+
CREATE DATABASE [ IF NOT EXISTS ] [ catalog_name. ]database_name;
68+
```
69+
70+
_**Example**: Create a database in the current active catalog_
71+
```sql
72+
USE CATALOG my_catalog;
73+
CREATE DATABASE sales_data;
74+
```
75+
76+
_**Example**: Create a database in a specified catalog (must be registered)_
77+
```sql
78+
CREATE DATABASE other_catalog.sales_data;
79+
```
80+
81+
### `CREATE TABLE`
82+
Creates a table within the currently active catalog and database. If the table name is fully qualified, the referenced database and catalog is used.
83+
84+
```sql
85+
CREATE EXTERNAL TABLE [ IF NOT EXISTS ] [ catalog. ][ db. ]table_name (
86+
col_name col_type [ NOT NULL ] [ COMMENT 'col_comment' ],
87+
...
88+
)
89+
TYPE 'type_name'
90+
[ PARTITIONED BY ( 'partition_field' [, ... ] ) ]
91+
[ COMMENT 'table_comment' ]
92+
[ LOCATION 'location_uri' ]
93+
[ TBLPROPERTIES 'properties_json_string' ];
94+
```
95+
- **TYPE**: the table type (e.g. `'iceberg'`, `'text'`, `'kafka'`)
96+
- **PARTITIONED BY**: an ordered list of fields describing the partition spec.
97+
- **LOCATION**: explicitly sets the location of the table (overriding the inferred `catalog.db.table_name` location)
98+
- **TBLPROPERTIES**: configuration properties used when creating the table or setting up its IO connection.
99+
100+
_**Example**: Creating an Iceberg Table_
101+
```sql
102+
CREATE EXTERNAL TABLE prod_iceberg.sales_data.orders (
103+
order_id BIGINT NOT NULL COMMENT 'Unique order identifier',
104+
amount DECIMAL(10, 2),
105+
order_date TIMESTAMP,
106+
region_id VARCHAR
107+
)
108+
TYPE 'iceberg'
109+
PARTITIONED BY ( 'region_id', 'day(order_date)' )
110+
COMMENT 'Daily sales transactions'
111+
TBLPROPERTIES '{
112+
"write.format.default": "parquet",
113+
"read.split.target-size": 268435456",
114+
"beam.write.triggering_frequency_seconds": 60"
115+
}';
116+
```
117+
- This creates an Iceberg table named `orders` under the namespace `sales_data`, within the `prod_iceberg` catalog.
118+
- The table is partitioned by `region_id`, then by the day value of `order_date` (using Iceberg's [hidden partitioning](https://iceberg.apache.org/docs/latest/partitioning/#icebergs-hidden-partitioning)).
119+
- The table is created with the appropriate properties `"write.format.default"` and `"read.split.target-size"`. The Beam property `"beam.write.triggering_frequency_seconds"`
120+
- Beam properties (prefixed with `"beam.write."` and `"beam.read."` are intended for the relevant IOs)
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
---
2+
type: languages
3+
title: "Beam SQL DDL: Drop"
4+
---
5+
<!--
6+
Licensed under the Apache License, Version 2.0 (the "License");
7+
you may not use this file except in compliance with the License.
8+
You may obtain a copy of the License at
9+
10+
http://www.apache.org/licenses/LICENSE-2.0
11+
12+
Unless required by applicable law or agreed to in writing, software
13+
distributed under the License is distributed on an "AS IS" BASIS,
14+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
See the License for the specific language governing permissions and
16+
limitations under the License.
17+
-->
18+
19+
# DROP statements
20+
The **DROP** command serves two potential functions depending on the connector:
21+
22+
- **Unregistration**: unregisters an entity from the current Beam SQL session.
23+
- **Deletion**: For supported connectors (like **Iceberg**), it **physically deletes** the entity
24+
(e.g. namespace or table) in the underlying storage.
25+
26+
**Caution:** Physical deletion can be permanent
27+
28+
## DROP CATALOG
29+
Unregisters a catalog from Beam SQL. This does not destroy external data, only the link within the SQL session.
30+
31+
```sql
32+
DROP CATALOG [ IF EXISTS ] catalog_name;
33+
```
34+
35+
## DROP DATABASE
36+
Unregisters a database from the current session. For supported connectors, this
37+
will also **delete** the database from the external data source.
38+
39+
```sql
40+
DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ];
41+
```
42+
- **RESTRICT** (Default): Fails if the database is not empty.
43+
- **CASCADE**: Drops the database and all tables contained within it. **Use with caution.**
44+
45+
## DROP TABLE
46+
Unregisters a table from the current session. For supported connectors, this
47+
will also **delete** the table from the external data source.
48+
```sql
49+
DROP TABLE [ IF EXISTS ] table_name;
50+
```

0 commit comments

Comments
 (0)