diff --git a/datafusion-postgres/src/datatypes.rs b/datafusion-postgres/src/datatypes.rs index ca199b1..0ace6bb 100644 --- a/datafusion-postgres/src/datatypes.rs +++ b/datafusion-postgres/src/datatypes.rs @@ -1,3 +1,4 @@ +use std::iter; use std::str::FromStr; use std::sync::Arc; @@ -649,24 +650,29 @@ pub(crate) async fn encode_dataframe<'a>( let fields_ref = fields.clone(); let pg_row_stream = recordbatch_stream .map(move |rb: datafusion::error::Result| { - let rb = rb.unwrap(); - let rows = rb.num_rows(); - let cols = rb.num_columns(); - - let fields = fields_ref.clone(); - - let row_stream = (0..rows).map(move |row| { - let mut encoder = DataRowEncoder::new(fields.clone()); - for col in 0..cols { - let array = rb.column(col); - if array.is_null(row) { - encoder.encode_field(&None::).unwrap(); - } else { - encode_value(&mut encoder, array, row).unwrap(); - } + let row_stream: Box + Send> = match rb { + Ok(rb) => { + let rows = rb.num_rows(); + let cols = rb.num_columns(); + + let fields = fields_ref.clone(); + + let row_stream = (0..rows).map(move |row| { + let mut encoder = DataRowEncoder::new(fields.clone()); + for col in 0..cols { + let array = rb.column(col); + if array.is_null(row) { + encoder.encode_field(&None::).unwrap(); + } else { + encode_value(&mut encoder, array, row).unwrap(); + } + } + encoder.finish() + }); + Box::new(row_stream) } - encoder.finish() - }); + Err(e) => Box::new(iter::once(Err(PgWireError::ApiError(e.into())))), + }; stream::iter(row_stream) })