Skip to content

Commit 5b6e585

Browse files
authored
feat: Adds support for Avro single-object encoding (#215)
* Merge branch 'soe-avro' * Removes needles return and additional line break
1 parent c1c71bd commit 5b6e585

File tree

8 files changed

+7012
-5
lines changed

8 files changed

+7012
-5
lines changed

Cargo.lock

Lines changed: 6724 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ rust-version = "1.81"
99
flate2 = "1.0"
1010
anyhow = "1"
1111
async-trait = "0.1"
12-
apache-avro = "^0.14"
12+
apache-avro = "^0.17"
1313
base64 = "0.13"
1414
bytes = "1"
1515
chrono = "0.4.31"
@@ -25,14 +25,15 @@ rdkafka = { version = "0.28", features = ["ssl"] }
2525
schema_registry_converter = { version = "3.1.0", features = ["easy", "json", "avro"] }
2626
serde = { version = "1", features = ["derive"] }
2727
serde_json = "1"
28-
strum = "0.20"
29-
strum_macros = "0.20"
28+
strum = "0.27.1"
29+
strum_macros = "0.27.1"
3030
thiserror = "1"
3131
tokio = { version = "1", features = ["full"] }
3232
tokio-stream = { version = "0", features = ["fs"] }
3333
tokio-util = "0.6.3"
3434
uuid = { version = "0.8", features = ["serde", "v4"] }
3535
url = "2.3"
36+
dashmap = "6.0.1"
3637

3738
# datafusion feature is required for writer version 2
3839
deltalake-core = { version = "0.25.0", features = ["json", "datafusion"]}

src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,9 @@ pub enum MessageFormat {
218218

219219
/// Parses avro messages using provided schema, schema registry or schema within file
220220
Avro(SchemaSource),
221+
222+
/// Parses avro messages in the single object encoding format, PathBuf can either point to a single avro schema file or a directory containing (only) multiple avro schema files
223+
SoeAvro(PathBuf),
221224
}
222225

223226
/// Source for schema

src/main.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,24 @@ fn to_schema_source(
221221
}
222222
}
223223

224+
fn to_schema_path(input: Option<&String>) -> Result<PathBuf, SchemaSourceError> {
225+
match input {
226+
None => Err(SchemaSourceError::NoFileSpecified),
227+
Some(value) => {
228+
if value.is_empty() {
229+
return Err(SchemaSourceError::NoFileSpecified);
230+
}
231+
let p = PathBuf::from_str(value)?;
232+
if !p.exists() {
233+
return Err(SchemaSourceError::FileNotFound {
234+
file_name: (*value).clone(),
235+
});
236+
}
237+
Ok(p)
238+
}
239+
}
240+
}
241+
224242
fn init_logger(app_id: String) {
225243
let app_id: &'static str = Box::leak(app_id.into_boxed_str());
226244
let log_level = std::env::var("RUST_LOG")
@@ -272,6 +290,9 @@ enum SchemaSourceError {
272290
},
273291
#[error("File not found error: {file_name}")]
274292
FileNotFound { file_name: String },
293+
294+
#[error("No file specified error")]
295+
NoFileSpecified,
275296
}
276297

277298
fn parse_kafka_property(val: &str) -> Result<(String, String), KafkaPropertySyntaxError> {
@@ -444,8 +465,12 @@ This can be used to provide TLS configuration as in:
444465
.env("AVRO_REGISTRY")
445466
.required(false)
446467
.help("Schema registry endpoint, local path, or empty string"))
468+
.arg(Arg::new("soe-avro")
469+
.long("soe-avro")
470+
.required(false)
471+
.help("Local path to either a single Avro schema file or a directory containing (only) Avro schematas"))
447472
.group(ArgGroup::new("format")
448-
.args(["json", "avro"])
473+
.args(["json", "avro","soe-avro"])
449474
.required(false))
450475
.arg(Arg::new("end")
451476
.short('e')
@@ -476,6 +501,11 @@ fn convert_matches_to_message_format(
476501
.map(MessageFormat::Avro);
477502
}
478503

504+
if ingest_matches.contains_id("soe-avro") {
505+
return to_schema_path(ingest_matches.get_one::<String>("soe-avro"))
506+
.map(MessageFormat::SoeAvro);
507+
}
508+
479509
to_schema_source(ingest_matches.get_one::<String>("json"), true).map(MessageFormat::Json)
480510
}
481511

src/serialization.rs

Lines changed: 147 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,22 @@
11
use crate::{dead_letters::DeadLetter, MessageDeserializationError, MessageFormat};
22
use async_trait::async_trait;
3+
use dashmap::DashMap;
34
use flate2::read::GzDecoder;
45
use schema_registry_converter::async_impl::{
56
easy_avro::EasyAvroDecoder, easy_json::EasyJsonDecoder, schema_registry::SrSettings,
67
};
78
use serde_json::Value;
8-
use std::{borrow::BorrowMut, convert::TryFrom, io::Cursor, io::Read, path::PathBuf};
9+
10+
// use crate::avro_canonical_schema_workaround::parse_into_canonical_form;
11+
use apache_avro::{rabin::Rabin, GenericSingleObjectReader, Schema};
12+
use std::{
13+
borrow::BorrowMut,
14+
convert::{TryFrom, TryInto},
15+
io::{Cursor, Read},
16+
path::PathBuf,
17+
};
18+
19+
use log::debug;
920

