Skip to content

Commit df2193e

Browse files
authored
Merge pull request #38 from cloneyate/main
update dependencies and add method serve_with_listener
2 parents dfd5c34 + e1321e2 commit df2193e

File tree

7 files changed

+42
-30
lines changed

7 files changed

+42
-30
lines changed

Cargo.toml

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,12 @@ readme = "README.md"
1414
repository = "https://github.com/datafusion-contrib/datafusion-flight-sql-server"
1515

1616
[workspace.dependencies]
17-
arrow = "56"
18-
arrow-flight = { version = "56", features = ["flight-sql"] }
19-
arrow-json = "56"
17+
arrow-flight = { version = "57", features = ["flight-sql"] }
2018
async-trait = "0.1.88"
21-
datafusion = "50"
22-
datafusion-federation = { version = "0.4.10" }
23-
datafusion-substrait = "50"
19+
datafusion = "51"
20+
datafusion-federation = { version = "0.4.11" }
21+
datafusion-substrait = "51"
2422
futures = "0.3.31"
2523
tokio = { version = "1.47", features = ["full"] }
26-
tonic = { version = "0.13", features = ["transport", "codegen", "prost"] }
24+
tonic = { version = "0.14", features = ["transport", "codegen"] }
25+
prost = "0.14"

datafusion-flight-sql-server/Cargo.toml

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,19 @@ protoc = ["datafusion-substrait/protoc"]
1717

1818
[dependencies]
1919
arrow-flight.workspace = true
20-
arrow.workspace = true
2120
datafusion-federation = { workspace = true, features = ["sql"] }
2221
datafusion-substrait.workspace = true
2322
datafusion.workspace = true
2423
futures.workspace = true
2524
log = "0.4"
2625
once_cell = "1.21"
27-
prost = "0.13"
26+
prost.workspace = true
2827
tonic.workspace = true
2928
async-trait.workspace = true
30-
tonic-async-interceptor = "0.13"
29+
tokio-stream = "0.1.17"
30+
tokio = { version = "1.47", features = ["net"], default-features = false }
3131

3232
[dev-dependencies]
3333
tokio.workspace = true
3434
datafusion-flight-sql-table-provider = { path = "../datafusion-flight-sql-table-provider" }
35+
tonic-async-interceptor = "0.14"

datafusion-flight-sql-server/src/service.rs

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,5 @@
11
use std::{collections::BTreeMap, pin::Pin, sync::Arc};
22

3-
use arrow::{
4-
array::{ArrayRef, RecordBatch, StringArray},
5-
compute::concat_batches,
6-
datatypes::{DataType, Field, SchemaBuilder, SchemaRef},
7-
error::ArrowError,
8-
ipc::{
9-
reader::StreamReader,
10-
writer::{IpcWriteOptions, StreamWriter},
11-
},
12-
};
133
use arrow_flight::{
144
decode::{DecodedPayload, FlightDataDecoder},
155
sql::{
@@ -35,6 +25,16 @@ use arrow_flight::{
3525
Action, FlightDescriptor, FlightEndpoint, FlightInfo, HandshakeRequest, HandshakeResponse,
3626
IpcMessage, SchemaAsIpc, Ticket,
3727
};
28+
use datafusion::arrow::{
29+
array::{ArrayRef, RecordBatch, StringArray},
30+
compute::concat_batches,
31+
datatypes::{DataType, Field, SchemaBuilder, SchemaRef},
32+
error::ArrowError,
33+
ipc::{
34+
reader::StreamReader,
35+
writer::{IpcWriteOptions, StreamWriter},
36+
},
37+
};
3838
use datafusion::{
3939
common::{arrow::datatypes::Schema, ParamValues},
4040
dataframe::DataFrame,
@@ -115,6 +115,21 @@ impl FlightSqlService {
115115
Ok(Server::builder().add_service(svc).serve(addr).await?)
116116
}
117117

118+
pub async fn serve_with_listener(
119+
self,
120+
listener: std::net::TcpListener,
121+
) -> Result<(), Box<dyn std::error::Error>> {
122+
info!("Listening on {}", listener.local_addr()?);
123+
124+
let svc = FlightServiceServer::new(self);
125+
let listener = tokio::net::TcpListener::from_std(listener)?;
126+
127+
Ok(Server::builder()
128+
.add_service(svc)
129+
.serve_with_incoming(tokio_stream::wrappers::TcpListenerStream::new(listener))
130+
.await?)
131+
}
132+
118133
async fn new_context<T>(
119134
&self,
120135
request: Request<T>,
@@ -1051,7 +1066,7 @@ fn get_schema_for_plan(logical_plan: &LogicalPlan, with_metadata: bool) -> Schem
10511066
df_schema.as_ref().metadata().clone(),
10521067
))
10531068
} else {
1054-
Arc::new(Schema::from(logical_plan.schema().as_ref()))
1069+
Arc::new(logical_plan.schema().as_arrow().clone())
10551070
};
10561071

10571072
// Use an empty FlightDataEncoder to determine the schema of the encoded flight data.
@@ -1126,9 +1141,7 @@ async fn decode_schema(decoder: &mut FlightDataDecoder) -> Result<SchemaRef, Sta
11261141
}
11271142

11281143
// Decode parameter ipc stream as ParamValues
1129-
fn decode_param_values(
1130-
parameters: Option<&[u8]>,
1131-
) -> Result<Option<ParamValues>, arrow::error::ArrowError> {
1144+
fn decode_param_values(parameters: Option<&[u8]>) -> Result<Option<ParamValues>, ArrowError> {
11321145
parameters
11331146
.map(|parameters| {
11341147
let decoder = StreamReader::try_new(parameters, None)?;

datafusion-flight-sql-server/tests/integration_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::sync::Arc;
22

3-
use arrow::{
3+
use arrow_flight::sql::client::FlightSqlServiceClient;
4+
use datafusion::arrow::{
45
array::{Int32Array, RecordBatch, StringArray},
56
datatypes::{DataType, Field, Schema},
67
};
7-
use arrow_flight::sql::client::FlightSqlServiceClient;
88
use datafusion::{
99
datasource::MemTable,
1010
execution::context::{SessionContext, SessionState},

datafusion-flight-sql-server/tests/schema_metadata_test.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
use std::sync::Arc;
22

3-
use arrow::{
3+
use arrow_flight::sql::client::FlightSqlServiceClient;
4+
use datafusion::arrow::{
45
array::{Int32Array, RecordBatch, StringArray},
56
datatypes::{DataType, Field, Schema},
67
};
7-
use arrow_flight::sql::client::FlightSqlServiceClient;
88
use datafusion::{
99
datasource::MemTable,
1010
execution::context::{SessionContext, SessionState},

datafusion-flight-sql-table-provider/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ readme.workspace = true
77

88
[dependencies]
99
arrow-flight.workspace = true
10-
arrow.workspace = true
1110
async-trait.workspace = true
1211
datafusion-federation = { workspace = true, features = ["sql"] }
1312
datafusion.workspace = true

datafusion-flight-sql-table-provider/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
use std::sync::Arc;
22

3-
use arrow::{datatypes::SchemaRef, error::ArrowError};
43
use arrow_flight::sql::client::FlightSqlServiceClient;
54
use async_trait::async_trait;
5+
use datafusion::arrow::{datatypes::SchemaRef, error::ArrowError};
66
use datafusion::{
77
error::{DataFusionError, Result},
88
physical_plan::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream},

0 commit comments

Comments
 (0)