Skip to content

Commit dfe6e40

Browse files
Pass storage_options directly instead of reading from env
1 parent 3b8e6b0 commit dfe6e40

File tree

6 files changed

+76
-52
lines changed

6 files changed

+76
-52
lines changed

etl-api/src/configs/destination.rs

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
use etl_config::SerializableSecretString;
24
use etl_config::shared::DestinationConfig;
35
use secrecy::ExposeSecret;
@@ -34,10 +36,10 @@ pub enum FullApiDestinationConfig {
3436
DeltaLake {
3537
#[schema(example = "s3://my-bucket/my-path")]
3638
base_uri: String,
37-
#[schema(example = "s3://my-bucket/my-path")]
38-
warehouse: Option<String>,
39-
#[schema(example = "[\"date\"]")]
40-
partition_columns: Option<Vec<String>>,
39+
#[schema(example = "{\"aws_access_key_id\": \"https://my-endpoint.com\"}")]
40+
storage_options: Option<HashMap<String, String>>,
41+
#[schema(example = "{\"my_table\": [\"date\"]}")]
42+
partition_columns: Option<HashMap<String, Vec<String>>>,
4143
#[schema(example = 100)]
4244
optimize_after_commits: Option<u64>,
4345
},
@@ -62,12 +64,12 @@ impl From<StoredDestinationConfig> for FullApiDestinationConfig {
6264
},
6365
StoredDestinationConfig::DeltaLake {
6466
base_uri,
65-
warehouse,
67+
storage_options,
6668
partition_columns,
6769
optimize_after_commits,
6870
} => Self::DeltaLake {
6971
base_uri,
70-
warehouse,
72+
storage_options,
7173
partition_columns,
7274
optimize_after_commits,
7375
},
@@ -88,8 +90,8 @@ pub enum StoredDestinationConfig {
8890
},
8991
DeltaLake {
9092
base_uri: String,
91-
warehouse: Option<String>,
92-
partition_columns: Option<Vec<String>>,
93+
storage_options: Option<HashMap<String, String>>,
94+
partition_columns: Option<HashMap<String, Vec<String>>>,
9395
optimize_after_commits: Option<u64>,
9496
},
9597
}
@@ -113,12 +115,12 @@ impl StoredDestinationConfig {
113115
},
114116
Self::DeltaLake {
115117
base_uri,
116-
warehouse,
118+
storage_options,
117119
partition_columns,
118120
optimize_after_commits,
119121
} => DestinationConfig::DeltaLake {
120122
base_uri,
121-
warehouse,
123+
storage_options,
122124
partition_columns,
123125
optimize_after_commits,
124126
},
@@ -146,12 +148,12 @@ impl From<FullApiDestinationConfig> for StoredDestinationConfig {
146148
},
147149
FullApiDestinationConfig::DeltaLake {
148150
base_uri,
149-
warehouse,
151+
storage_options,
150152
partition_columns,
151153
optimize_after_commits,
152154
} => Self::DeltaLake {
153155
base_uri,
154-
warehouse,
156+
storage_options,
155157
partition_columns,
156158
optimize_after_commits,
157159
},
@@ -188,12 +190,12 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
188190
}
189191
Self::DeltaLake {
190192
base_uri,
191-
warehouse,
193+
storage_options,
192194
partition_columns,
193195
optimize_after_commits,
194196
} => Ok(EncryptedStoredDestinationConfig::DeltaLake {
195197
base_uri,
196-
warehouse,
198+
storage_options,
197199
partition_columns,
198200
optimize_after_commits,
199201
}),
@@ -215,8 +217,8 @@ pub enum EncryptedStoredDestinationConfig {
215217
},
216218
DeltaLake {
217219
base_uri: String,
218-
warehouse: Option<String>,
219-
partition_columns: Option<Vec<String>>,
220+
storage_options: Option<HashMap<String, String>>,
221+
partition_columns: Option<HashMap<String, Vec<String>>>,
220222
optimize_after_commits: Option<u64>,
221223
},
222224
}
@@ -252,12 +254,12 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
252254
}
253255
Self::DeltaLake {
254256
base_uri,
255-
warehouse,
257+
storage_options,
256258
partition_columns,
257259
optimize_after_commits,
258260
} => Ok(StoredDestinationConfig::DeltaLake {
259261
base_uri,
260-
warehouse,
262+
storage_options,
261263
partition_columns,
262264
optimize_after_commits,
263265
}),

etl-config/src/shared/destination.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
use serde::{Deserialize, Serialize};
24

