Skip to content

Commit cdab887

Browse files
authored
dict layout writer to allow empty input stream (#3652)
Fix #3600 --------- Signed-off-by: Onur Satici <[email protected]>
1 parent dae960a commit cdab887

File tree

2 files changed

+55
-25
lines changed

2 files changed

+55
-25
lines changed

vortex-file/src/tests.rs

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,33 @@ async fn write_chunked() {
375375
assert_eq!(array_len, 48);
376376
}
377377

378+
#[tokio::test]
379+
#[cfg_attr(miri, ignore)]
380+
async fn test_empty_varbin_array_roundtrip() {
381+
let empty = VarBinArray::from(Vec::<&str>::new()).into_array();
382+
383+
let st = StructArray::from_fields(&[("a", empty)]).unwrap();
384+
385+
let buf = VortexWriteOptions::default()
386+
.write(ByteBufferMut::empty(), st.to_array_stream())
387+
.await
388+
.unwrap();
389+
390+
let file = VortexOpenOptions::in_memory().open(buf).await.unwrap();
391+
392+
let result = file
393+
.scan()
394+
.unwrap()
395+
.into_array_stream()
396+
.unwrap()
397+
.read_all()
398+
.await
399+
.unwrap();
400+
401+
assert_eq!(result.len(), 0);
402+
assert_eq!(result.dtype(), st.dtype());
403+
}
404+
378405
#[tokio::test]
379406
#[cfg_attr(miri, ignore)]
380407
async fn filter_string() {

vortex-layout/src/layouts/dict/writer/mod.rs

Lines changed: 28 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -93,14 +93,18 @@ impl LayoutStrategy for DictStrategy {
9393
let executor = self.executor.clone();
9494
Box::pin(async move {
9595
// 0. decide if chunks are eligible for dict encoding
96-
let (stream, is_dict_encoding) = call_for_first_item(stream, |chunk| {
97-
let compressed = BtrBlocksCompressor.compress(chunk)?;
98-
Ok(compressed.is_encoding(DictEncoding.id()))
99-
})
100-
.await?;
96+
let (stream, first_chunk) = peek_first_chunk(stream).await?;
10197
let stream = SequentialStreamAdapter::new(dtype.clone(), stream).sendable();
102-
if !is_dict_encoding? {
103-
// first chunk did not compress to dict, skip dict layout
98+
99+
let should_fallback = match first_chunk {
100+
None => true, // empty stream
101+
Some(chunk) => {
102+
let compressed = BtrBlocksCompressor.compress(&chunk)?;
103+
!compressed.is_encoding(DictEncoding.id())
104+
}
105+
};
106+
if should_fallback {
107+
// first chunk did not compress to dict, or did not exist. Skip dict layout
104108
return fallback
105109
.write_stream(&ctx, sequence_writer.clone(), stream)
106110
.await;
@@ -133,14 +137,12 @@ impl LayoutStrategy for DictStrategy {
133137
let mut children = Vec::new();
134138
let mut runs = DictEncodedRuns::new(Box::pin(encoded_rx));
135139
while let Some((codes_stream, values_future)) = runs.next_run().await {
136-
let (codes_stream, codes_dtype) =
137-
call_for_first_item(codes_stream.boxed(), |chunk| {
138-
Ok(chunk.dtype().clone())
139-
})
140-
.await?;
141-
let Ok(codes_dtype) = codes_dtype else {
140+
let (codes_stream, first_chunk) =
141+
peek_first_chunk(codes_stream.boxed()).await?;
142+
let codes_dtype = match first_chunk {
142143
// codes_stream is empty, this would happen if the parent stream end coincided with a dict run end
143-
break;
144+
None => break,
145+
Some(chunk) => chunk.dtype().clone(),
144146
};
145147
let codes_layout = codes
146148
.write_stream(
@@ -415,18 +417,19 @@ impl Drop for DictEncodedRunStream {
415417
}
416418
}
417419

418-
async fn call_for_first_item<T>(
420+
async fn peek_first_chunk(
419421
mut stream: BoxStream<'static, SequencedChunk>,
420-
func: impl Fn(&ArrayRef) -> VortexResult<T>,
421-
) -> VortexResult<(BoxStream<'static, SequencedChunk>, VortexResult<T>)> {
422-
let Some(result) = stream.next().await else {
423-
return Ok((stream.boxed(), Err(vortex_err!("empty stream"))));
424-
};
425-
let (sequence_id, first_chunk) = result?;
426-
let res = func(&first_chunk);
427-
// reconstruct the stream
428-
let stream = once(async { Ok((sequence_id, first_chunk)) }).chain(stream);
429-
Ok((stream.boxed(), res))
422+
) -> VortexResult<(BoxStream<'static, SequencedChunk>, Option<ArrayRef>)> {
423+
match stream.next().await {
424+
None => Ok((stream.boxed(), None)),
425+
Some(Err(e)) => Err(e),
426+
Some(Ok((sequence_id, chunk))) => {
427+
let chunk_clone = chunk.clone();
428+
let reconstructed_stream =
429+
once(async move { Ok((sequence_id, chunk_clone)) }).chain(stream);
430+
Ok((reconstructed_stream.boxed(), Some(chunk)))
431+
}
432+
}
430433
}
431434

432435
pub fn dict_layout_supported(dtype: &DType) -> bool {

0 commit comments

Comments
 (0)