Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
[package]
name = "kafka-delta-ingest"
version = "0.4.0"
version = "0.5.0"
authors = ["R. Tyler Croy <rtyler@brokenco.de>", "Christian Williams <christianw@scribd.com>"]
edition = "2018"
rust-version = "1.81"
edition = "2024"
rust-version = "1.91"
resolver = "3"

[dependencies]
flate2 = "1.0"
Expand Down Expand Up @@ -40,8 +41,6 @@ deltalake-core = { version = "0.26.0", features = ["json", "datafusion"]}
deltalake-aws = { version = "0.9.0", optional = true }
deltalake-azure = { version = "0.9.0", optional = true }

# s3 feature enabled, helps for locking interactions with DLQ
dynamodb_lock = { version = "0.6.0", optional = true }
# sentry
sentry = { version = "0.23.0", optional = true }

Expand All @@ -54,7 +53,6 @@ azure = [
]
s3 = [
"deltalake-aws",
"dynamodb_lock",
]

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion README.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ The table below indicates what will happen with respect to the provided argument
|===


For more information, see link:https://github.com/delta-io/delta-rs/tree/dbc2994c5fddfd39fc31a8f9202df74788f59a01/dynamodb_lock[DynamoDB lock].
NOTE: As of `deltalake >= 0.17.0`, DynamoDB-based locking on S3 is handled internally by the `deltalake-aws` crate via `S3DynamoDbLogStore`. Direct `dynamodb_lock` interactions are no longer required.
== Verifying data in Azure Storage

