diff --git a/Cargo.toml b/Cargo.toml index ca3276b..60da6a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,9 +1,10 @@ [package] name = "kafka-delta-ingest" -version = "0.4.0" +version = "0.5.0" authors = ["R. Tyler Croy ", "Christian Williams "] -edition = "2018" -rust-version = "1.81" +edition = "2024" +rust-version = "1.91" +resolver = "3" [dependencies] flate2 = "1.0" @@ -40,8 +41,6 @@ deltalake-core = { version = "0.26.0", features = ["json", "datafusion"]} deltalake-aws = { version = "0.9.0", optional = true } deltalake-azure = { version = "0.9.0", optional = true } -# s3 feature enabled, helps for locking interactions with DLQ -dynamodb_lock = { version = "0.6.0", optional = true } # sentry sentry = { version = "0.23.0", optional = true } @@ -54,7 +53,6 @@ azure = [ ] s3 = [ "deltalake-aws", - "dynamodb_lock", ] [dev-dependencies] diff --git a/README.adoc b/README.adoc index 6eb787d..5497872 100644 --- a/README.adoc +++ b/README.adoc @@ -252,7 +252,7 @@ The table below indicates what will happen with respect to the provided argument |=== -For more information, see link:https://github.com/delta-io/delta-rs/tree/dbc2994c5fddfd39fc31a8f9202df74788f59a01/dynamodb_lock[DynamoDB lock]. +NOTE: As of `deltalake >= 0.17.0`, DynamoDB-based locking on S3 is handled internally by the `deltalake-aws` crate via `S3DynamoDbLogStore`. Direct `dynamodb_lock` interactions are no longer required. == Verifying data in Azure Storage Use the Azure Portal to browse the file system: diff --git a/src/coercions.rs b/src/coercions.rs index 262469a..f63c2b5 100644 --- a/src/coercions.rs +++ b/src/coercions.rs @@ -85,19 +85,19 @@ fn apply_coercion(value: &mut Value, node: &CoercionNode) { } } CoercionNode::Coercion(Coercion::ToTimestamp) => { - if let Some(as_str) = value.as_str() { - if let Some(parsed) = string_to_timestamp(as_str) { - *value = parsed - } + if let Some(as_str) = value.as_str() + && let Some(parsed) = string_to_timestamp(as_str) + { + *value = parsed } } CoercionNode::Tree(tree) => { for (name, node) in tree.root.iter() { let fields = value.as_object_mut(); - if let Some(fields) = fields { - if let Some(value) = fields.get_mut(name) { - apply_coercion(value, node); - } + if let Some(fields) = fields + && let Some(value) = fields.get_mut(name) + { + apply_coercion(value, node); } } } diff --git a/src/dead_letters.rs b/src/dead_letters.rs index 9feb0f5..1f19880 100644 --- a/src/dead_letters.rs +++ b/src/dead_letters.rs @@ -4,11 +4,7 @@ use chrono::prelude::*; use core::fmt::Debug; use deltalake_core::parquet::errors::ParquetError; use deltalake_core::{DeltaTable, DeltaTableError}; -#[cfg(feature = "s3")] -use dynamodb_lock::dynamo_lock_options; -use log::{error, info, warn}; -#[cfg(feature = "s3")] -use maplit::hashmap; +use log::{info, warn}; use rdkafka::message::BorrowedMessage; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -16,12 +12,6 @@ use std::collections::HashMap; use crate::{transforms::TransformError, writer::*}; -#[cfg(feature = "s3")] -mod env_vars { - pub(crate) const DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE: &str = - "DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE"; -} - /// Struct that represents a dead letter record. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct DeadLetter { @@ -250,16 +240,8 @@ impl DeltaSinkDeadLetterQueue { ) -> Result { match &options.delta_table_uri { Some(table_uri) => { - #[cfg(feature = "s3")] - let opts = hashmap! { - dynamo_lock_options::DYNAMO_LOCK_PARTITION_KEY_VALUE.to_string() => std::env::var(env_vars::DEAD_LETTER_DYNAMO_LOCK_PARTITION_KEY_VALUE) - .unwrap_or_else(|_| "kafka_delta_ingest-dead_letters".to_string()), - }; - #[cfg(all(feature = "azure", not(feature = "s3")))] - let opts = HashMap::default(); - - let table = crate::delta_helpers::load_table(table_uri, opts.clone()).await?; - let delta_writer = DataWriter::for_table(&table, opts)?; + let table = crate::delta_helpers::load_table(table_uri, HashMap::new()).await?; + let delta_writer = DataWriter::for_table(&table, HashMap::new())?; Ok(Self { table, diff --git a/src/lib.rs b/src/lib.rs index 7296e04..f87ce65 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -26,11 +26,11 @@ use futures::stream::StreamExt; use log::{debug, error, info, warn}; use rdkafka::consumer::BaseConsumer; use rdkafka::{ + ClientContext, Message, Offset, TopicPartitionList, config::ClientConfig, consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer}, error::KafkaError, util::Timeout, - ClientContext, Message, Offset, TopicPartitionList, }; use serde_json::Value; use serialization::{MessageDeserializer, MessageDeserializerFactory}; @@ -155,7 +155,7 @@ pub enum IngestError { /// Error returned when a delta write fails. /// Ending Kafka offsets and counts for each partition are included to help identify the Kafka buffer that caused the write to fail. #[error( - "Delta write failed: ending_offsets: {ending_offsets}, partition_counts: {partition_counts}, source: {source}" + "Delta write failed: ending_offsets: {ending_offsets}, partition_counts: {partition_counts}, source: {source}" )] DeltaWriteFailed { /// Ending offsets for each partition that failed to be written to delta. @@ -163,7 +163,7 @@ pub enum IngestError { /// Message counts for each partition that failed to be written to delta. partition_counts: String, /// The underlying DataWriterError. - source: DataWriterError, + source: Box, }, /// Error returned when a message is received from Kafka that has already been processed. @@ -194,7 +194,9 @@ pub enum IngestError { DeltaSchemaChanged, /// Error returned if the committed Delta table version does not match the version specified by the commit attempt. - #[error("Committed delta version {actual_version} does not match the version specified in the commit attempt {expected_version}")] + #[error( + "Committed delta version {actual_version} does not match the version specified in the commit attempt {expected_version}" + )] UnexpectedVersionMismatch { /// The version specified in the commit attempt expected_version: i64, @@ -338,11 +340,13 @@ pub async fn start_ingest( "Ingesting messages from {} Kafka topic to {} Delta table", topic, table_uri ); - info!("Using options: [allowed_latency={},max_messages_per_batch={},min_bytes_per_file={},write_checkpoints={}]", + info!( + "Using options: [allowed_latency={},max_messages_per_batch={},min_bytes_per_file={},write_checkpoints={}]", opts.allowed_latency, opts.max_messages_per_batch, opts.min_bytes_per_file, - opts.write_checkpoints); + opts.write_checkpoints + ); // Initialize a RebalanceSignal to share between threads so it can be set when rebalance events are sent from Kafka and checked or cleared in the run loop. // We use an RwLock so we can quickly skip past the typical case in the run loop where the rebalance signal is a None without starving the writer. @@ -436,16 +440,19 @@ pub async fn start_ingest( let message = message?; - if let Some(offset_map) = &_max_offsets { - if end_of_partition_reached(&message, offset_map) { - unassign_partition(cancellation_token.clone(), consumer.clone(), &message)?; - } + if let Some(offset_map) = &_max_offsets + && end_of_partition_reached(&message, offset_map) + { + unassign_partition(cancellation_token.clone(), consumer.clone(), &message)?; } // Process the message if there wasn't a rebalance signal if let Err(e) = ingest_processor.process_message(message).await { match e { IngestError::AlreadyProcessedPartitionOffset { partition, offset } => { - debug!("Skipping message with partition {}, offset {} on topic {} because it was already processed", partition, offset, topic); + debug!( + "Skipping message with partition {}, offset {} on topic {} because it was already processed", + partition, offset, topic + ); continue; } _ => return Err(e), @@ -792,11 +799,7 @@ impl IngestProcessor { fn consume_timeout_duration(&self) -> Duration { let elapsed_secs = self.latency_timer.elapsed().as_secs(); - let timeout_secs = if elapsed_secs >= self.opts.allowed_latency { - 0 - } else { - self.opts.allowed_latency - elapsed_secs - }; + let timeout_secs = self.opts.allowed_latency.saturating_sub(elapsed_secs); Duration::from_secs(timeout_secs) } @@ -924,7 +927,7 @@ impl IngestProcessor { return Err(IngestError::DeltaWriteFailed { ending_offsets: serde_json::to_string(&partition_offsets).unwrap(), partition_counts: serde_json::to_string(&partition_counts).unwrap(), - source: *e, + source: Box::new(*e), }); } } @@ -1010,7 +1013,10 @@ impl IngestProcessor { } Err(e) => match e { DeltaTableError::VersionAlreadyExists(_) => { - error!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing", DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS); + error!( + "Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing", + DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS + ); return Err(e.into()); } // if store == "DeltaS3ObjectStore" @@ -1057,7 +1063,10 @@ impl IngestProcessor { match offset { Some(o) if *o == 0 => { // MARK: workaround for rdkafka error when attempting seek to offset 0 - info!("Seeking consumer to beginning for partition {}. Delta log offset is 0, but seek to zero is not possible.", p); + info!( + "Seeking consumer to beginning for partition {}. Delta log offset is 0, but seek to zero is not possible.", + p + ); self.consumer .seek(&self.topic, *p, Offset::Beginning, Timeout::Never)?; } @@ -1069,12 +1078,18 @@ impl IngestProcessor { } None => match self.opts.auto_offset_reset { AutoOffsetReset::Earliest => { - info!("Seeking consumer to beginning for partition {}. Partition has no stored offset but 'auto.offset.reset' is earliest", p); + info!( + "Seeking consumer to beginning for partition {}. Partition has no stored offset but 'auto.offset.reset' is earliest", + p + ); self.consumer .seek(&self.topic, *p, Offset::Beginning, Timeout::Never)?; } AutoOffsetReset::Latest => { - info!("Seeking consumer to end for partition {}. Partition has no stored offset but 'auto.offset.reset' is latest", p); + info!( + "Seeking consumer to end for partition {}. Partition has no stored offset but 'auto.offset.reset' is latest", + p + ); self.consumer .seek(&self.topic, *p, Offset::End, Timeout::Never)?; } @@ -1089,14 +1104,14 @@ impl IngestProcessor { /// Returns a boolean indicating whether a message with `partition` and `offset` should be processed given current state. fn should_process_offset(&self, partition: DataTypePartition, offset: DataTypeOffset) -> bool { - if let Some(Some(written_offset)) = self.delta_partition_offsets.get(&partition) { - if offset <= *written_offset { - debug!( - "Message with partition {} offset {} on topic {} is already in delta log so skipping.", - partition, offset, self.topic - ); - return false; - } + if let Some(Some(written_offset)) = self.delta_partition_offsets.get(&partition) + && offset <= *written_offset + { + debug!( + "Message with partition {} offset {} on topic {} is already in delta log so skipping.", + partition, offset, self.topic + ); + return false; } true @@ -1219,13 +1234,10 @@ impl PartitionAssignment { /// Returns a copy of the current partition offsets as a [`HashMap`] for all partitions that have an offset stored in memory. /// Partitions that do not have an offset stored in memory (offset is [`None`]) are **not** included in the returned HashMap. fn nonempty_partition_offsets(&self) -> HashMap { - let partition_offsets = self - .assignment + self.assignment .iter() .filter_map(|(k, v)| v.as_ref().map(|o| (*k, *o))) - .collect(); - - partition_offsets + .collect() } } diff --git a/src/main.rs b/src/main.rs index d66460b..ac5cc2a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -33,10 +33,10 @@ use chrono::Local; use clap::{Arg, ArgAction, ArgGroup, ArgMatches, Command}; use kafka_delta_ingest::{ - start_ingest, AutoOffsetReset, DataTypeOffset, DataTypePartition, IngestOptions, MessageFormat, - SchemaSource, + AutoOffsetReset, DataTypeOffset, DataTypePartition, IngestOptions, MessageFormat, SchemaSource, + start_ingest, }; -use log::{error, info, LevelFilter}; +use log::{LevelFilter, error, info}; use std::collections::HashMap; use std::io::prelude::*; use std::path::PathBuf; @@ -262,13 +262,17 @@ fn init_logger(app_id: String) { } #[derive(thiserror::Error, Debug)] -#[error("'{value}' - Each transform argument must be colon delimited and match the pattern 'PROPERTY: SOURCE'")] +#[error( + "'{value}' - Each transform argument must be colon delimited and match the pattern 'PROPERTY: SOURCE'" +)] struct TransformSyntaxError { value: String, } #[derive(thiserror::Error, Debug)] -#[error("'{value}' - Each Kafka setting must be delimited by an '=' and match the pattern 'PROPERTY_NAME=PROPERTY_VALUE'")] +#[error( + "'{value}' - Each Kafka setting must be delimited by an '=' and match the pattern 'PROPERTY_NAME=PROPERTY_VALUE'" +)] struct KafkaPropertySyntaxError { value: String, } @@ -515,7 +519,7 @@ mod test { use kafka_delta_ingest::{MessageFormat, SchemaSource}; use crate::{ - build_app, convert_matches_to_message_format, parse_seek_offsets, SchemaSourceError, + SchemaSourceError, build_app, convert_matches_to_message_format, parse_seek_offsets, }; const SCHEMA_REGISTRY_ADDRESS: &str = "http://localhost:8081"; diff --git a/src/offsets.rs b/src/offsets.rs index 00ae9ac..ba3c274 100644 --- a/src/offsets.rs +++ b/src/offsets.rs @@ -1,8 +1,8 @@ use crate::delta_helpers::*; use crate::{DataTypeOffset, DataTypePartition}; +use deltalake_core::kernel::Action; use deltalake_core::kernel::transaction::CommitBuilder; use deltalake_core::kernel::transaction::TableReference; -use deltalake_core::kernel::Action; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::OutputMode; use deltalake_core::{DeltaTable, DeltaTableError}; @@ -139,7 +139,10 @@ async fn commit_partition_offsets( } Err(e) => match e { DeltaTableError::VersionAlreadyExists(_) => { - error!("Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing", crate::DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS); + error!( + "Transaction attempt failed. Attempts exhausted beyond max_retry_commit_attempts of {} so failing", + crate::DEFAULT_DELTA_MAX_RETRY_COMMIT_ATTEMPTS + ); Err(e) } _ => Err(e), diff --git a/src/serialization.rs b/src/serialization.rs index 1832440..294d648 100644 --- a/src/serialization.rs +++ b/src/serialization.rs @@ -1,4 +1,4 @@ -use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat}; +use crate::{MessageDeserializationError, MessageFormat, dead_letters::DeadLetter}; use async_trait::async_trait; use dashmap::DashMap; use flate2::read::GzDecoder; @@ -8,7 +8,7 @@ use schema_registry_converter::async_impl::{ use serde_json::Value; // use crate::avro_canonical_schema_workaround::parse_into_canonical_form; -use apache_avro::{rabin::Rabin, GenericSingleObjectReader, Schema}; +use apache_avro::{GenericSingleObjectReader, Schema, rabin::Rabin}; use std::{ borrow::BorrowMut, convert::{TryFrom, TryInto}, @@ -350,9 +350,9 @@ impl AvroSchemaDeserializer { match std::fs::read_to_string(file) { Ok(content) => match apache_avro::Schema::parse_str(&content) { Ok(s) => Ok(AvroSchemaDeserializer { schema: Some(s) }), - Err(e) => Err(anyhow::format_err!("{}", e.to_string())), + Err(e) => Err(anyhow::format_err!("{}", e)), }, - Err(e) => Err(anyhow::format_err!("{}", e.to_string())), + Err(e) => Err(anyhow::format_err!("{}", e)), } } } @@ -408,20 +408,20 @@ impl SoeAvroDeserializer { Err(e) => Err(anyhow::format_err!( "Schema file '{:?}'; Error: {}", path, - e.to_string() + e )), } } Err(e) => Err(anyhow::format_err!( "Schema file '{:?}'; Error: {}", path, - e.to_string() + e )), }, Err(e) => Err(anyhow::format_err!( "Schema file '{:?}'; Error: {}", path, - e.to_string() + e )), } } diff --git a/src/transforms.rs b/src/transforms.rs index 98a4549..49774fc 100644 --- a/src/transforms.rs +++ b/src/transforms.rs @@ -1,7 +1,7 @@ use chrono::prelude::*; use jmespatch::{ - functions::{ArgumentType, CustomFunction, Signature}, Context, ErrorReason, Expression, JmespathError, Rcvar, Runtime, RuntimeError, Variable, + functions::{ArgumentType, CustomFunction, Signature}, }; use rdkafka::Message; use serde_json::{Map, Value}; diff --git a/src/writer.rs b/src/writer.rs index 5ba1fab..91fef0d 100644 --- a/src/writer.rs +++ b/src/writer.rs @@ -2,8 +2,8 @@ #[allow(deprecated)] use deltalake_core::arrow::{ array::{ - as_boolean_array, as_primitive_array, as_struct_array, make_array, Array, ArrayData, - StructArray, + Array, ArrayData, StructArray, as_boolean_array, as_primitive_array, as_struct_array, + make_array, }, buffer::MutableBuffer, datatypes::Schema as ArrowSchema, @@ -23,14 +23,14 @@ use deltalake_core::parquet::{ }; use deltalake_core::protocol::DeltaOperation; use deltalake_core::protocol::SaveMode; -use deltalake_core::{kernel::transaction::TableReference, parquet::format::FileMetaData}; use deltalake_core::{ + DeltaTable, DeltaTableError, ObjectStoreError, kernel::{Action, Add, Schema}, logstore::ObjectStoreRef, protocol::{ColumnCountStat, ColumnValueStat, Stats}, - DeltaTable, DeltaTableError, ObjectStoreError, }; -use log::{error, info, warn}; +use deltalake_core::{kernel::transaction::TableReference, parquet::format::FileMetaData}; +use log::*; use serde_json::{Number, Value}; use std::collections::HashMap; use std::convert::TryFrom; @@ -58,7 +58,9 @@ pub enum DataWriterError { MissingPartitionColumn(String), /// The Arrow RecordBatch schema does not match the expected schema. - #[error("Arrow RecordBatch schema does not match: RecordBatch schema: {record_batch_schema}, {expected_schema}")] + #[error( + "Arrow RecordBatch schema does not match: RecordBatch schema: {record_batch_schema}, {expected_schema}" + )] SchemaMismatch { /// The record batch schema. record_batch_schema: SchemaRef, @@ -231,7 +233,9 @@ impl DataArrowWriter { json_buffer: Vec, parquet_error: ParquetError, ) -> Result<(), Box> { - warn!("Failed with parquet error while writing record batch. Attempting quarantine of bad records."); + warn!( + "Failed with parquet error while writing record batch. Attempting quarantine of bad records." + ); let (good, bad) = quarantine_failed_parquet_rows(arrow_schema.clone(), json_buffer)?; let record_batch = record_batch_from_json(arrow_schema, good.as_slice())?; self.write_record_batch(partition_columns, record_batch) @@ -623,7 +627,8 @@ fn quarantine_failed_parquet_rows( let mut bad: Vec = Vec::new(); for value in values { - let record_batch = record_batch_from_json(arrow_schema.clone(), &[value.clone()])?; + let record_batch = + record_batch_from_json(arrow_schema.clone(), std::slice::from_ref(&value))?; let cursor = InMemoryWriteableCursor::default(); let mut writer = ArrowWriter::try_new(cursor.clone(), arrow_schema.clone(), None)?; diff --git a/tests/buffer_flush_tests.rs b/tests/buffer_flush_tests.rs index 71fb987..be8cf31 100644 --- a/tests/buffer_flush_tests.rs +++ b/tests/buffer_flush_tests.rs @@ -5,7 +5,7 @@ use log::info; use serde::{Deserialize, Serialize}; use serde_json::json; use serial_test::serial; -use tokio::time::{sleep, Duration}; +use tokio::time::{Duration, sleep}; use kafka_delta_ingest::IngestOptions; diff --git a/tests/dead_letter_tests.rs b/tests/dead_letter_tests.rs index 0b9082a..280d983 100644 --- a/tests/dead_letter_tests.rs +++ b/tests/dead_letter_tests.rs @@ -2,8 +2,8 @@ use kafka_delta_ingest::IngestOptions; use log::info; use serde::{Deserialize, Serialize}; -use serde_json::json; use serde_json::Value; +use serde_json::json; use uuid::Uuid; #[allow(dead_code)] @@ -167,9 +167,11 @@ async fn test_dlq() { .collect(); assert_eq!(bad_null_struct_records.len(), 2); - assert!(dlq_content - .iter() - .all(|v| v.get("date").unwrap().as_str() == Some(expected_date.as_str()))); + assert!( + dlq_content + .iter() + .all(|v| v.get("date").unwrap().as_str() == Some(expected_date.as_str())) + ); } fn create_table() -> String { diff --git a/tests/delta_partitions_tests.rs b/tests/delta_partitions_tests.rs index efa4326..0d0be8b 100644 --- a/tests/delta_partitions_tests.rs +++ b/tests/delta_partitions_tests.rs @@ -1,14 +1,14 @@ #[allow(dead_code)] mod helpers; +use deltalake_core::DeltaTableError; use deltalake_core::kernel::transaction::CommitBuilder; use deltalake_core::kernel::transaction::TableReference; use deltalake_core::kernel::{Action, Add}; use deltalake_core::protocol::{DeltaOperation, SaveMode}; -use deltalake_core::DeltaTableError; use kafka_delta_ingest::writer::*; use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use std::collections::HashMap; #[derive(Debug, Serialize, Deserialize)] diff --git a/tests/deserialization_tests.rs b/tests/deserialization_tests.rs index e54ff21..89d46de 100644 --- a/tests/deserialization_tests.rs +++ b/tests/deserialization_tests.rs @@ -7,7 +7,7 @@ use schema_registry_converter::{ async_impl::{ easy_avro::EasyAvroEncoder, easy_json::EasyJsonEncoder, - schema_registry::{post_schema, SrSettings}, + schema_registry::{SrSettings, post_schema}, }, error::SRCError, schema_registry_common::{RegisteredSchema, SchemaType, SubjectNameStrategy, SuppliedSchema}, diff --git a/tests/emails_azure_blob_tests.rs b/tests/emails_azure_blob_tests.rs index 1d8efb0..f8f851a 100644 --- a/tests/emails_azure_blob_tests.rs +++ b/tests/emails_azure_blob_tests.rs @@ -115,18 +115,21 @@ fn create_partitions_app_ids(num_p: i32) -> Vec { } fn create_options() -> IngestOptions { - env::set_var("AZURE_STORAGE_USE_EMULATOR", "true"); - env::set_var("AZURE_ACCOUNT_NAME", "devstoreaccount1"); - env::set_var( - "AZURE_ACCESS_KEY", - "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", - ); - env::set_var("AZURE_STORAGE_CONTAINER_NAME", "tests"); - env::set_var("AZURE_STORAGE_ALLOW_HTTP", "1"); - env::set_var("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000"); - env::set_var( - "AZURE_STORAGE_CONNECTION_STRING", - "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;"); + unsafe { + env::set_var("AZURE_STORAGE_USE_EMULATOR", "true"); + env::set_var("AZURE_ACCOUNT_NAME", "devstoreaccount1"); + env::set_var( + "AZURE_ACCESS_KEY", + "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==", + ); + env::set_var("AZURE_STORAGE_CONTAINER_NAME", "tests"); + env::set_var("AZURE_STORAGE_ALLOW_HTTP", "1"); + env::set_var("AZURITE_BLOB_STORAGE_URL", "http://127.0.0.1:10000"); + env::set_var( + "AZURE_STORAGE_CONNECTION_STRING", + "DefaultEndpointsProtocol=http;AccountName=devstoreaccount1;AccountKey=Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==;BlobEndpoint=http://localhost:10000/devstoreaccount1;QueueEndpoint=http://localhost:10001/devstoreaccount1;", + ); + } let mut additional_kafka_settings = HashMap::new(); additional_kafka_settings.insert("auto.offset.reset".to_string(), "earliest".to_string()); diff --git a/tests/emails_s3_tests.rs b/tests/emails_s3_tests.rs index 8d5e652..1b97453 100644 --- a/tests/emails_s3_tests.rs +++ b/tests/emails_s3_tests.rs @@ -77,16 +77,18 @@ async fn run_emails_s3_tests(initiate_rebalance: bool) { } fn create_options(name: &str) -> IngestOptions { - env::set_var("AWS_ENDPOINT_URL", helpers::test_aws_endpoint()); - env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); - env::set_var("AWS_REGION", "us-east-2"); - env::set_var("AWS_STORAGE_ALLOW_HTTP", "true"); - env::set_var("DELTA_DYNAMO_TABLE_NAME", "locks"); - env::set_var("DYNAMO_LOCK_OWNER_NAME", name); - env::set_var("DYNAMO_LOCK_PARTITION_KEY_VALUE", "emails_s3_tests"); - env::set_var("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); - env::set_var("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); - env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); + unsafe { + env::set_var("AWS_ENDPOINT_URL", helpers::test_aws_endpoint()); + env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); + env::set_var("AWS_REGION", "us-east-2"); + env::set_var("AWS_STORAGE_ALLOW_HTTP", "true"); + env::set_var("DELTA_DYNAMO_TABLE_NAME", "locks"); + env::set_var("DYNAMO_LOCK_OWNER_NAME", name); + env::set_var("DYNAMO_LOCK_PARTITION_KEY_VALUE", "emails_s3_tests"); + env::set_var("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); + env::set_var("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); + env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); + } let mut additional_kafka_settings = HashMap::new(); additional_kafka_settings.insert("auto.offset.reset".to_string(), "earliest".to_string()); @@ -115,15 +117,16 @@ fn create_options(name: &str) -> IngestOptions { } async fn prepare_table(topic: &str) -> String { - match env::var("AWS_ACCESS_KEY_ID") { - Err(_) => env::set_var("AWS_ACCESS_KEY_ID", "test"), - Ok(_) => {} - } - match env::var("AWS_SECRET_ACCESS_KEY") { - Err(_) => env::set_var("AWS_SECRET_ACCESS_KEY", "test"), - Ok(_) => {} + unsafe { + match env::var("AWS_ACCESS_KEY_ID") { + Err(_) => env::set_var("AWS_ACCESS_KEY_ID", "test"), + Ok(_) => {} + } + match env::var("AWS_SECRET_ACCESS_KEY") { + Err(_) => env::set_var("AWS_SECRET_ACCESS_KEY", "test"), + Ok(_) => {} + } } - let s3 = rusoto_s3::S3Client::new(Region::Custom { name: "custom".to_string(), endpoint: helpers::test_aws_endpoint(), diff --git a/tests/helpers/mod.rs b/tests/helpers/mod.rs index c14a9ae..358e58e 100644 --- a/tests/helpers/mod.rs +++ b/tests/helpers/mod.rs @@ -16,14 +16,14 @@ use deltalake_core::parquet::{ record::RowAccessor, }; use deltalake_core::{DeltaTable, Path}; -use kafka_delta_ingest::{start_ingest, IngestOptions}; +use kafka_delta_ingest::{IngestOptions, start_ingest}; +use rdkafka::ClientConfig; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::client::DefaultClientContext; use rdkafka::producer::{FutureProducer, FutureRecord}; use rdkafka::util::{DefaultRuntime, Timeout}; -use rdkafka::ClientConfig; use serde::de::DeserializeOwned; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use tokio::runtime::Runtime; use tokio::task::JoinHandle; use tokio_util::sync::CancellationToken; @@ -271,15 +271,17 @@ pub fn create_kdi_with( let app_id = options.app_id.to_string(); let worker_name = worker_name.unwrap_or(app_id.clone()); - env::set_var("AWS_ENDPOINT_URL", test_aws_endpoint()); - env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); - env::set_var("AWS_REGION", "us-east-2"); - env::set_var("DYNAMO_LOCK_TABLE_NAME", "locks"); - env::set_var("DYNAMO_LOCK_OWNER_NAME", Uuid::new_v4().to_string()); - env::set_var("DYNAMO_LOCK_PARTITION_KEY_VALUE", app_id.clone()); - env::set_var("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); - env::set_var("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); - env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); + unsafe { + env::set_var("AWS_ENDPOINT_URL", test_aws_endpoint()); + env::set_var("AWS_S3_LOCKING_PROVIDER", "dynamodb"); + env::set_var("AWS_REGION", "us-east-2"); + env::set_var("DYNAMO_LOCK_TABLE_NAME", "locks"); + env::set_var("DYNAMO_LOCK_OWNER_NAME", Uuid::new_v4().to_string()); + env::set_var("DYNAMO_LOCK_PARTITION_KEY_VALUE", app_id.clone()); + env::set_var("DYNAMO_LOCK_REFRESH_PERIOD_MILLIS", "100"); + env::set_var("DYNAMO_LOCK_ADDITIONAL_TIME_TO_WAIT_MILLIS", "100"); + env::set_var("DYNAMO_LOCK_LEASE_DURATION", "2"); + } let rt = create_runtime(&worker_name); let token = Arc::new(CancellationToken::new()); @@ -320,7 +322,9 @@ pub fn create_runtime(name: &str) -> Runtime { pub fn init_logger() { // Any time the test_aws_endpoint() is being used the ability to hit HTTP hosts // needs to be enabled - env::set_var("AWS_ALLOW_HTTP", "true"); + unsafe { + env::set_var("AWS_ALLOW_HTTP", "true"); + } let _ = env_logger::Builder::new() .format(|buf, record| { diff --git a/tests/offset_tests.rs b/tests/offset_tests.rs index ef06fd8..e4db585 100644 --- a/tests/offset_tests.rs +++ b/tests/offset_tests.rs @@ -1,5 +1,5 @@ -use deltalake_core::protocol::Stats; use deltalake_core::DeltaTable; +use deltalake_core::protocol::Stats; use log::*; use rdkafka::{producer::Producer, util::Timeout}; use serde::{Deserialize, Serialize}; diff --git a/tests/schema_update_tests.rs b/tests/schema_update_tests.rs index beb3355..9777f45 100644 --- a/tests/schema_update_tests.rs +++ b/tests/schema_update_tests.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use serde_json::{json, Value}; +use serde_json::{Value, json}; use serial_test::serial; use std::fs::File; use std::io::prelude::*;