Skip to content

Commit 487c8c7

Browse files
authored
Bridge: receivers now publish to queues with payload field (#982)
This diff is a little more broad than just "put the payload in a field." It also tries to formalize the requirements of each output case, both Senders and Receivers. For Senders, we need a certain shape useful for building a call to Create Message. For Receivers, outputs expect to get a JSON object with a `payload` field on it, representing the data to publish to the queue or what have you. These boundary types are now both in the `-types` crate, and used to ensure both as-is and transformed data conforms where needed. Existing tests caught the changes on the receiver side (which is the main thing for this diff). Additionally, for Senders, I camelCased the `appId` field to make the create message JSON cased more consistently. This was a remark that came up during review of an up-coming blog post.
2 parents 8101917 + 8c334e7 commit 487c8c7

File tree

13 files changed

+169
-127
lines changed

13 files changed

+169
-127
lines changed

bridge/README.md

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ Transformations are configured as either a single string of JS source code:
3535
transformation: |
3636
function handler(input) {
3737
return {
38-
app_id: input.key,
38+
appId: input.key,
3939
message: {
4040
eventType: input.event_type,
4141
payload: input.data
@@ -52,7 +52,7 @@ transformation:
5252
src: |
5353
function handler(input) {
5454
return {
55-
app_id: input.key,
55+
appId: input.key,
5656
message: {
5757
eventType: input.event_type,
5858
payload: input.data
@@ -78,7 +78,7 @@ transformation:
7878
let payload = JSON.parse(msg.getElementsByTagName("payload")[0].textContent)
7979
8080
return {
81-
app_id: msg.attributes.appId,
81+
appId: msg.attributes.appId,
8282
message: {
8383
eventType: msg.attributes.eventType,
8484
payload,
@@ -91,7 +91,7 @@ by the configured `format` field.
9191

9292
Note that regardless of the `format`, the return type of `handler` must be an `Object`.
9393

94-
> N.b. at time of writing, `format: string` is currently unsupported for `senders` and `receivers` configured with
94+
> N.b. at time of writing, `format: string` is unsupported for `senders` and `receivers` configured with
9595
> a `redis` input or output.
9696

9797
---
@@ -137,25 +137,30 @@ Each sender and receiver can optionally specify a `transformation`.
137137
Transformations should define a function called `handler` that accepts an object and returns an object.
138138
139139
Senders should produce JSON following an expected shape:
140-
```
140+
```json
141141
{
142-
// This indicates which Svix application to send the message to
143-
"app_id": "app_XYZ",
144-
145-
// The `message` field has the same requirements as the standard `MessageIn`
146-
// used for Create Message API requests
142+
"appId": "app_XYZ",
147143
"message": {
148144
"eventType": "my.event",
149-
"payload": {"abc": 123}
145+
"payload": {"code": 123, "message": "something happened..."}
150146
}
151147
}
152148
```
153149

154-
> The comments in the above JSON are for illustrative purposes only ;)
155-
> That's not valid JSON! Sorry!
156-
157150
For detail on the `message` field, see: <https://api.svix.com/docs#tag/Message/operation/v1.message.create>
158151

152+
Receivers can accept arbitrary body data but their outputs require a JSON object with a `payload` field representing the
153+
message to publish.
154+
155+
```json
156+
{
157+
"payload": {"msg": "my cool message, published by svix-bridge!"}
158+
}
159+
```
160+
161+
By configuring a transformation, you should be able to consume a variety of `POST` bodies and
162+
produce a valid output.
163+
159164
See the example configs for how to configure each input and output in more detail:
160165
- [senders](./svix-bridge.example.senders.yaml)
161166
- [receivers](./svix-bridge.example.receivers.yaml)

bridge/svix-bridge-plugin-queue/src/lib.rs

Lines changed: 12 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,10 @@ use generic_queue::rabbitmq::{RabbitMqConsumer, RabbitMqDelivery};
33
use generic_queue::redis::{RedisStreamConsumer, RedisStreamDelivery, RedisStreamJsonSerde};
44
use generic_queue::sqs::{SqsDelivery, SqsQueueConsumer};
55
use generic_queue::{Delivery, QueueError, TaskQueueReceive};
6-
use serde::{Deserialize, Serialize};
76
use std::time::{Duration, Instant};
87
use svix_bridge_types::{
9-
async_trait,
10-
svix::api::{MessageIn, PostOptions as PostOptions_, Svix},
11-
JsObject, TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
12-
TransformerOutput, TransformerTx,
8+
async_trait, svix::api::Svix, CreateMessageRequest, JsObject, TransformationConfig,
9+
TransformerInput, TransformerInputFormat, TransformerJob, TransformerOutput, TransformerTx,
1310
};
1411
use tracing::instrument;
1512

@@ -244,7 +241,7 @@ trait Consumer {
244241
delivery.nack().await.map_err(Error::from)?;
245242
return Ok(());
246243
}
247-
Ok(x) => x,
244+
Ok(x) => serde_json::from_value(serde_json::Value::Object(x))?,
248245
}
249246
} else {
250247
// Parse as JSON when not using a transformation because Create Message requires JSON.
@@ -302,40 +299,18 @@ async fn run_inner(consumer: &(impl Consumer + Send + Sync)) -> std::io::Result<
302299
}
303300
}
304301

305-
#[derive(Clone, Default, Deserialize, Serialize)]
306-
pub struct PostOptions {
307-
idempotency_key: Option<String>,
308-
}
309-
310-
impl From<PostOptions> for PostOptions_ {
311-
fn from(value: PostOptions) -> Self {
312-
PostOptions_ {
313-
idempotency_key: value.idempotency_key,
314-
}
315-
}
316-
}
317-
318-
#[derive(Clone, Deserialize, Serialize)]
319-
pub struct CreateMessageRequest {
320-
pub app_id: String,
321-
pub message: MessageIn,
322-
#[serde(skip_serializing_if = "Option::is_none")]
323-
pub post_options: Option<PostOptions>,
324-
}
325-
326-
async fn create_svix_message(svix: &Svix, value: JsObject) -> std::io::Result<()> {
327-
let CreateMessageRequest {
302+
#[instrument(skip_all, level="error", fields(
303+
app_id = app_id,
304+
event_type = message.event_type
305+
))]
306+
async fn create_svix_message(
307+
svix: &Svix,
308+
CreateMessageRequest {
328309
app_id,
329310
message,
330311
post_options,
331-
}: CreateMessageRequest = serde_json::from_value(value.into())?;
332-
let span = tracing::error_span!(
333-
"create_svix_message",
334-
app_id = app_id,
335-
event_type = message.event_type
336-
);
337-
let _enter = span.enter();
338-
312+
}: CreateMessageRequest,
313+
) -> std::io::Result<()> {
339314
svix.message()
340315
.create(app_id, message, post_options.map(Into::into))
341316
.await

bridge/svix-bridge-plugin-queue/src/receiver_output/mod.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use generic_queue::redis::{RedisConfig, RedisQueueBackend};
99
use generic_queue::sqs::{SqsConfig, SqsQueueBackend};
1010
use generic_queue::{TaskQueueBackend, TaskQueueSend};
1111
use std::sync::Arc;
12-
use svix_bridge_types::{async_trait, JsObject, ReceiverOutput};
12+
use svix_bridge_types::{async_trait, ForwardRequest, ReceiverOutput};
1313

1414
type Result<T> = std::result::Result<T, Error>;
1515

@@ -120,11 +120,9 @@ impl ReceiverOutput for QueueForwarder {
120120
fn name(&self) -> &str {
121121
&self.name
122122
}
123-
async fn handle(&self, payload: JsObject) -> std::io::Result<()> {
123+
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()> {
124124
self.sender
125-
// FIXME(#5762): the payload to publish should be coming from a specific field in the
126-
// Object, not the whole thing.
127-
.send(serde_json::Value::Object(payload))
125+
.send(request.payload)
128126
.await
129127
.map_err(crate::Error::from)?;
130128
Ok(())

bridge/svix-bridge-plugin-queue/tests/gcp_pubsub_consumer.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,11 @@ use google_cloud_pubsub::topic::Topic;
1010
use std::time::Duration;
1111

1212
use serde_json::json;
13-
use svix_bridge_plugin_queue::{
14-
config::GCPPubSubInputOpts, CreateMessageRequest, GCPPubSubConsumerPlugin,
15-
};
13+
use svix_bridge_plugin_queue::{config::GCPPubSubInputOpts, GCPPubSubConsumerPlugin};
1614
use svix_bridge_types::{
17-
svix::api::MessageIn, SenderInput, SenderOutputOpts, SvixOptions, SvixSenderOutputOpts,
18-
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
19-
TransformerOutput,
15+
svix::api::MessageIn, CreateMessageRequest, SenderInput, SenderOutputOpts, SvixOptions,
16+
SvixSenderOutputOpts, TransformationConfig, TransformerInput, TransformerInputFormat,
17+
TransformerJob, TransformerOutput,
2018
};
2119
use wiremock::matchers::{body_partial_json, method};
2220
use wiremock::{Mock, MockServer, ResponseTemplate};
@@ -286,7 +284,7 @@ async fn test_consume_transformed_string_ok() {
286284
};
287285
// Build a create-message-compatible object, using the string input as a field in the payload.
288286
let out = json!({
289-
"app_id": "app_1234",
287+
"appId": "app_1234",
290288
"message": {
291289
"eventType": "testing.things",
292290
"payload": {
@@ -396,7 +394,7 @@ async fn test_missing_event_type_nack() {
396394
publish(
397395
&topic,
398396
&serde_json::to_string(&json!({
399-
"app_id": "app_1234",
397+
"appId": "app_1234",
400398
"message": {
401399
// No event type
402400
"payload": {

bridge/svix-bridge-plugin-queue/tests/rabbitmq_consumer.rs

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,11 @@ use generic_queue::rabbitmq::FieldTable;
66
use lapin::{options::QueueDeclareOptions, Channel, Connection, ConnectionProperties, Queue};
77
use serde_json::json;
88
use std::time::Duration;
9-
use svix_bridge_plugin_queue::{
10-
config::RabbitMqInputOpts, CreateMessageRequest, RabbitMqConsumerPlugin,
11-
};
9+
use svix_bridge_plugin_queue::{config::RabbitMqInputOpts, RabbitMqConsumerPlugin};
1210
use svix_bridge_types::{
13-
svix::api::MessageIn, SenderInput, SenderOutputOpts, SvixOptions, SvixSenderOutputOpts,
14-
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
15-
TransformerOutput,
11+
svix::api::MessageIn, CreateMessageRequest, SenderInput, SenderOutputOpts, SvixOptions,
12+
SvixSenderOutputOpts, TransformationConfig, TransformerInput, TransformerInputFormat,
13+
TransformerJob, TransformerOutput,
1614
};
1715
use wiremock::matchers::{body_partial_json, method};
1816
use wiremock::{Mock, MockServer, ResponseTemplate};
@@ -270,7 +268,7 @@ async fn test_consume_transformed_string_ok() {
270268
};
271269
// Build a create-message-compatible object, using the string input as a field in the payload.
272270
let out = json!({
273-
"app_id": "app_1234",
271+
"appId": "app_1234",
274272
"message": {
275273
"eventType": "testing.things",
276274
"payload": {
@@ -390,7 +388,7 @@ async fn test_missing_event_type_nack() {
390388
&channel,
391389
queue_name,
392390
&serde_json::to_vec(&json!({
393-
"app_id": "app_1234",
391+
"appId": "app_1234",
394392
"message": {
395393
// No event type
396394
"payload": {

bridge/svix-bridge-plugin-queue/tests/redis_stream_consumer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use std::time::Duration;
55

66
use redis::{AsyncCommands, Client};
77
use serde_json::json;
8-
use svix_bridge_plugin_queue::{config::RedisInputOpts, CreateMessageRequest, RedisConsumerPlugin};
8+
use svix_bridge_plugin_queue::{config::RedisInputOpts, RedisConsumerPlugin};
99
use svix_bridge_types::{
10-
svix::api::MessageIn, SenderInput, SenderOutputOpts, SvixOptions, SvixSenderOutputOpts,
11-
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
12-
TransformerOutput,
10+
svix::api::MessageIn, CreateMessageRequest, SenderInput, SenderOutputOpts, SvixOptions,
11+
SvixSenderOutputOpts, TransformationConfig, TransformerInput, TransformerInputFormat,
12+
TransformerJob, TransformerOutput,
1313
};
1414
use wiremock::matchers::{body_partial_json, method};
1515
use wiremock::{Mock, MockServer, ResponseTemplate};
@@ -243,7 +243,7 @@ async fn test_consume_transformed_string_ok() {
243243
};
244244
// Build a create-message-compatible object, using the string input as a field in the payload.
245245
let out = json!({
246-
"app_id": "app_1234",
246+
"appId": "app_1234",
247247
"message": {
248248
"eventType": "testing.things",
249249
"payload": {
@@ -353,7 +353,7 @@ async fn test_missing_event_type_nack() {
353353
&client,
354354
&key,
355355
&serde_json::to_string(&json!({
356-
"app_id": "app_1234",
356+
"appId": "app_1234",
357357
"message": {
358358
// No event type
359359
"payload": {

bridge/svix-bridge-plugin-queue/tests/sqs_consumer.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@ use std::time::Duration;
77

88
use aws_sdk_sqs::Client;
99
use serde_json::json;
10-
use svix_bridge_plugin_queue::{config::SqsInputOpts, CreateMessageRequest, SqsConsumerPlugin};
10+
use svix_bridge_plugin_queue::{config::SqsInputOpts, SqsConsumerPlugin};
1111
use svix_bridge_types::svix::api::MessageIn;
1212
use svix_bridge_types::{
13-
SenderInput, SenderOutputOpts, SvixOptions, SvixSenderOutputOpts, TransformationConfig,
14-
TransformerInput, TransformerInputFormat, TransformerJob, TransformerOutput,
13+
CreateMessageRequest, SenderInput, SenderOutputOpts, SvixOptions, SvixSenderOutputOpts,
14+
TransformationConfig, TransformerInput, TransformerInputFormat, TransformerJob,
15+
TransformerOutput,
1516
};
1617
use wiremock::matchers::{body_partial_json, method};
1718
use wiremock::{Mock, MockServer, ResponseTemplate};
@@ -261,7 +262,7 @@ async fn test_consume_transformed_string_ok() {
261262
};
262263
// Build a create-message-compatible object, using the string input as a field in the payload.
263264
let out = json!({
264-
"app_id": "app_1234",
265+
"appId": "app_1234",
265266
"message": {
266267
"eventType": "testing.things",
267268
"payload": {
@@ -385,7 +386,7 @@ async fn test_missing_event_type_nack() {
385386
&client,
386387
&queue_url,
387388
&serde_json::to_string(&json!({
388-
"app_id": "app_1234",
389+
"appId": "app_1234",
389390
"message": {
390391
// No event type
391392
"payload": {

bridge/svix-bridge-types/src/lib.rs

Lines changed: 42 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
pub use async_trait::async_trait;
22
use serde::{Deserialize, Serialize};
33
pub use svix;
4-
use svix::api::SvixOptions as _SvixOptions;
4+
use svix::api::{MessageIn, PostOptions as PostOptions_, SvixOptions as _SvixOptions};
55
use tokio::sync::mpsc;
66
use tokio::sync::oneshot;
77

@@ -106,11 +106,8 @@ pub struct TransformerJob {
106106

107107
pub enum TransformerOutput {
108108
/// A successfully transformed payload.
109-
// XXX: not sure if there's a cheaper way to deserialize the output while requiring an Object.
110-
// FIXME(#5762): We can define a fixed type here as the expected output for a transformation.
111-
// It think it'll always be json, but the `payload` field can be any json-encoded type.
112-
// That type, whatever it is, will be required as output regardless of if a transformation is
113-
// used or not, probably, replacing `JsObject` as the interchange value.
109+
// Both senders and receivers require a map type (Object) but have different requirements which
110+
// are best validated after the fact. For now, we validate only that we get a map type back.
114111
Object(JsObject),
115112
/// For cases where the JS script executes successfully but produces an unexpected output.
116113
Invalid,
@@ -149,7 +146,7 @@ pub trait SenderInput: Send {
149146
#[async_trait]
150147
pub trait ReceiverOutput: Send + Sync {
151148
fn name(&self) -> &str;
152-
async fn handle(&self, payload: JsObject) -> std::io::Result<()>;
149+
async fn handle(&self, request: ForwardRequest) -> std::io::Result<()>;
153150
}
154151

155152
#[derive(Deserialize, Debug, Clone, Default)]
@@ -214,3 +211,41 @@ pub struct SvixSenderOutputOpts {
214211
#[serde(default)]
215212
pub options: Option<SvixOptions>,
216213
}
214+
215+
#[derive(Clone, Default, Deserialize, Serialize)]
216+
pub struct PostOptions {
217+
idempotency_key: Option<String>,
218+
}
219+
220+
impl From<PostOptions> for PostOptions_ {
221+
fn from(value: PostOptions) -> Self {
222+
PostOptions_ {
223+
idempotency_key: value.idempotency_key,
224+
}
225+
}
226+
}
227+
228+
/// Senders convert messages into Create Message API calls so the JSON pulled out of message queues
229+
/// or produced by transformations need to conform to this shape.
230+
#[derive(Clone, Deserialize, Serialize)]
231+
#[serde(rename_all = "camelCase")]
232+
pub struct CreateMessageRequest {
233+
pub app_id: String,
234+
pub message: MessageIn,
235+
#[serde(skip_serializing_if = "Option::is_none")]
236+
pub post_options: Option<PostOptions>,
237+
}
238+
239+
/// Receivers convert HTTP bodies into messages forwarded to (currently only) message queues, etc.
240+
/// The `payload` field represents the message body given to the producer, and other fields may be
241+
/// added in the future allowing transformations to dynamically customize the producer behavior.
242+
#[derive(Clone, Deserialize, Serialize)]
243+
pub struct ForwardRequest {
244+
/// This is the payload that will be fed into a Receiver Output
245+
// XXX: right now I think any arbitrary json value can work, but individual outputs may have
246+
// more strict requirements.
247+
// The fact this is represented as a field on a json object demands at least that the value can
248+
// be represented in json.
249+
// FIXME: can we leverage RawValue here?
250+
pub payload: serde_json::Value,
251+
}

0 commit comments

Comments
 (0)