Skip to content

Commit d048e44

Browse files
Get integration tests to pass
1 parent cc2ab1a commit d048e44

File tree

7 files changed

+109
-64
lines changed

7 files changed

+109
-64
lines changed

etl-api/src/configs/destination.rs

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -191,14 +191,12 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
191191
warehouse,
192192
partition_columns,
193193
optimize_after_commits,
194-
} => {
195-
Ok(EncryptedStoredDestinationConfig::DeltaLake {
196-
base_uri,
197-
warehouse,
198-
partition_columns,
199-
optimize_after_commits,
200-
})
201-
}
194+
} => Ok(EncryptedStoredDestinationConfig::DeltaLake {
195+
base_uri,
196+
warehouse,
197+
partition_columns,
198+
optimize_after_commits,
199+
}),
202200
}
203201
}
204202
}
@@ -221,7 +219,7 @@ pub enum EncryptedStoredDestinationConfig {
221219
partition_columns: Option<Vec<String>>,
222220
optimize_after_commits: Option<u64>,
223221
},
224-
}
222+
}
225223

226224
impl Store for EncryptedStoredDestinationConfig {}
227225

@@ -262,7 +260,7 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
262260
warehouse,
263261
partition_columns,
264262
optimize_after_commits,
265-
}),
263+
}),
266264
}
267265
}
268266
}

etl-destinations/src/delta/core.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use std::collections::{HashMap, HashSet};
99
use std::num::NonZeroU64;
1010
use std::sync::Arc;
1111
use tokio::sync::RwLock;
12-
use tracing::info;
12+
use tracing::{info, trace};
1313

1414
use crate::delta::{DeltaLakeClient, TableRowEncoder};
1515

@@ -116,7 +116,6 @@ where
116116
)
117117
})?;
118118

119-
// Create or open table
120119
let table = self
121120
.client
122121
.create_table_if_missing(&table_path, &table_schema)
@@ -129,13 +128,11 @@ where
129128
)
130129
})?;
131130

132-
// Cache the table for future use
133131
{
134132
let mut cache = self.table_cache.write().await;
135133
cache.insert(table_path.clone(), table.clone());
136134
}
137135

138-
println!("✅ Delta table ready: {}", table_path);
139136
Ok(table)
140137
}
141138

@@ -334,7 +331,6 @@ where
334331
)
335332
})?;
336333

337-
// Convert to Arrow RecordBatch
338334
let record_batches = TableRowEncoder::encode_table_rows(&table_schema, table_rows.clone())
339335
.map_err(|e| {
340336
etl_error!(
@@ -344,8 +340,7 @@ where
344340
)
345341
})?;
346342