Use the Azure Portal to browse the file system:
Expand Down
16 changes: 8 additions & 8 deletions src/coercions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,19 @@ fn apply_coercion(value: &mut Value, node: &CoercionNode) {
}
}
CoercionNode::Coercion(Coercion::ToTimestamp) => {
if let Some(as_str) = value.as_str() {
if let Some(parsed) = string_to_timestamp(as_str) {
*value = parsed
}
if let Some(as_str) = value.as_str()
&& let Some(parsed) = string_to_timestamp(as_str)
{
*value = parsed
}
}
CoercionNode::Tree(tree) => {
for (name, node) in tree.root.iter() {
let fields = value.as_object_mut();
if let Some(fields) = fields {
if let Some(value) = fields.get_mut(name) {
apply_coercion(value, node);
}
if let Some(fields) = fields
&& let Some(value) = fields.get_mut(name)
{
apply_coercion(value, node);
}
}
}
Expand Down
24 changes: 3 additions & 21 deletions src/dead_letters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,14 @@ use chrono::prelude::*;
use core::fmt::Debug;
use deltalake_core::parquet::errors::ParquetError;
use deltalake_core::{DeltaTable, DeltaTableError};
#[cfg(feature = "s3")]
use dynamodb_lock::dynamo_lock_options;
use log::{error, info, warn};
#[cfg(feature = "s3")]
use maplit::hashmap;
use log::{info, warn};
use rdkafka::message::BorrowedMessage;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;

use crate::{transforms::TransformError, writer::*};

#[cfg(feature = "s3")]
mod env_vars {
pub(crate) const DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE: &str =
"DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE";
}

/// Struct that represents a dead letter record.
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DeadLetter {
Expand Down Expand Up @@ -250,16 +240,8 @@ impl DeltaSinkDeadLetterQueue {
) -> Result<Self, DeadLetterQueueError> {
match &options.delta_table_uri {
Some(table_uri) => {
#[cfg(feature = "s3")]
let opts = hashmap! {
dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE)
.unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()),
};
#[cfg(all(feature = "azure", not(feature = "s3")))]
let opts = HashMap::default();

let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?;
let delta_writer = DataWriter::for_table(&table, opts)?;
let table = crate::delta_helpers::load_table(table_uri, HashMap::new()).await?;
let delta_writer = DataWriter::for_table(&table, HashMap::new())?;

Ok(Self {
table,
Expand Down
80 changes: 46 additions & 34 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ use futures::stream::StreamExt;
use log::{debug, error, info, warn};
use rdkafka::consumer::BaseConsumer;
use rdkafka::{
ClientContext, Message, Offset, TopicPartitionList,
config::ClientConfig,
consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer},
error::KafkaError,
util::Timeout,
ClientContext, Message, Offset, TopicPartitionList,
};
use serde_json::Value;
use serialization::{MessageDeserializer, MessageDeserializerFactory};
Expand Down Expand Up @@ -155,15 +155,15 @@ pub enum IngestError {
/// Error returned when a delta write fails.
/// Ending Kafka offsets and counts for each partition are included to help identify the Kafka buffer that caused the write to fail.
#[error(
"Delta write failed: ending_offsets: {ending_offsets}, partition_counts: {partition_counts}, source: {source}"
"Delta write failed: ending_offsets: {ending_offsets}, partition_counts: {partition_counts}, source: {source}"
)]
DeltaWriteFailed {
/// Ending offsets for each partition that failed to be written to delta.
ending_offsets: String,
/// Message counts for each partition that failed to be written to delta.
partition_counts: String,
/// The underlying DataWriterError.
source: DataWriterError,
source: Box<DataWriterError>,
},

/// Error returned when a message is received from Kafka that has already been processed.
Expand Down Expand Up @@ -194,7 +194,9 @@ pub enum IngestError {
DeltaSchemaChanged,

/// Error returned if the committed Delta table version does not match the version specified by the commit attempt.
#[error("Committed delta version {actual_version} does not match the version specified in the commit attempt {expected_version}")]
#[error(
"Committed delta version {actual_version} does not match the version specified in the commit attempt {expected_version}"
)]
UnexpectedVersionMismatch {
/// The version specified in the commit attempt
expected_version: i64,
Expand Down Expand Up @@ -338,11 +340,13 @@ pub async fn start_ingest(
"Ingesting messages from {} Kafka topic to {} Delta table",
topic, table_uri
);
info!("Using options: [allowed_latency={},max_messages_per_batch={},min_bytes_per_file={},write_checkpoints={}]",
info!(
"Using options: [allowed_latency={},max_messages_per_batch={},min_bytes_per_file={},write_checkpoints={}]",
opts.allowed_latency,
opts.max_messages_per_batch,
opts.min_bytes_per_file,
opts.write_checkpoints);
opts.write_checkpoints
);

// Initialize a RebalanceSignal to share between threads so it can be set when rebalance events are sent from Kafka and checked or cleared in the run loop.
// We use an RwLock so we can quickly skip past the typical case in the run loop where the rebalance signal is a None without starving the writer.
Expand Down Expand Up @@ -436,16 +440,19 @@ pub async fn start_ingest(

let message = message?;

if let Some(offset_map) = &_max_offsets {
if end_of_partition_reached(&message, offset_map) {
unassign_partition(cancellation_token.clone(), consumer.clone(), &message)?;
}
if let Some(offset_map) = &_max_offsets
&& end_of_partition_reached(&message, offset_map)
{
unassign_partition(cancellation_token.clone(), consumer.clone(), &message)?;
}
// Process the message if there wasn't a rebalance signal
if let Err(e) = ingest_processor.process_message(message).await {
match e {
IngestError::AlreadyProcessedPartitionOffset { partition, offset } => {
debug!("Skipping message with partition {}, offset {} on topic {} because it was already processed", partition, offset, topic);
debug!(
"Skipping message with partition {}, offset {} on topic {} because it was already processed",
partition, offset, topic
);
continue;
}
_ => return Err(e),
Expand Down Expand Up @@ -792,11 +799,7 @@ impl IngestProcessor {
fn consume_timeout_duration(&self) -> Duration {
let elapsed_secs = self.latency_timer.elapsed().as_secs();

let timeout_secs = if elapsed_secs >= self.opts.allowed_latency {
0
} else {
self.opts.allowed_latency - elapsed_secs
};
let timeout_secs = self.opts.allowed_latency.saturating_sub(elapsed_secs);

Duration::from_secs(timeout_secs)
}
Expand Down Expand Up @@ -924,7 +927,7 @@ impl IngestProcessor {
return Err(IngestError::DeltaWriteFailed {
ending_offsets: serde_json::to_string(&partition_offsets).unwrap(),
partition_counts: serde_json::to_string(&partition_counts).unwrap(),
source: *e,
source: Box::new(*e),
});
}
}
Expand Down Expand Up @@ -1010,7 +1013,10 @@ impl IngestProcessor {
}
Err(e) => match e {
DeltaTableError::VersionAlreadyExists(_) => {
error!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing", DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS);
error!(
"Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing",
DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS
);
return Err(e.into());
}
// if store == "DeltaS3ObjectStore"
Expand Down Expand Up @@ -1057,7 +1063,10 @@ impl IngestProcessor {
match offset {
Some(o) if *o == 0 => {
// MARK: workaround for rdkafka error when attempting seek to offset 0
info!("Seeking consumer to beginning for partition {}. Delta log offset is 0, but seek to zero is not possible.", p);
info!(
"Seeking consumer to beginning for partition {}. Delta log offset is 0, but seek to zero is not possible.",
p
);
self.consumer
.seek(&self.topic, *p, Offset::Beginning, Timeout::Never)?;
}
Expand All @@ -1069,12 +1078,18 @@ impl IngestProcessor {
}
None => match self.opts.auto_offset_reset {
AutoOffsetReset::Earliest => {
info!("Seeking consumer to beginning for partition {}. Partition has no stored offset but 'auto.offset.reset' is earliest", p);
info!(
"Seeking consumer to beginning for partition {}. Partition has no stored offset but 'auto.offset.reset' is earliest",
p
);
self.consumer
.seek(&self.topic, *p, Offset::Beginning, Timeout::Never)?;
}
AutoOffsetReset::Latest => {
info!("Seeking consumer to end for partition {}. Partition has no stored offset but 'auto.offset.reset' is latest", p);
info!(
"Seeking consumer to end for partition {}. Partition has no stored offset but 'auto.offset.reset' is latest",
p
);
self.consumer
.seek(&self.topic, *p, Offset::End, Timeout::Never)?;
}
Expand All @@ -1089,14 +1104,14 @@ impl IngestProcessor {

/// Returns a boolean indicating whether a message with `partition` and `offset` should be processed given current state.
fn should_process_offset(&self, partition: DataTypePartition, offset: DataTypeOffset) -> bool {
if let Some(Some(written_offset)) = self.delta_partition_offsets.get(&partition) {
if offset <= *written_offset {
debug!(
"Message with partition {} offset {} on topic {} is already in delta log so skipping.",
partition, offset, self.topic
);
return false;
}
if let Some(Some(written_offset)) = self.delta_partition_offsets.get(&partition)
&& offset <= *written_offset
{
debug!(
"Message with partition {} offset {} on topic {} is already in delta log so skipping.",
partition, offset, self.topic
);
return false;
}

true
Expand Down Expand Up @@ -1219,13 +1234,10 @@ impl PartitionAssignment {
/// Returns a copy of the current partition offsets as a [`HashMap`] for all partitions that have an offset stored in memory.
/// Partitions that do not have an offset stored in memory (offset is [`None`]) are **not** included in the returned HashMap.
fn nonempty_partition_offsets(&self) -> HashMap<DataTypePartition, DataTypeOffset> {
let partition_offsets = self
.assignment
self.assignment
.iter()
.filter_map(|(k, v)| v.as_ref().map(|o| (*k, *o)))
.collect();

partition_offsets
.collect()
}
}

Expand Down
16 changes: 10 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
use chrono::Local;
use clap::{Arg, ArgAction, ArgGroup, ArgMatches, Command};
use kafka_delta_ingest::{
start_ingest, AutoOffsetReset, DataTypeOffset, DataTypePartition, IngestOptions, MessageFormat,
SchemaSource,
AutoOffsetReset, DataTypeOffset, DataTypePartition, IngestOptions, MessageFormat, SchemaSource,
start_ingest,
};
use log::{error, info, LevelFilter};
use log::{LevelFilter, error, info};
use std::collections::HashMap;
use std::io::prelude::*;
use std::path::PathBuf;
Expand Down Expand Up @@ -262,13 +262,17 @@ fn init_logger(app_id: String) {
}

#[derive(thiserror::Error, Debug)]
#[error("'{value}' - Each transform argument must be colon delimited and match the pattern 'PROPERTY: SOURCE'")]
#[error(
"'{value}' - Each transform argument must be colon delimited and match the pattern 'PROPERTY: SOURCE'"
)]
struct TransformSyntaxError {
value: String,
}

#[derive(thiserror::Error, Debug)]
#[error("'{value}' - Each Kafka setting must be delimited by an '=' and match the pattern 'PROPERTY_NAME=PROPERTY_VALUE'")]
#[error(
"'{value}' - Each Kafka setting must be delimited by an '=' and match the pattern 'PROPERTY_NAME=PROPERTY_VALUE'"
)]
struct KafkaPropertySyntaxError {
value: String,
}
Expand Down Expand Up @@ -515,7 +519,7 @@ mod test {
use kafka_delta_ingest::{MessageFormat, SchemaSource};

use crate::{
build_app, convert_matches_to_message_format, parse_seek_offsets, SchemaSourceError,
SchemaSourceError, build_app, convert_matches_to_message_format, parse_seek_offsets,
};

const SCHEMA_REGISTRY_ADDRESS: &str = "http://localhost:8081";
Expand Down
7 changes: 5 additions & 2 deletions src/offsets.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use crate::delta_helpers::*;
use crate::{DataTypeOffset, DataTypePartition};
use deltalake_core::kernel::Action;
use deltalake_core::kernel::transaction::CommitBuilder;
use deltalake_core::kernel::transaction::TableReference;
use deltalake_core::kernel::Action;
use deltalake_core::protocol::DeltaOperation;
use deltalake_core::protocol::OutputMode;
use deltalake_core::{DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -139,7 +139,10 @@ async fn commit_partition_offsets(
}
Err(e) => match e {
DeltaTableError::VersionAlreadyExists(_) => {
error!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing", crate::DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS);
error!(
"Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing",
crate::DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS
);
Err(e)
}
_ => Err(e),
Expand Down
Loading
Loading