Skip to content

Commit 7dcdc94

Browse files
blah
1 parent f161d84 commit 7dcdc94

File tree

2 files changed

+173
-120
lines changed

2 files changed

+173
-120
lines changed

etl-destinations/src/delta/client.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::sync::Arc;
44
use super::schema::postgres_to_delta_schema;
55
use deltalake::arrow::record_batch::RecordBatch;
66
use deltalake::{DeltaOps, DeltaResult, DeltaTable, DeltaTableBuilder, open_table};
7-
use etl::types::{TableSchema, TableRow, Cell};
7+
use etl::types::{Cell, TableRow, TableSchema};
88

99
/// Client for connecting to Delta Lake tables.
1010
#[derive(Clone)]
@@ -246,7 +246,7 @@ impl DeltaLakeClient {
246246
// Malformed composite key, skip
247247
return "false".to_string();
248248
}
249-
249+
250250
let conditions: Vec<String> = pk_column_names
251251
.iter()
252252
.zip(key_parts.iter())
@@ -258,7 +258,7 @@ impl DeltaLakeClient {
258258
)
259259
})
260260
.collect();
261-
261+
262262
format!("({})", conditions.join(" AND "))
263263
})
264264
.filter(|cond| cond != "false") // Remove malformed conditions
@@ -386,7 +386,7 @@ impl DeltaLakeClient {
386386
Cell::Bytes(b) => {
387387
let hex_string: String = b.iter().map(|byte| format!("{:02x}", byte)).collect();
388388
format!("\\x{}", hex_string)
389-
},
389+
}
390390
Cell::Array(_) => "[ARRAY]".to_string(), // Arrays shouldn't be PKs
391391
}
392392
}
@@ -414,7 +414,7 @@ impl DeltaLakeClient {
414414
let mut parts = Vec::new();
415415
let mut current_part = String::new();
416416
let mut chars = composite_key.chars().peekable();
417-
417+
418418
while let Some(ch) = chars.next() {
419419
if ch == ':' {
420420
if chars.peek() == Some(&':') {
@@ -437,12 +437,12 @@ impl DeltaLakeClient {
437437
current_part.push(ch);
438438
}
439439
}
440-
440+
441441
// Add the final part
442442
if !current_part.is_empty() || !parts.is_empty() {
443443
parts.push(current_part);
444444
}
445-
445+
446446
parts
447447
}
448448

@@ -472,7 +472,7 @@ impl DeltaLakeClient {
472472
#[cfg(test)]
473473
mod tests {
474474
use super::*;
475-
use etl::types::{ColumnSchema, TableName, Type, Cell, TableId, TableRow, TableSchema};
475+
use etl::types::{Cell, ColumnSchema, TableId, TableName, TableRow, TableSchema, Type};
476476

477477
fn create_test_schema() -> TableSchema {
478478
TableSchema::new(
@@ -486,10 +486,7 @@ mod tests {
486486
}
487487

488488
fn create_test_row(id: i32, name: &str) -> TableRow {
489-
TableRow::new(vec![
490-
Cell::I32(id),
491-
Cell::String(name.to_string()),
492-
])
489+
TableRow::new(vec![Cell::I32(id), Cell::String(name.to_string())])
493490
}
494491

495492
#[test]
@@ -509,7 +506,7 @@ mod tests {
509506
let mut schema = create_test_schema();
510507
// Make both columns primary keys
511508
schema.column_schemas[1].primary = true;
512-
509+
513510
let row = create_test_row(42, "test");
514511

515512
let result = client.extract_primary_key(&row, &schema);
@@ -523,10 +520,10 @@ mod tests {
523520
let mut keys = HashSet::new();
524521
keys.insert("42".to_string());
525522
keys.insert("43".to_string());
526-
523+
527524
let pk_columns = vec!["id".to_string()];
528525
let predicate = client.build_pk_predicate(&keys, &pk_columns);
529-
526+
530527
// Should be `id` IN ('42', '43') - order may vary
531528
assert!(predicate.contains("`id` IN"));
532529
assert!(predicate.contains("'42'"));
@@ -539,10 +536,10 @@ mod tests {
539536
let mut keys = HashSet::new();
540537
keys.insert("42::test".to_string());
541538
keys.insert("43::hello".to_string());
542-
539+
543540
let pk_columns = vec!["id".to_string(), "name".to_string()];
544541
let predicate = client.build_pk_predicate(&keys, &pk_columns);
545-
542+
546543
// Should be (`id` = '42' AND `name` = 'test') OR (`id` = '43' AND `name` = 'hello')
547544
assert!(predicate.contains("`id` = '42' AND `name` = 'test'"));
548545
assert!(predicate.contains("`id` = '43' AND `name` = 'hello'"));
@@ -554,7 +551,7 @@ mod tests {
554551
let client = DeltaLakeClient::new(None);
555552
let keys = HashSet::new();
556553
let pk_columns = vec!["id".to_string()];
557-
554+
558555
let predicate = client.build_pk_predicate(&keys, &pk_columns);
559556
assert_eq!(predicate, "false");
560557
}
@@ -564,20 +561,26 @@ mod tests {
564561
let parts = vec!["value::with::delimiter".to_string(), "normal".to_string()];
565562
let composite = DeltaLakeClient::join_composite_key(&parts);
566563
assert_eq!(composite, "value::::with::::delimiter::normal");
567-
564+
568565
let split_parts = DeltaLakeClient::split_composite_key(&composite);
569566
assert_eq!(split_parts, parts);
570567
}
571568

572569
#[test]
573570
fn test_escape_identifier() {
574571
assert_eq!(DeltaLakeClient::escape_identifier("normal"), "`normal`");
575-
assert_eq!(DeltaLakeClient::escape_identifier("with`backtick"), "`with``backtick`");
572+
assert_eq!(
573+
DeltaLakeClient::escape_identifier("with`backtick"),
574+
"`with``backtick`"
575+
);
576576
}
577577

578578
#[test]
579579
fn test_escape_string_literal() {
580580
assert_eq!(DeltaLakeClient::escape_string_literal("normal"), "'normal'");
581-
assert_eq!(DeltaLakeClient::escape_string_literal("with'quote"), "'with''quote'");
581+
assert_eq!(
582+
DeltaLakeClient::escape_string_literal("with'quote"),
583+
"'with''quote'"
584+
);
582585
}
583586
}

0 commit comments

Comments
 (0)