Skip to content

Commit 0ffd319

Browse files
committed
bridge: Remove pointless return type from SenderInput::run
1 parent fa1851a commit 0ffd319

File tree

6 files changed

+16
-21
lines changed

6 files changed

+16
-21
lines changed

bridge/svix-bridge-plugin-kafka/src/input.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ impl SenderInput for KafkaConsumer {
199199
self.transformer_tx = tx;
200200
}
201201

202-
async fn run(&self) -> std::io::Result<()> {
202+
async fn run(&self) {
203203
let mut fails: u64 = 0;
204204
let mut last_fail = Instant::now();
205205

bridge/svix-bridge-plugin-kafka/tests/it/kafka_consumer.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ async fn test_consume_ok() {
173173
let plugin = get_test_plugin(mock_server.uri(), topic, None);
174174

175175
let handle = tokio::spawn(async move {
176-
plugin.run().await.unwrap();
176+
plugin.run().await;
177177
});
178178
// Wait for the consumer to connect
179179
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -245,7 +245,7 @@ async fn test_consume_transformed_json_ok() {
245245
plugin.set_transformer(Some(transformer_tx));
246246

247247
let handle = tokio::spawn(async move {
248-
plugin.run().await.unwrap();
248+
plugin.run().await;
249249
});
250250
// Wait for the consumer to connect
251251
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -330,7 +330,7 @@ async fn test_consume_transformed_string_ok() {
330330
plugin.set_transformer(Some(transformer_tx));
331331

332332
let handle = tokio::spawn(async move {
333-
plugin.run().await.unwrap();
333+
plugin.run().await;
334334
});
335335
// Wait for the consumer to connect
336336
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -365,7 +365,7 @@ async fn test_missing_app_id_nack() {
365365
let plugin = get_test_plugin(mock_server.uri(), topic, None);
366366

367367
let handle = tokio::spawn(async move {
368-
plugin.run().await.unwrap();
368+
plugin.run().await;
369369
});
370370

371371
// Wait for the consumer to connect
@@ -415,7 +415,7 @@ async fn test_missing_event_type_nack() {
415415
let plugin = get_test_plugin(mock_server.uri(), topic, None);
416416

417417
let handle = tokio::spawn(async move {
418-
plugin.run().await.unwrap();
418+
plugin.run().await;
419419
});
420420

421421
// Wait for the consumer to connect
@@ -467,7 +467,7 @@ async fn test_consume_svix_503() {
467467
let plugin = get_test_plugin(mock_server.uri(), topic, None);
468468

469469
let handle = tokio::spawn(async move {
470-
plugin.run().await.unwrap();
470+
plugin.run().await;
471471
});
472472
// Wait for the consumer to connect
473473
tokio::time::sleep(CONNECT_WAIT_TIME).await;
@@ -510,7 +510,7 @@ async fn test_consume_svix_offline() {
510510
drop(mock_server);
511511

512512
let handle = tokio::spawn(async move {
513-
plugin.run().await.unwrap();
513+
plugin.run().await;
514514
});
515515
// Wait for the consumer to connect
516516
tokio::time::sleep(CONNECT_WAIT_TIME).await;

bridge/svix-bridge-plugin-queue/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ trait Consumer {
197197
}
198198
}
199199

200-
async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> std::io::Result<()> {
200+
async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> ! {
201201
let mut fails: u64 = 0;
202202
let mut last_fail = Instant::now();
203203
let system_name = consumer.system();

bridge/svix-bridge-plugin-queue/src/sender_input/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,10 +101,12 @@ impl SenderInput for QueueSender {
101101
fn name(&self) -> &str {
102102
&self.name
103103
}
104+
104105
fn set_transformer(&mut self, tx: Option<TransformerTx>) {
105106
self.transformer_tx = tx;
106107
}
107-
async fn run(&self) -> std::io::Result<()> {
108+
109+
async fn run(&self) {
108110
run_inner(self).await
109111
}
110112
}

bridge/svix-bridge-types/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ pub trait SenderInput: Send {
137137
/// For plugins that want to run JS transformations on payloads.
138138
/// Giving them a sender lets them pass messages to the JS executor.
139139
fn set_transformer(&mut self, _tx: Option<TransformerTx>) {}
140-
async fn run(&self) -> std::io::Result<()>;
140+
async fn run(&self);
141141
}
142142

143143
/// Represents something we can hand a webhook payload to.

bridge/svix-bridge/src/main.rs

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -166,17 +166,10 @@ async fn supervise_senders(inputs: Vec<Box<dyn SenderInput>>) -> Result<()> {
166166
set.spawn(async move {
167167
// FIXME: needs much better signaling for termination
168168
loop {
169-
let fut = input.run();
170169
// If this future returns, the consumer terminated unexpectedly.
171-
if let Err(e) = fut.await {
172-
tracing::warn!(
173-
"sender input {} unexpectedly terminated: {}",
174-
input.name(),
175-
e
176-
);
177-
} else {
178-
tracing::warn!("sender input {} unexpectedly terminated", input.name());
179-
}
170+
input.run().await;
171+
172+
tracing::warn!("sender input {} unexpectedly terminated", input.name());
180173
tokio::time::sleep(Duration::from_secs(1)).await;
181174
}
182175
});

0 commit comments

Comments
 (0)