35
use crate::SerializableSecretString;
@@ -44,9 +46,9 @@ pub enum DestinationConfig {
4446
},
4547
DeltaLake {
4648
base_uri: String,
47-
warehouse: Option<String>,
49+
storage_options: Option<HashMap<String, String>>,
4850
#[serde(skip_serializing_if = "Option::is_none")]
49-
partition_columns: Option<Vec<String>>,
51+
partition_columns: Option<HashMap<String, Vec<String>>>,
5052
#[serde(skip_serializing_if = "Option::is_none")]
5153
optimize_after_commits: Option<u64>,
5254
},

etl-destinations/src/delta/client.rs

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,43 @@
1-
use std::collections::HashSet;
1+
use std::collections::{HashMap, HashSet};
22
use std::sync::Arc;
33

44
use super::schema::postgres_to_delta_schema;
55
use deltalake::arrow::record_batch::RecordBatch;
6-
use deltalake::{DeltaOps, DeltaResult, DeltaTable, open_table};
6+
use deltalake::{DeltaOps, DeltaResult, DeltaTable, DeltaTableBuilder, open_table};
77
use etl::types::TableSchema;
88

99
/// Client for connecting to Delta Lake tables.
1010
#[derive(Clone)]
11-
pub struct DeltaLakeClient {}
11+
pub struct DeltaLakeClient {
12+
storage_options: Option<HashMap<String, String>>,
13+
}
1214

1315
impl Default for DeltaLakeClient {
1416
fn default() -> Self {
15-
Self::new()
17+
Self::new(None)
1618
}
1719
}
1820

