Skip to content

Commit 8101917

Browse files
authored
Bridge: support raw inputs for transformations (#975)
Adds a bunch of plumbing to allow the message payloads from consumers to be passed to transformations as strings rather than pre-parsing as JSON in rust. One concrete use case for this is if the system publishing those messages to the queue is sending XML or perhaps base64 encoded data. When configured to specify the `format` of a transformation, you can set it to `string` rather than the default `json`. Then, when the transformation receives that string as an argument, it can do whatever parsing is required in order to return a valid `Object`. Tests have been added up and down the stack to show that strings are handled correctly, and that the new config data is propagated. Notable omissions: - redis relies on JSON values internally and can't easily support "raw" inputs. We can revisit this when we switch to `omniqueue` since I believe it handles this better. - [x] webhook receivers _can support_ raw inputs, I think, but _don't currently_. I'll try to get this going before the PR merges. Update: got receivers going now. For both senders and receivers, we now check the configuration to catch cases where redis is paired with a string transformation and call that an Error. There's some tests in place to make sure those errors show up early in the process, rather than waiting for some message to come through.
2 parents 1255177 + 0c6c784 commit 8101917

File tree

30 files changed

+1089
-257
lines changed

30 files changed

+1089
-257
lines changed

bridge/README.md

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,75 @@ publish the payload on to a specified "output."
2727

2828
Both **senders** and **receivers** are defined in terms of their input, and optional JavaScript transformation, and their output.
2929

30+
---
31+
32+
Transformations are configured as either a single string of JS source code:
33+
34+
```yaml
35+
transformation: |
36+
function handler(input) {
37+
return {
38+
app_id: input.key,
39+
message: {
40+
eventType: input.event_type,
41+
payload: input.data
42+
}
43+
};
44+
}
45+
```
46+
In this situation, the input value will be pre-parsed as JSON.
47+
48+
You can also configure transformations using the verbose form, allowing control of the `format`:
49+
```yaml
50+
transformation:
51+
format: "json"
52+
src: |
53+
function handler(input) {
54+
return {
55+
app_id: input.key,
56+
message: {
57+
eventType: input.event_type,
58+
payload: input.data
59+
}
60+
};
61+
}
62+
```
63+
The `format` field can be set to either `string` (ie. the _raw payload_) or `json` (the default behavior).
64+
65+
For example, using the string `format` you can parse data out of the input using whatever method you like:
66+
67+
```yaml
68+
transformation:
69+
# Let's say the input is an XML message like:
70+
# `<msg appId="app_123" eventType="a.b"><payload>{"x": 123}</payload></msg>`
71+
72+
format: "string"
73+
src: |
74+
function handler(input) {
75+
let parser = new DOMParser();
76+
let doc = parser.parseFromString(input, "text/xml");
77+
let msg = doc.firstChild;
78+
let payload = JSON.parse(msg.getElementsByTagName("payload")[0].textContent)
79+
80+
return {
81+
app_id: msg.attributes.appId,
82+
message: {
83+
eventType: msg.attributes.eventType,
84+
payload,
85+
}
86+
};
87+
}
88+
```
89+
Transformations must define a function named `handler` which receives a single argument, the type of which is controlled
90+
by the configured `format` field.
91+
92+
Note that regardless of the `format`, the return type of `handler` must be an `Object`.
93+
94+
> N.b. at time of writing, `format: string` is currently unsupported for `senders` and `receivers` configured with
95+
> a `redis` input or output.
96+
97+
---
98+
3099
Currently the supported Sender inputs and Receiver outputs are the following
31100
messaging systems:
32101

bridge/generic-queue/src/gcp_pubsub.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,10 @@ impl<T: DeserializeOwned + Send + Serialize + Sync> Delivery<T> for GCPPubSubDel
186186
serde_json::from_slice(&self.message.message.data).map_err(Into::into)
187187
}
188188

189+
fn raw_payload(&self) -> Result<&str, QueueError> {
190+
std::str::from_utf8(&self.message.message.data).map_err(QueueError::generic)
191+
}
192+
189193
async fn ack(self) -> Result<(), QueueError> {
190194
self.message.ack().await.map_err(QueueError::generic)
191195
}