1021
#[async_trait]
1122
pub(crate) trait MessageDeserializer {
@@ -48,6 +59,10 @@ impl MessageDeserializerFactory {
4859
}
4960
}
5061
},
62+
MessageFormat::SoeAvro(path) => match SoeAvroDeserializer::try_from_path(path) {
63+
Ok(s) => Ok(Box::new(s)),
64+
Err(e) => Err(e),
65+
},
5166
_ => Ok(Box::new(DefaultDeserializer::new(decompress_gzip))),
5267
}
5368
}
@@ -128,6 +143,11 @@ struct AvroDeserializer {
128143
decoder: EasyAvroDecoder,
129144
}
130145

146+
struct SoeAvroDeserializer {
147+
//Deserializer for avro single object encoding
148+
decoders: DashMap<i64, GenericSingleObjectReader>,
149+
}
150+
131151
#[derive(Default)]
132152
struct AvroSchemaDeserializer {
133153
schema: Option<apache_avro::Schema>,
@@ -137,6 +157,58 @@ struct JsonDeserializer {
137157
decoder: EasyJsonDecoder,
138158
}
139159

160+
#[async_trait]
161+
impl MessageDeserializer for SoeAvroDeserializer {
162+
async fn deserialize(
163+
&mut self,
164+
message_bytes: &[u8],
165+
) -> Result<Value, MessageDeserializationError> {
166+
let key = Self::extract_message_fingerprint(message_bytes).map_err(|e| {
167+
MessageDeserializationError::AvroDeserialization {
168+
dead_letter: DeadLetter::from_failed_deserialization(message_bytes, e.to_string()),
169+
}
170+
})?;
171+
172+
let decoder =
173+
self.decoders
174+
.get(&key)
175+
.ok_or(MessageDeserializationError::AvroDeserialization {
176+
dead_letter: DeadLetter::from_failed_deserialization(
177+
message_bytes,
178+
format!(
179+
"Unkown schema with fingerprint {}",
180+
&message_bytes[2..10]
181+
.iter()
182+
.map(|byte| format!("{:02x}", byte))
183+
.collect::<Vec<String>>()
184+
.join("")
185+
),
186+
),
187+
})?;
188+
let mut reader = Cursor::new(message_bytes);
189+
190+
match decoder.read_value(&mut reader) {
191+
Ok(drs) => match Value::try_from(drs) {
192+
Ok(v) => Ok(v),
193+
Err(e) => Err(MessageDeserializationError::AvroDeserialization {
194+
dead_letter: DeadLetter::from_failed_deserialization(
195+
message_bytes,
196+
e.to_string(),
197+
),
198+
}),
199+
},
200+
Err(e) => {
201+
return Err(MessageDeserializationError::AvroDeserialization {
202+
dead_letter: DeadLetter::from_failed_deserialization(
203+
message_bytes,
204+
e.to_string(),
205+
),
206+
});
207+
}
208+
}
209+
}
210+
}
211+
140212
#[async_trait]
141213
impl MessageDeserializer for AvroDeserializer {
142214
async fn deserialize(
@@ -293,5 +365,79 @@ impl AvroDeserializer {
293365
}
294366
}
295367

368+
impl SoeAvroDeserializer {
369+
pub(crate) fn try_from_path(path: &PathBuf) -> Result<Self, anyhow::Error> {
370+
if path.is_file() {
371+
let (key, seo_reader) = Self::read_single_schema_file(path)?;
372+
debug!(
373+
"Loaded schema {:?} with key (i64 rep of fingerprint) {:?}",
374+
path, key
375+
);
376+
let map: DashMap<i64, GenericSingleObjectReader> = DashMap::with_capacity(1);
377+
map.insert(key, seo_reader);
378+
Ok(SoeAvroDeserializer { decoders: map })
379+
} else if path.is_dir() {
380+
let decoders = path
381+
.read_dir()?
382+
.map(|file| {
383+
let file_path = file?.path();
384+
let value = Self::read_single_schema_file(&file_path)?;
385+
Ok(value)
386+
})
387+
.collect::<anyhow::Result<DashMap<_, _>>>()?;
388+
389+
Ok(SoeAvroDeserializer { decoders })
390+
} else {
391+
Err(anyhow::format_err!("Path '{:?}' does not exists", path))
392+
}
393+
}
394+
395+
fn read_single_schema_file(
396+
path: &PathBuf,
397+
) -> Result<(i64, GenericSingleObjectReader), anyhow::Error> {
398+
match std::fs::read_to_string(path) {
399+
Ok(content) => match Schema::parse_str(&content) {
400+
Ok(s) => {
401+
let fingerprint = s.fingerprint::<Rabin>().bytes;
402+
let fingerprint = fingerprint
403+
.try_into()
404+
.expect("Rabin fingerprints are 8 bytes");
405+
let key = Self::fingerprint_to_i64(fingerprint);
406+
match GenericSingleObjectReader::new(s) {
407+
Ok(decoder) => Ok((key, decoder)),
408+
Err(e) => Err(anyhow::format_err!(
409+
"Schema file '{:?}'; Error: {}",
410+
path,
411+
e.to_string()
412+
)),
413+
}
414+
}
415+
Err(e) => Err(anyhow::format_err!(
416+
"Schema file '{:?}'; Error: {}",
417+
path,
418+
e.to_string()
419+
)),
420+
},
421+
Err(e) => Err(anyhow::format_err!(
422+
"Schema file '{:?}'; Error: {}",
423+
path,
424+
e.to_string()
425+
)),
426+
}
427+
}
428+
429+
fn extract_message_fingerprint(msg: &[u8]) -> Result<i64, anyhow::Error> {
430+
msg.get(2..10)
431+
.ok_or(anyhow::anyhow!(
432+
"Message does not contain a valid fingerprint"
433+
))
434+
.map(|x| Self::fingerprint_to_i64(x.try_into().expect("Slice must be 8 bytes long")))
435+
}
436+
437+
fn fingerprint_to_i64(msg: [u8; 8]) -> i64 {
438+
i64::from_le_bytes(msg)
439+
}
440+
}
441+
296442
#[cfg(test)]
297443
mod tests {}

src/transforms.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ lazy_static! {
8484
"epoch_micros_to_iso8601",
8585
Box::new(create_epoch_micros_to_iso8601_fn()),
8686
);
87+
runtime.register_function(
88+
"epoch_millis_to_micro",
89+
Box::new(create_epoch_millis_to_micro_fn()),
90+
);
8791
runtime
8892
};
8993
}
@@ -202,6 +206,13 @@ fn create_epoch_micros_to_iso8601_fn() -> CustomFunction {
202206
)
203207
}
204208

