Skip to content

Commit 7187df1

Browse files
authored
refactor: use BlocksSerializer to replace StringBlock to simplify the serialization (#17667)
* perf: use `BlocksSerializer` to replace `StringBlock` to simplify the serialization Signed-off-by: Kould <[email protected]> * chore: codefmt Signed-off-by: Kould <[email protected]> * chore: fix ci Signed-off-by: Kould <[email protected]> * chore: fix typo Signed-off-by: Kould <[email protected]> * chore: resize remain_size for flightsql Signed-off-by: Kould <[email protected]> --------- Signed-off-by: Kould <[email protected]>
1 parent 865b150 commit 7187df1

File tree

12 files changed

+426
-240
lines changed

12 files changed

+426
-240
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/query/service/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ fastrace = { workspace = true }
120120
flatbuffers = { workspace = true }
121121
futures = { workspace = true }
122122
futures-util = { workspace = true }
123+
geozero = { workspace = true }
123124
headers = { workspace = true }
124125
hex = { workspace = true }
125126
highway = { workspace = true }

src/query/service/src/servers/flight_sql/flight_sql_service/query.rs

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::ops::Deref;
16+
use std::pin::pin;
1617
use std::sync::atomic::AtomicBool;
1718
use std::sync::atomic::Ordering;
1819
use std::sync::Arc;
@@ -21,11 +22,11 @@ use std::sync::LazyLock;
2122
use arrow_flight::FlightData;
2223
use arrow_flight::SchemaAsIpc;
2324
use arrow_ipc::writer;
24-
use arrow_ipc::writer::IpcWriteOptions;
2525
use arrow_ipc::MessageBuilder;
2626
use arrow_ipc::MessageHeader;
2727
use arrow_ipc::MetadataVersion;
2828
use arrow_schema::Schema as ArrowSchema;
29+
use async_stream::stream;
2930
use bytes::Bytes;
3031
use databend_common_base::base::tokio;
3132
use databend_common_exception::ErrorCode;
@@ -80,20 +81,43 @@ static DATA_HEADER_PROGRESS: LazyLock<Bytes> = LazyLock::new(|| {
8081
impl FlightSqlServiceImpl {
8182
pub(crate) fn schema_to_flight_data(data_schema: DataSchema) -> FlightData {
8283
let arrow_schema = ArrowSchema::from(&data_schema);
83-
let options = IpcWriteOptions::default();
84+
let options = writer::IpcWriteOptions::default();
8485
SchemaAsIpc::new(&arrow_schema, &options).into()
8586
}
8687

87-
pub fn block_to_flight_data(block: DataBlock, data_schema: &DataSchema) -> Result<FlightData> {
88+
pub fn block_to_flight_data_stream(
89+
block: DataBlock,
90+
data_schema: &DataSchema,
91+
) -> impl Stream<Item = Result<FlightData>> + '_ {
92+
stream! {
93+
let remain_size = 4 * 1024 * 1024;
94+
let options = writer::IpcWriteOptions::default();
95+
let data_gen = writer::IpcDataGenerator::default();
96+
97+
if block.memory_size() > remain_size {
98+
let row = block.num_rows() / ((block.memory_size() / remain_size) + 1);
99+
for block in block.split_by_rows_no_tail(row) {
100+
yield Self::block_to_flight_data(block, data_schema, &options, &data_gen);
101+
}
102+
} else {
103+
yield Self::block_to_flight_data(block, data_schema, &options, &data_gen);
104+
}
105+
}
106+
}
107+
108+
fn block_to_flight_data(
109+
block: DataBlock,
110+
data_schema: &DataSchema,
111+
options: &writer::IpcWriteOptions,
112+
data_gen: &writer::IpcDataGenerator,
113+
) -> Result<FlightData> {
88114
let batch = block
89115
.to_record_batch_with_dataschema(data_schema)
90116
.map_err(|e| ErrorCode::Internal(format!("{e:?}")))?;
91-
let options = IpcWriteOptions::default();
92-
let data_gen = writer::IpcDataGenerator::default();
93117
let mut dictionary_tracker = writer::DictionaryTracker::new(false);
94118

95119
let (_encoded_dictionaries, encoded_batch) = data_gen
96-
.encoded_batch(&batch, &mut dictionary_tracker, &options)
120+
.encoded_batch(&batch, &mut dictionary_tracker, options)
97121
.map_err(|e| ErrorCode::Internal(format!("{e:?}")))?;
98122

99123
Ok(encoded_batch.into())
@@ -187,13 +211,15 @@ impl FlightSqlServiceImpl {
187211
while let Some(block) = data_stream.next().await {
188212
match block {
189213
Ok(block) => {
190-
let res =
191-
match FlightSqlServiceImpl::block_to_flight_data(block, &data_schema) {
192-
Ok(flight_data) => Ok(flight_data),
193-
Err(err) => Err(status!("Could not convert batches", err)),
194-
};
195-
196-
let _ = s1.send(res).await;
214+
let mut stream = pin!(FlightSqlServiceImpl::block_to_flight_data_stream(
215+
block,
216+
&data_schema,
217+
));
218+
while let Some(res) = stream.next().await {
219+
let _ = s1
220+
.send(res.map_err(|err| status!("Could not convert batches", err)))
221+
.await;
222+
}
197223
}
198224
Err(err) => {
199225
let _ = s1

src/query/service/src/servers/http/v1/http_query_handlers.rs

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::collections::HashMap;
16+
use std::sync::Arc;
1617

1718
use databend_common_base::base::mask_connection_info;
1819
use databend_common_base::headers::HEADER_QUERY_ID;
@@ -70,7 +71,7 @@ use crate::servers::http::v1::catalog::search_tables_handler;
7071
use crate::servers::http::v1::discovery_nodes;
7172
use crate::servers::http::v1::login_handler;
7273
use crate::servers::http::v1::logout_handler;
73-
use crate::servers::http::v1::query::string_block::StringBlock;
74+
use crate::servers::http::v1::query::blocks_serializer::BlocksSerializer;
7475
use crate::servers::http::v1::query::Progresses;
7576
use crate::servers::http::v1::refresh_handler;
7677
use crate::servers::http::v1::roles::list_roles_handler;
@@ -126,7 +127,7 @@ impl QueryResponseField {
126127
}
127128
}
128129

129-
#[derive(Serialize, Deserialize, Debug, Clone)]
130+
#[derive(Serialize, Debug, Clone)]
130131
pub struct QueryResponse {
131132
pub id: String,
132133
pub session_id: Option<String>,
@@ -142,7 +143,7 @@ pub struct QueryResponse {
142143
#[serde(skip_serializing_if = "Option::is_none")]
143144
pub has_result_set: Option<bool>,
144145
pub schema: Vec<QueryResponseField>,
145-
pub data: Vec<Vec<Option<String>>>,
146+
pub data: Arc<BlocksSerializer>,
146147
pub affect: Option<QueryAffect>,
147148
pub result_timeout_secs: Option<u64>,
148149

@@ -163,11 +164,14 @@ impl QueryResponse {
163164
) -> impl IntoResponse {
164165
let state = r.state.clone();
165166
let (data, next_uri) = if is_final {
166-
(StringBlock::empty(), None)
167+
(Arc::new(BlocksSerializer::empty()), None)
167168
} else {
168169
match state.state {
169170
ExecuteStateKind::Running | ExecuteStateKind::Starting => match r.data {
170-
None => (StringBlock::empty(), Some(make_state_uri(&id))),
171+
None => (
172+
Arc::new(BlocksSerializer::empty()),
173+
Some(make_state_uri(&id)),
174+
),
171175
Some(d) => {
172176
let uri = match d.next_page_no {
173177
Some(n) => Some(make_page_uri(&id, n)),
@@ -176,9 +180,15 @@ impl QueryResponse {
176180
(d.page.data, uri)
177181
}
178182
},
179-
ExecuteStateKind::Failed => (StringBlock::empty(), Some(make_final_uri(&id))),
183+
ExecuteStateKind::Failed => (
184+
Arc::new(BlocksSerializer::empty()),
185+
Some(make_final_uri(&id)),
186+
),
180187
ExecuteStateKind::Succeeded => match r.data {
181-
None => (StringBlock::empty(), Some(make_final_uri(&id))),
188+
None => (
189+
Arc::new(BlocksSerializer::empty()),
190+
Some(make_final_uri(&id)),
191+
),
182192
Some(d) => {
183193
let uri = match d.next_page_no {
184194
Some(n) => Some(make_page_uri(&id, n)),
@@ -199,10 +209,10 @@ impl QueryResponse {
199209
progresses: state.progresses.clone(),
200210
running_time_ms: state.running_time_ms,
201211
};
202-
let rows = data.data.len();
212+
let rows = data.num_rows();
203213

204214
Json(QueryResponse {
205-
data: data.into(),
215+
data,
206216
state: state.state,
207217
schema: state.schema.clone(),
208218
session_id: Some(session_id),

src/query/service/src/servers/http/v1/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,9 @@ pub use http_query_handlers::make_page_uri;
2828
pub use http_query_handlers::make_state_uri;
2929
pub use http_query_handlers::query_route;
3030
pub use http_query_handlers::QueryResponse;
31+
pub use http_query_handlers::QueryResponseField;
3132
pub use http_query_handlers::QueryStats;
32-
pub use query::string_block::StringBlock;
33+
pub use query::blocks_serializer::BlocksSerializer;
3334
pub use query::ExecuteStateKind;
3435
pub use query::ExpiringMap;
3536
pub use query::ExpiringState;

0 commit comments

Comments
 (0)