Skip to content

Commit 8206810

Browse files
authored
feat: FlightSQL support update (DDL and insert) (#10765)
* refactor: make has_result_set a method of Plan. * refactor: Column::into_arrow_rs support Boolean. * feat: DataBlock::from_record_batch. * refactor: polish log for do_action_close_prepared_statement. * refactor: do_action_create_prepared_statement return empty schema for plan without result set. * feat: support FlightSQL support update. * feat: covert int arrays from arrow_rs. * feat: FlightSQL impl do_put_statement_update. * ci: test FlightSQL update. * feat: change FlightSQL port to 8900. * chore: clear println.
1 parent bf82c95 commit 8206810

File tree

16 files changed

+359
-75
lines changed

16 files changed

+359
-75
lines changed

scripts/ci/deploy/config/databend-query-node-1.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ http_handler_port = 8000
2929

3030
# Databend Query FlightSQL Handler.
3131
flight_sql_handler_host = "0.0.0.0"
32-
flight_sql_handler_port = 50050
32+
flight_sql_handler_port = 8900
3333

3434
tenant_id = "test_tenant"
3535
cluster_id = "test_cluster"

scripts/ci/deploy/config/databend-query-node-2.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ http_handler_port = 8002
2929

3030
# Databend Query FlightSQL Handler.
3131
flight_sql_handler_host = "0.0.0.0"
32-
flight_sql_handler_port = 7002
32+
flight_sql_handler_port = 8902
3333

3434
tenant_id = "test_tenant"
3535
cluster_id = "test_cluster"

scripts/ci/deploy/config/databend-query-node-3.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ http_handler_port = 8003
3030

3131
# Databend Query FlightSQL Handler.
3232
flight_sql_handler_host = "0.0.0.0"
33-
flight_sql_handler_port = 7003
33+
flight_sql_handler_port = 8903
3434

3535
tenant_id = "test_tenant"
3636
cluster_id = "test_cluster"

scripts/ci/deploy/config/databend-query-node-shared.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ clickhouse_http_handler_port = 58124
2727
http_handler_host = "0.0.0.0"
2828
http_handler_port = 58000
2929

30+
flight_sql_handler_host = "0.0.0.0"
31+
flight_sql_handler_port = 58900
32+
3033
tenant_id = "shared_tenant"
3134
cluster_id = "test_cluster"
3235

src/query/config/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1154,7 +1154,7 @@ pub struct QueryConfig {
11541154
#[clap(long, default_value = "127.0.0.1")]
11551155
pub flight_sql_handler_host: String,
11561156

1157-
#[clap(long, default_value = "7000")]
1157+
#[clap(long, default_value = "8900")]
11581158
pub flight_sql_handler_port: u16,
11591159

11601160
#[clap(long, default_value = "127.0.0.1:9090")]

src/query/config/src/inner.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,7 @@ impl Default for QueryConfig {
196196
http_handler_result_timeout_secs: 60,
197197
flight_api_address: "127.0.0.1:9090".to_string(),
198198
flight_sql_handler_host: "127.0.0.1".to_string(),
199-
flight_sql_handler_port: 7000,
199+
flight_sql_handler_port: 8900,
200200
admin_api_address: "127.0.0.1:8080".to_string(),
201201
metric_api_address: "127.0.0.1:7070".to_string(),
202202
api_tls_server_cert: "".to_string(),

src/query/expression/src/convert_arrow_rs/array.rs

Lines changed: 120 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,19 @@ use std::sync::Arc;
1717
use arrow_array::make_array;
1818
use arrow_array::Array;
1919
use arrow_array::ArrowPrimitiveType;
20+
use arrow_array::BooleanArray;
21+
use arrow_array::Int16Array;
22+
use arrow_array::Int32Array;
23+
use arrow_array::Int64Array;
24+
use arrow_array::Int8Array;
2025
use arrow_array::LargeStringArray;
2126
use arrow_array::NullArray;
2227
use arrow_array::PrimitiveArray;
28+
use arrow_array::StringArray;
29+
use arrow_array::UInt16Array;
30+
use arrow_array::UInt32Array;
31+
use arrow_array::UInt64Array;
32+
use arrow_array::UInt8Array;
2333
use arrow_buffer::i256;
2434
use arrow_buffer::ArrowNativeType;
2535
use arrow_buffer::Buffer;
@@ -28,12 +38,15 @@ use arrow_data::ArrayDataBuilder;
2838
use arrow_schema::ArrowError;
2939
use arrow_schema::DataType;
3040
use arrow_schema::TimeUnit;
41+
use common_arrow::arrow::bitmap::Bitmap;
3142
use common_arrow::arrow::buffer::Buffer as Buffer2;
3243
use common_arrow::arrow::Either;
3344
use ordered_float::OrderedFloat;
3445

3546
use crate::types::decimal::DecimalColumn;
47+
use crate::types::nullable::NullableColumn;
3648
use crate::types::number::NumberColumn;
49+
use crate::types::string::StringColumn;
3750
use crate::Column;
3851

3952
fn try_take_buffer<T: Clone>(buffer: Buffer2<T>) -> Vec<T> {
@@ -65,6 +78,14 @@ impl Column {
6578
Column::Null { len } => Arc::new(NullArray::new(len)),
6679
Column::EmptyArray { len } => Arc::new(NullArray::new(len)),
6780
Column::EmptyMap { len } => Arc::new(NullArray::new(len)),
81+
Column::Boolean(bitmap) => {
82+
let buf = bitmap.as_slice().0;
83+
let array_data = ArrayData::builder(DataType::Boolean)
84+
.len(bitmap.len())
85+
.add_buffer(buf.into())
86+
.build()?;
87+
Arc::new(BooleanArray::from(array_data))
88+
}
6889
Column::String(col) => {
6990
let len = col.len();
7091
let values: Vec<u8> = try_take_buffer(col.data);
@@ -160,10 +181,108 @@ impl Column {
160181
_ => {
161182
let data_type = self.data_type();
162183
Err(ArrowError::NotYetImplemented(format!(
163-
"Column::into_arrow_rs() for {data_type} not implemented yet"
184+
"Column::into_arrow_rs() for {data_type} not implemented yet"
164185
)))?
165186
}
166187
};
167188
Ok(array)
168189
}
190+
191+
pub fn from_arrow_rs(array: Arc<dyn Array>) -> Result<Self, ArrowError> {
192+
let data_type = array.data_type();
193+
let column = match data_type {
194+
DataType::Null => Column::Null { len: array.len() },
195+
DataType::LargeUtf8 => {
196+
let array = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
197+
let offsets = array.value_offsets().to_vec();
198+
let offsets = unsafe { std::mem::transmute::<Vec<i64>, Vec<u64>>(offsets) };
199+
Column::String(StringColumn {
200+
offsets: offsets.into(),
201+
data: array.value_data().to_vec().into(),
202+
})
203+
}
204+
DataType::Utf8 => {
205+
let array = array.as_any().downcast_ref::<StringArray>().unwrap();
206+
let offsets = array
207+
.value_offsets()
208+
.iter()
209+
.map(|x| *x as u64)
210+
.collect::<Vec<_>>();
211+
Column::String(StringColumn {
212+
offsets: offsets.into(),
213+
data: array.value_data().to_vec().into(),
214+
})
215+
}
216+
DataType::Boolean => {
217+
let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
218+
let bytes = array.values().clone().into_vec().map_err(|_| {
219+
ArrowError::CastError(
220+
"can not covert Buffer of BooleanArray to Vec".to_string(),
221+
)
222+
})?;
223+
let bitmap = Bitmap::try_new(bytes, array.len()).map_err(|e| {
224+
ArrowError::CastError(format!(
225+
"can not covert BooleanArray to Column::Boolean: {e:?}"
226+
))
227+
})?;
228+
Column::Boolean(bitmap)
229+
}
230+
DataType::Int8 => {
231+
let array = array.as_any().downcast_ref::<Int8Array>().unwrap();
232+
let buffer2: Buffer2<i8> = array.values().to_vec().into();
233+
Column::Number(NumberColumn::Int8(buffer2))
234+
}
235+
DataType::UInt8 => {
236+
let array = array.as_any().downcast_ref::<UInt8Array>().unwrap();
237+
let buffer2: Buffer2<u8> = array.values().to_vec().into();
238+
Column::Number(NumberColumn::UInt8(buffer2))
239+
}
240+
DataType::Int16 => {
241+
let array = array.as_any().downcast_ref::<Int16Array>().unwrap();
242+
let buffer2: Buffer2<i16> = array.values().to_vec().into();
243+
Column::Number(NumberColumn::Int16(buffer2))
244+
}
245+
DataType::UInt16 => {
246+
let array = array.as_any().downcast_ref::<UInt16Array>().unwrap();
247+
let buffer2: Buffer2<u16> = array.values().to_vec().into();
248+
Column::Number(NumberColumn::UInt16(buffer2))
249+
}
250+
DataType::Int32 => {
251+
let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
252+
let buffer2: Buffer2<i32> = array.values().to_vec().into();
253+
Column::Number(NumberColumn::Int32(buffer2))
254+
}
255+
DataType::UInt32 => {
256+
let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
257+
let buffer2: Buffer2<u32> = array.values().to_vec().into();
258+
Column::Number(NumberColumn::UInt32(buffer2))
259+
}
260+
DataType::Int64 => {
261+
let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
262+
let buffer2: Buffer2<i64> = array.values().to_vec().into();
263+
Column::Number(NumberColumn::Int64(buffer2))
264+
}
265+
DataType::UInt64 => {
266+
let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
267+
let buffer2: Buffer2<u64> = array.values().to_vec().into();
268+
Column::Number(NumberColumn::UInt64(buffer2))
269+
}
270+
271+
_ => Err(ArrowError::NotYetImplemented(format!(
272+
"Column::from_arrow_rs() for {data_type} not implemented yet"
273+
)))?,
274+
};
275+
if let Some(nulls) = array.into_data().nulls() {
276+
let validity =
277+
Bitmap::try_new(nulls.buffer().to_vec(), nulls.offset()).map_err(|e| {
278+
ArrowError::CastError(format!(
279+
"fail to cast arrow_rs::NullBuffer to arrow2::Bitmap: {e}"
280+
))
281+
})?;
282+
let column = NullableColumn { column, validity };
283+
Ok(Column::Nullable(Box::new(column)))
284+
} else {
285+
Ok(column)
286+
}
287+
}
169288
}

src/query/expression/src/convert_arrow_rs/record_batch.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use arrow_array::RecordBatch;
1818
use arrow_schema::ArrowError;
1919

20+
use crate::Column;
2021
use crate::DataBlock;
2122
use crate::DataSchema;
2223

@@ -30,4 +31,13 @@ impl DataBlock {
3031
let schema = Arc::new(data_schema.into());
3132
RecordBatch::try_new(schema, arrays)
3233
}
34+
35+
pub fn from_record_batch(batch: &RecordBatch) -> Result<(Self, DataSchema), ArrowError> {
36+
let mut columns = Vec::with_capacity(batch.columns().len());
37+
for array in batch.columns() {
38+
columns.push(Column::from_arrow_rs(array.clone())?)
39+
}
40+
let schema = DataSchema::try_from(&(*batch.schema()))?;
41+
Ok((DataBlock::new_from_columns(columns), schema))
42+
}
3343
}

src/query/expression/src/convert_arrow_rs/schema.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use arrow_schema::ArrowError;
1516
use arrow_schema::DataType as ArrowDataType;
1617
use arrow_schema::Field as ArrowField;
1718
use arrow_schema::Schema as ArrowSchema;
1819
use arrow_schema::TimeUnit;
1920

21+
use crate::types::decimal::DecimalSize;
2022
use crate::types::DataType;
2123
use crate::types::DecimalDataType;
2224
use crate::types::NumberDataType;
@@ -127,3 +129,73 @@ impl From<&DataSchema> for ArrowSchema {
127129
}
128130
}
129131
}
132+
133+
impl TryFrom<&ArrowField> for DataField {
134+
type Error = ArrowError;
135+
136+
fn try_from(f: &ArrowField) -> Result<Self, ArrowError> {
137+
let ty = f.data_type().try_into()?;
138+
if f.is_nullable() {
139+
Ok(DataField::new_nullable(f.name().as_str(), ty))
140+
} else {
141+
Ok(DataField::new(f.name(), ty))
142+
}
143+
}
144+
}
145+
146+
impl TryFrom<&ArrowSchema> for DataSchema {
147+
type Error = ArrowError;
148+
149+
fn try_from(schema: &ArrowSchema) -> Result<Self, ArrowError> {
150+
let mut fields = vec![];
151+
for field in &schema.fields {
152+
fields.push(DataField::try_from(field)?)
153+
}
154+
Ok(DataSchema {
155+
fields,
156+
metadata: Default::default(),
157+
})
158+
}
159+
}
160+
161+
impl TryFrom<&ArrowDataType> for DataType {
162+
type Error = ArrowError;
163+
164+
fn try_from(ty: &ArrowDataType) -> Result<Self, ArrowError> {
165+
let data_type = match ty {
166+
ArrowDataType::Null => DataType::Null,
167+
ArrowDataType::Boolean => DataType::Boolean,
168+
ArrowDataType::Int8 => DataType::Number(NumberDataType::Int8),
169+
ArrowDataType::Int16 => DataType::Number(NumberDataType::Int16),
170+
ArrowDataType::Int32 => DataType::Number(NumberDataType::Int32),
171+
ArrowDataType::Int64 => DataType::Number(NumberDataType::Int64),
172+
ArrowDataType::UInt8 => DataType::Number(NumberDataType::UInt8),
173+
ArrowDataType::UInt16 => DataType::Number(NumberDataType::UInt16),
174+
ArrowDataType::UInt32 => DataType::Number(NumberDataType::UInt32),
175+
ArrowDataType::UInt64 => DataType::Number(NumberDataType::UInt64),
176+
ArrowDataType::Float32 | ArrowDataType::Float16 => {
177+
DataType::Number(NumberDataType::Float32)
178+
}
179+
ArrowDataType::Float64 => DataType::Number(NumberDataType::Float64),
180+
ArrowDataType::Timestamp(_unit, _tz) => DataType::Timestamp,
181+
ArrowDataType::Date32 | ArrowDataType::Date64 => DataType::Date,
182+
ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => DataType::String,
183+
ArrowDataType::Decimal128(p, s) => {
184+
DataType::Decimal(DecimalDataType::Decimal128(DecimalSize {
185+
precision: *p,
186+
scale: (*s) as u8,
187+
}))
188+
}
189+
ArrowDataType::Decimal256(p, s) => {
190+
DataType::Decimal(DecimalDataType::Decimal256(DecimalSize {
191+
precision: *p,
192+
scale: (*s) as u8,
193+
}))
194+
}
195+
_ => Err(ArrowError::CastError(format!(
196+
"cast {ty} to DataType not not implemented yet"
197+
)))?,
198+
};
199+
Ok(data_type)
200+
}
201+
}

src/query/service/src/servers/flight_sql/flight_sql_service/query.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,30 @@ impl FlightSqlServiceImpl {
6161
planner.plan_sql(query).await
6262
}
6363

64-
pub(super) async fn execute_plan(
64+
pub(super) async fn execute_update(
65+
&self,
66+
session: Arc<Session>,
67+
plan: &Plan,
68+
plan_extras: &PlanExtras,
69+
) -> Result<i64> {
70+
let context = session
71+
.create_query_context()
72+
.await
73+
.map_err(|e| status!("Could not create_query_context", e))?;
74+
75+
context.attach_query_str(plan.to_string(), plan_extras.stament.to_mask_sql());
76+
let interpreter = InterpreterFactory::get(context.clone(), plan).await?;
77+
78+
let mut blocks = interpreter.execute(context.clone()).await?;
79+
while let Some(block) = blocks.next().await {
80+
block?;
81+
}
82+
83+
let affected_rows = context.get_write_progress_value().rows;
84+
Ok(affected_rows as i64)
85+
}
86+
87+
pub(super) async fn execute_query(
6588
&self,
6689
session: Arc<Session>,
6790
plan: &Plan,

0 commit comments

Comments
 (0)