Skip to content

Commit be9d7b2

Browse files
authored
feat: error handling overhaul (#65)
1 parent 5d8a12a commit be9d7b2

File tree

15 files changed

+530
-364
lines changed

15 files changed

+530
-364
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ url = "2.3"
3030
warp = { version = "0.3", default-features = false }
3131
serde_json = "1.0"
3232
rand = "0.8.5"
33+
futures-util = "0.3"
34+
once_cell = "1.19"
3335

3436
[[example]]
3537
name = "websocket_client"

examples/websocket_client.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
relay_client::{
3-
error::Error,
3+
error::ClientError,
44
websocket::{Client, CloseFrame, ConnectionHandler, PublishedMessage},
55
ConnectionOptions,
66
},
@@ -49,11 +49,11 @@ impl ConnectionHandler for Handler {
4949
);
5050
}
5151

52-
fn inbound_error(&mut self, error: Error) {
52+
fn inbound_error(&mut self, error: ClientError) {
5353
println!("[{}] inbound error: {error}", self.name);
5454
}
5555

56-
fn outbound_error(&mut self, error: Error) {
56+
fn outbound_error(&mut self, error: ClientError) {
5757
println!("[{}] outbound error: {error}", self.name);
5858
}
5959
}

relay_client/src/error.rs

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use relay_rpc::rpc::{self, error::ServiceError};
2+
13
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
24

35
/// Errors generated while parsing
@@ -23,7 +25,7 @@ pub enum RequestBuildError {
2325

2426
/// Possible Relay client errors.
2527
#[derive(Debug, thiserror::Error)]
26-
pub enum Error {
28+
pub enum ClientError {
2729
#[error("Failed to build connection request: {0}")]
2830
RequestBuilder(#[from] RequestBuildError),
2931

@@ -42,15 +44,69 @@ pub enum Error {
4244
#[error("Invalid response ID")]
4345
InvalidResponseId,
4446

47+
#[error("Invalid error response")]
48+
InvalidErrorResponse,
49+
4550
#[error("Serialization failed: {0}")]
4651
Serialization(serde_json::Error),
4752

4853
#[error("Deserialization failed: {0}")]
4954
Deserialization(serde_json::Error),
5055

51-
#[error("RPC error ({code}): {message}")]
52-
Rpc { code: i32, message: String },
56+
#[error("RPC error: code={code} data={data:?} message={message}")]
57+
Rpc {
58+
code: i32,
59+
message: String,
60+
data: Option<String>,
61+
},
5362

5463
#[error("Invalid request type")]
5564
InvalidRequestType,
5665
}
66+
67+
impl From<rpc::ErrorData> for ClientError {
68+
fn from(err: rpc::ErrorData) -> Self {
69+
Self::Rpc {
70+
code: err.code,
71+
message: err.message,
72+
data: err.data,
73+
}
74+
}
75+
}
76+
77+
#[derive(Debug, thiserror::Error)]
78+
pub enum Error<T> {
79+
/// Client errors encountered while performing the request.
80+
#[error(transparent)]
81+
Client(ClientError),
82+
83+
/// Error response received from the relay.
84+
#[error(transparent)]
85+
Response(#[from] rpc::Error<T>),
86+
}
87+
88+
impl<T: ServiceError> From<ClientError> for Error<T> {
89+
fn from(err: ClientError) -> Self {
90+
match err {
91+
ClientError::Rpc {
92+
code,
93+
message,
94+
data,
95+
} => {
96+
let err = rpc::ErrorData {
97+
code,
98+
message,
99+
data,
100+
};
101+
102+
match rpc::Error::try_from(err) {
103+
Ok(err) => Error::Response(err),
104+
105+
Err(_) => Error::Client(ClientError::InvalidErrorResponse),
106+
}
107+
}
108+
109+
_ => Error::Client(err),
110+
}
111+
}
112+
}

relay_client/src/http.rs

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
crate::{
3-
error::{BoxError, Error},
3+
error::{BoxError, ClientError, Error},
44
ConnectionOptions,
55
MessageIdGenerator,
66
},
@@ -9,15 +9,15 @@ use {
99
auth::ed25519_dalek::SigningKey,
1010
domain::{DecodedClientId, SubscriptionId, Topic},
1111
jwt::{self, JwtError, VerifyableClaims},
12-
rpc::{self, Receipt, RequestPayload},
12+
rpc::{self, Receipt, ServiceRequest},
1313
},
1414
std::{sync::Arc, time::Duration},
1515
url::Url,
1616
};
1717

1818
pub type TransportError = reqwest::Error;
19-
pub type Response<T> = Result<<T as RequestPayload>::Response, Error>;
20-
pub type EmptyResponse = Result<(), Error>;
19+
pub type Response<T> = Result<<T as ServiceRequest>::Response, Error<<T as ServiceRequest>::Error>>;
20+
pub type EmptyResponse<T> = Result<(), Error<<T as ServiceRequest>::Error>>;
2121

2222
#[derive(Debug, thiserror::Error)]
2323
pub enum RequestParamsError {
@@ -41,9 +41,6 @@ pub enum HttpClientError {
4141

4242
#[error("JWT error: {0}")]
4343
Jwt(#[from] JwtError),
44-
45-
#[error("RPC error: code={} message={}", .0.code, .0.message)]
46-
RpcError(rpc::ErrorData),
4744
}
4845

4946
#[derive(Debug, Clone)]
@@ -82,7 +79,7 @@ pub struct Client {
8279
}
8380

8481
impl Client {
85-
pub fn new(opts: &ConnectionOptions) -> Result<Self, Error> {
82+
pub fn new(opts: &ConnectionOptions) -> Result<Self, ClientError> {
8683
let mut headers = HeaderMap::new();
8784
opts.update_request_headers(&mut headers)?;
8885

@@ -111,11 +108,14 @@ impl Client {
111108
tag: u32,
112109
ttl: Duration,
113110
prompt: bool,
114-
) -> EmptyResponse {
111+
) -> EmptyResponse<rpc::Publish> {
115112
let ttl_secs = ttl
116113
.as_secs()
117114
.try_into()
118-
.map_err(|_| HttpClientError::InvalidRequest(RequestParamsError::InvalidTtl.into()))?;
115+
.map_err(|_| {
116+
HttpClientError::InvalidRequest(RequestParamsError::InvalidTtl.into()).into()
117+
})
118+
.map_err(Error::Client)?;
119119

120120
self.request(rpc::Publish {
121121
topic,
@@ -175,7 +175,8 @@ impl Client {
175175
.ttl
176176
.as_secs()
177177
.try_into()
178-
.map_err(|err| HttpClientError::InvalidRequest(Box::new(err)))?;
178+
.map_err(|err| HttpClientError::InvalidRequest(Box::new(err)).into())
179+
.map_err(Error::Client)?;
179180
let exp = iat + ttl_sec;
180181

181182
let claims = rpc::WatchRegisterClaims {
@@ -194,7 +195,11 @@ impl Client {
194195
};
195196

196197
let payload = rpc::WatchRegister {
197-
register_auth: claims.encode(keypair).map_err(HttpClientError::Jwt)?,
198+
register_auth: claims
199+
.encode(keypair)
200+
.map_err(HttpClientError::Jwt)
201+
.map_err(ClientError::from)
202+
.map_err(Error::Client)?,
198203
};
199204

200205
self.request(payload).await
@@ -230,7 +235,11 @@ impl Client {
230235
};
231236

232237
let payload = rpc::WatchUnregister {
233-
unregister_auth: claims.encode(keypair).map_err(HttpClientError::Jwt)?,
238+
unregister_auth: claims
239+
.encode(keypair)
240+
.map_err(HttpClientError::Jwt)
241+
.map_err(ClientError::from)
242+
.map_err(Error::Client)?,
234243
};
235244

236245
self.request(payload).await
@@ -299,45 +308,50 @@ impl Client {
299308

300309
pub(crate) async fn request<T>(&self, payload: T) -> Response<T>
301310
where
302-
T: RequestPayload,
311+
T: ServiceRequest,
303312
{
304313
let payload = rpc::Payload::Request(rpc::Request {
305314
id: self.id_generator.next(),
306315
jsonrpc: rpc::JSON_RPC_VERSION.clone(),
307316
params: payload.into_params(),
308317
});
309318

310-
let result = self
311-
.client
312-
.post(self.url.clone())
313-
.json(&payload)
314-
.send()
315-
.await
316-
.map_err(HttpClientError::Transport)?;
319+
let response = async {
320+
let result = self
321+
.client
322+
.post(self.url.clone())
323+
.json(&payload)
324+
.send()
325+
.await
326+
.map_err(HttpClientError::Transport)?;
317327

318-
let status = result.status();
328+
let status = result.status();
319329

320-
if !status.is_success() {
321-
let body = result.text().await;
322-
return Err(HttpClientError::InvalidHttpCode(status, body).into());
323-
}
330+
if !status.is_success() {
331+
let body = result.text().await;
332+
return Err(HttpClientError::InvalidHttpCode(status, body));
333+
}
324334

325-
let response = result
326-
.json::<rpc::Payload>()
327-
.await
328-
.map_err(|_| HttpClientError::InvalidResponse)?;
335+
result
336+
.json::<rpc::Payload>()
337+
.await
338+
.map_err(|_| HttpClientError::InvalidResponse)
339+
}
340+
.await
341+
.map_err(ClientError::from)
342+
.map_err(Error::Client)?;
329343

330344
match response {
331345
rpc::Payload::Response(rpc::Response::Success(response)) => {
332346
serde_json::from_value(response.result)
333-
.map_err(|_| HttpClientError::InvalidResponse.into())
347+
.map_err(|_| Error::Client(HttpClientError::InvalidResponse.into()))
334348
}
335349

336350
rpc::Payload::Response(rpc::Response::Error(response)) => {
337-
Err(HttpClientError::RpcError(response.error).into())
351+
Err(ClientError::from(response.error).into())
338352
}
339353

340-
_ => Err(HttpClientError::InvalidResponse.into()),
354+
_ => Err(Error::Client(HttpClientError::InvalidResponse.into())),
341355
}
342356
}
343357
}

relay_client/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use {
2-
crate::error::{Error, RequestBuildError},
2+
crate::error::{ClientError, RequestBuildError},
33
::http::HeaderMap,
44
relay_rpc::{
55
auth::{SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS},

relay_client/src/websocket.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use {
22
self::connection::{connection_event_loop, ConnectionControl},
3-
crate::{error::Error, ConnectionOptions},
3+
crate::{error::ClientError, ConnectionOptions},
44
relay_rpc::{
55
domain::{MessageId, SubscriptionId, Topic},
66
rpc::{
@@ -114,11 +114,11 @@ pub trait ConnectionHandler: Send + 'static {
114114

115115
/// Called when an inbound error occurs, such as data deserialization
116116
/// failure, or an unknown response message ID.
117-
fn inbound_error(&mut self, _error: Error) {}
117+
fn inbound_error(&mut self, _error: ClientError) {}
118118

119119
/// Called when an outbound error occurs, i.e. failed to write to the
120120
/// websocket stream.
121-
fn outbound_error(&mut self, _error: Error) {}
121+
fn outbound_error(&mut self, _error: ClientError) {}
122122
}
123123

124124
/// The Relay WebSocket RPC client.
@@ -291,7 +291,7 @@ impl Client {
291291
}
292292

293293
/// Opens a connection to the Relay.
294-
pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), Error> {
294+
pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), ClientError> {
295295
let (tx, rx) = oneshot::channel();
296296
let request = opts.as_ws_request()?;
297297

@@ -300,24 +300,24 @@ impl Client {
300300
.send(ConnectionControl::Connect { request, tx })
301301
.is_ok()
302302
{
303-
rx.await.map_err(|_| Error::ChannelClosed)?
303+
rx.await.map_err(|_| ClientError::ChannelClosed)?
304304
} else {
305-
Err(Error::ChannelClosed)
305+
Err(ClientError::ChannelClosed)
306306
}
307307
}
308308

309309
/// Closes the Relay connection.
310-
pub async fn disconnect(&self) -> Result<(), Error> {
310+
pub async fn disconnect(&self) -> Result<(), ClientError> {
311311
let (tx, rx) = oneshot::channel();
312312

313313
if self
314314
.control_tx
315315
.send(ConnectionControl::Disconnect { tx })
316316
.is_ok()
317317
{
318-
rx.await.map_err(|_| Error::ChannelClosed)?
318+
rx.await.map_err(|_| ClientError::ChannelClosed)?
319319
} else {
320-
Err(Error::ChannelClosed)
320+
Err(ClientError::ChannelClosed)
321321
}
322322
}
323323

@@ -330,7 +330,7 @@ impl Client {
330330
unreachable!();
331331
};
332332

333-
request.tx.send(Err(Error::ChannelClosed)).ok();
333+
request.tx.send(Err(ClientError::ChannelClosed)).ok();
334334
}
335335
}
336336
}

0 commit comments

Comments
 (0)