Skip to content

Commit 7dd7789

Browse files
committed
refactor(flight): enhance error handling during request serialization and deserialization
1 parent 9a6e6f7 commit 7dd7789

File tree

3 files changed

+71
-8
lines changed

3 files changed

+71
-8
lines changed

src/common/exception/src/exception_into.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,10 @@ impl From<tonic::Status> for ErrorCode {
392392
.set_span(serialized_error.span),
393393
}
394394
}
395-
_ => ErrorCode::Unimplemented(status.to_string()),
395+
_ => {
396+
let debug = format!("{:?}", status);
397+
ErrorCode::Unimplemented(debug)
398+
}
396399
}
397400
}
398401
}

src/query/service/src/servers/flight/flight_client.rs

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
use std::str::FromStr;
1616
use std::sync::Arc;
17+
use std::time::Instant;
1718

1819
use arrow_flight::Action;
1920
use arrow_flight::FlightData;
@@ -115,16 +116,55 @@ impl FlightClient {
115116
AsciiMetadataKey::from_str("secret").unwrap(),
116117
AsciiMetadataValue::from_str(&secret).unwrap(),
117118
);
119+
request.metadata_mut().insert(
120+
AsciiMetadataKey::from_str("x-request-id").unwrap(),
121+
AsciiMetadataValue::from_str(&uuid).unwrap(),
122+
);
123+
124+
let start = Instant::now();
125+
debug!(
126+
"[{}]FlightClient::do_action rpc start: path={:?}, timeout={}s",
127+
&uuid, &path, timeout
128+
);
129+
130+
let response = self.inner.do_action(request).await.map_err(|status| {
131+
error!(
132+
"[{}]FlightClient::do_action rpc failed: path={:?}, elapsed_ms={}, status={}",
133+
&uuid,
134+
&path,
135+
start.elapsed().as_millis(),
136+
status
137+
);
138+
ErrorCode::from(status)
139+
})?;
118140

119-
let response = self.inner.do_action(request).await?;
141+
debug!(
142+
"[{}]FlightClient::do_action rpc headers received: path={:?}, elapsed_ms={}",
143+
&uuid,
144+
&path,
145+
start.elapsed().as_millis()
146+
);
120147

121-
match response.into_inner().message().await? {
148+
let mut response_stream = response.into_inner();
149+
debug!("[{}]FlightClient::wait first response message: path={:?}", &uuid, &path);
150+
match response_stream.message().await.map_err(|status| {
151+
error!(
152+
"[{}]FlightClient::receive response failed: path={:?}, elapsed_ms={}, status={}",
153+
&uuid,
154+
&path,
155+
start.elapsed().as_millis(),
156+
status
157+
);
158+
ErrorCode::from(status)
159+
})? {
122160
Some(response) => {
123161
let response_type = std::any::type_name::<Res>();
124162
let response_len = response.body.len();
125163
debug!(
126-
"[{}]FlightClient::receive response: path={:?}",
127-
&uuid, &path
164+
"[{}]FlightClient::receive response: path={:?}, elapsed_ms={}",
165+
&uuid,
166+
&path,
167+
start.elapsed().as_millis()
128168
);
129169
let after_deserd = match catch_unwind(
130170
|| -> std::result::Result<Res, serde_json::Error> {

src/query/service/src/servers/flight/v1/flight_service.rs

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
use std::pin::Pin;
16+
use std::time::Instant;
1617

1718
use arrow_flight::Action;
1819
use arrow_flight::ActionType;
@@ -167,7 +168,17 @@ impl FlightService for DatabendQueryFlightService {
167168
#[async_backtrace::framed]
168169
async fn do_action(&self, request: Request<Action>) -> Response<Self::DoActionStream> {
169170
let uuid = uuid::Uuid::new_v4().to_string();
170-
debug!("[{}]FlightService::do_action", &uuid);
171+
let request_id = request
172+
.metadata()
173+
.get("x-request-id")
174+
.and_then(|value| value.to_str().ok())
175+
.unwrap_or("unknown")
176+
.to_string();
177+
let start = Instant::now();
178+
debug!(
179+
"[{}]FlightService::do_action: request_id={}",
180+
&uuid, &request_id
181+
);
171182
let root = databend_common_tracing::start_trace_for_remote_request(func_path!(), &request);
172183

173184
let secret = request.get_metadata("secret")?;
@@ -189,18 +200,27 @@ impl FlightService for DatabendQueryFlightService {
189200
{
190201
Err(cause) => {
191202
error!(
192-
"[{}]flight do_action failed, node: {}, action: {}, body_len: {}, code: {}, error: {:?}",
203+
"[{}]flight do_action failed, request_id: {}, node: {}, action: {}, body_len: {}, elapsed_ms: {}, code: {}, error: {:?}",
193204
uuid,
205+
request_id,
194206
config.query.node_id,
195207
action.r#type,
196208
action.body.len(),
209+
start.elapsed().as_millis(),
197210
cause.code(),
198211
cause
199212
);
200213
Err(cause.into())
201214
}
202215
Ok(body) => {
203-
debug!("[{}]FlightService: finish", &uuid);
216+
debug!(
217+
"[{}]FlightService: finish, request_id: {}, action: {}, body_len: {}, elapsed_ms={}",
218+
&uuid,
219+
&request_id,
220+
action.r#type,
221+
body.len(),
222+
start.elapsed().as_millis()
223+
);
204224
Ok(RawResponse::new(
205225
Box::pin(tokio_stream::once(Ok(FlightResult { body: body.into() })))
206226
as FlightStream<FlightResult>,

0 commit comments

Comments
 (0)