209+
fn create_epoch_millis_to_micro_fn() -> CustomFunction {
210+
CustomFunction::new(
211+
Signature::new(vec![ArgumentType::Number], None),
212+
Box::new(jmespath_epoch_millis_to_micro),
213+
)
214+
}
215+
205216
fn substr(args: &[Rcvar], context: &mut Context) -> Result<Rcvar, JmespathError> {
206217
let s = args[0].as_string().ok_or_else(|| {
207218
InvalidTypeError::new(context, "string", args[0].get_type().to_string(), 0)
@@ -268,6 +279,14 @@ fn jmespath_epoch_micros_to_iso8601(
268279
let variable = Variable::try_from(value)?;
269280
Ok(Arc::new(variable))
270281
}
282+
fn jmespath_epoch_millis_to_micro(
283+
args: &[Rcvar],
284+
context: &mut Context,
285+
) -> Result<Rcvar, JmespathError> {
286+
let millis = i64_from_args(args, context, 0)?;
287+
let variable = Variable::Number((millis * 1000).into());
288+
Ok(Arc::new(variable))
289+
}
271290

272291
fn i64_from_args(
273292
args: &[Rcvar],
@@ -586,6 +605,41 @@ mod tests {
586605
assert_eq!(expected_iso, dt);
587606
}
588607

608+
#[test]
609+
fn test_epoch_millis_to_micro() {
610+
let mut test_value = json!({
611+
"name": "A",
612+
"modified": 1732279537028u64,
613+
});
614+
615+
let test_message = OwnedMessage::new(
616+
Some(test_value.to_string().into_bytes()),
617+
None,
618+
"test".to_string(),
619+
rdkafka::Timestamp::NotAvailable,
620+
0,
621+
0,
622+
None,
623+
);
624+
625+
let mut transforms = HashMap::new();
626+
627+
transforms.insert(
628+
"modified_micros".to_string(),
629+
"epoch_millis_to_micro(modified)".to_string(),
630+
);
631+
632+
let transformer = Transformer::from_transforms(&transforms).unwrap();
633+
634+
transformer
635+
.transform(&mut test_value, Some(&test_message))
636+
.unwrap();
637+
638+
let modified_date = test_value.get("modified_micros").unwrap().as_u64().unwrap();
639+
640+
assert_eq!(1732279537028000u64, modified_date);
641+
}
642+
589643
#[test]
590644
fn test_transforms_with_epoch_seconds_to_iso8601() {
591645
let expected_iso = "2021-07-20T23:18:18Z";

src/writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -584,6 +584,7 @@ impl DataWriter {
584584
let mut adds = self.write_parquet_files(&table.table_uri()).await?;
585585
let actions = adds.drain(..).map(Action::Add).collect();
586586
let commit = deltalake_core::operations::transaction::CommitBuilder::default()
587+
.with_max_retries(100) //We increase this from the default 15 times because (at leat for Azure) this may fail in case of to frequent writes (which happen if many messages arrive in the dead letter queue)
587588
.with_actions(actions)
588589
.build(
589590
table.state.as_ref().map(|s| s as &dyn TableReference),

0 commit comments

Comments
 (0)