bridge/generic-queue/src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,14 +98,15 @@ pub trait TaskQueueBackend<T: Send + Sync> {
9898
pub trait Delivery<T: Send + Sync> {
9999
/// Returns a freshly deserialized instance of the contained payload.
100100
fn payload(&self) -> Result<T, QueueError>;
101+
fn raw_payload(&self) -> Result<&str, QueueError>;
101102

102103
/// ACKs this message, which, depending on what backend you are using, may be a NOOP, or it may
103-
/// explicity acknowledge the successful processing the message.
104+
/// explicitly acknowledge the successful processing the message.
104105
///
105106
/// When ACKed, consumers will not see this exact message again.
106107
async fn ack(self) -> Result<(), QueueError>;
107108
/// NACKs this message, which, depending on what backend you are using, may be a NOOP, it may
108-
/// explicitly mark a messaege as not acknowledged, or it may reinsert the message back into the
109+
/// explicitly mark a message as not acknowledged, or it may reinsert the message back into the
109110
/// end of the queue.
110111
///
111112
/// When NACKed, consumers of this queue will process the message again at some point.

bridge/generic-queue/src/memory_queue.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ impl<T: 'static + Clone + Debug + DeserializeOwned + Send + Serialize + Sync> De
5555
Ok(self.payload.clone())
5656
}
5757

58+
fn raw_payload(&self) -> Result<&str, QueueError> {
59+
// this backend is not used in bridge, and we will migrate to omniqueue anyway...
60+
unimplemented!("not supported");
61+
}
62+
5863
async fn ack(self) -> Result<(), QueueError> {
5964
Ok(())
6065
}

bridge/generic-queue/src/rabbitmq.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,9 @@ impl<T: DeserializeOwned + Send + Serialize + Sync> Delivery<T> for RabbitMqDeli
128128
serde_json::from_slice(&self.body).map_err(Into::into)
129129
}
130130

