Skip to content

Commit 8bd3c63

Browse files
committed
chore: clippy
1 parent ff817c8 commit 8bd3c63

File tree

5 files changed

+63
-72
lines changed

5 files changed

+63
-72
lines changed

src/sources/delta_lake_cdf/checkpoint.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,7 @@ mod tests {
138138
#[test]
139139
fn test_checkpoint_roundtrip() {
140140
let temp_dir = TempDir::new().unwrap();
141-
let checkpointer =
142-
DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table");
141+
let checkpointer = DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table");
143142

144143
// Initially no checkpoint
145144
assert!(checkpointer.read_checkpoint().is_none());
@@ -160,21 +159,18 @@ mod tests {
160159
let temp_dir = TempDir::new().unwrap();
161160

162161
// Write checkpoint for one table
163-
let checkpointer1 =
164-
DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table1");
162+
let checkpointer1 = DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table1");
165163
checkpointer1.write_checkpoint(42).unwrap();
166164

167165
// Try to read with different table URI
168-
let checkpointer2 =
169-
DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table2");
166+
let checkpointer2 = DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table2");
170167
assert!(checkpointer2.read_checkpoint().is_none());
171168
}
172169

173170
#[test]
174171
fn test_checkpoint_file_format() {
175172
let temp_dir = TempDir::new().unwrap();
176-
let checkpointer =
177-
DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table");
173+
let checkpointer = DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table");
178174

179175
checkpointer.write_checkpoint(42).unwrap();
180176

@@ -190,8 +186,7 @@ mod tests {
190186
#[test]
191187
fn test_checkpoint_corrupt_file() {
192188
let temp_dir = TempDir::new().unwrap();
193-
let checkpointer =
194-
DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table");
189+
let checkpointer = DeltaLakeCdfCheckpointer::new(temp_dir.path(), "s3://bucket/table");
195190

196191
// Write corrupt data
197192
std::fs::write(checkpointer.checkpoint_path(), "not valid json").unwrap();
@@ -204,8 +199,7 @@ mod tests {
204199
fn test_checkpoint_missing_dir() {
205200
let temp_dir = TempDir::new().unwrap();
206201
let nested_path = temp_dir.path().join("nested").join("dir");
207-
let checkpointer =
208-
DeltaLakeCdfCheckpointer::new(&nested_path, "s3://bucket/table");
202+
let checkpointer = DeltaLakeCdfCheckpointer::new(&nested_path, "s3://bucket/table");
209203

210204
// Should create parent directories
211205
checkpointer.write_checkpoint(42).unwrap();

src/sources/delta_lake_cdf/config.rs

Lines changed: 29 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -279,33 +279,35 @@ impl SourceConfig for DeltaLakeCdfConfig {
279279
let log_namespace = global_log_namespace.merge(self.log_namespace);
280280

281281
// Define the schema for CDF events
282-
let schema_definition = vector_lib::schema::Definition::default_for_namespace(
283-
&[log_namespace].into(),
284-
)
285-
.with_standard_vector_source_metadata()
286-
.with_source_metadata(
287-
DeltaLakeCdfConfig::NAME,
288-
Some(LegacyKey::Overwrite(owned_value_path!("_change_type"))),
289-
&owned_value_path!("change_type"),
290-
Kind::bytes(),
291-
Some("change_type"),
292-
)
293-
.with_source_metadata(
294-
DeltaLakeCdfConfig::NAME,
295-
Some(LegacyKey::Overwrite(owned_value_path!("_commit_version"))),
296-
&owned_value_path!("commit_version"),
297-
Kind::integer(),
298-
Some("commit_version"),
299-
)
300-
.with_source_metadata(
301-
DeltaLakeCdfConfig::NAME,
302-
Some(LegacyKey::Overwrite(owned_value_path!("_commit_timestamp"))),
303-
&owned_value_path!("commit_timestamp"),
304-
Kind::timestamp(),
305-
Some("commit_timestamp"),
306-
);
307-
308-
vec![SourceOutput::new_maybe_logs(DataType::Log, schema_definition)]
282+
let schema_definition =
283+
vector_lib::schema::Definition::default_for_namespace(&[log_namespace].into())
284+
.with_standard_vector_source_metadata()
285+
.with_source_metadata(
286+
DeltaLakeCdfConfig::NAME,
287+
Some(LegacyKey::Overwrite(owned_value_path!("_change_type"))),
288+
&owned_value_path!("change_type"),
289+
Kind::bytes(),
290+
Some("change_type"),
291+
)
292+
.with_source_metadata(
293+
DeltaLakeCdfConfig::NAME,
294+
Some(LegacyKey::Overwrite(owned_value_path!("_commit_version"))),
295+
&owned_value_path!("commit_version"),
296+
Kind::integer(),
297+
Some("commit_version"),
298+
)
299+
.with_source_metadata(
300+
DeltaLakeCdfConfig::NAME,
301+
Some(LegacyKey::Overwrite(owned_value_path!("_commit_timestamp"))),
302+
&owned_value_path!("commit_timestamp"),
303+
Kind::timestamp(),
304+
Some("commit_timestamp"),
305+
);
306+
307+
vec![SourceOutput::new_maybe_logs(
308+
DataType::Log,
309+
schema_definition,
310+
)]
309311
}
310312

311313
fn can_acknowledge(&self) -> bool {

src/sources/delta_lake_cdf/event.rs

Lines changed: 13 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,16 +4,14 @@ use bytes::Bytes;
44
use chrono::{DateTime, TimeZone, Utc};
55
use deltalake::arrow::array::{
66
Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Float32Array,
7-
Float64Array, Int16Array, Int32Array, Int64Array, Int8Array, LargeBinaryArray,
7+
Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, LargeBinaryArray,
88
LargeStringArray, StringArray, TimestampMicrosecondArray, TimestampMillisecondArray,
9-
TimestampNanosecondArray, TimestampSecondArray, UInt16Array, UInt32Array, UInt64Array,
10-
UInt8Array,
9+
TimestampNanosecondArray, TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array,
10+
UInt64Array,
1111
};
1212
use deltalake::arrow::datatypes::DataType;
1313
use deltalake::arrow::record_batch::RecordBatch;
14-
use deltalake::delta_datafusion::cdf::{
15-
CHANGE_TYPE_COL, COMMIT_TIMESTAMP_COL, COMMIT_VERSION_COL,
16-
};
14+
use deltalake::delta_datafusion::cdf::{CHANGE_TYPE_COL, COMMIT_TIMESTAMP_COL, COMMIT_VERSION_COL};
1715
use vector_lib::config::{LegacyKey, LogNamespace};
1816
use vector_lib::event::{Event, LogEvent};
1917
use vector_lib::lookup::path;
@@ -72,12 +70,11 @@ fn convert_batch_to_events(
7270
.ok_or_else(|| format!("Null change_type at row {}", row_idx))?;
7371

7472
// Filter by change type if specified
75-
if !config.change_types.is_empty() {
76-
if let Some(ct) = ChangeType::from_cdf_string(&change_type_str) {
77-
if !config.change_types.contains(&ct) {
78-
continue;
79-
}
80-
}
73+
if !config.change_types.is_empty()
74+
&& let Some(ct) = ChangeType::from_cdf_string(&change_type_str)
75+
&& !config.change_types.contains(&ct)
76+
{
77+
continue;
8178
}
8279

8380
let mut log = LogEvent::default();
@@ -306,7 +303,7 @@ fn arrow_value_to_vrl(
306303

307304
// Timestamps
308305
DataType::Timestamp(unit, _tz) => {
309-
let ts = extract_timestamp_by_unit(column, row_idx, unit)
306+
let ts = extract_timestamp_by_unit(column, row_idx, *unit)
310307
.ok_or("Failed to extract timestamp")?;
311308
Ok(Value::Timestamp(ts))
312309
}
@@ -399,7 +396,7 @@ fn extract_timestamp(column: &ArrayRef, row_idx: usize) -> Option<DateTime<Utc>>
399396
}
400397

401398
match column.data_type() {
402-
DataType::Timestamp(unit, _) => extract_timestamp_by_unit(column, row_idx, unit),
399+
DataType::Timestamp(unit, _) => extract_timestamp_by_unit(column, row_idx, *unit),
403400
DataType::Int64 => {
404401
// Assume milliseconds if stored as plain Int64
405402
let arr = column.as_any().downcast_ref::<Int64Array>()?;
@@ -413,7 +410,7 @@ fn extract_timestamp(column: &ArrayRef, row_idx: usize) -> Option<DateTime<Utc>>
413410
fn extract_timestamp_by_unit(
414411
column: &ArrayRef,
415412
row_idx: usize,
416-
unit: &deltalake::arrow::datatypes::TimeUnit,
413+
unit: deltalake::arrow::datatypes::TimeUnit,
417414
) -> Option<DateTime<Utc>> {
418415
use deltalake::arrow::datatypes::TimeUnit;
419416

@@ -438,9 +435,7 @@ fn extract_timestamp_by_unit(
438435
Utc.timestamp_opt(secs, nanos).single()
439436
}
440437
TimeUnit::Nanosecond => {
441-
let arr = column
442-
.as_any()
443-
.downcast_ref::<TimestampNanosecondArray>()?;
438+
let arr = column.as_any().downcast_ref::<TimestampNanosecondArray>()?;
444439
let nanos = arr.value(row_idx);
445440
let secs = nanos / 1_000_000_000;
446441
let subsec_nanos = (nanos % 1_000_000_000) as u32;

src/sources/delta_lake_cdf/integration_tests.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,14 @@ use std::collections::HashMap;
99
use std::sync::Arc;
1010
use std::time::Duration;
1111

12+
use deltalake::DeltaTable;
1213
use deltalake::arrow::array::{Int64Builder, StringBuilder};
1314
use deltalake::arrow::datatypes::{DataType, Field, Schema};
1415
use deltalake::arrow::record_batch::RecordBatch;
1516
use deltalake::kernel::StructType;
1617
use deltalake::kernel::engine::arrow_conversion::TryFromArrow;
1718
use deltalake::operations::create::CreateBuilder;
1819
use deltalake::protocol::SaveMode;
19-
use deltalake::DeltaTable;
2020

2121
use super::config::{DeltaLakeCdfConfig, StartPosition};
2222

@@ -167,8 +167,8 @@ async fn test_auto_recovery_after_vacuum() {
167167

168168
use super::checkpoint::DeltaLakeCdfCheckpointer;
169169
use super::source::run_cdf_source;
170-
use crate::shutdown::ShutdownSignal;
171170
use crate::SourceSender;
171+
use crate::shutdown::ShutdownSignal;
172172
use vector_lib::config::LogNamespace;
173173

174174
// Create unique table path

src/sources/delta_lake_cdf/source.rs

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,20 @@
11
//! Main source logic for Delta Lake CDF streaming.
22
3-
use deltalake::datafusion::prelude::SessionContext;
4-
use deltalake::delta_datafusion::DeltaCdfTableProvider;
53
use deltalake::DeltaTable;
64
use deltalake::DeltaTableError;
5+
use deltalake::datafusion::prelude::SessionContext;
6+
use deltalake::delta_datafusion::DeltaCdfTableProvider;
77
use std::sync::Arc;
88
use tokio::time::interval;
9+
use vector_lib::EstimatedJsonEncodedSizeOf;
910
use vector_lib::config::LogNamespace;
1011
use vector_lib::internal_event::{
1112
ByteSize, BytesReceived, CountByteSize, InternalEventHandle as _, Protocol,
1213
};
13-
use vector_lib::EstimatedJsonEncodedSizeOf;
1414

15+
use crate::SourceSender;
1516
use crate::internal_events::{EventsReceived, StreamClosedError};
1617
use crate::shutdown::ShutdownSignal;
17-
use crate::SourceSender;
1818

1919
use super::checkpoint::DeltaLakeCdfCheckpointer;
2020
use super::config::DeltaLakeCdfConfig;
@@ -82,14 +82,14 @@ pub async fn run_cdf_source(
8282
let latest_version = table.version().unwrap_or(0);
8383

8484
// Check for bounded read completion
85-
if let Some(end_version) = config.ending_version {
86-
if current_version > end_version {
87-
info!(
88-
message = "Reached ending version, stopping",
89-
ending_version = end_version,
90-
);
91-
return Ok(());
92-
}
85+
if let Some(end_version) = config.ending_version
86+
&& current_version > end_version
87+
{
88+
info!(
89+
message = "Reached ending version, stopping",
90+
ending_version = end_version,
91+
);
92+
return Ok(());
9393
}
9494

9595
// Skip if no new versions
@@ -143,7 +143,7 @@ pub async fn run_cdf_source(
143143
events_received.emit(CountByteSize(event_count, json_size));
144144

145145
// Send events downstream
146-
if let Err(_) = out.send_batch(events).await {
146+
if out.send_batch(events).await.is_err() {
147147
emit!(StreamClosedError { count: event_count });
148148
return Err(());
149149
}

0 commit comments

Comments
 (0)