diff --git a/datafusion-postgres/src/pg_catalog.rs b/datafusion-postgres/src/pg_catalog.rs index b4a46b3..90e3a31 100644 --- a/datafusion-postgres/src/pg_catalog.rs +++ b/datafusion-postgres/src/pg_catalog.rs @@ -10,13 +10,11 @@ use datafusion::arrow::array::{ use datafusion::arrow::datatypes::{DataType, Field, SchemaRef}; use datafusion::arrow::ipc::reader::FileReader; use datafusion::catalog::streaming::StreamingTable; -use datafusion::catalog::{CatalogProviderList, SchemaProvider}; +use datafusion::catalog::{CatalogProviderList, MemTable, SchemaProvider}; use datafusion::common::utils::SingleRowListArrayBuilder; use datafusion::datasource::{TableProvider, ViewTable}; use datafusion::error::{DataFusionError, Result}; -use datafusion::execution::{SendableRecordBatchStream, TaskContext}; use datafusion::logical_expr::{ColumnarValue, ScalarUDF, Volatility}; -use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::streaming::PartitionStream; use datafusion::prelude::{create_udf, SessionContext}; use postgres_types::Oid; @@ -394,19 +392,10 @@ impl ArrowTable { data: batches, }) } -} - -impl PartitionStream for ArrowTable { - fn schema(&self) -> &SchemaRef { - &self.schema - } - fn execute(&self, _ctx: Arc) -> SendableRecordBatchStream { - let data = self.data.clone(); - Box::pin(RecordBatchStreamAdapter::new( - self.schema.clone(), - futures::stream::iter(data.into_iter().map(Ok)), - )) + /// Convert the arrow data into datafusion MemTable + pub fn try_into_memtable(self) -> Result { + MemTable::try_new(self.schema, vec![self.data]) } } @@ -664,8 +653,8 @@ impl PgCatalogStaticTables { /// Create table from dumped arrow data fn create_arrow_table(data_bytes: Vec) -> Result> { let table = ArrowTable::from_ipc_data(data_bytes)?; - let streaming_table = StreamingTable::try_new(table.schema.clone(), vec![Arc::new(table)])?; - Ok(Arc::new(streaming_table)) + let mem_table = table.try_into_memtable()?; + Ok(Arc::new(mem_table)) } }