Skip to content

Commit aae42f9

Browse files
authored
bridge: Error refactoring (#1342)
Preparation for adding kafka `ReceiverOutput`. Part of svix/monorepo-private#8508.
2 parents 1568915 + 5c7c42a commit aae42f9

File tree

12 files changed

+43
-63
lines changed

12 files changed

+43
-63
lines changed

bridge/.config/nextest.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
[profile.default]
2+
slow-timeout = { period = "30s", terminate-after = 2 }

bridge/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bridge/svix-bridge-plugin-kafka/src/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl SenderInput for KafkaConsumer {
199199
self.transformer_tx = tx;
200200
}
201201

202-
async fn run(&self) -> std::io::Result<()> {
202+
async fn run(&self) {
203203
let mut fails: u64 = 0;
204204
let mut last_fail = Instant::now();
205205

bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ async fn test_consume_ok() {
173173
let plugin = get_test_plugin(mock_server.uri(), topic, None);
174174

175175
let handle = tokio::spawn(async move {
176-
plugin.run().await.unwrap();
176+
plugin.run().await;
177177
});
178178
// Wait for the consumer to connect
179179
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -245,7 +245,7 @@ async fn test_consume_transformed_json_ok() {
245245
plugin.set_transformer(Some(transformer_tx));
246246

247247
let handle = tokio::spawn(async move {
248-
plugin.run().await.unwrap();
248+
plugin.run().await;
249249
});
250250
// Wait for the consumer to connect
251251
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -330,7 +330,7 @@ async fn test_consume_transformed_string_ok() {
330330
plugin.set_transformer(Some(transformer_tx));
331331

332332
let handle = tokio::spawn(async move {
333-
plugin.run().await.unwrap();
333+
plugin.run().await;
334334
});
335335
// Wait for the consumer to connect
336336
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -365,7 +365,7 @@ async fn test_missing_app_id_nack() {
365365
let plugin = get_test_plugin(mock_server.uri(), topic, None);
366366

367367
let handle = tokio::spawn(async move {
368-
plugin.run().await.unwrap();
368+
plugin.run().await;
369369
});
370370

371371
// Wait for the consumer to connect
@@ -415,7 +415,7 @@ async fn test_missing_event_type_nack() {
415415
let plugin = get_test_plugin(mock_server.uri(), topic, None);
416416

417417
let handle = tokio::spawn(async move {
418-
plugin.run().await.unwrap();
418+
plugin.run().await;
419419
});
420420

421421
// Wait for the consumer to connect
@@ -467,7 +467,7 @@ async fn test_consume_svix_503() {
467467
let plugin = get_test_plugin(mock_server.uri(), topic, None);
468468

469469
let handle = tokio::spawn(async move {
470-
plugin.run().await.unwrap();
470+
plugin.run().await;
471471
});
472472
// Wait for the consumer to connect
473473
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -510,7 +510,7 @@ async fn test_consume_svix_offline() {
510510
drop(mock_server);
511511

512512
let handle = tokio::spawn(async move {
513-
plugin.run().await.unwrap();
513+
plugin.run().await;
514514
});
515515
// Wait for the consumer to connect
516516
tokio::time::sleep(CONNECT_WAIT_TIME).await;

bridge/svix-bridge-plugin-queue/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ edition = "2021"
99
serde_json = "1.0"
1010
serde = { version = "1.0", features = ["derive"] }
1111
svix-bridge-types = { path = "../svix-bridge-types" }
12+
thiserror = "1.0.61"
1213
tokio = { version = "1", features = ["full"] }
1314
tokio-executor-trait = "2.1"
1415
tokio-reactor-trait = "1.1"

bridge/svix-bridge-plugin-queue/src/error.rs

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,23 @@
11
pub use omniqueue::QueueError;
22
use svix_bridge_types::svix;
3+
use thiserror::Error;
34

5+
#[derive(Debug, Error)]
46
pub enum Error {
5-
Payload(String),
6-
Json(serde_json::Error),
7-
Queue(QueueError),
8-
Svix(svix::error::Error),
7+
#[error("json error: {0}")]
8+
Json(#[from] serde_json::Error),
9+
#[error("queue error: {0}")]
10+
Queue(#[from] QueueError),
11+
#[error("svix API error: {0}")]
12+
Svix(#[from] svix::error::Error),
13+
#[error("{0}")]
914
Generic(String),
1015
}
11-
pub type Result<T> = std::result::Result<T, Error>;
12-
13-
impl From<svix::error::Error> for Error {
14-
fn from(value: svix::error::Error) -> Self {
15-
Error::Svix(value)
16-
}
17-
}
18-
19-
impl From<serde_json::Error> for Error {
20-
fn from(value: serde_json::Error) -> Self {
21-
Error::Json(value)
22-
}
23-
}
24-
25-
impl From<QueueError> for Error {
26-
fn from(value: QueueError) -> Self {
27-
Error::Queue(value)
28-
}
29-
}
30-
31-
impl From<String> for Error {
32-
fn from(value: String) -> Self {
33-
Self::Generic(value)
34-
}
35-
}
16+
pub type Result<T, E = Error> = std::result::Result<T, E>;
3617

3718
impl From<Error> for std::io::Error {
3819
fn from(value: Error) -> Self {
3920
match value {
40-
Error::Payload(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
4121
Error::Json(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
4222
Error::Queue(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
4323
Error::Svix(e) => std::io::Error::new(std::io::ErrorKind::Other, e),

bridge/svix-bridge-plugin-queue/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ trait Consumer {
197197
}
198198
}
199199

200-
async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> std::io::Result<()> {
200+
async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> ! {
201201
let mut fails: u64 = 0;
202202
let mut last_fail = Instant::now();
203203
let system_name = consumer.system();

bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use std::sync::Arc;
22

33
use omniqueue::DynProducer;
4-
use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput};
4+
use svix_bridge_types::{async_trait, BoxError, ForwardRequest, ReceiverOutput};
55

66
use crate::{config::QueueOutputOpts, error::Result};
77

@@ -42,11 +42,9 @@ impl ReceiverOutput for QueueForwarder {
4242
fn name(&self) -> &str {
4343
&self.name
4444
}
45-
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()> {
46-
Ok(self
47-
.sender
48-
.send_serde_json(&request.payload)
49-
.await
50-
.map_err(crate::Error::from)?)
45+
46+
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError> {
47+
self.sender.send_serde_json(&request.payload).await?;
48+
Ok(())
5149
}
5250
}

bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,12 @@ impl SenderInput for QueueSender {
101101
fn name(&self) -> &str {
102102
&self.name
103103
}
104+
104105
fn set_transformer(&mut self, tx: Option<TransformerTx>) {
105106
self.transformer_tx = tx;
106107
}
107-
async fn run(&self) -> std::io::Result<()> {
108+
109+
async fn run(&self) {
108110
run_inner(self).await
109111
}
110112
}

bridge/svix-bridge-types/src/lib.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,17 +137,19 @@ pub trait SenderInput: Send {
137137
/// For plugins that want to run JS transformations on payloads.
138138
/// Giving them a sender lets them pass messages to the JS executor.
139139
fn set_transformer(&mut self, _tx: Option<TransformerTx>) {}
140-
async fn run(&self) -> std::io::Result<()>;
140+
async fn run(&self);
141141
}
142142

143+
pub type BoxError = Box<dyn std::error::Error + Send + Sync>;
144+
143145
/// Represents something we can hand a webhook payload to.
144146
/// Aka a "forwarder."
145147
///
146148
/// To start, we're only using this in conjunction with an HTTP server "owned" by the bridge binary.
147149
#[async_trait]
148150
pub trait ReceiverOutput: Send + Sync {
149151
fn name(&self) -> &str;
150-
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()>;
152+
async fn handle(&self, request: ForwardRequest) -> Result<(), BoxError>;
151153
}
152154

153155
#[derive(Deserialize, Debug, Clone, Default)]

0 commit comments

Comments
 (0)