1414
1515use std:: sync:: Arc ;
1616
17- use arrow_flight:: utils:: batches_to_flight_data;
1817use arrow_flight:: FlightData ;
18+ use arrow_flight:: SchemaAsIpc ;
19+ use arrow_ipc:: writer;
20+ use arrow_ipc:: writer:: IpcWriteOptions ;
21+ use arrow_schema:: Schema as ArrowSchema ;
1922use async_stream:: stream;
2023use common_exception:: ErrorCode ;
2124use common_exception:: Result ;
@@ -35,16 +38,25 @@ use crate::interpreters::InterpreterFactory;
3538use crate :: sessions:: Session ;
3639
3740impl FlightSqlServiceImpl {
38- pub ( super ) fn block_to_flight_data (
39- block : DataBlock ,
40- data_schema : & DataSchema ,
41- ) -> Result < Vec < FlightData > > {
41+ pub ( crate ) fn schema_to_flight_data ( data_schema : DataSchema ) -> FlightData {
42+ let arrow_schema = ArrowSchema :: from ( & data_schema) ;
43+ let options = IpcWriteOptions :: default ( ) ;
44+ SchemaAsIpc :: new ( & arrow_schema, & options) . into ( )
45+ }
46+
47+ pub fn block_to_flight_data ( block : DataBlock , data_schema : & DataSchema ) -> Result < FlightData > {
4248 let batch = block
4349 . to_record_batch ( data_schema)
4450 . map_err ( |e| ErrorCode :: Internal ( format ! ( "{e:?}" ) ) ) ?;
45- let schema = ( * batch. schema ( ) ) . clone ( ) ;
46- let batches = vec ! [ batch] ;
47- batches_to_flight_data ( schema, batches) . map_err ( |e| ErrorCode :: Internal ( format ! ( "{e:?}" ) ) )
51+ let options = IpcWriteOptions :: default ( ) ;
52+ let data_gen = writer:: IpcDataGenerator :: default ( ) ;
53+ let mut dictionary_tracker = writer:: DictionaryTracker :: new ( false ) ;
54+
55+ let ( _encoded_dictionaries, encoded_batch) = data_gen
56+ . encoded_batch ( & batch, & mut dictionary_tracker, & options)
57+ . map_err ( |e| ErrorCode :: Internal ( format ! ( "{e:?}" ) ) ) ?;
58+
59+ Ok ( encoded_batch. into ( ) )
4860 }
4961
5062 #[ async_backtrace:: framed]
@@ -101,18 +113,18 @@ impl FlightSqlServiceImpl {
101113 context. attach_query_str ( plan. to_string ( ) , plan_extras. stament . to_mask_sql ( ) ) ;
102114 let interpreter = InterpreterFactory :: get ( context. clone ( ) , plan) . await ?;
103115 let data_schema = interpreter. schema ( ) ;
116+ let schema_flight_data = Self :: schema_to_flight_data ( ( * data_schema) . clone ( ) ) ;
104117
105118 let mut data_stream = interpreter. execute ( context. clone ( ) ) . await ?;
106119
107120 let stream = stream ! {
121+ yield Ok ( schema_flight_data) ;
108122 while let Some ( block) = data_stream. next( ) . await {
109123 match block {
110124 Ok ( block) => {
111125 match Self :: block_to_flight_data( block, & data_schema) {
112- Ok ( data) => {
113- for d in data {
114- yield Ok ( d)
115- }
126+ Ok ( flight_data) => {
127+ yield Ok ( flight_data)
116128 }
117129 Err ( err) => {
118130 yield Err ( status!( "Could not convert batches" , err) )
0 commit comments