131+
fn raw_payload(&self) -> Result<&str, QueueError> {
132+
std::str::from_utf8(&self.body).map_err(QueueError::generic)
133+
}
131134
async fn ack(self) -> Result<(), QueueError> {
132135
self.acker
133136
.ack(BasicAckOptions { multiple: false })

bridge/generic-queue/src/redis.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ impl<
180180
fn payload(&self) -> Result<T, QueueError> {
181181
T::from_redis_stream_map(&self.body.map).map_err(Into::into)
182182
}
183+
// FIXME: the redis stream queue impl requires values to be encoded a certain way (eg. inside
184+
// a "payload" key in a map).
185+
// This means producers and consumers need to agree on the encoding.
186+
// This also means we can't just accept raw messages published by other systems (as is the
187+
// expectation with these raw payloads).
188+
// Leaving unsupported until we migrate to `omniqueue`.
189+
fn raw_payload(&self) -> Result<&str, QueueError> {
190+
unimplemented!("raw payload not supported with redis backend")
191+
}
183192

184193
async fn ack(self) -> Result<(), QueueError> {
185194
let mut conn = self.redis.get().await.map_err(QueueError::generic)?;

bridge/generic-queue/src/sqs.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,10 @@ impl<T: DeserializeOwned + Send + Serialize + Sync> Delivery<T> for SqsDelivery<
106106
serde_json::from_str(&self.body).map_err(Into::into)
107107
}
108108

109+
fn raw_payload(&self) -> Result<&str, QueueError> {
110+
Ok(&self.body)
111+
}
112+
109113
async fn ack(self) -> Result<(), QueueError> {
110114
if let Some(receipt_handle) = self.receipt_handle {
111115
self.ack_client

bridge/svix-bridge-plugin-queue/src/config.rs

Lines changed: 106 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,58 +7,78 @@ use crate::{
77
GCPPubSubConsumerPlugin, RabbitMqConsumerPlugin, RedisConsumerPlugin, SqsConsumerPlugin,
88
};
99
use serde::Deserialize;
10-
use svix_bridge_types::{ReceiverOutput, SenderInput, SenderOutputOpts};
10+
use svix_bridge_types::{
11+
ReceiverOutput, SenderInput, SenderOutputOpts, TransformationConfig, TransformerInputFormat,
12+
};
1113

1214
#[derive(Deserialize)]
1315
pub struct QueueConsumerConfig {
1416
pub name: String,
1517
pub input: SenderInputOpts,
1618
#[serde(default)]
17-
pub transformation: Option<String>,
19+
pub transformation: Option<TransformationConfig>,
1820
pub output: SenderOutputOpts,
1921
}
2022

2123
impl QueueConsumerConfig {
22-
pub fn into_sender_input(self) -> Box<dyn SenderInput> {
24+
pub fn into_sender_input(self) -> Result<Box<dyn SenderInput>, &'static str> {
2325
match self.input {
24-
SenderInputOpts::GCPPubSub(input) => Box::new(GCPPubSubConsumerPlugin::new(
25-
self.name,
26-
input,
27-
self.transformation,
28-
self.output,
29-
)),
30-
SenderInputOpts::RabbitMQ(input) => Box::new(RabbitMqConsumerPlugin::new(
26+
SenderInputOpts::GCPPubSub(input) => Ok(Box::new(GCPPubSubConsumerPlugin::new(
3127
self.name,
3228
input,
3329
self.transformation,
3430
self.output,
35-
)),
36-
SenderInputOpts::Redis(input) => Box::new(RedisConsumerPlugin::new(
31+
))),
32+
SenderInputOpts::RabbitMQ(input) => Ok(Box::new(RabbitMqConsumerPlugin::new(
3733
self.name,
3834
input,
3935
self.transformation,
4036
self.output,
41-
)),
42-
SenderInputOpts::SQS(input) => Box::new(SqsConsumerPlugin::new(
37+
))),
38+
SenderInputOpts::Redis(input) => {
39+
if let Some(xform) = &self.transformation {
40+
if xform.format() != TransformerInputFormat::Json {
41+
return Err("redis only supports json formatted transformations");
42+
}
43+
}
44+
Ok(Box::new(RedisConsumerPlugin::new(
45+
self.name,
46+
input,
47+
self.transformation,
48+
self.output,
49+
)))
50+
}
51+
SenderInputOpts::SQS(input) => Ok(Box::new(SqsConsumerPlugin::new(
4352
self.name,
4453
input,
4554
self.transformation,
4655
self.output,
47-
)),
56+
))),
4857
}
4958
}
5059
}
5160

5261
pub async fn into_receiver_output(
5362
name: String,
5463
opts: ReceiverOutputOpts,
64+
// Annoying to have to pass this, but certain backends (redis) only work with certain transformations (json).
65+
transformation: &Option<TransformationConfig>,
5566
) -> Result<Box<dyn ReceiverOutput>, crate::Error> {
5667
let forwarder = match opts {
5768
ReceiverOutputOpts::GCPPubSub(opts) => {
5869
QueueForwarder::from_gcp_pupsub_cfg(name, opts).await?
5970
}
6071
ReceiverOutputOpts::RabbitMQ(opts) => QueueForwarder::from_rabbitmq_cfg(name, opts).await?,
61-
ReceiverOutputOpts::Redis(opts) => QueueForwarder::from_redis_cfg(name, opts).await?,
72+
ReceiverOutputOpts::Redis(opts) => {
73+
if let Some(t) = transformation {
74+
if t.format() != TransformerInputFormat::Json {
75+
return Err(crate::Error::Generic(
76+
"redis only supports json formatted transformations".to_string(),
77+
));
78+
}
79+
}
80+
QueueForwarder::from_redis_cfg(name, opts).await?
81+
}
6282
ReceiverOutputOpts::SQS(opts) => QueueForwarder::from_sqs_cfg(name, opts).await?,
6383
};
6484
Ok(Box::new(forwarder))
@@ -84,3 +104,73 @@ pub enum ReceiverOutputOpts {
84104
Redis(RedisOutputOpts),
85105
SQS(SqsOutputOpts),
86106
}
107+
108+
#[cfg(test)]
109+
mod tests {
110+
use super::into_receiver_output;
111+
use super::QueueConsumerConfig;
112+
use crate::config::{ReceiverOutputOpts, SenderInputOpts};
113+
use crate::redis::{RedisInputOpts, RedisOutputOpts};
114+
use svix_bridge_types::{
115+
SenderOutputOpts, SvixSenderOutputOpts, TransformationConfig, TransformerInputFormat,
116+
};
117+
118+
// FIXME: can't support raw payload access for redis because it requires JSON internally.
119+
// Revisit after `omniqueue` adoption.
120+
#[test]
121+
fn redis_sender_with_string_transformation_is_err() {
122+
let cfg = QueueConsumerConfig {
123+
name: "redis-with-string-transformation".to_string(),
124+
input: SenderInputOpts::Redis(RedisInputOpts {
125+
dsn: "".to_string(),
126+
max_connections: 0,
127+
reinsert_on_nack: false,
128+
queue_key: "".to_string(),
129+
consumer_group: "".to_string(),
130+
consumer_name: "".to_string(),
131+
}),
132+
transformation: Some(TransformationConfig::Explicit {
133+
format: TransformerInputFormat::String,
134+
src: String::new(),
135+
}),
136+
output: SenderOutputOpts::Svix(SvixSenderOutputOpts {
137+
token: "".to_string(),
138+
options: None,
139+
}),
140+
};
141+
142+
assert_eq!(
143+
cfg.into_sender_input()
144+
.err()
145+
.expect("invalid config didn't result in error"),
146+
"redis only supports json formatted transformations"
147+
)
148+
}
149+
150+
// FIXME: can't support raw payload access for redis because it requires JSON internally.
151+
// Revisit after `omniqueue` adoption.
152+
#[tokio::test]
153+
async fn test_redis_receiver_string_transform_is_err() {
154+
let redis_out = ReceiverOutputOpts::Redis(RedisOutputOpts {
155+
dsn: "".to_string(),
156+
max_connections: 0,
157+
queue_key: "".to_string(),
158+
});
159+
160+
// Explicit String fails
161+
let res = into_receiver_output(
162+
"".to_string(),
163+
redis_out,
164+
&Some(TransformationConfig::Explicit {
165+
src: String::new(),
166+
format: TransformerInputFormat::String,
167+
}),
168+
)
169+
.await;
170+
assert!(matches!(
171+
res.err()
172+
.expect("invalid config didn't result in error"),
173+
crate::error::Error::Generic(msg) if msg == "redis only supports json formatted transformations"
174+
));
175+
}
176+
}

bridge/svix-bridge-plugin-queue/src/gcp_pubsub/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ use generic_queue::TaskQueueBackend;
66
use serde::Deserialize;
77
use std::path::PathBuf;
88
use svix_bridge_types::{
9-
async_trait, svix::api::Svix, JsObject, SenderInput, SenderOutputOpts, TransformerTx,
9+
async_trait, svix::api::Svix, JsObject, SenderInput, SenderOutputOpts, TransformationConfig,
10+
TransformerTx,
1011
};
1112

1213
#[derive(Debug, Default, Deserialize)]
@@ -20,14 +21,14 @@ pub struct GCPPubSubConsumerPlugin {
2021
input_options: GCPPubSubInputOpts,
2122
svix_client: Svix,
2223
transformer_tx: Option<TransformerTx>,
23-
transformation: Option<String>,
24+
transformation: Option<TransformationConfig>,
2425
}
2526

2627
impl GCPPubSubConsumerPlugin {
2728
pub fn new(
2829
name: String,
2930
input: GCPPubSubInputOpts,
30-
transformation: Option<String>,
31+
transformation: Option<TransformationConfig>,
3132
output: SenderOutputOpts,
3233
) -> Self {
3334
Self {
@@ -58,7 +59,7 @@ impl Consumer for GCPPubSubConsumerPlugin {
5859
&self.transformer_tx
5960
}
6061

61-
fn transformation(&self) -> &Option<String> {
62+
fn transformation(&self) -> &Option<TransformationConfig> {
6263
&self.transformation
6364
}
6465

0 commit comments

Comments
 (0)