Skip to content

Commit 0d58fbc

Browse files
committed
bridge: reduce duplicate consumer code as much as possible (#926)
Adds wrapper types around the backend-specific types we get from generic-queue. ## Motivation Adding new backends to the consumer bridge plugin today is a bit of a chore, encouraging parts of the ack/nack and consumer loop flow to be duplicated. The generics exposed in the signatures of items from `generic-queue` tend to make code reuse tricky. ## Solution This is mostly focused on moving the decision-making bits of the flow up to the trait's default implementation for methods like `receive` and `process` since these were essentially duplicate from backend to backend. New wrapper types are used to hide/erase the distinct backend typing from the consumer flow. Additionally, the code related to each backend has been relegated to a dedicated module (which should help when adding new backends since you can copy the module entirely as a starting point). More refactoring could be done, perhaps adding `enum_dispatch` to reduce runtime cost and improve the layout of the code generally, but even without this seems like a good first step.
1 parent 8b21c04 commit 0d58fbc

File tree

10 files changed

+495
-469
lines changed

10 files changed

+495
-469
lines changed
Lines changed: 32 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,11 @@
11
use crate::config::{GCPPubSubConsumerConfig, GCPPubSubInputOpts};
22
use crate::error::Error;
3-
use crate::PLUGIN_VERS;
4-
use crate::{create_svix_message, CreateMessageRequest};
5-
use crate::{run_inner, Consumer, PLUGIN_NAME};
6-
use generic_queue::gcp_pubsub::{
7-
GCPPubSubConfig, GCPPubSubDelivery, GCPPubSubQueueBackend, GCPPubSubQueueConsumer,
8-
};
9-
use generic_queue::{Delivery, TaskQueueBackend, TaskQueueReceive};
10-
use std::time::Duration;
3+
use crate::ConsumerWrapper;
4+
use crate::{run_inner, Consumer};
5+
use generic_queue::gcp_pubsub::{GCPPubSubConfig, GCPPubSubQueueBackend};
6+
use generic_queue::TaskQueueBackend;
117
use svix::api::Svix;
128
use svix_webhook_bridge_types::{async_trait, JsObject, Plugin, TransformerTx};
13-
use tracing::instrument;
149

1510
pub struct GCPPubSubConsumerPlugin {
1611
input_options: GCPPubSubInputOpts,
@@ -34,95 +29,42 @@ impl GCPPubSubConsumerPlugin {
3429
transformation,
3530
}
3631
}
32+
}
3733

38-
/// Pulls N messages off the queue and feeds them to [`Self::process`].
39-
#[instrument(skip_all,
40-
fields(
41-
otel.kind = "CONSUMER",
42-
messaging.system = "gcp-pubsub",
43-
messaging.operation = "receive",
44-
messaging.source = &self.input_options.subscription_id,
45-
svixagent_plugin.name = PLUGIN_NAME,
46-
svixagent_plugin.vers = PLUGIN_VERS,
47-
)
48-
)]
49-
async fn receive(&self, consumer: &mut GCPPubSubQueueConsumer) -> std::io::Result<()> {
50-
let deliveries = consumer
51-
.receive_all(1, Duration::from_millis(10))
52-
.await
53-
.map_err(Error::from)?;
54-
tracing::trace!("received: {}", deliveries.len());
55-
for delivery in deliveries {
56-
self.process(delivery).await?;
57-
}
58-
Ok(())
34+
#[async_trait]
35+
impl Consumer for GCPPubSubConsumerPlugin {
36+
fn source(&self) -> &str {
37+
&self.input_options.subscription_id
5938
}
6039

61-
/// Parses the delivery as JSON and feeds it into [`create_svix_message`].
62-
/// Will nack the delivery if either the JSON parse, transformation, or the request to svix fails.
63-
#[instrument(skip_all, fields(messaging.operation = "process"))]
64-
async fn process(&self, delivery: GCPPubSubDelivery<JsObject>) -> std::io::Result<()> {
65-
let payload = match Delivery::<JsObject>::payload(&delivery) {
66-
Ok(p) => p,
67-
Err(e) => {
68-
tracing::warn!("{e}");
69-
delivery.nack().await.map_err(Error::from)?;
70-
return Ok(());
71-
}
72-
};
40+
fn system(&self) -> &str {
41+
"gcp-pubsub"
42+
}
7343

74-
let payload = if let Some(script) = &self.transformation {
75-
match self.transform(script.clone(), payload).await {
76-
Err(e) => {
77-
tracing::error!("nack: {e}");
78-
delivery.nack().await.map_err(Error::from)?;
79-
return Ok(());
80-
}
81-
Ok(x) => x,
82-
}
83-
} else {
84-
payload
85-
};
44+
fn transformer_tx(&self) -> &Option<TransformerTx> {
45+
&self.transformer_tx
46+
}
8647

87-
match create_svix_message(&self.svix_client, payload).await {
88-
Ok(_) => {
89-
tracing::trace!("ack");
90-
delivery.ack().await.map_err(Error::from)?
91-
}
92-
Err(e) => {
93-
tracing::error!("nack: {e}");
94-
delivery.nack().await.map_err(Error::from)?
95-
}
96-
}
97-
Ok(())
48+
fn transformation(&self) -> &Option<String> {
49+
&self.transformation
9850
}
99-
}
10051

101-
#[async_trait]
102-
impl Consumer for GCPPubSubConsumerPlugin {
103-
fn transformer_tx(&self) -> Option<&TransformerTx> {
104-
self.transformer_tx.as_ref()
52+
fn svix_client(&self) -> &Svix {
53+
&self.svix_client
10554
}
106-
async fn consume(&self) -> std::io::Result<()> {
107-
let mut consumer =
108-
<GCPPubSubQueueBackend as TaskQueueBackend<CreateMessageRequest>>::consuming_half(
109-
GCPPubSubConfig {
110-
subscription_id: self.input_options.subscription_id.clone(),
111-
credentials_file: self.input_options.credentials_file.clone(),
112-
// Topics are for producers so we don't care
113-
topic: String::new(),
114-
},
115-
)
116-
.await
117-
.map_err(Error::from)?;
118-
tracing::debug!(
119-
"gcp pubsub consuming: {}",
120-
&self.input_options.subscription_id
121-
);
12255

123-
loop {
124-
self.receive(&mut consumer).await?;
125-
}
56+
async fn consumer(&self) -> std::io::Result<ConsumerWrapper> {
57+
let consumer = <GCPPubSubQueueBackend as TaskQueueBackend<JsObject>>::consuming_half(
58+
GCPPubSubConfig {
59+
subscription_id: self.input_options.subscription_id.clone(),
60+
credentials_file: self.input_options.credentials_file.clone(),
61+
// Topics are for producers so we don't care
62+
topic: String::new(),
63+
},
64+
)
65+
.await
66+
.map_err(Error::from)?;
67+
Ok(ConsumerWrapper::GCPPubSub(consumer))
12668
}
12769
}
12870

@@ -139,6 +81,6 @@ impl Plugin for GCPPubSubConsumerPlugin {
13981
self.transformer_tx = tx;
14082
}
14183
async fn run(&self) -> std::io::Result<()> {
142-
run_inner(self, "gcp subsub", &self.input_options.subscription_id).await
84+
run_inner(self).await
14385
}
14486
}

0 commit comments

Comments
 (0)