Skip to content

Commit a7d471e

Browse files
Actually get working
1 parent c19edeb commit a7d471e

File tree

7 files changed

+142
-92
lines changed

7 files changed

+142
-92
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,10 @@ clap = { version = "4.5.42", default-features = false }
3535
config = { version = "0.14", default-features = false }
3636
const-oid = { version = "0.9.6", default-features = false }
3737
constant_time_eq = { version = "0.4.2" }
38+
deltalake = { version = "0.27.0", default-features = false, features = ["rustls"] }
3839
fail = { version = "0.5.1", default-features = false }
3940
futures = { version = "0.3.31", default-features = false }
4041
gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", default-features = false, rev = "391a162642bd0af6d55e2310ef46adf96f21228c" }
41-
deltalake = { version = "0.27.0", default-features = false, features = ["rustls"] }
4242
iceberg = { version = "0.6.0", default-features = false }
4343
iceberg-catalog-rest = { version = "0.6.0", default-features = false }
4444
insta = { version = "1.43.1", default-features = false }

etl-destinations/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ etl = { workspace = true }
2020
etl-postgres = { workspace = true }
2121
chrono = { workspace = true }
2222

23+
deltalake = { workspace = true, optional = true, features = ["rustls"] }
2324
futures = { workspace = true, optional = true }
2425
gcp-bigquery-client = { workspace = true, optional = true, features = [
2526
"rust-tls",
2627
"aws-lc-rs",
2728
] }
2829
iceberg = { workspace = true, optional = true }
2930
iceberg-catalog-rest = { workspace = true, optional = true }
30-
deltalake = { workspace = true, optional = true, features = ["rustls"] }
3131
metrics = { workspace = true }
3232
prost = { workspace = true, optional = true }
3333
rustls = { workspace = true, optional = true, features = [

etl-destinations/src/delta/client.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,18 @@
11
use std::sync::Arc;
22

3-
use deltalake::{open_table, DeltaOps, DeltaResult, DeltaTable, StructField};
3+
use super::schema::postgres_to_delta_schema;
4+
use deltalake::{DeltaOps, DeltaResult, DeltaTable, open_table};
45
use etl::types::TableSchema;
5-
use delta_kernel::engine::arrow_conversion::TryFromArrow;
6-
use super::schema::postgres_to_arrow_schema;
76

87
/// Client for connecting to Delta Lake tables.
98
#[derive(Clone)]
109
pub struct DeltaLakeClient {}
1110

1211
impl DeltaLakeClient {
1312
/// Create a new client.
14-
pub fn new() -> Self { Self {} }
13+
pub fn new() -> Self {
14+
Self {}
15+
}
1516

1617
/// Returns true if a Delta table exists at the given uri/path.
1718
pub async fn table_exists(&self, table_uri: &str) -> bool {
@@ -28,12 +29,13 @@ impl DeltaLakeClient {
2829
return Ok(Arc::new(table));
2930
}
3031

31-
let arrow_schema = postgres_to_arrow_schema(table_schema);
32+
let delta_schema = postgres_to_delta_schema(table_schema)?;
3233

3334
let ops = DeltaOps::try_from_uri(table_uri).await?;
3435
let table = ops
3536
.create()
36-
.with_columns(arrow_schema.fields().iter().map(|field| StructField::try_from_arrow(field)))
37+
// TODO(abhi): Figure out how to avoid the clone
38+
.with_columns(delta_schema.fields().map(|field| field.clone()))
3739
.await?;
3840

3941
Ok(Arc::new(table))
@@ -45,5 +47,3 @@ impl DeltaLakeClient {
4547
Ok(Arc::new(table))
4648
}
4749
}
48-
49-

etl-destinations/src/delta/core.rs

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,6 @@
1+
use etl::destination::Destination;
12
use etl::store::schema::SchemaStore;
23
use etl::store::state::StateStore;
3-
use etl::types::{Event, TableId, TableRow};
4-
use etl::Destination;
5-
use etl::error::EtlError;
6-
use etl::types::{Event, TableId, TableRow};
7-
use etl::Destination;
8-
use etl::error::EtlError;
94

105
use crate::delta::DeltaLakeClient;
116

@@ -14,8 +9,4 @@ struct DeltaLakeDestination<S> {
149
store: S,
1510
}
1611

17-
18-
impl<S> DeltaLakeDestination<S>
19-
where
20-
S: StateStore + SchemaStore,
21-
{}
12+
impl<S> DeltaLakeDestination<S> where S: StateStore + SchemaStore {}

etl-destinations/src/delta/mod.rs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
mod client;
2-
mod schema;
32
mod core;
3+
mod schema;
44

55
pub use client::DeltaLakeClient;
6-
Lines changed: 127 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -1,83 +1,85 @@
1-
use std::sync::Arc;
2-
3-
use deltalake::arrow::datatypes::{DataType, Field, Schema, TimeUnit};
1+
use deltalake::kernel::{ArrayType, DataType, StructField};
2+
use deltalake::{DeltaResult, Schema};
43
use etl::types::{TableSchema, Type};
54
use etl_postgres::types::is_array_type;
65

7-
/// Convert a Postgres scalar type to an equivalent Arrow DataType
8-
fn postgres_scalar_type_to_arrow(typ: &Type) -> DataType {
6+
/// Convert a Postgres scalar type to an equivalent Delta DataType
7+
fn postgres_scalar_type_to_delta(typ: &Type) -> DataType {
98
match typ {
10-
&Type::BOOL => DataType::Boolean,
9+
&Type::BOOL => DataType::BOOLEAN,
1110
&Type::CHAR | &Type::BPCHAR | &Type::VARCHAR | &Type::NAME | &Type::TEXT => {
12-
DataType::Utf8
11+
DataType::STRING
1312
}
14-
&Type::INT2 => DataType::Int16,
15-
&Type::INT4 => DataType::Int32,
16-
&Type::INT8 => DataType::Int64,
17-
&Type::FLOAT4 => DataType::Float32,
18-
&Type::FLOAT8 => DataType::Float64,
19-
// Without precision/scale information, map NUMERIC to Utf8 for now
20-
&Type::NUMERIC => DataType::Utf8,
21-
&Type::DATE => DataType::Date32,
22-
&Type::TIME => DataType::Time64(TimeUnit::Microsecond),
23-
&Type::TIMESTAMP => DataType::Timestamp(TimeUnit::Microsecond, None),
24-
&Type::TIMESTAMPTZ => DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
25-
// Arrow has no native UUID type; represent as string
26-
&Type::UUID => DataType::Utf8,
13+
&Type::INT2 => DataType::SHORT,
14+
&Type::INT4 => DataType::INTEGER,
15+
&Type::INT8 => DataType::LONG,
16+
&Type::FLOAT4 => DataType::FLOAT,
17+
&Type::FLOAT8 => DataType::DOUBLE,
18+
// Without precision/scale information, map NUMERIC to STRING for now
19+
&Type::NUMERIC => DataType::STRING,
20+
&Type::DATE => DataType::DATE,
21+
// Delta Lake doesn't have a separate TIME type, use TIMESTAMP_NTZ
22+
&Type::TIME => DataType::TIMESTAMP_NTZ,
23+
&Type::TIMESTAMP => DataType::TIMESTAMP_NTZ,
24+
&Type::TIMESTAMPTZ => DataType::TIMESTAMP,
25+
// Delta Lake has no native UUID type; represent as string
26+
&Type::UUID => DataType::STRING,
2727
// Represent JSON as string
28-
&Type::JSON | &Type::JSONB => DataType::Utf8,
29-
// OID is 32-bit unsigned in Postgres
30-
&Type::OID => DataType::UInt32,
31-
&Type::BYTEA => DataType::Binary,
32-
_ => DataType::Utf8,
28+
&Type::JSON | &Type::JSONB => DataType::STRING,
29+
// OID is 32-bit unsigned in Postgres, map to INTEGER
30+
&Type::OID => DataType::INTEGER,
31+
&Type::BYTEA => DataType::BINARY,
32+
// Default fallback for unsupported types
33+
_ => DataType::STRING,
3334
}
3435
}
3536

36-
/// Convert a Postgres array type to an Arrow List type
37-
fn postgres_array_type_to_arrow(typ: &Type) -> DataType {
37+
/// Convert a Postgres array type to a Delta Array type
38+
fn postgres_array_type_to_delta(typ: &Type) -> DataType {
3839
let element_type = match typ {
39-
&Type::BOOL_ARRAY => DataType::Boolean,
40-
&Type::CHAR_ARRAY | &Type::BPCHAR_ARRAY | &Type::VARCHAR_ARRAY | &Type::NAME_ARRAY
41-
| &Type::TEXT_ARRAY => DataType::Utf8,
42-
&Type::INT2_ARRAY => DataType::Int16,
43-
&Type::INT4_ARRAY => DataType::Int32,
44-
&Type::INT8_ARRAY => DataType::Int64,
45-
&Type::FLOAT4_ARRAY => DataType::Float32,
46-
&Type::FLOAT8_ARRAY => DataType::Float64,
40+
&Type::BOOL_ARRAY => DataType::BOOLEAN,
41+
&Type::CHAR_ARRAY
42+
| &Type::BPCHAR_ARRAY
43+
| &Type::VARCHAR_ARRAY
44+
| &Type::NAME_ARRAY
45+
| &Type::TEXT_ARRAY => DataType::STRING,
46+
&Type::INT2_ARRAY => DataType::SHORT,
47+
&Type::INT4_ARRAY => DataType::INTEGER,
48+
&Type::INT8_ARRAY => DataType::LONG,
49+
&Type::FLOAT4_ARRAY => DataType::FLOAT,
50+
&Type::FLOAT8_ARRAY => DataType::DOUBLE,
4751
// Map NUMERIC arrays to string arrays until precision/scale available
48-
&Type::NUMERIC_ARRAY => DataType::Utf8,
49-
&Type::DATE_ARRAY => DataType::Date32,
50-
&Type::TIME_ARRAY => DataType::Time64(TimeUnit::Microsecond),
51-
&Type::TIMESTAMP_ARRAY => DataType::Timestamp(TimeUnit::Microsecond, None),
52-
&Type::TIMESTAMPTZ_ARRAY => {
53-
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into()))
54-
}
55-
&Type::UUID_ARRAY => DataType::Utf8,
56-
&Type::JSON_ARRAY | &Type::JSONB_ARRAY => DataType::Utf8,
57-
&Type::OID_ARRAY => DataType::UInt32,
58-
&Type::BYTEA_ARRAY => DataType::Binary,
59-
_ => DataType::Utf8,
52+
&Type::NUMERIC_ARRAY => DataType::STRING,
53+
&Type::DATE_ARRAY => DataType::DATE,
54+
&Type::TIME_ARRAY => DataType::TIMESTAMP_NTZ,
55+
&Type::TIMESTAMP_ARRAY => DataType::TIMESTAMP_NTZ,
56+
&Type::TIMESTAMPTZ_ARRAY => DataType::TIMESTAMP,
57+
&Type::UUID_ARRAY => DataType::STRING,
58+
&Type::JSON_ARRAY | &Type::JSONB_ARRAY => DataType::STRING,
59+
&Type::OID_ARRAY => DataType::INTEGER,
60+
&Type::BYTEA_ARRAY => DataType::BINARY,
61+
_ => DataType::STRING,
6062
};
6163

62-
DataType::List(Arc::new(Field::new("item", element_type, true)))
64+
ArrayType::new(element_type, true).into()
6365
}
6466

65-
/// Convert a Postgres `TableSchema` to an Arrow `Schema`
66-
pub fn postgres_to_arrow_schema(schema: &TableSchema) -> Arc<Schema> {
67-
let fields: Vec<Field> = schema
67+
/// Convert a Postgres `TableSchema` to a Delta `Schema`
68+
pub fn postgres_to_delta_schema(schema: &TableSchema) -> DeltaResult<Schema> {
69+
let fields: Vec<StructField> = schema
6870
.column_schemas
6971
.iter()
7072
.map(|col| {
7173
let data_type = if is_array_type(&col.typ) {
72-
postgres_array_type_to_arrow(&col.typ)
74+
postgres_array_type_to_delta(&col.typ)
7375
} else {
74-
postgres_scalar_type_to_arrow(&col.typ)
76+
postgres_scalar_type_to_delta(&col.typ)
7577
};
76-
Field::new(&col.name, data_type, col.nullable)
78+
StructField::new(&col.name, data_type, col.nullable)
7779
})
7880
.collect();
7981

80-
Arc::new(Schema::new(fields))
82+
Ok(Schema::new(fields))
8183
}
8284

8385
#[cfg(test)]
@@ -86,22 +88,80 @@ mod tests {
8688

8789
#[test]
8890
fn test_scalar_mappings() {
89-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::BOOL), DataType::Boolean));
90-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::TEXT), DataType::Utf8));
91-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::INT2), DataType::Int16));
92-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::INT4), DataType::Int32));
93-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::INT8), DataType::Int64));
94-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::FLOAT4), DataType::Float32));
95-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::FLOAT8), DataType::Float64));
96-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::DATE), DataType::Date32));
97-
assert!(matches!(postgres_scalar_type_to_arrow(&Type::BYTEA), DataType::Binary));
91+
assert!(matches!(
92+
postgres_scalar_type_to_delta(&Type::BOOL),
93+
DataType::BOOLEAN
94+
));
95+
assert!(matches!(
96+
postgres_scalar_type_to_delta(&Type::TEXT),
97+
DataType::STRING
98+
));
99+
assert!(matches!(
100+
postgres_scalar_type_to_delta(&Type::INT2),
101+
DataType::SHORT
102+
));
103+
assert!(matches!(
104+
postgres_scalar_type_to_delta(&Type::INT4),
105+
DataType::INTEGER
106+
));
107+
assert!(matches!(
108+
postgres_scalar_type_to_delta(&Type::INT8),
109+
DataType::LONG
110+
));
111+
assert!(matches!(
112+
postgres_scalar_type_to_delta(&Type::FLOAT4),
113+
DataType::FLOAT
114+
));
115+
assert!(matches!(
116+
postgres_scalar_type_to_delta(&Type::FLOAT8),
117+
DataType::DOUBLE
118+
));
119+
assert!(matches!(
120+
postgres_scalar_type_to_delta(&Type::DATE),
121+
DataType::DATE
122+
));
123+
assert!(matches!(
124+
postgres_scalar_type_to_delta(&Type::BYTEA),
125+
DataType::BINARY
126+
));
98127
}
99128

