|
| 1 | +use std::iter; |
1 | 2 | use std::str::FromStr; |
2 | 3 | use std::sync::Arc; |
3 | 4 |
|
@@ -649,24 +650,29 @@ pub(crate) async fn encode_dataframe<'a>( |
649 | 650 | let fields_ref = fields.clone(); |
650 | 651 | let pg_row_stream = recordbatch_stream |
651 | 652 | .map(move |rb: datafusion::error::Result<RecordBatch>| { |
652 | | - let rb = rb.unwrap(); |
653 | | - let rows = rb.num_rows(); |
654 | | - let cols = rb.num_columns(); |
655 | | - |
656 | | - let fields = fields_ref.clone(); |
657 | | - |
658 | | - let row_stream = (0..rows).map(move |row| { |
659 | | - let mut encoder = DataRowEncoder::new(fields.clone()); |
660 | | - for col in 0..cols { |
661 | | - let array = rb.column(col); |
662 | | - if array.is_null(row) { |
663 | | - encoder.encode_field(&None::<i8>).unwrap(); |
664 | | - } else { |
665 | | - encode_value(&mut encoder, array, row).unwrap(); |
666 | | - } |
| 653 | + let row_stream: Box<dyn Iterator<Item = _> + Send> = match rb { |
| 654 | + Ok(rb) => { |
| 655 | + let rows = rb.num_rows(); |
| 656 | + let cols = rb.num_columns(); |
| 657 | + |
| 658 | + let fields = fields_ref.clone(); |
| 659 | + |
| 660 | + let row_stream = (0..rows).map(move |row| { |
| 661 | + let mut encoder = DataRowEncoder::new(fields.clone()); |
| 662 | + for col in 0..cols { |
| 663 | + let array = rb.column(col); |
| 664 | + if array.is_null(row) { |
| 665 | + encoder.encode_field(&None::<i8>).unwrap(); |
| 666 | + } else { |
| 667 | + encode_value(&mut encoder, array, row).unwrap(); |
| 668 | + } |
| 669 | + } |
| 670 | + encoder.finish() |
| 671 | + }); |
| 672 | + Box::new(row_stream) |
667 | 673 | } |
668 | | - encoder.finish() |
669 | | - }); |
| 674 | + Err(e) => Box::new(iter::once(Err(PgWireError::ApiError(e.into())))), |
| 675 | + }; |
670 | 676 |
|
671 | 677 | stream::iter(row_stream) |
672 | 678 | }) |
|
0 commit comments