Skip to content

Commit 0a52b70

Browse files
mjgartonsunng87
authored andcommitted
Avoid unwrapping Result<RecordBatch>
Avoid unwrapping `Result<RecordBatch>` because it can sometimes be an Err variant. If so, instead of iterating over the records in the batch, iterator over a single error value, so the error will end up being handled by the calling code.
1 parent 7431970 commit 0a52b70

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

datafusion-postgres/src/datatypes.rs

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::iter;
12
use std::str::FromStr;
23
use std::sync::Arc;
34

@@ -649,24 +650,29 @@ pub(crate) async fn encode_dataframe<'a>(
649650
let fields_ref = fields.clone();
650651
let pg_row_stream = recordbatch_stream
651652
.map(move |rb: datafusion::error::Result<RecordBatch>| {
652-
let rb = rb.unwrap();
653-
let rows = rb.num_rows();
654-
let cols = rb.num_columns();
655-
656-
let fields = fields_ref.clone();
657-
658-
let row_stream = (0..rows).map(move |row| {
659-
let mut encoder = DataRowEncoder::new(fields.clone());
660-
for col in 0..cols {
661-
let array = rb.column(col);
662-
if array.is_null(row) {
663-
encoder.encode_field(&None::<i8>).unwrap();
664-
} else {
665-
encode_value(&mut encoder, array, row).unwrap();
666-
}
653+
let row_stream: Box<dyn Iterator<Item = _> + Send> = match rb {
654+
Ok(rb) => {
655+
let rows = rb.num_rows();
656+
let cols = rb.num_columns();
657+
658+
let fields = fields_ref.clone();
659+
660+
let row_stream = (0..rows).map(move |row| {
661+
let mut encoder = DataRowEncoder::new(fields.clone());
662+
for col in 0..cols {
663+
let array = rb.column(col);
664+
if array.is_null(row) {
665+
encoder.encode_field(&None::<i8>).unwrap();
666+
} else {
667+
encode_value(&mut encoder, array, row).unwrap();
668+
}
669+
}
670+
encoder.finish()
671+
});
672+
Box::new(row_stream)
667673
}
668-
encoder.finish()
669-
});
674+
Err(e) => Box::new(iter::once(Err(PgWireError::ApiError(e.into())))),
675+
};
670676

671677
stream::iter(row_stream)
672678
})

0 commit comments

Comments
 (0)