Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 58 additions & 39 deletions ballista/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,28 +125,29 @@ pub struct BallistaLogicalExtensionCodec {
}

impl BallistaLogicalExtensionCodec {
// looks for a codec which can operate on this node
// returns a position of codec in the list.
//
// position is important with encoding process
// as there is a need to remember which codec
// in the list was used to encode message,
// so we can use it for decoding as well

fn try_any<T>(
/// looks for a codec which can operate on this node
/// returns a position of codec in the list and result.
///
/// position is important with encoding process
/// as position of used codecs is needed
/// so the same codec can be used for decoding

fn try_any<R>(
&self,
mut f: impl FnMut(&dyn LogicalExtensionCodec) -> Result<T>,
) -> Result<(u8, T)> {
mut f: impl FnMut(&dyn LogicalExtensionCodec) -> Result<R>,
) -> Result<(u32, R)> {
let mut last_err = None;
for (position, codec) in self.file_format_codecs.iter().enumerate() {
match f(codec.as_ref()) {
Ok(node) => return Ok((position as u8, node)),
Ok(result) => return Ok((position as u32, result)),
Err(err) => last_err = Some(err),
}
}

Err(last_err.unwrap_or_else(|| {
DataFusionError::NotImplemented("Empty list of composed codecs".to_owned())
DataFusionError::Internal(
"List of provided extended logical codecs is empty".to_owned(),
)
}))
}
}
Expand All @@ -155,10 +156,12 @@ impl Default for BallistaLogicalExtensionCodec {
fn default() -> Self {
Self {
default_codec: Arc::new(DefaultLogicalExtensionCodec {}),
// Position in this list is important as it will be used for decoding.
// If new codec is added it should go to last position.
file_format_codecs: vec![
Arc::new(ParquetLogicalExtensionCodec {}),
Arc::new(CsvLogicalExtensionCodec {}),
Arc::new(JsonLogicalExtensionCodec {}),
Arc::new(ParquetLogicalExtensionCodec {}),
Arc::new(ArrowLogicalExtensionCodec {}),
Arc::new(AvroLogicalExtensionCodec {}),
],
Expand Down Expand Up @@ -210,38 +213,35 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec {
buf: &[u8],
ctx: &datafusion::prelude::SessionContext,
) -> Result<Arc<dyn datafusion::datasource::file_format::FileFormatFactory>> {
if !buf.is_empty() {
// gets codec id from input buffer
let codec_number = buf[0];
let codec = self.file_format_codecs.get(codec_number as usize).ok_or(
DataFusionError::NotImplemented("Can't find required codex".to_owned()),
)?;

codec.try_decode_file_format(&buf[1..], ctx)
} else {
Err(DataFusionError::NotImplemented(
"File format blob should have more than 0 bytes".to_owned(),
))
}
let proto = FileFormatProto::decode(buf)
.map_err(|e| DataFusionError::Internal(e.to_string()))?;

let codec = self
.file_format_codecs
.get(proto.encoder_position as usize)
.ok_or(DataFusionError::Internal(
"Can't find required codec in file codec list".to_owned(),
))?;

codec.try_decode_file_format(&proto.blob, ctx)
}

fn try_encode_file_format(
&self,
buf: &mut Vec<u8>,
node: Arc<dyn datafusion::datasource::file_format::FileFormatFactory>,
) -> Result<()> {
let mut encoded_format = vec![];
let (codec_number, _) = self.try_any(|codec| {
codec.try_encode_file_format(&mut encoded_format, node.clone())
})?;
// we need to remember which codec in the list was used to
// encode this node.
buf.push(codec_number);

// save actual encoded node
buf.append(&mut encoded_format);

Ok(())
let mut blob = vec![];
let (encoder_position, _) =
self.try_any(|codec| codec.try_encode_file_format(&mut blob, node.clone()))?;

let proto = FileFormatProto {
encoder_position,
blob,
};
proto
.encode(buf)
.map_err(|e| DataFusionError::Internal(e.to_string()))
}
}

Expand Down Expand Up @@ -429,6 +429,25 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
}
}

/// FileFormatProto captures data encoded by file format codecs
///
/// it captures position of codec used to encode FileFormat
/// and actual encoded value.
///
/// capturing codec position is required, as same codec can decode
/// blobs encoded by different encoders (probability is low but it
/// happened in the past)
///
#[derive(Clone, PartialEq, prost::Message)]
struct FileFormatProto {
/// encoder id used to encode blob
/// (to be used for decoding)
#[prost(uint32, tag = 1)]
pub encoder_position: u32,
#[prost(bytes, tag = 2)]
pub blob: Vec<u8>,
}

#[cfg(test)]
mod test {
use datafusion::{
Expand Down
Loading