Skip to content

Commit cc2ab1a

Browse files
Get stuff working
1 parent 8c1b479 commit cc2ab1a

File tree

15 files changed

+1627
-12
lines changed

15 files changed

+1627
-12
lines changed

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ clap = { version = "4.5.42", default-features = false }
3535
config = { version = "0.14", default-features = false }
3636
const-oid = { version = "0.9.6", default-features = false }
3737
constant_time_eq = { version = "0.4.2" }
38-
deltalake = { version = "0.27.0", default-features = false, features = ["rustls"] }
38+
deltalake = { version = "0.27.0", default-features = false, features = ["rustls", "datafusion", "s3"] }
3939
fail = { version = "0.5.1", default-features = false }
4040
futures = { version = "0.3.31", default-features = false }
4141
gcp-bigquery-client = { git = "https://github.com/lquerel/gcp-bigquery-client.git", default-features = false, rev = "391a162642bd0af6d55e2310ef46adf96f21228c" }

etl-api/src/configs/destination.rs

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ const DEFAULT_MAX_CONCURRENT_STREAMS: usize = 8;
1414

1515
#[derive(Debug, Clone, Serialize, Deserialize, ToSchema)]
1616
#[serde(rename_all = "snake_case")]
17+
#[non_exhaustive]
1718
pub enum FullApiDestinationConfig {
1819
Memory,
1920
BigQuery {
@@ -30,6 +31,16 @@ pub enum FullApiDestinationConfig {
3031
#[serde(skip_serializing_if = "Option::is_none")]
3132
max_concurrent_streams: Option<usize>,
3233
},
34+
DeltaLake {
35+
#[schema(example = "s3://my-bucket/my-path")]
36+
base_uri: String,
37+
#[schema(example = "s3://my-bucket/my-path")]
38+
warehouse: Option<String>,
39+
#[schema(example = "[\"date\"]")]
40+
partition_columns: Option<Vec<String>>,
41+
#[schema(example = 100)]
42+
optimize_after_commits: Option<u64>,
43+
},
3344
}
3445

3546
impl From<StoredDestinationConfig> for FullApiDestinationConfig {
@@ -49,6 +60,17 @@ impl From<StoredDestinationConfig> for FullApiDestinationConfig {
4960
max_staleness_mins,
5061
max_concurrent_streams: Some(max_concurrent_streams),
5162
},
63+
StoredDestinationConfig::DeltaLake {
64+
base_uri,
65+
warehouse,
66+
partition_columns,
67+
optimize_after_commits,
68+
} => Self::DeltaLake {
69+
base_uri,
70+
warehouse,
71+
partition_columns,
72+
optimize_after_commits,
73+
},
5274
}
5375
}
5476
}
@@ -64,6 +86,12 @@ pub enum StoredDestinationConfig {
6486
max_staleness_mins: Option<u16>,
6587
max_concurrent_streams: usize,
6688
},
89+
DeltaLake {
90+
base_uri: String,
91+
warehouse: Option<String>,
92+
partition_columns: Option<Vec<String>>,
93+
optimize_after_commits: Option<u64>,
94+
},
6795
}
6896

6997
impl StoredDestinationConfig {
@@ -83,6 +111,17 @@ impl StoredDestinationConfig {
83111
max_staleness_mins,
84112
max_concurrent_streams,
85113
},
114+
Self::DeltaLake {
115+
base_uri,
116+
warehouse,
117+
partition_columns,
118+
optimize_after_commits,
119+
} => DestinationConfig::DeltaLake {
120+
base_uri,
121+
warehouse,
122+
partition_columns,
123+
optimize_after_commits,
124+
},
86125
}
87126
}
88127
}
@@ -105,6 +144,17 @@ impl From<FullApiDestinationConfig> for StoredDestinationConfig {
105144
max_concurrent_streams: max_concurrent_streams
106145
.unwrap_or(DEFAULT_MAX_CONCURRENT_STREAMS),
107146
},
147+
FullApiDestinationConfig::DeltaLake {
148+
base_uri,
149+
warehouse,
150+
partition_columns,
151+
optimize_after_commits,
152+
} => Self::DeltaLake {
153+
base_uri,
154+
warehouse,
155+
partition_columns,
156+
optimize_after_commits,
157+
},
108158
}
109159
}
110160
}
@@ -136,12 +186,26 @@ impl Encrypt<EncryptedStoredDestinationConfig> for StoredDestinationConfig {
136186
max_concurrent_streams,
137187
})
138188
}
189+
Self::DeltaLake {
190+
base_uri,
191+
warehouse,
192+
partition_columns,
193+
optimize_after_commits,
194+
} => {
195+
Ok(EncryptedStoredDestinationConfig::DeltaLake {
196+
base_uri,
197+
warehouse,
198+
partition_columns,
199+
optimize_after_commits,
200+
})
201+
}
139202
}
140203
}
141204
}
142205

