Skip to content

Commit 8b21c04

Browse files
committed
add js transforms to bridge (#917)
Embeds a deno runtime in the bridge binary to allow ad hoc reshaping of messages read from queues before they are sent to Svix. ## Motivation When consuming JSON from messaging systems to generate new webhooks, it's common to need to transform the payload before making the Create Message request to Svix. ## Solution Embedding a JS runtime into the bridge process allows us to run user-defined functions on the payloads as they move from input to output. With this diff, plugin instances provided by `svix-webhook-bridge-plugin-queue-consumer` can now evaluate js included in the plugin config: ```yaml plugins: - type: "rabbitmqconsumer" input: uri: "amqp://guest:guest@localhost:5672/%2f" queue_name: "local" requeue_on_nack: false transformation: | function handler(input) { return { app_id: input.key, message: { eventType: input.event_type, payload: input.data } }; } output: token: "***************" ``` The `transformation` key can now be set on any of these consumers. The JS fragment should include a default export of a function that accepts an object and returns an object. These functions can be used to reshape the payload as necessary.
1 parent 667cc95 commit 8b21c04

File tree

20 files changed

+1130
-373
lines changed

20 files changed

+1130
-373
lines changed

webhook-bridge/Cargo.lock

Lines changed: 220 additions & 32 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

webhook-bridge/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
11
[workspace]
2+
# Earlier versions of deno fail to compile in a workspace because of wgpu-hal
3+
# The "fix" is to enable resolver 2 at the workspace. Crates with edition 2021
4+
# use this by default, but workspaces are set independently for some reason.
5+
# <https://stackoverflow.com/questions/73967574/dependency-fails-to-compile-in-workspace-but-not-in-package>
6+
resolver = "2"
7+
28
members = [
39
"generic-queue",
410
"svix-webhook-bridge-types",

webhook-bridge/README.md

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,39 @@ Currently this supports the following messaging systems:
3333
- Redis
3434
- SQS
3535

36-
Generally instances of this plugin are configured in terms of inputs and outputs, where the input configuration varies
37-
by the messaging system.
36+
Generally instances of this plugin are configured in terms of inputs, _optional transformations_, and outputs, where
37+
the input configuration varies by the messaging system.
3838

3939
The output options control how the Svix client is built and configured.
4040
The sole required field is `token`.
4141

42+
The optional _transformation_ can be set to a JavaScript fragment which can be used to reshape the messages as they flow through.
43+
44+
```yaml
45+
46+
plugins:
47+
- type: ...
48+
input:
49+
# ... snip ...
50+
51+
# Reshape the messages we get from the queue before they get sent to Svix
52+
transformation: |
53+
function handler(input) {
54+
return {
55+
app_id: input.key,
56+
message: {
57+
eventType: input.event_type,
58+
payload: input.data
59+
}
60+
};
61+
}
62+
63+
output:
64+
# ... snip ...
65+
```
66+
67+
Transformations should define a function called `handler` that accepts an object and returns an object.
68+
4269
Messages received by these consumers must follow an expected format:
4370

4471
```

webhook-bridge/generic-queue/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ serde = { version = "1", features = ["derive", "rc"] }
2424
serde_json = "1"
2525
thiserror = "1"
2626
tokio = { version = "1", features = ["full"] }
27-
tokio-util = { version = "0.7.8", optional = true }
27+
tokio-util = { version = "0.7", optional = true }
2828
futures-util = { version = "0.3.28", optional = true }
2929
tracing = "0.1"
3030

webhook-bridge/svix-webhook-bridge-plugin-queue-consumer/src/config.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,24 +6,28 @@ use svix::api::SvixOptions as _SvixOptions;
66
#[derive(Debug, Default, Deserialize)]
77
pub struct RabbitMqConsumerConfig {
88
pub input: RabbitMqInputOpts,
9+
pub transformation: Option<String>,
910
pub output: OutputOpts,
1011
}
1112

1213
#[derive(Debug, Default, Deserialize)]
1314
pub struct RedisConsumerConfig {
1415
pub input: RedisInputOpts,
16+
pub transformation: Option<String>,
1517
pub output: OutputOpts,
1618
}
1719

1820
#[derive(Debug, Default, Deserialize)]
1921
pub struct SqsConsumerConfig {
2022
pub input: SqsInputOpts,
23+
pub transformation: Option<String>,
2124
pub output: OutputOpts,
2225
}
2326

2427
#[derive(Debug, Default, Deserialize)]
2528
pub struct GCPPubSubConsumerConfig {
2629
pub input: GCPPubSubInputOpts,
30+
pub transformation: Option<String>,
2731
pub output: OutputOpts,
2832
}
2933

webhook-bridge/svix-webhook-bridge-plugin-queue-consumer/src/error.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub enum Error {
55
Json(serde_json::Error),
66
Queue(QueueError),
77
Svix(svix::error::Error),
8+
Generic(String),
89
}
910

1011
impl From<svix::error::Error> for Error {
@@ -25,13 +26,20 @@ impl From<QueueError> for Error {
2526
}
2627
}
2728

29+
impl From<String> for Error {
30+
fn from(value: String) -> Self {
31+
Self::Generic(value)
32+
}
33+
}
34+
2835
impl From<Error> for std::io::Error {
2936
fn from(value: Error) -> Self {
3037
match value {
3138
Error::Payload(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
3239
Error::Json(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
3340
Error::Queue(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
3441
Error::Svix(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
42+
Error::Generic(e) => std::io::Error::new(std::io::ErrorKind::Other, e),
3543
}
3644
}
3745
}
Lines changed: 40 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,37 @@
11
use crate::config::{GCPPubSubConsumerConfig, GCPPubSubInputOpts};
22
use crate::error::Error;
3-
use crate::PLUGIN_NAME;
43
use crate::PLUGIN_VERS;
54
use crate::{create_svix_message, CreateMessageRequest};
5+
use crate::{run_inner, Consumer, PLUGIN_NAME};
66
use generic_queue::gcp_pubsub::{
77
GCPPubSubConfig, GCPPubSubDelivery, GCPPubSubQueueBackend, GCPPubSubQueueConsumer,
88
};
99
use generic_queue::{Delivery, TaskQueueBackend, TaskQueueReceive};
10-
use std::time::{Duration, Instant};
10+
use std::time::Duration;
1111
use svix::api::Svix;
12-
use svix_webhook_bridge_types::{async_trait, Plugin};
12+
use svix_webhook_bridge_types::{async_trait, JsObject, Plugin, TransformerTx};
1313
use tracing::instrument;
1414

1515
pub struct GCPPubSubConsumerPlugin {
1616
input_options: GCPPubSubInputOpts,
1717
svix_client: Svix,
18+
transformer_tx: Option<TransformerTx>,
19+
transformation: Option<String>,
1820
}
1921

2022
impl GCPPubSubConsumerPlugin {
21-
pub fn new(GCPPubSubConsumerConfig { input, output }: GCPPubSubConsumerConfig) -> Self {
23+
pub fn new(
24+
GCPPubSubConsumerConfig {
25+
input,
26+
transformation,
27+
output,
28+
}: GCPPubSubConsumerConfig,
29+
) -> Self {
2230
Self {
2331
input_options: input,
2432
svix_client: Svix::new(output.token, output.svix_options.map(Into::into)),
33+
transformer_tx: None,
34+
transformation,
2535
}
2636
}
2737

@@ -49,10 +59,10 @@ impl GCPPubSubConsumerPlugin {
4959
}
5060

5161
/// Parses the delivery as JSON and feeds it into [`create_svix_message`].
52-
/// Will nack the delivery if either the JSON parse step, or the request to svix fails.
62+
/// Will nack the delivery if either the JSON parse, transformation, or the request to svix fails.
5363
#[instrument(skip_all, fields(messaging.operation = "process"))]
54-
async fn process(&self, delivery: GCPPubSubDelivery<serde_json::Value>) -> std::io::Result<()> {
55-
let payload = match Delivery::<serde_json::Value>::payload(&delivery) {
64+
async fn process(&self, delivery: GCPPubSubDelivery<JsObject>) -> std::io::Result<()> {
65+
let payload = match Delivery::<JsObject>::payload(&delivery) {
5666
Ok(p) => p,
5767
Err(e) => {
5868
tracing::warn!("{e}");
@@ -61,6 +71,19 @@ impl GCPPubSubConsumerPlugin {
6171
}
6272
};
6373

74+
let payload = if let Some(script) = &self.transformation {
75+
match self.transform(script.clone(), payload).await {
76+
Err(e) => {
77+
tracing::error!("nack: {e}");
78+
delivery.nack().await.map_err(Error::from)?;
79+
return Ok(());
80+
}
81+
Ok(x) => x,
82+
}
83+
} else {
84+
payload
85+
};
86+
6487
match create_svix_message(&self.svix_client, payload).await {
6588
Ok(_) => {
6689
tracing::trace!("ack");
@@ -73,7 +96,13 @@ impl GCPPubSubConsumerPlugin {
7396
}
7497
Ok(())
7598
}
99+
}
76100

101+
#[async_trait]
102+
impl Consumer for GCPPubSubConsumerPlugin {
103+
fn transformer_tx(&self) -> Option<&TransformerTx> {
104+
self.transformer_tx.as_ref()
105+
}
77106
async fn consume(&self) -> std::io::Result<()> {
78107
let mut consumer =
79108
<GCPPubSubQueueBackend as TaskQueueBackend<CreateMessageRequest>>::consuming_half(
@@ -86,7 +115,6 @@ impl GCPPubSubConsumerPlugin {
86115
)
87116
.await
88117
.map_err(Error::from)?;
89-
90118
tracing::debug!(
91119
"gcp pubsub consuming: {}",
92120
&self.input_options.subscription_id
@@ -100,43 +128,17 @@ impl GCPPubSubConsumerPlugin {
100128

101129
impl TryInto<Box<dyn Plugin>> for GCPPubSubConsumerConfig {
102130
type Error = &'static str;
103-
104131
fn try_into(self) -> Result<Box<dyn Plugin>, Self::Error> {
105132
Ok(Box::new(GCPPubSubConsumerPlugin::new(self)))
106133
}
107134
}
108135

109136
#[async_trait]
110137
impl Plugin for GCPPubSubConsumerPlugin {
138+
fn set_transformer(&mut self, tx: Option<TransformerTx>) {
139+
self.transformer_tx = tx;
140+
}
111141
async fn run(&self) -> std::io::Result<()> {
112-
let mut fails: u64 = 0;
113-
let mut last_fail = Instant::now();
114-
115-
tracing::info!(
116-
"gcp pubsub starting: {}",
117-
&self.input_options.subscription_id
118-
);
119-
120-
loop {
121-
if let Err(e) = self.consume().await {
122-
tracing::error!("{e}");
123-
}
124-
125-
tracing::error!(
126-
"gcp pubsub disconnected: {}",
127-
&self.input_options.subscription_id
128-
);
129-
130-
if last_fail.elapsed() > Duration::from_secs(10) {
131-
// reset the fail count if we didn't have a hiccup in the past short while.
132-
tracing::trace!("been a while since last fail, resetting count");
133-
fails = 0;
134-
} else {
135-
fails += 1;
136-
}
137-
138-
last_fail = Instant::now();
139-
tokio::time::sleep(Duration::from_millis((300 * fails).min(3000))).await;
140-
}
142+
run_inner(self, "gcp subsub", &self.input_options.subscription_id).await
141143
}
142144
}

0 commit comments

Comments
 (0)