diff --git a/Cargo.toml b/Cargo.toml index fd29196..851e10b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,13 +14,12 @@ readme = "README.md" repository = "https://github.com/datafusion-contrib/datafusion-flight-sql-server" [workspace.dependencies] -arrow = "56" -arrow-flight = { version = "56", features = ["flight-sql"] } -arrow-json = "56" +arrow-flight = { version = "57", features = ["flight-sql"] } async-trait = "0.1.88" -datafusion = "50" -datafusion-federation = { version = "0.4.10" } -datafusion-substrait = "50" +datafusion = "51" +datafusion-federation = { version = "0.4.11" } +datafusion-substrait = "51" futures = "0.3.31" tokio = { version = "1.47", features = ["full"] } -tonic = { version = "0.13", features = ["transport", "codegen", "prost"] } +tonic = { version = "0.14", features = ["transport", "codegen"] } +prost = "0.14" \ No newline at end of file diff --git a/datafusion-flight-sql-server/Cargo.toml b/datafusion-flight-sql-server/Cargo.toml index 744626b..9aaba9d 100644 --- a/datafusion-flight-sql-server/Cargo.toml +++ b/datafusion-flight-sql-server/Cargo.toml @@ -17,18 +17,19 @@ protoc = ["datafusion-substrait/protoc"] [dependencies] arrow-flight.workspace = true -arrow.workspace = true datafusion-federation = { workspace = true, features = ["sql"] } datafusion-substrait.workspace = true datafusion.workspace = true futures.workspace = true log = "0.4" once_cell = "1.21" -prost = "0.13" +prost.workspace = true tonic.workspace = true async-trait.workspace = true -tonic-async-interceptor = "0.13" +tokio-stream = "0.1.17" +tokio = { version = "1.47", features = ["net"], default-features = false } [dev-dependencies] tokio.workspace = true datafusion-flight-sql-table-provider = { path = "../datafusion-flight-sql-table-provider" } +tonic-async-interceptor = "0.14" diff --git a/datafusion-flight-sql-server/src/service.rs b/datafusion-flight-sql-server/src/service.rs index 150bb98..dbaebc4 100644 --- a/datafusion-flight-sql-server/src/service.rs +++ b/datafusion-flight-sql-server/src/service.rs @@ -1,15 +1,5 @@ use std::{collections::BTreeMap, pin::Pin, sync::Arc}; -use arrow::{ - array::{ArrayRef, RecordBatch, StringArray}, - compute::concat_batches, - datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, - error::ArrowError, - ipc::{ - reader::StreamReader, - writer::{IpcWriteOptions, StreamWriter}, - }, -}; use arrow_flight::{ decode::{DecodedPayload, FlightDataDecoder}, sql::{ @@ -35,6 +25,16 @@ use arrow_flight::{ Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse, IpcMessage, SchemaAsIpc, Ticket, }; +use datafusion::arrow::{ + array::{ArrayRef, RecordBatch, StringArray}, + compute::concat_batches, + datatypes::{DataType, Field, SchemaBuilder, SchemaRef}, + error::ArrowError, + ipc::{ + reader::StreamReader, + writer::{IpcWriteOptions, StreamWriter}, + }, +}; use datafusion::{ common::{arrow::datatypes::Schema, ParamValues}, dataframe::DataFrame, @@ -115,6 +115,21 @@ impl FlightSqlService { Ok(Server::builder().add_service(svc).serve(addr).await?) } + pub async fn serve_with_listener( + self, + listener: std::net::TcpListener, + ) -> Result<(), Box> { + info!("Listening on {}", listener.local_addr()?); + + let svc = FlightServiceServer::new(self); + let listener = tokio::net::TcpListener::from_std(listener)?; + + Ok(Server::builder() + .add_service(svc) + .serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener)) + .await?) + } + async fn new_context( &self, request: Request, @@ -1051,7 +1066,7 @@ fn get_schema_for_plan(logical_plan: &LogicalPlan, with_metadata: bool) -> Schem df_schema.as_ref().metadata().clone(), )) } else { - Arc::new(Schema::from(logical_plan.schema().as_ref())) + Arc::new(logical_plan.schema().as_arrow().clone()) }; // Use an empty FlightDataEncoder to determine the schema of the encoded flight data. @@ -1126,9 +1141,7 @@ async fn decode_schema(decoder: &mut FlightDataDecoder) -> Result, -) -> Result, arrow::error::ArrowError> { +fn decode_param_values(parameters: Option<&[u8]>) -> Result, ArrowError> { parameters .map(|parameters| { let decoder = StreamReader::try_new(parameters, None)?; diff --git a/datafusion-flight-sql-server/tests/integration_test.rs b/datafusion-flight-sql-server/tests/integration_test.rs index 9ce4795..72b9f00 100644 --- a/datafusion-flight-sql-server/tests/integration_test.rs +++ b/datafusion-flight-sql-server/tests/integration_test.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use arrow::{ +use arrow_flight::sql::client::FlightSqlServiceClient; +use datafusion::arrow::{ array::{Int32Array, RecordBatch, StringArray}, datatypes::{DataType, Field, Schema}, }; -use arrow_flight::sql::client::FlightSqlServiceClient; use datafusion::{ datasource::MemTable, execution::context::{SessionContext, SessionState}, diff --git a/datafusion-flight-sql-server/tests/schema_metadata_test.rs b/datafusion-flight-sql-server/tests/schema_metadata_test.rs index 8018539..877d30b 100644 --- a/datafusion-flight-sql-server/tests/schema_metadata_test.rs +++ b/datafusion-flight-sql-server/tests/schema_metadata_test.rs @@ -1,10 +1,10 @@ use std::sync::Arc; -use arrow::{ +use arrow_flight::sql::client::FlightSqlServiceClient; +use datafusion::arrow::{ array::{Int32Array, RecordBatch, StringArray}, datatypes::{DataType, Field, Schema}, }; -use arrow_flight::sql::client::FlightSqlServiceClient; use datafusion::{ datasource::MemTable, execution::context::{SessionContext, SessionState}, diff --git a/datafusion-flight-sql-table-provider/Cargo.toml b/datafusion-flight-sql-table-provider/Cargo.toml index 79a1311..51f51aa 100644 --- a/datafusion-flight-sql-table-provider/Cargo.toml +++ b/datafusion-flight-sql-table-provider/Cargo.toml @@ -7,7 +7,6 @@ readme.workspace = true [dependencies] arrow-flight.workspace = true -arrow.workspace = true async-trait.workspace = true datafusion-federation = { workspace = true, features = ["sql"] } datafusion.workspace = true diff --git a/datafusion-flight-sql-table-provider/src/lib.rs b/datafusion-flight-sql-table-provider/src/lib.rs index 0bcf432..bcfea8e 100644 --- a/datafusion-flight-sql-table-provider/src/lib.rs +++ b/datafusion-flight-sql-table-provider/src/lib.rs @@ -1,8 +1,8 @@ use std::sync::Arc; -use arrow::{datatypes::SchemaRef, error::ArrowError}; use arrow_flight::sql::client::FlightSqlServiceClient; use async_trait::async_trait; +use datafusion::arrow::{datatypes::SchemaRef, error::ArrowError}; use datafusion::{ error::{DataFusionError, Result}, physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},