1921
impl DeltaLakeClient {
2022
/// Create a new client.
21-
pub fn new() -> Self {
22-
Self {}
23+
pub fn new(storage_options: Option<HashMap<String, String>>) -> Self {
24+
Self { storage_options }
25+
}
26+
27+
fn get_table_with_storage_options(&self, table_uri: &str) -> DeltaResult<DeltaTableBuilder> {
28+
let mut builder = DeltaTableBuilder::from_valid_uri(table_uri)?;
29+
if let Some(storage_options) = &self.storage_options {
30+
builder = builder.with_storage_options(storage_options.clone());
31+
}
32+
Ok(builder)
2333
}
2434

2535
/// Returns true if a Delta table exists at the given uri/path.
2636
pub async fn table_exists(&self, table_uri: &str) -> bool {
27-
open_table(table_uri).await.is_ok()
37+
let Ok(builder) = self.get_table_with_storage_options(table_uri) else {
38+
return false;
39+
};
40+
builder.load().await.is_ok()
2841
}
2942

3043
/// Create a Delta table at `table_uri` if it doesn't exist, using the provided Postgres schema.
@@ -39,7 +52,12 @@ impl DeltaLakeClient {
3952

4053
let delta_schema = postgres_to_delta_schema(table_schema)?;
4154

42-
let ops = DeltaOps::try_from_uri(table_uri).await?;
55+
let ops = if let Some(storage_options) = &self.storage_options {
56+
DeltaOps::try_from_uri_with_storage_options(table_uri, storage_options.clone()).await?
57+
} else {
58+
DeltaOps::try_from_uri(table_uri).await?
59+
};
60+
4361
let table = ops
4462
.create()
4563
// TODO(abhi): Figure out how to avoid the clone

etl-destinations/src/delta/core.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ use crate::delta::{DeltaLakeClient, TableRowEncoder};
1818
pub struct DeltaDestinationConfig {
1919
/// Base URI for Delta table storage (e.g., "s3://bucket/warehouse", "file:///tmp/delta")
2020
pub base_uri: String,
21-
/// Optional warehouse path for organizing tables
22-
pub warehouse: Option<String>,
21+
/// Optional storage options passed to underlying object store
22+
pub storage_options: Option<HashMap<String, String>>,
2323
/// Columns to use for partitioning (per table)
24-
pub partition_columns: Option<Vec<String>>,
24+
pub partition_columns: Option<HashMap<String, Vec<String>>>,
2525
/// Run OPTIMIZE every N commits (None = disabled)
2626
pub optimize_after_commits: Option<NonZeroU64>,
2727
}
@@ -30,7 +30,7 @@ impl Default for DeltaDestinationConfig {
3030
fn default() -> Self {
3131
Self {
3232
base_uri: "file:///tmp/delta".to_string(),
33-
warehouse: None,
33+
storage_options: None,
3434
partition_columns: None,
3535
optimize_after_commits: None,
3636
}
@@ -56,7 +56,7 @@ where
5656
/// Create a new Delta Lake destination
5757
pub fn new(store: S, config: DeltaDestinationConfig) -> Self {
5858
Self {
59-
client: DeltaLakeClient::new(),
59+
client: DeltaLakeClient::new(config.storage_options.clone()),
6060
store,
6161
config,
6262
table_cache: Arc::new(RwLock::new(HashMap::new())),

etl-destinations/tests/common/delta.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ use etl::store::schema::SchemaStore;
22
use etl::store::state::StateStore;
33
use etl::types::TableName;
44
use etl_destinations::delta::{DeltaDestinationConfig, DeltaLakeDestination};
5+
use std::collections::HashMap;
56
use std::env;
67
use uuid::Uuid;
78

@@ -62,18 +63,6 @@ impl DeltaLakeDatabase {
6263
let warehouse_path = random_warehouse_path();
6364
let s3_base_uri = format!("s3://{}/{}", bucket, warehouse_path);
6465

65-
// Set up AWS environment for delta-rs to use minio
66-
unsafe {
67-
env::set_var("AWS_ENDPOINT_URL", &endpoint);
68-
env::set_var("AWS_ACCESS_KEY_ID", &access_key);
69-
env::set_var("AWS_SECRET_ACCESS_KEY", &secret_key);
70-
env::set_var("AWS_REGION", "local-01");
71-
env::set_var("AWS_S3_ALLOW_UNSAFE_RENAME", "true");
72-
env::set_var("AWS_S3_PATH_STYLE_ACCESS", "true");
73-
env::set_var("AWS_USE_HTTPS", "false");
74-
env::set_var("AWS_ALLOW_HTTP", "true");
75-
}
76-
7766
Self {
7867
warehouse_path,
7968
s3_base_uri,
@@ -87,14 +76,27 @@ impl DeltaLakeDatabase {
8776
/// Creates a [`DeltaLakeDestination`] configured for this database instance.
8877
///
8978
/// Returns a destination suitable for ETL operations, configured with
90-
/// the test warehouse location.
79+
/// the test warehouse location and appropriate storage options for MinIO.
9180
pub async fn build_destination<S>(&self, store: S) -> DeltaLakeDestination<S>
9281
where
9382
S: StateStore + SchemaStore + Send + Sync,
9483
{
84+
// Create storage options HashMap with AWS-compatible settings for MinIO
85+
let mut storage_options = HashMap::new();
86+
storage_options.insert("endpoint".to_string(), self.endpoint.clone());
87+
storage_options.insert("access_key_id".to_string(), self.access_key.clone());
88+
storage_options.insert("secret_access_key".to_string(), self.secret_key.clone());
89+
storage_options.insert("region".to_string(), "local-01".to_string());
90+
storage_options.insert("allow_http".to_string(), "true".to_string());
91+
// Use path-style requests for MinIO compatibility (opposite of virtual hosted style)
92+
storage_options.insert(
93+
"virtual_hosted_style_request".to_string(),
94+
"false".to_string(),
95+
);
96+
9597
let config = DeltaDestinationConfig {
9698
base_uri: self.s3_base_uri.clone(),
97-
warehouse: Some(self.warehouse_path.clone()),
99+
storage_options: Some(storage_options),
98100
partition_columns: None,
99101
optimize_after_commits: None,
100102
};

etl-replicator/src/core.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,15 +70,15 @@ pub async fn start_replicator_with_config(
7070
}
7171
DestinationConfig::DeltaLake {
7272
base_uri,
73-
warehouse,
73+
storage_options,
7474
partition_columns,
7575
optimize_after_commits,
7676
} => {
7777
let destination = DeltaLakeDestination::new(
7878
state_store.clone(),
7979
DeltaDestinationConfig {
8080
base_uri: base_uri.clone(),
81-
warehouse: warehouse.clone(),
81+
storage_options: storage_options.clone(),
8282
partition_columns: partition_columns.clone(),
8383
optimize_after_commits: optimize_after_commits.map(|n| n.try_into().unwrap()),
8484
},
@@ -121,12 +121,12 @@ fn log_destination_config(config: &DestinationConfig) {
121121
)
122122
}
123123
DestinationConfig::DeltaLake {
124-
base_uri: _,
125-
warehouse: _,
124+
base_uri,
125+
storage_options: _,
126126
partition_columns: _,
127127
optimize_after_commits: _,
128128
} => {
129-
debug!("using delta lake destination config");
129+
debug!(base_uri = base_uri, "using delta lake destination config");
130130
}
131131
_ => unimplemented!("destination config not implemented"),
132132
}

0 commit comments

Comments
 (0)