Skip to content

Commit 4820d68

Browse files
burmeciaCopilot
andauthored
chore(iceberg): add extra foreign table options and refactor insert batch processing (#528)
* chore(iceberg): add create_table_if_not_exists foreign table option * chore: refactor partition batch processing * add partition rows sorter * chore: add partition rows sorter * Update wrappers/src/fdw/iceberg_fdw/iceberg_fdw.rs Co-authored-by: Copilot <[email protected]> * Update wrappers/src/fdw/iceberg_fdw/mapper.rs Co-authored-by: Copilot <[email protected]> * Update wrappers/src/fdw/iceberg_fdw/README.md Co-authored-by: Copilot <[email protected]> * Update wrappers/src/fdw/iceberg_fdw/sorter.rs Co-authored-by: Copilot <[email protected]> * modify as per Copilot suggestion --------- Co-authored-by: Copilot <[email protected]>
1 parent 8443e16 commit 4820d68

File tree

12 files changed

+825
-251
lines changed

12 files changed

+825
-251
lines changed

docs/catalog/iceberg.md

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ For any server options need to be stored in Vault, you can add a prefix `vault_`
162162

163163
#### Additional Server Options
164164

165-
- `batch_size` - Controls the batch size of records read from Iceberg (default: 4096)
165+
- `batch_size` - Controls the batch size of records read from Iceberg (value range: 1 - 65536, default: 8192)
166166

167167
### Create a schema
168168

@@ -178,6 +178,8 @@ The full list of foreign table options are below:
178178

179179
- `table` - Fully qualified source table name with all namespaces in Iceberg, required.
180180
- `rowid_column` - The column to use as the row identifier for INSERT operations, required for data insertion.
181+
- `create_table_if_not_exists` - Boolean option (true/false) to automatically create the Iceberg table if it doesn't exist when inserting data, optional (default: false).
182+
- `partition_buffer_size` - Controls the buffer size for partitioned data during insertion operations, determining how many rows are batched together before being written to Iceberg (value range: 1 - 65536, default: 8192).
181183

182184
## Entities
183185

@@ -332,12 +334,43 @@ values (123, 99.99, '2025-01-15');
332334
- **Partition Awareness**: When possible, insert data in partition order to optimize file organization
333335
- **Transaction Size**: Consider breaking very large inserts into smaller transactions
334336

335-
### Limitations for Inserts
337+
### Automatic Table Creation
338+
339+
When using the `create_table_if_not_exists` option, the Iceberg FDW will automatically create the target table in Iceberg if it doesn't exist when inserting data. This is useful for ad-hoc data insertion scenarios.
340+
341+
```sql
342+
create foreign table iceberg.new_table (
343+
id bigint,
344+
name text,
345+
created_at timestamp
346+
)
347+
server iceberg_server
348+
options (
349+
table 'docs_example.new_table',
350+
rowid_column 'id',
351+
create_table_if_not_exists 'true'
352+
);
353+
354+
-- when data is inserted, if the 'docs_example.new_table' doesn't exist in Iceberg,
355+
-- it will be automatically created with a schema matching the foreign table definition
356+
insert into iceberg.new_table (id, name, created_at)
357+
values (1, 'New Record', now());
358+
```
359+
360+
### Limitations for Insertion
336361

337362
- Schema evolution during insert is not supported
338363
- Only append operations are supported (no upserts)
339364
- Complex data types (nested structs, arrays, maps) have limited support
340365

366+
#### Automatic Table Creation Limitations
367+
368+
When using the `create_table_if_not_exists` option, please be aware of the following additional limitations:
369+
370+
- **Type Support**: Only primitive types are supported (such as boolean, integer, text, etc.). Complex types like arrays, structs, and maps are not supported for automatic table creation.
371+
- **Partitioning**: The automatically created table will use default partitioning settings. You cannot specify custom partition or sort specifications during automatic creation.
372+
- **Identifier Fields**: The automatically created table will not have any identifier fields specified. If you need identifier fields, you must create the Iceberg table manually beforehand.
373+
341374
## Limitations
342375

343376
This section describes important limitations and considerations when using this FDW:

supabase-wrappers/src/instance.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ use std::ffi::CStr;
33

44
use crate::prelude::*;
55
use pgrx::pg_sys::panic::ErrorReport;
6-
use pgrx::prelude::*;
6+
use pgrx::{pg_sys::Oid, prelude::*};
77

88
#[derive(Debug, Clone, Default)]
99
pub struct ForeignServer {
10+
pub server_oid: Oid,
1011
pub server_name: String,
1112
pub server_type: Option<String>,
1213
pub server_version: Option<String>,
@@ -38,6 +39,7 @@ pub(super) unsafe fn create_fdw_instance_from_server_id<
3839
};
3940
let fserver = pg_sys::GetForeignServer(fserver_id);
4041
let server = ForeignServer {
42+
server_oid: fserver_id,
4143
server_name: to_string((*fserver).servername).unwrap(),
4244
server_type: to_string((*fserver).servertype),
4345
server_version: to_string((*fserver).serverversion),

supabase-wrappers/src/modify.rs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,9 +232,20 @@ pub(super) extern "C-unwind" fn plan_foreign_modify<
232232
// use NoLock here.
233233
let rel = PgRelation::with_lock((*rte).relid, pg_sys::NoLock as _);
234234

235-
// get rowid column name from table options
236235
let ftable = pg_sys::GetForeignTable(rel.oid());
237-
let opts = options_to_hashmap((*ftable).options).report_unwrap();
236+
let mut opts = options_to_hashmap((*ftable).options).report_unwrap();
237+
238+
// add additional metadata to the options
239+
opts.insert(
240+
"wrappers.fserver_oid".into(),
241+
(*ftable).serverid.to_u32().to_string(),
242+
);
243+
opts.insert(
244+
"wrappers.ftable_oid".into(),
245+
(*ftable).relid.to_u32().to_string(),
246+
);
247+
248+
// check if the rowid column name is specified in table options
238249
let rowid_name = opts.get("rowid_column");
239250
if rowid_name.is_none() {
240251
report_error(

supabase-wrappers/src/scan.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,16 @@ pub(super) extern "C-unwind" fn get_foreign_rel_size<
187187
// get foreign table options
188188
let ftable = pg_sys::GetForeignTable(foreigntableid);
189189
state.opts = options_to_hashmap((*ftable).options).report_unwrap();
190+
191+
// add additional metadata to the options
192+
state.opts.insert(
193+
"wrappers.fserver_oid".into(),
194+
(*ftable).serverid.to_u32().to_string(),
195+
);
196+
state.opts.insert(
197+
"wrappers.ftable_oid".into(),
198+
(*ftable).relid.to_u32().to_string(),
199+
);
190200
});
191201

192202
// get estimate row count and mean row width

wrappers/dockerfiles/s3/iceberg_seed.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,12 @@ def create_asks_table(catalog, namespace):
253253
identifier_field_ids=[1],
254254
)
255255

256+
partition_spec = PartitionSpec(
257+
PartitionField(
258+
source_id=1, field_id=1000, transform=DayTransform(), name="datetime_day"
259+
),
260+
)
261+
256262
sort_order = SortOrder(SortField(source_id=2, transform=IdentityTransform()))
257263

258264
if catalog.table_exists(tblname):
@@ -261,6 +267,7 @@ def create_asks_table(catalog, namespace):
261267
identifier=tblname,
262268
schema=schema,
263269
#location="s3://iceberg",
270+
partition_spec=partition_spec,
264271
sort_order=sort_order,
265272
)
266273
table = catalog.load_table(tblname)

wrappers/src/fdw/iceberg_fdw/README.md

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ This is a foreign data wrapper for [Apache Iceberg](https://iceberg.apache.org/)
88

99
## Changelog
1010

11-
| Version | Date | Notes |
12-
| ------- | ---------- | ---------------------------------------------------- |
13-
| 0.1.3 | 2025-09-20 | Add data insertion support |
14-
| 0.1.2 | 2025-07-30 | Large data set query performance improvement |
15-
| 0.1.1 | 2025-05-15 | Refactor server options passdown |
16-
| 0.1.0 | 2025-05-07 | Initial version |
11+
| Version | Date | Notes |
12+
| ------- | ---------- | ---------------------------------------------------------------------- |
13+
| 0.1.4 | 2025-11-21 | Add create_table_if_not_exists option and improve insertion performance |
14+
| 0.1.3 | 2025-09-20 | Add data insertion support |
15+
| 0.1.2 | 2025-07-30 | Large data set query performance improvement |
16+
| 0.1.1 | 2025-05-15 | Refactor server options passdown |
17+
| 0.1.0 | 2025-05-07 | Initial version |
1718

0 commit comments

Comments
 (0)