Skip to content

Commit 479eb37

Browse files
wip
1 parent cf65d69 commit 479eb37

File tree

7 files changed

+188
-0
lines changed

7 files changed

+188
-0
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ constant_time_eq = { version = "0.4.2" }
3838
fail = { version = "0.5.1", default-features = false }
3939
futures = { version = "0.3.31", default-features = false }
4040
gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", default-features = false, rev = "391a162642bd0af6d55e2310ef46adf96f21228c" }
41+
deltalake = { version = "0.27.0", default-features = false, features = ["rustls"] }
4142
iceberg = { version = "0.6.0", default-features = false }
4243
iceberg-catalog-rest = { version = "0.6.0", default-features = false }
4344
insta = { version = "1.43.1", default-features = false }

etl-destinations/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ bigquery = [
1313
"dep:tokio",
1414
]
1515
iceberg = ["dep:iceberg", "dep:iceberg-catalog-rest"]
16+
deltalake = ["dep:deltalake"]
1617

1718
[dependencies]
1819
etl = { workspace = true }
@@ -26,6 +27,7 @@ gcp-bigquery-client = { workspace = true, optional = true, features = [
2627
] }
2728
iceberg = { workspace = true, optional = true }
2829
iceberg-catalog-rest = { workspace = true, optional = true }
30+
deltalake = { workspace = true, optional = true, features = ["rustls"] }
2931
metrics = { workspace = true }
3032
prost = { workspace = true, optional = true }
3133
rustls = { workspace = true, optional = true, features = [
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
use std::sync::Arc;
2+
3+
use deltalake::{open_table, DeltaOps, DeltaResult, DeltaTable, StructField};
4+
use etl::types::TableSchema;
5+
use delta_kernel::engine::arrow_conversion::TryFromArrow;
6+
use super::schema::postgres_to_arrow_schema;
7+
8+
/// Client for connecting to Delta Lake tables.
9+
#[derive(Clone)]
10+
pub struct DeltaLakeClient {}
11+
12+
impl DeltaLakeClient {
13+
/// Create a new client.
14+
pub fn new() -> Self { Self {} }
15+
16+
/// Returns true if a Delta table exists at the given uri/path.
17+
pub async fn table_exists(&self, table_uri: &str) -> bool {
18+
open_table(table_uri).await.is_ok()
19+
}
20+
21+
/// Create a Delta table at `table_uri` if it doesn't exist, using the provided Postgres schema.
22+
pub async fn create_table_if_missing(
23+
&self,
24+
table_uri: &str,
25+
table_schema: &TableSchema,
26+
) -> DeltaResult<Arc<DeltaTable>> {
27+
if let Ok(table) = open_table(table_uri).await {
28+
return Ok(Arc::new(table));
29+
}
30+
31+
let arrow_schema = postgres_to_arrow_schema(table_schema);
32+
33+
let ops = DeltaOps::try_from_uri(table_uri).await?;
34+
let table = ops
35+
.create()
36+
.with_columns(arrow_schema.fields().iter().map(|field| StructField::try_from_arrow(field)))
37+
.await?;
38+
39+
Ok(Arc::new(table))
40+
}
41+
42+
/// Open a Delta table at `table_uri`.
43+
pub async fn open_table(&self, table_uri: &str) -> DeltaResult<Arc<DeltaTable>> {
44+
let table = open_table(table_uri).await?;
45+
Ok(Arc::new(table))
46+
}
47+
}
48+
49+

etl-destinations/src/delta/core.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
use etl::store::schema::SchemaStore;
2+
use etl::store::state::StateStore;
3+
use etl::types::{Event, TableId, TableRow};
4+
use etl::Destination;
5+
use etl::error::EtlError;
6+
use etl::types::{Event, TableId, TableRow};
7+
use etl::Destination;
8+
use etl::error::EtlError;
9+
10+
use crate::delta::DeltaLakeClient;
11+
12+
struct DeltaLakeDestination<S> {
13+
client: DeltaLakeClient,
14+
store: S,
15+
}
16+
17+
18+
impl<S> DeltaLakeDestination<S>
19+
where
20+
S: StateStore + SchemaStore,
21+
{}

etl-destinations/src/delta/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
mod client;
2+
mod schema;
3+
mod core;
4+
5+
pub use client::DeltaLakeClient;
6+
Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,107 @@
1+
use std::sync::Arc;
2+
3+
use deltalake::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
4+
use etl::types::{TableSchema, Type};
5+
use etl_postgres::types::is_array_type;
6+
7+
/// Convert a Postgres scalar type to an equivalent Arrow DataType
8+
fn postgres_scalar_type_to_arrow(typ: &Type) -> DataType {
9+
match typ {
10+
&Type::BOOL => DataType::Boolean,
11+
&Type::CHAR | &Type::BPCHAR | &Type::VARCHAR | &Type::NAME | &Type::TEXT => {
12+
DataType::Utf8
13+
}
14+
&Type::INT2 => DataType::Int16,
15+
&Type::INT4 => DataType::Int32,
16+
&Type::INT8 => DataType::Int64,
17+
&Type::FLOAT4 => DataType::Float32,
18+
&Type::FLOAT8 => DataType::Float64,
19+
// Without precision/scale information, map NUMERIC to Utf8 for now
20+
&Type::NUMERIC => DataType::Utf8,
21+
&Type::DATE => DataType::Date32,
22+
&Type::TIME => DataType::Time64(TimeUnit::Microsecond),
23+
&Type::TIMESTAMP => DataType::Timestamp(TimeUnit::Microsecond, None),
24+
&Type::TIMESTAMPTZ => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
25+
// Arrow has no native UUID type; represent as string
26+
&Type::UUID => DataType::Utf8,
27+
// Represent JSON as string
28+
&Type::JSON | &Type::JSONB => DataType::Utf8,
29+
// OID is 32-bit unsigned in Postgres
30+
&Type::OID => DataType::UInt32,
31+
&Type::BYTEA => DataType::Binary,
32+
_ => DataType::Utf8,
33+
}
34+
}
35+
36+
/// Convert a Postgres array type to an Arrow List type
37+
fn postgres_array_type_to_arrow(typ: &Type) -> DataType {
38+
let element_type = match typ {
39+
&Type::BOOL_ARRAY => DataType::Boolean,
40+
&Type::CHAR_ARRAY | &Type::BPCHAR_ARRAY | &Type::VARCHAR_ARRAY | &Type::NAME_ARRAY
41+
| &Type::TEXT_ARRAY => DataType::Utf8,
42+
&Type::INT2_ARRAY => DataType::Int16,
43+
&Type::INT4_ARRAY => DataType::Int32,
44+
&Type::INT8_ARRAY => DataType::Int64,
45+
&Type::FLOAT4_ARRAY => DataType::Float32,
46+
&Type::FLOAT8_ARRAY => DataType::Float64,
47+
// Map NUMERIC arrays to string arrays until precision/scale available
48+
&Type::NUMERIC_ARRAY => DataType::Utf8,
49+
&Type::DATE_ARRAY => DataType::Date32,
50+
&Type::TIME_ARRAY => DataType::Time64(TimeUnit::Microsecond),
51+
&Type::TIMESTAMP_ARRAY => DataType::Timestamp(TimeUnit::Microsecond, None),
52+
&Type::TIMESTAMPTZ_ARRAY => {
53+
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
54+
}
55+
&Type::UUID_ARRAY => DataType::Utf8,
56+
&Type::JSON_ARRAY | &Type::JSONB_ARRAY => DataType::Utf8,
57+
&Type::OID_ARRAY => DataType::UInt32,
58+
&Type::BYTEA_ARRAY => DataType::Binary,
59+
_ => DataType::Utf8,
60+
};
61+
62+
DataType::List(Arc::new(Field::new("item", element_type, true)))
63+
}
64+
65+
/// Convert a Postgres `TableSchema` to an Arrow `Schema`
66+
pub fn postgres_to_arrow_schema(schema: &TableSchema) -> Arc<Schema> {
67+
let fields: Vec<Field> = schema
68+
.column_schemas
69+
.iter()
70+
.map(|col| {
71+
let data_type = if is_array_type(&col.typ) {
72+
postgres_array_type_to_arrow(&col.typ)
73+
} else {
74+
postgres_scalar_type_to_arrow(&col.typ)
75+
};
76+
Field::new(&col.name, data_type, col.nullable)
77+
})
78+
.collect();
79+
80+
Arc::new(Schema::new(fields))
81+
}
82+
83+
#[cfg(test)]
84+
mod tests {
85+
use super::*;
86+
87+
#[test]
88+
fn test_scalar_mappings() {
89+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::BOOL), DataType::Boolean));
90+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::TEXT), DataType::Utf8));
91+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::INT2), DataType::Int16));
92+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::INT4), DataType::Int32));
93+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::INT8), DataType::Int64));
94+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::FLOAT4), DataType::Float32));
95+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::FLOAT8), DataType::Float64));
96+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::DATE), DataType::Date32));
97+
assert!(matches!(postgres_scalar_type_to_arrow(&Type::BYTEA), DataType::Binary));
98+
}
99+
100+
#[test]
101+
fn test_array_mappings() {
102+
let dt = postgres_array_type_to_arrow(&Type::INT4_ARRAY);
103+
if let DataType::List(inner) = dt { assert_eq!(inner.name(), "item"); } else { panic!(); }
104+
}
105+
}
106+
107+

etl-destinations/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,4 +7,6 @@
77
pub mod bigquery;
88
#[cfg(feature = "iceberg")]
99
pub mod iceberg;
10+
#[cfg(feature = "deltalake")]
11+
pub mod delta;
1012
mod metrics;

0 commit comments

Comments
 (0)