100129
#[test]
101130
fn test_array_mappings() {
102-
let dt = postgres_array_type_to_arrow(&Type::INT4_ARRAY);
103-
if let DataType::List(inner) = dt { assert_eq!(inner.name(), "item"); } else { panic!(); }
131+
let dt = postgres_array_type_to_delta(&Type::INT4_ARRAY);
132+
if let DataType::Array(array_type) = dt {
133+
assert!(matches!(array_type.element_type(), &DataType::INTEGER));
134+
assert!(array_type.contains_null());
135+
} else {
136+
panic!("Expected Array type, got: {:?}", dt);
137+
}
104138
}
105-
}
106139

140+
#[test]
141+
fn test_timestamp_mappings() {
142+
assert!(matches!(
143+
postgres_scalar_type_to_delta(&Type::TIMESTAMP),
144+
DataType::TIMESTAMP_NTZ
145+
));
146+
assert!(matches!(
147+
postgres_scalar_type_to_delta(&Type::TIMESTAMPTZ),
148+
DataType::TIMESTAMP
149+
));
150+
}
107151

152+
#[test]
153+
fn test_string_mappings() {
154+
assert!(matches!(
155+
postgres_scalar_type_to_delta(&Type::UUID),
156+
DataType::STRING
157+
));
158+
assert!(matches!(
159+
postgres_scalar_type_to_delta(&Type::JSON),
160+
DataType::STRING
161+
));
162+
assert!(matches!(
163+
postgres_scalar_type_to_delta(&Type::JSONB),
164+
DataType::STRING
165+
));
166+
}
167+
}

etl-destinations/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55
66
#[cfg(feature = "bigquery")]
77
pub mod bigquery;
8-
#[cfg(feature = "iceberg")]
9-
pub mod iceberg;
108
#[cfg(feature = "deltalake")]
119
pub mod delta;
10+
#[cfg(feature = "iceberg")]
11+
pub mod iceberg;
1212
mod metrics;

0 commit comments

Comments
 (0)