Skip to content

Commit 19b2bc9

Browse files
Clippy
1 parent 75bf091 commit 19b2bc9

File tree

4 files changed

+24
-52
lines changed

4 files changed

+24
-52
lines changed

etl-destinations/src/delta/client.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ impl DeltaLakeClient {
124124
table: Arc<DeltaTable>,
125125
delete_predicate: Option<&str>,
126126
record_batches: Vec<RecordBatch>,
127-
app_transaction_id: Option<&str>,
127+
_app_transaction_id: Option<&str>,
128128
) -> DeltaResult<Arc<DeltaTable>> {
129129
// todo(abhi): Implement atomic delete+append transaction
130130
// todo(abhi): Use Delta transaction features for atomicity
@@ -162,6 +162,7 @@ impl DeltaLakeClient {
162162
}
163163

164164
/// Run OPTIMIZE operation on the table
165+
#[allow(unused)]
165166
pub async fn optimize_table(
166167
&self,
167168
table: Arc<DeltaTable>,

etl-destinations/src/delta/core.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ where
250250
&self,
251251
table_id: TableId,
252252
upserts_by_pk: &HashMap<String, TableRow>,
253-
delete_pks: &HashSet<String>,
253+
_delete_pks: &HashSet<String>,
254254
) -> EtlResult<()> {
255255
// todo(abhi): Implement the transaction logic from PLAN.md
256256
// todo(abhi): Delete rows with PK in affected set
@@ -281,6 +281,7 @@ where
281281
}
282282

283283
/// Run table optimization (OPTIMIZE)
284+
#[allow(unused)]
284285
async fn optimize_table(&self, _table_path: &str) -> EtlResult<()> {
285286
// todo(abhi): Implement OPTIMIZE operation using delta-rs
286287
// todo(abhi): Small file compaction and Z-ordering

etl-destinations/src/delta/encoding.rs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,11 @@ impl TableRowEncoder {
4444

4545
let arrow_schema = Schema::new(fields);
4646

47-
let result = RecordBatch::try_new(Arc::new(arrow_schema), arrays);
48-
49-
result
47+
RecordBatch::try_new(Arc::new(arrow_schema), arrays)
5048
}
5149

5250
/// Convert Delta schema to Arrow schema
51+
#[allow(unused)]
5352
fn delta_schema_to_arrow(
5453
delta_schema: &deltalake::kernel::StructType,
5554
) -> Result<Schema, ArrowError> {
@@ -60,24 +59,24 @@ impl TableRowEncoder {
6059
.fields()
6160
.map(|field| {
6261
// Convert Delta DataType to Arrow DataType
63-
let arrow_type = match field.data_type() {
64-
&deltalake::kernel::DataType::BOOLEAN => DataType::Boolean,
65-
&deltalake::kernel::DataType::STRING => DataType::Utf8,
66-
&deltalake::kernel::DataType::INTEGER => DataType::Int32,
67-
&deltalake::kernel::DataType::LONG => DataType::Int64,
68-
&deltalake::kernel::DataType::SHORT => DataType::Int16,
69-
&deltalake::kernel::DataType::FLOAT => DataType::Float32,
70-
&deltalake::kernel::DataType::DOUBLE => DataType::Float64,
71-
&deltalake::kernel::DataType::DATE => DataType::Date32,
72-
&deltalake::kernel::DataType::TIMESTAMP => DataType::Timestamp(
62+
let arrow_type = match *field.data_type() {
63+
deltalake::kernel::DataType::BOOLEAN => DataType::Boolean,
64+
deltalake::kernel::DataType::STRING => DataType::Utf8,
65+
deltalake::kernel::DataType::INTEGER => DataType::Int32,
66+
deltalake::kernel::DataType::LONG => DataType::Int64,
67+
deltalake::kernel::DataType::SHORT => DataType::Int16,
68+
deltalake::kernel::DataType::FLOAT => DataType::Float32,
69+
deltalake::kernel::DataType::DOUBLE => DataType::Float64,
70+
deltalake::kernel::DataType::DATE => DataType::Date32,
71+
deltalake::kernel::DataType::TIMESTAMP => DataType::Timestamp(
7372
deltalake::arrow::datatypes::TimeUnit::Microsecond,
7473
Some("UTC".into()),
7574
),
76-
&deltalake::kernel::DataType::TIMESTAMP_NTZ => DataType::Timestamp(
75+
deltalake::kernel::DataType::TIMESTAMP_NTZ => DataType::Timestamp(
7776
deltalake::arrow::datatypes::TimeUnit::Microsecond,
7877
None,
7978
),
80-
&deltalake::kernel::DataType::BINARY => DataType::Binary,
79+
deltalake::kernel::DataType::BINARY => DataType::Binary,
8180
// Default to string for complex/unsupported types
8281
_ => DataType::Utf8,
8382
};
@@ -184,6 +183,7 @@ impl TableRowEncoder {
184183
}
185184

186185
/// Convert Cell values to specific Arrow array types
186+
#[allow(unused)]
187187
fn convert_bool_column(cells: Vec<&Cell>) -> Result<ArrayRef, ArrowError> {
188188
// todo(abhi): Extract boolean values from cells, handle nulls
189189
let values: Vec<Option<bool>> = cells
@@ -198,6 +198,7 @@ impl TableRowEncoder {
198198
Ok(Arc::new(BooleanArray::from(values)))
199199
}
200200

201+
#[allow(unused)]
201202
fn convert_string_column(cells: Vec<&Cell>) -> Result<ArrayRef, ArrowError> {
202203
// todo(abhi): Extract string values from cells, handle nulls and conversions
203204
let values: Vec<Option<String>> = cells
@@ -213,6 +214,7 @@ impl TableRowEncoder {
213214
Ok(Arc::new(StringArray::from(values)))
214215
}
215216

217+
#[allow(unused)]
216218
fn convert_int32_column(cells: Vec<&Cell>) -> Result<ArrayRef, ArrowError> {
217219
// todo(abhi): Extract i32 values from cells, handle nulls and conversions
218220
let values: Vec<Option<i32>> = cells
@@ -228,6 +230,7 @@ impl TableRowEncoder {
228230
Ok(Arc::new(Int32Array::from(values)))
229231
}
230232

233+
#[allow(unused)]
231234
fn convert_array_column(cells: Vec<&Cell>) -> Result<ArrayRef, ArrowError> {
232235
// todo(abhi): Convert ArrayCell variants to Arrow ListArray
233236
// todo(abhi): Handle nested arrays properly with element type detection

etl-destinations/tests/common/delta.rs

Lines changed: 2 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ fn random_warehouse_path() -> String {
3434
///
3535
/// Provides a unified interface for Delta Lake operations in tests, automatically
3636
/// handling setup of test warehouse locations using minio as the object storage backend.
37+
#[allow(unused)]
3738
pub struct DeltaLakeDatabase {
3839
warehouse_path: String,
3940
s3_base_uri: String,
@@ -43,6 +44,7 @@ pub struct DeltaLakeDatabase {
4344
bucket: String,
4445
}
4546

47+
#[allow(unused)]
4648
impl DeltaLakeDatabase {
4749
/// Creates a new Delta Lake database instance.
4850
///
@@ -144,38 +146,3 @@ impl Drop for DeltaLakeDatabase {
144146
pub async fn setup_delta_connection() -> DeltaLakeDatabase {
145147
DeltaLakeDatabase::new().await
146148
}
147-
148-
/// Test data structures for Delta Lake integration tests
149-
/// These mirror the BigQuery test structures but are designed for Delta Lake
150-
151-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
152-
pub struct DeltaUser {
153-
pub id: i32,
154-
pub name: String,
155-
pub age: i32,
156-
}
157-
158-
impl DeltaUser {
159-
pub fn new(id: i32, name: &str, age: i32) -> Self {
160-
Self {
161-
id,
162-
name: name.to_owned(),
163-
age,
164-
}
165-
}
166-
}
167-
168-
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)]
169-
pub struct DeltaOrder {
170-
pub id: i32,
171-
pub description: String,
172-
}
173-
174-
impl DeltaOrder {
175-
pub fn new(id: i32, description: &str) -> Self {
176-
Self {
177-
id,
178-
description: description.to_owned(),
179-
}
180-
}
181-
}

0 commit comments

Comments
 (0)