Skip to content

Commit 0614798

Browse files
committed
refactor(flight): enhance error handling during request serialization and deserialization
1 parent 7878994 commit 0614798

File tree

2 files changed

+69
-40
lines changed

2 files changed

+69
-40
lines changed

src/query/service/src/servers/flight/v1/actions/flight_actions.rs

Lines changed: 57 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -69,54 +69,72 @@ impl FlightActions {
6969
self.actions.insert(
7070
path.clone(),
7171
Box::new(move |request| {
72-
let mut deserializer = serde_json::Deserializer::from_slice(request);
73-
deserializer.disable_recursion_limit();
74-
75-
let deserializer = serde_stacker::Deserializer {
76-
de: &mut deserializer,
77-
red_zone: recursive::get_minimum_stack_size(),
78-
stack_size: recursive::get_stack_allocation_size(),
79-
};
80-
81-
let request = Req::deserialize(deserializer).map_err(|cause| {
82-
ErrorCode::BadArguments(format!(
83-
"Cannot parse request for {}, cause: {:?}",
84-
path, cause
85-
))
86-
});
72+
let request =
73+
match catch_unwind(|| -> std::result::Result<Req, serde_json::Error> {
74+
let mut deserializer = serde_json::Deserializer::from_slice(request);
75+
deserializer.disable_recursion_limit();
76+
77+
let deserializer = serde_stacker::Deserializer {
78+
de: &mut deserializer,
79+
red_zone: recursive::get_minimum_stack_size(),
80+
stack_size: recursive::get_stack_allocation_size(),
81+
};
82+
83+
Req::deserialize(deserializer)
84+
}) {
85+
Ok(Ok(request)) => Ok(request),
86+
Ok(Err(cause)) => Err(ErrorCode::BadArguments(format!(
87+
"Cannot parse request for {}, len: {}, cause: {:?}",
88+
path,
89+
request.len(),
90+
cause
91+
))),
92+
Err(cause) => Err(cause.add_message_back(format!(
93+
"(while deserializing flight action request: action={}, len={})",
94+
path,
95+
request.len()
96+
))),
97+
};
8798

8899
let path = path.clone();
89100
let t = t.clone();
90101
Box::pin(async move {
91102
let request = request?;
92103

93-
let future = catch_unwind(move || t(request))?;
104+
let future = catch_unwind(move || t(request)).map_err(|cause| {
105+
cause.add_message_back(format!(
106+
"(while creating flight action future: action={})",
107+
path
108+
))
109+
})?;
94110

95111
let future = CatchUnwindFuture::create(future);
96-
match future
112+
let response = future
97113
.await
98-
.with_context(|| "failed to do flight action")
99-
.flatten()
100-
{
101-
Ok(v) => {
102-
let mut out = Vec::with_capacity(512);
103-
let mut serializer = serde_json::Serializer::new(&mut out);
104-
let serializer = serde_stacker::Serializer {
105-
ser: &mut serializer,
106-
red_zone: recursive::get_minimum_stack_size(),
107-
stack_size: recursive::get_stack_allocation_size(),
108-
};
109-
110-
v.serialize(serializer).map_err(|cause| {
111-
ErrorCode::BadBytes(format!(
112-
"Cannot serialize response for {}, cause: {:?}",
113-
path, cause
114-
))
115-
})?;
116-
117-
Ok(out)
118-
}
119-
Err(err) => Err(err),
114+
.with_context(|| format!("failed to do flight action, action: {}", path))
115+
.flatten()?;
116+
117+
match catch_unwind(|| -> std::result::Result<Vec<u8>, serde_json::Error> {
118+
let mut out = Vec::with_capacity(512);
119+
let mut serializer = serde_json::Serializer::new(&mut out);
120+
let serializer = serde_stacker::Serializer {
121+
ser: &mut serializer,
122+
red_zone: recursive::get_minimum_stack_size(),
123+
stack_size: recursive::get_stack_allocation_size(),
124+
};
125+
126+
response.serialize(serializer)?;
127+
Ok(out)
128+
}) {
129+
Ok(Ok(out)) => Ok(out),
130+
Ok(Err(cause)) => Err(ErrorCode::BadBytes(format!(
131+
"Cannot serialize response for {}, cause: {:?}",
132+
path, cause
133+
))),
134+
Err(cause) => Err(cause.add_message_back(format!(
135+
"(while serializing flight action response: action={})",
136+
path
137+
))),
120138
}
121139
})
122140
}),

src/query/service/src/servers/flight/v1/flight_service.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ use databend_common_exception::ErrorCode;
3434
use fastrace::func_path;
3535
use fastrace::prelude::*;
3636
use futures_util::stream;
37+
use log::error;
3738
use tokio_stream::Stream;
3839
use tonic::Request;
3940
use tonic::Response as RawResponse;
@@ -172,7 +173,17 @@ impl FlightService for DatabendQueryFlightService {
172173
.in_span(root)
173174
.await
174175
{
175-
Err(cause) => Err(cause.into()),
176+
Err(cause) => {
177+
error!(
178+
"flight do_action failed, node: {}, action: {}, body_len: {}, code: {}, error: {:?}",
179+
config.query.node_id,
180+
action.r#type,
181+
action.body.len(),
182+
cause.code(),
183+
cause
184+
);
185+
Err(cause.into())
186+
}
176187
Ok(body) => Ok(RawResponse::new(
177188
Box::pin(tokio_stream::once(Ok(FlightResult { body: body.into() })))
178189
as FlightStream<FlightResult>,

0 commit comments

Comments
 (0)