Skip to content

Commit 1b185d4

Browse files
authored
BallistaLogicalExtensionCodec improvements (#1127)
1 parent b3685d2 commit 1b185d4

File tree

1 file changed

+58
-39
lines changed

1 file changed

+58
-39
lines changed

ballista/core/src/serde/mod.rs

Lines changed: 58 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -125,28 +125,29 @@ pub struct BallistaLogicalExtensionCodec {
125125
}
126126

127127
impl BallistaLogicalExtensionCodec {
128-
// looks for a codec which can operate on this node
129-
// returns a position of codec in the list.
130-
//
131-
// position is important with encoding process
132-
// as there is a need to remember which codec
133-
// in the list was used to encode message,
134-
// so we can use it for decoding as well
135-
136-
fn try_any<T>(
128+
/// looks for a codec which can operate on this node
129+
/// returns a position of codec in the list and result.
130+
///
131+
/// position is important with encoding process
132+
/// as position of used codecs is needed
133+
/// so the same codec can be used for decoding
134+
135+
fn try_any<R>(
137136
&self,
138-
mut f: impl FnMut(&dyn LogicalExtensionCodec) -> Result<T>,
139-
) -> Result<(u8, T)> {
137+
mut f: impl FnMut(&dyn LogicalExtensionCodec) -> Result<R>,
138+
) -> Result<(u32, R)> {
140139
let mut last_err = None;
141140
for (position, codec) in self.file_format_codecs.iter().enumerate() {
142141
match f(codec.as_ref()) {
143-
Ok(node) => return Ok((position as u8, node)),
142+
Ok(result) => return Ok((position as u32, result)),
144143
Err(err) => last_err = Some(err),
145144
}
146145
}
147146

148147
Err(last_err.unwrap_or_else(|| {
149-
DataFusionError::NotImplemented("Empty list of composed codecs".to_owned())
148+
DataFusionError::Internal(
149+
"List of provided extended logical codecs is empty".to_owned(),
150+
)
150151
}))
151152
}
152153
}
@@ -155,10 +156,12 @@ impl Default for BallistaLogicalExtensionCodec {
155156
fn default() -> Self {
156157
Self {
157158
default_codec: Arc::new(DefaultLogicalExtensionCodec {}),
159+
// Position in this list is important as it will be used for decoding.
160+
// If new codec is added it should go to last position.
158161
file_format_codecs: vec![
162+
Arc::new(ParquetLogicalExtensionCodec {}),
159163
Arc::new(CsvLogicalExtensionCodec {}),
160164
Arc::new(JsonLogicalExtensionCodec {}),
161-
Arc::new(ParquetLogicalExtensionCodec {}),
162165
Arc::new(ArrowLogicalExtensionCodec {}),
163166
Arc::new(AvroLogicalExtensionCodec {}),
164167
],
@@ -210,38 +213,35 @@ impl LogicalExtensionCodec for BallistaLogicalExtensionCodec {
210213
buf: &[u8],
211214
ctx: &datafusion::prelude::SessionContext,
212215
) -> Result<Arc<dyn datafusion::datasource::file_format::FileFormatFactory>> {
213-
if !buf.is_empty() {
214-
// gets codec id from input buffer
215-
let codec_number = buf[0];
216-
let codec = self.file_format_codecs.get(codec_number as usize).ok_or(
217-
DataFusionError::NotImplemented("Can't find required codex".to_owned()),
218-
)?;
219-
220-
codec.try_decode_file_format(&buf[1..], ctx)
221-
} else {
222-
Err(DataFusionError::NotImplemented(
223-
"File format blob should have more than 0 bytes".to_owned(),
224-
))
225-
}
216+
let proto = FileFormatProto::decode(buf)
217+
.map_err(|e| DataFusionError::Internal(e.to_string()))?;
218+
219+
let codec = self
220+
.file_format_codecs
221+
.get(proto.encoder_position as usize)
222+
.ok_or(DataFusionError::Internal(
223+
"Can't find required codec in file codec list".to_owned(),
224+
))?;
225+
226+
codec.try_decode_file_format(&proto.blob, ctx)
226227
}
227228

228229
fn try_encode_file_format(
229230
&self,
230231
buf: &mut Vec<u8>,
231232
node: Arc<dyn datafusion::datasource::file_format::FileFormatFactory>,
232233
) -> Result<()> {
233-
let mut encoded_format = vec![];
234-
let (codec_number, _) = self.try_any(|codec| {
235-
codec.try_encode_file_format(&mut encoded_format, node.clone())
236-
})?;
237-
// we need to remember which codec in the list was used to
238-
// encode this node.
239-
buf.push(codec_number);
240-
241-
// save actual encoded node
242-
buf.append(&mut encoded_format);
243-
244-
Ok(())
234+
let mut blob = vec![];
235+
let (encoder_position, _) =
236+
self.try_any(|codec| codec.try_encode_file_format(&mut blob, node.clone()))?;
237+
238+
let proto = FileFormatProto {
239+
encoder_position,
240+
blob,
241+
};
242+
proto
243+
.encode(buf)
244+
.map_err(|e| DataFusionError::Internal(e.to_string()))
245245
}
246246
}
247247

@@ -429,6 +429,25 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
429429
}
430430
}
431431

432+
/// FileFormatProto captures data encoded by file format codecs
433+
///
434+
/// it captures position of codec used to encode FileFormat
435+
/// and actual encoded value.
436+
///
437+
/// capturing codec position is required, as same codec can decode
438+
/// blobs encoded by different encoders (probability is low but it
439+
/// happened in the past)
440+
///
441+
#[derive(Clone, PartialEq, prost::Message)]
442+
struct FileFormatProto {
443+
/// encoder id used to encode blob
444+
/// (to be used for decoding)
445+
#[prost(uint32, tag = 1)]
446+
pub encoder_position: u32,
447+
#[prost(bytes, tag = 2)]
448+
pub blob: Vec<u8>,
449+
}
450+
432451
#[cfg(test)]
433452
mod test {
434453
use datafusion::{

0 commit comments

Comments
 (0)