347-
// Write the data to Delta table
348-
println!(
343+
trace!(
349344
"Writing {} rows ({} batches) to Delta table",
350345
table_rows.len(),
351346
record_batches.len()

etl-destinations/src/delta/encoding.rs

Lines changed: 90 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,6 @@ use deltalake::arrow::error::ArrowError;
44
use deltalake::arrow::record_batch::RecordBatch;
55
use etl::types::{Cell, TableRow, TableSchema};
66
use std::sync::Arc;
7-
8-
use crate::delta::schema::postgres_to_delta_schema;
9-
107
/// Converts TableRows to Arrow RecordBatch for Delta Lake writes
118
pub struct TableRowEncoder;
129

@@ -29,17 +26,27 @@ impl TableRowEncoder {
2926
table_schema: &TableSchema,
3027
table_rows: Vec<TableRow>,
3128
) -> Result<RecordBatch, ArrowError> {
32-
// Create Arrow schema from TableSchema
33-
let delta_schema = postgres_to_delta_schema(table_schema)
34-
.map_err(|e| ArrowError::ExternalError(Box::new(e)))?;
29+
let arrays = Self::convert_columns_to_arrays(table_schema, &table_rows)?;
3530

36-
// Convert Delta schema to Arrow schema
37-
let arrow_schema = Self::delta_schema_to_arrow(&delta_schema)?;
31+
// Create Arrow schema that MATCHES the actual array types we generated
32+
let fields: Vec<Field> = table_schema
33+
.column_schemas
34+
.iter()
35+
.zip(arrays.iter())
36+
.map(|(col_schema, array)| {
37+
Field::new(
38+
&col_schema.name,
39+
array.data_type().clone(),
40+
col_schema.nullable,
41+
)
42+
})
43+
.collect();
3844

39-
// Convert each column's data to Arrow arrays
40-
let arrays = Self::convert_columns_to_arrays(table_schema, &table_rows)?;
45+
let arrow_schema = Schema::new(fields);
4146

42-
RecordBatch::try_new(Arc::new(arrow_schema), arrays)
47+
let result = RecordBatch::try_new(Arc::new(arrow_schema), arrays);
48+
49+
result
4350
}
4451

4552
/// Convert Delta schema to Arrow schema
@@ -100,38 +107,80 @@ impl TableRowEncoder {
100107
Ok(arrays)
101108
}
102109

103-
/// Convert a column of Cells to an Arrow array
110+
/// Convert a column of Cells to an Arrow array based on the first non-null value's type
104111
fn convert_cell_column_to_array(cells: Vec<&Cell>) -> Result<ArrayRef, ArrowError> {
105-
// todo(abhi): Implement proper type detection and conversion
106-
// todo(abhi): Handle all Cell variants: Null, Bool, String, I16, I32, U32, I64, F32, F64,
107-
// Numeric, Date, Time, Timestamp, TimestampTz, Uuid, Json, Bytes, Array
108-
109-
// For now, convert everything to string as a stub
110-
let string_values: Vec<Option<String>> = cells
111-
.iter()
112-
.map(|cell| match cell {
113-
Cell::Null => None,
114-
Cell::Bool(b) => Some(b.to_string()),
115-
Cell::String(s) => Some(s.clone()),
116-
Cell::I16(i) => Some(i.to_string()),
117-
Cell::I32(i) => Some(i.to_string()),
118-
Cell::U32(i) => Some(i.to_string()),
119-
Cell::I64(i) => Some(i.to_string()),
120-
Cell::F32(f) => Some(f.to_string()),
121-
Cell::F64(f) => Some(f.to_string()),
122-
Cell::Numeric(n) => Some(n.to_string()),
123-
Cell::Date(d) => Some(d.to_string()),
124-
Cell::Time(t) => Some(t.to_string()),
125-
Cell::Timestamp(ts) => Some(ts.to_string()),
126-
Cell::TimestampTz(ts) => Some(ts.to_string()),
127-
Cell::Uuid(u) => Some(u.to_string()),
128-
Cell::Json(j) => Some(j.to_string()),
129-
Cell::Bytes(b) => Some(format!("{b:?}")),
130-
Cell::Array(a) => Some(format!("{a:?}")),
131-
})
132-
.collect();
112+
if cells.is_empty() {
113+
return Ok(Arc::new(StringArray::from(Vec::<Option<String>>::new())));
114+
}
133115

134-
Ok(Arc::new(StringArray::from(string_values)))
116+
// Determine the column type from the first non-null cell
117+
let first_non_null = cells.iter().find(|cell| !matches!(cell, Cell::Null));
118+
119+
match first_non_null {
120+
Some(Cell::Bool(_)) => {
121+
let bool_values: Vec<Option<bool>> = cells
122+
.iter()
123+
.map(|cell| match cell {
124+
Cell::Null => None,
125+
Cell::Bool(b) => Some(*b),
126+
_ => None, // Invalid conversion, treat as null
127+
})
128+
.collect();
129+
Ok(Arc::new(BooleanArray::from(bool_values)))
130+
}
131+
Some(Cell::I32(_)) => {
132+
let int_values: Vec<Option<i32>> = cells
133+
.iter()
134+
.map(|cell| match cell {
135+
Cell::Null => None,
136+
Cell::I32(i) => Some(*i),
137+
Cell::I16(i) => Some(*i as i32),
138+
Cell::U32(i) => Some(*i as i32),
139+
_ => None,
140+
})
141+
.collect();
142+
Ok(Arc::new(Int32Array::from(int_values)))
143+
}
144+
Some(Cell::I16(_)) => {
145+
let int_values: Vec<Option<i32>> = cells
146+
.iter()
147+
.map(|cell| match cell {
148+
Cell::Null => None,
149+
Cell::I16(i) => Some(*i as i32),
150+
Cell::I32(i) => Some(*i),
151+
_ => None,
152+
})
153+
.collect();
154+
Ok(Arc::new(Int32Array::from(int_values)))
155+
}
156+
_ => {
157+
// For all other types (String, Numeric, etc.), convert to string
158+
let string_values: Vec<Option<String>> = cells
159+
.iter()
160+
.map(|cell| match cell {
161+
Cell::Null => None,
162+
Cell::Bool(b) => Some(b.to_string()),
163+
Cell::String(s) => Some(s.clone()),
164+
Cell::I16(i) => Some(i.to_string()),
165+
Cell::I32(i) => Some(i.to_string()),
166+
Cell::U32(i) => Some(i.to_string()),
167+
Cell::I64(i) => Some(i.to_string()),
168+
Cell::F32(f) => Some(f.to_string()),
169+
Cell::F64(f) => Some(f.to_string()),
170+
Cell::Numeric(n) => Some(n.to_string()),
171+
Cell::Date(d) => Some(d.to_string()),
172+
Cell::Time(t) => Some(t.to_string()),
173+
Cell::Timestamp(ts) => Some(ts.to_string()),
174+
Cell::TimestampTz(ts) => Some(ts.to_string()),
175+
Cell::Uuid(u) => Some(u.to_string()),
176+
Cell::Json(j) => Some(j.to_string()),
177+
Cell::Bytes(b) => Some(format!("{b:?}")),
178+
Cell::Array(a) => Some(format!("{a:?}")),
179+
})
180+
.collect();
181+
Ok(Arc::new(StringArray::from(string_values)))
182+
}
183+
}
135184
}
136185

137186
/// Convert Cell values to specific Arrow array types

etl-destinations/tests/common/delta.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ const MINIO_BUCKET_ENV_NAME: &str = "TESTS_MINIO_BUCKET";
1818
const DEFAULT_MINIO_ENDPOINT: &str = "http://localhost:9010";
1919
const DEFAULT_MINIO_ACCESS_KEY: &str = "minio-admin";
2020
const DEFAULT_MINIO_SECRET_KEY: &str = "minio-admin-password";
21-
const DEFAULT_MINIO_BUCKET: &str = "dev-and-test";
21+
const DEFAULT_MINIO_BUCKET: &str = "delta-dev-and-test";
2222

2323
/// Generates a unique warehouse path for test isolation.
2424
///
@@ -70,6 +70,8 @@ impl DeltaLakeDatabase {
7070
env::set_var("AWS_REGION", "local-01");
7171
env::set_var("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
7272
env::set_var("AWS_S3_PATH_STYLE_ACCESS", "true");
73+
env::set_var("AWS_USE_HTTPS", "false");
74+
env::set_var("AWS_ALLOW_HTTP", "true");
7375
}
7476

7577
Self {

etl-destinations/tests/common/lakekeeper.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ impl Default for CreateWarehouseRequest {
8080
r#type: Type::S3,
8181
},
8282
storage_profile: StorageProfile {
83-
bucket: "dev-and-test".to_string(),
83+
bucket: "iceberg-dev-and-test".to_string(),
8484
region: "local-01".to_string(),
8585
sts_enabled: false,
8686
r#type: Type::S3,

scripts/docker-compose.yaml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,9 @@ services:
7979
condition: service_healthy
8080
entrypoint: >
8181
/bin/sh -c "
82-
mc alias set iceberg http://minio:9000 minio-admin minio-admin-password;
83-
mc mb iceberg/dev-and-test;
82+
mc alias set minio http://minio:9000 minio-admin minio-admin-password;
83+
mc mb minio/iceberg-dev-and-test;
84+
mc mb minio/delta-dev-and-test;
8485
exit 0;
8586
"
8687

scripts/warehouse.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
{
2-
"warehouse-name": "dev-and-test-warehouse",
2+
"warehouse-name": "iceberg-dev-and-test-warehouse",
33
"project-id": "00000000-0000-0000-0000-000000000000",
44
"storage-profile": {
55
"type": "s3",
6-
"bucket": "dev-and-test",
6+
"bucket": "iceberg-dev-and-test",
77
"key-prefix": "initial-warehouse",
88
"assume-role-arn": null,
99
"endpoint": "http://minio:9000",

0 commit comments

Comments
 (0)