143206
#[derive(Debug, Clone, Serialize, Deserialize)]
144207
#[serde(rename_all = "snake_case")]
208+
#[non_exhaustive]
145209
pub enum EncryptedStoredDestinationConfig {
146210
Memory,
147211
BigQuery {
@@ -151,7 +215,13 @@ pub enum EncryptedStoredDestinationConfig {
151215
max_staleness_mins: Option<u16>,
152216
max_concurrent_streams: usize,
153217
},
154-
}
218+
DeltaLake {
219+
base_uri: String,
220+
warehouse: Option<String>,
221+
partition_columns: Option<Vec<String>>,
222+
optimize_after_commits: Option<u64>,
223+
},
224+
}
155225

156226
impl Store for EncryptedStoredDestinationConfig {}
157227

@@ -182,6 +252,17 @@ impl Decrypt<StoredDestinationConfig> for EncryptedStoredDestinationConfig {
182252
max_concurrent_streams,
183253
})
184254
}
255+
Self::DeltaLake {
256+
base_uri,
257+
warehouse,
258+
partition_columns,
259+
optimize_after_commits,
260+
} => Ok(StoredDestinationConfig::DeltaLake {
261+
base_uri,
262+
warehouse,
263+
partition_columns,
264+
optimize_after_commits,
265+
}),
185266
}
186267
}
187268
}

etl-config/src/shared/destination.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::SerializableSecretString;
88
/// Each variant corresponds to a different supported destination system.
99
#[derive(Debug, Clone, Serialize, Deserialize)]
1010
#[serde(rename_all = "snake_case")]
11+
#[non_exhaustive]
1112
pub enum DestinationConfig {
1213
/// In-memory destination for ephemeral or test data.
1314
Memory,
@@ -41,4 +42,12 @@ pub enum DestinationConfig {
4142
/// - and the configured batch size.
4243
max_concurrent_streams: usize,
4344
},
45+
DeltaLake {
46+
base_uri: String,
47+
warehouse: Option<String>,
48+
#[serde(skip_serializing_if = "Option::is_none")]
49+
partition_columns: Option<Vec<String>>,
50+
#[serde(skip_serializing_if = "Option::is_none")]
51+
optimize_after_commits: Option<u64>,
52+
},
4453
}

etl-destinations/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ bigquery = [
1313
"dep:tokio",
1414
]
1515
iceberg = ["dep:iceberg", "dep:iceberg-catalog-rest"]
16-
deltalake = ["dep:deltalake"]
16+
deltalake = ["dep:deltalake", "dep:tokio", "dep:tracing"]
1717

1818
[dependencies]
1919
etl = { workspace = true }
2020
etl-postgres = { workspace = true }
2121
chrono = { workspace = true }
2222

23-
deltalake = { workspace = true, optional = true, features = ["rustls"] }
23+
deltalake = { workspace = true, optional = true, features = ["rustls", "datafusion"] }
2424
futures = { workspace = true, optional = true }
2525
gcp-bigquery-client = { workspace = true, optional = true, features = [
2626
"rust-tls",
@@ -40,6 +40,7 @@ tracing = { workspace = true, optional = true, default-features = true }
4040
[dev-dependencies]
4141
etl = { workspace = true, features = ["test-utils"] }
4242
etl-telemetry = { workspace = true }
43+
deltalake = { workspace = true, features = ["rustls", "datafusion", "s3"] }
4344

4445
base64 = { workspace = true }
4546
chrono = { workspace = true }

0 commit comments

Comments
 (0)