Skip to content

Commit 6d73a03

Browse files
authored
feat: add support to write scalar columns to iceberg (supabase#357)
1 parent 98eaf81 commit 6d73a03

File tree

11 files changed

+1946
-14
lines changed

11 files changed

+1946
-14
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ actix-web = { version = "4.11.0", default-features = false }
3232
actix-web-httpauth = { version = "0.8.2", default-features = false }
3333
actix-web-metrics = { version = "0.3.0", default-features = false }
3434
anyhow = { version = "1.0.98", default-features = false }
35+
arrow = { version = "55.0", default-features = false }
3536
async-trait = { version = "0.1.88" }
3637
aws-lc-rs = { version = "1.13.3", default-features = false }
3738
base64 = { version = "0.22.1", default-features = false }
@@ -52,6 +53,7 @@ k8s-openapi = { version = "0.25.0", default-features = false }
5253
kube = { version = "1.1.0", default-features = false }
5354
metrics = { version = "0.24.2", default-features = false }
5455
metrics-exporter-prometheus = { version = "0.17.2", default-features = false }
56+
parquet = { version = "55.0", default-features = false }
5557
pg_escape = { version = "0.1.1", default-features = false }
5658
pin-project-lite = { version = "0.2.16", default-features = false }
5759
postgres-replication = { git = "https://github.com/MaterializeInc/rust-postgres", default-features = false, rev = "c4b473b478b3adfbf8667d2fbe895d8423f1290b" }

etl-destinations/Cargo.toml

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,32 +15,42 @@ bigquery = [
1515
"dep:tracing",
1616
"dep:tokio",
1717
]
18-
iceberg = ["dep:iceberg", "dep:iceberg-catalog-rest"]
18+
iceberg = [
19+
"dep:iceberg",
20+
"dep:iceberg-catalog-rest",
21+
"dep:arrow",
22+
"dep:parquet",
23+
"dep:uuid",
24+
]
1925

2026
[dependencies]
2127
etl = { workspace = true }
2228
chrono = { workspace = true }
2329

30+
arrow = { workspace = true, optional = true }
2431
gcp-bigquery-client = { workspace = true, optional = true, features = [
2532
"rust-tls",
2633
"aws-lc-rs",
2734
] }
2835
iceberg = { workspace = true, optional = true }
2936
iceberg-catalog-rest = { workspace = true, optional = true }
37+
parquet = { workspace = true, optional = true, features = ["async", "arrow"] }
3038
prost = { workspace = true, optional = true }
3139
rustls = { workspace = true, optional = true, features = [
3240
"aws-lc-rs",
3341
"logging",
3442
] }
3543
tokio = { workspace = true, optional = true, features = ["sync"] }
3644
tracing = { workspace = true, optional = true, default-features = true }
45+
uuid = { workspace = true, optional = true, features = ["v4"] }
3746

3847
[dev-dependencies]
3948
etl = { workspace = true, features = ["test-utils"] }
4049
etl-telemetry = { workspace = true }
4150

4251
base64 = { workspace = true }
4352
chrono = { workspace = true }
53+
futures = { workspace = true }
4454
rand = { workspace = true, features = ["thread_rng"] }
4555
reqwest = { workspace = true }
4656
serde = { workspace = true }

etl-destinations/src/iceberg/client.rs

Lines changed: 132 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,31 @@
11
use std::{collections::HashMap, sync::Arc};
22

3-
use etl::types::TableSchema;
4-
use iceberg::{Catalog, NamespaceIdent, TableCreation, TableIdent};
3+
use arrow::array::RecordBatch;
4+
use etl::{
5+
error::EtlResult,
6+
types::{TableRow, TableSchema},
7+
};
8+
use iceberg::{
9+
Catalog, NamespaceIdent, TableCreation, TableIdent,
10+
table::Table,
11+
transaction::{ApplyTransactionAction, Transaction},
12+
writer::{
13+
IcebergWriter, IcebergWriterBuilder,
14+
base_writer::data_file_writer::DataFileWriterBuilder,
15+
file_writer::{
16+
ParquetWriterBuilder,
17+
location_generator::{DefaultFileNameGenerator, DefaultLocationGenerator},
18+
},
19+
},
20+
};
521
use iceberg_catalog_rest::{RestCatalog, RestCatalogConfig};
22+
use parquet::{basic::Compression, file::properties::WriterProperties};
623

7-
use crate::iceberg::schema::postgres_to_iceberg_schema;
24+
use crate::iceberg::{
25+
encoding::rows_to_record_batch,
26+
error::{arrow_error_to_etl_error, iceberg_error_to_etl_error},
27+
schema::postgres_to_iceberg_schema,
28+
};
829

930
/// Client for connecting to Iceberg data lakes.
1031
#[derive(Clone)]
@@ -14,10 +35,15 @@ pub struct IcebergClient {
1435

1536
impl IcebergClient {
1637
/// Creates a new [IcebergClient] from a REST catalog URI and a warehouse name.
17-
pub fn new_with_rest_catalog(catalog_uri: String, warehouse_name: String) -> Self {
38+
pub fn new_with_rest_catalog(
39+
catalog_uri: String,
40+
warehouse_name: String,
41+
props: HashMap<String, String>,
42+
) -> Self {
1843
let catalog_config = RestCatalogConfig::builder()
1944
.uri(catalog_uri)
2045
.warehouse(warehouse_name)
46+
.props(props)
2147
.build();
2248
let catalog = RestCatalog::new(catalog_config);
2349
IcebergClient {
@@ -92,4 +118,106 @@ impl IcebergClient {
92118
let namespace_ident = NamespaceIdent::from_strs(namespace.split('.'))?;
93119
self.catalog.drop_namespace(&namespace_ident).await
94120
}
121+
122+
/// Load a table
123+
pub async fn load_table(
124+
&self,
125+
namespace: String,
126+
table_name: String,
127+
) -> Result<iceberg::table::Table, iceberg::Error> {
128+
let namespace_ident = NamespaceIdent::new(namespace);
129+
let table_ident = TableIdent::new(namespace_ident, table_name);
130+
self.catalog.load_table(&table_ident).await
131+
}
132+
133+
/// Insert table rows into the table in the destination
134+
pub async fn insert_rows(
135+
&self,
136+
namespace: String,
137+
table_name: String,
138+
table_rows: &[TableRow],
139+
) -> EtlResult<()> {
140+
let namespace_ident = NamespaceIdent::new(namespace);
141+
let table_ident = TableIdent::new(namespace_ident, table_name);
142+
143+
let table = self
144+
.catalog
145+
.load_table(&table_ident)
146+
.await
147+
.map_err(iceberg_error_to_etl_error)?;
148+
let table_metadata = table.metadata();
149+
let iceberg_schema = table_metadata.current_schema();
150+
151+
// Convert the actual Iceberg schema to Arrow schema using iceberg-rust's built-in converter
152+
// This preserves field IDs properly for transaction-based writes
153+
let arrow_schema = iceberg::arrow::schema_to_arrow_schema(iceberg_schema)
154+
.map_err(iceberg_error_to_etl_error)?;
155+
let record_batch =
156+
rows_to_record_batch(table_rows, arrow_schema).map_err(arrow_error_to_etl_error)?;
157+
158+
self.write_record_batch(&table, record_batch)
159+
.await
160+
.map_err(iceberg_error_to_etl_error)?;
161+
162+
Ok(())
163+
}
164+
165+
async fn write_record_batch(
166+
&self,
167+
table: &Table,
168+
record_batch: RecordBatch,
169+
) -> Result<(), iceberg::Error> {
170+
// Create Parquet writer properties
171+
let writer_props = WriterProperties::builder()
172+
.set_compression(Compression::SNAPPY)
173+
.build();
174+
175+
// Create location and file name generators
176+
let location_gen = DefaultLocationGenerator::new(table.metadata().clone())?;
177+
let file_name_gen = DefaultFileNameGenerator::new(
178+
"data".to_string(),
179+
Some(uuid::Uuid::new_v4().to_string()), // Add unique UUID for each file
180+
iceberg::spec::DataFileFormat::Parquet,
181+
);
182+
183+
// Create Parquet writer builder
184+
let parquet_writer_builder = ParquetWriterBuilder::new(
185+
writer_props,
186+
table.metadata().current_schema().clone(),
187+
table.file_io().clone(),
188+
location_gen,
189+
file_name_gen,
190+
);
191+
192+
// Create data file writer with empty partition (unpartitioned table)
193+
let data_file_writer_builder = DataFileWriterBuilder::new(
194+
parquet_writer_builder,
195+
None, // No partition value for unpartitioned tables
196+
table.metadata().default_partition_spec_id(),
197+
);
198+
199+
// Build the writer
200+
let mut data_file_writer = data_file_writer_builder.build().await?;
201+
202+
// Write the record batch using Iceberg writer
203+
data_file_writer.write(record_batch.clone()).await?;
204+
205+
// Close writer and get data files
206+
let data_files = data_file_writer.close().await?;
207+
208+
// Create transaction and fast append action
209+
let transaction = Transaction::new(table);
210+
let append_action = transaction
211+
.fast_append()
212+
.with_check_duplicate(false) // Don't check duplicates for performance
213+
.add_data_files(data_files);
214+
215+
// Apply the append action to create updated transaction
216+
let updated_transaction = append_action.apply(transaction)?;
217+
218+
// Commit the transaction to the catalog
219+
let _updated_table = updated_transaction.commit(&*self.catalog).await?;
220+
221+
Ok(())
222+
}
95223
}

0 commit comments

Comments
 (0)