Skip to content

Commit 7a3b53b

Browse files
committed
integrate bridge js transformations with webhook-receiver plugin (#933)
Updates the webhook-receiver plugin to accept a `TransformerTx` so it can leverage the JS Runtime managed by bridge. This is not the best diff, sadly. The original webhook-ingester made some design decisions that made this a little awkward. Specifically, the request would accept a forwarder on its `forward` method which took responsibility for payload handling. When transformations are optionally happening, the payload either needs to transform as a part of the `forward` method, or (my preference) the payload handling needs to be moved up to the handler level. I went for the latter - The "forward" method now hangs off the forwarder and receives a payload. This means the handler can attempt the transformation before forwarding, which keeps the go/no-go logic all within the handler body. It also avoids having to somehow pass app state down into the forwarder so it could perform the transformation itself. Unfortunately, the go/no-go logic in the handler is also sort of a mess. In order to tame the additional mess contributed by the transformation, that chunk of the work has been extracted into a helper function. --- Ideally, we'd have some actual unit tests around the HTTP handler code, and the bits invoked by it, but since we don't yet have anything setup it'd be a bigger lift than I care to tackle right now. Instead, manual testing was done, noted in the comments below.
1 parent fabe3d9 commit 7a3b53b

File tree

4 files changed

+112
-34
lines changed

4 files changed

+112
-34
lines changed

webhook-bridge/svix-webhook-bridge-plugin-webhook-receiver/src/config.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ pub struct IntegrationConfig {
1313
pub name: IntegrationId,
1414
pub verification: VerificationScheme,
1515
pub destination: ForwardDestination,
16+
#[serde(default)]
17+
pub transformation: Option<String>,
1618
}
1719

1820
/// The [`VerificationScheme`] is an enum with variant for every method for verifying a webhook's

webhook-bridge/svix-webhook-bridge-plugin-webhook-receiver/src/forwarding.rs

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use crate::config::{GCPPubSubOutputOpts, RabbitMqOutputOpts, RedisOutputOpts, SqsOutputOpts};
2-
use crate::types::{SerializablePayload, SerializableRequest, Validated};
32
use anyhow::Result;
43
use axum::async_trait;
54
use enum_dispatch::enum_dispatch;
@@ -9,11 +8,12 @@ use generic_queue::redis::{RedisConfig, RedisQueueBackend};
98
use generic_queue::sqs::{SqsConfig, SqsQueueBackend};
109
use generic_queue::{TaskQueueBackend, TaskQueueSend};
1110
use std::sync::Arc;
11+
use svix_webhook_bridge_types::JsObject;
1212

1313
#[async_trait]
1414
#[enum_dispatch]
1515
pub trait ForwardingMethod {
16-
async fn forward(&self, req: SerializableRequest<Validated>) -> Result<http::StatusCode>;
16+
async fn forward(&self, payload: JsObject) -> Result<http::StatusCode>;
1717
}
1818

1919
#[derive(Clone)]
@@ -101,13 +101,8 @@ impl std::fmt::Debug for GenericQueueForwarder {
101101
}
102102
#[async_trait]
103103
impl ForwardingMethod for GenericQueueForwarder {
104-
async fn forward(&self, req: SerializableRequest<Validated>) -> Result<http::StatusCode> {
105-
let payload = match req.payload() {
106-
SerializablePayload::Standard(data) => serde_json::from_slice(data)?,
107-
SerializablePayload::StringSerializable(s) => serde_json::from_str(s)?,
108-
};
109-
110-
self.sender.send(payload).await?;
104+
async fn forward(&self, payload: JsObject) -> Result<http::StatusCode> {
105+
self.sender.send(payload.into()).await?;
111106
Ok(http::StatusCode::OK)
112107
}
113108
}

webhook-bridge/svix-webhook-bridge-plugin-webhook-receiver/src/lib.rs

Lines changed: 82 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,12 @@ use axum::{
44
routing::post,
55
Router,
66
};
7+
use forwarding::ForwardingMethod;
78
use serde::Deserialize;
89
use std::net::SocketAddr;
9-
use svix_webhook_bridge_types::{async_trait, Plugin};
10+
use svix_webhook_bridge_types::{
11+
async_trait, JsObject, JsReturn, Plugin, TransformerJob, TransformerTx,
12+
};
1013
use tracing::instrument;
1114
use types::{IntegrationId, IntegrationState, InternalState, SerializableRequest, Unvalidated};
1215

@@ -24,14 +27,18 @@ pub struct WebhookReceiverPluginConfig {
2427
pub routes: Vec<IntegrationConfig>,
2528
}
2629

27-
#[derive(Clone, Debug, Deserialize, PartialEq)]
30+
#[derive(Clone, Debug)]
2831
pub struct WebhookReceiverPlugin {
2932
cfg: WebhookReceiverPluginConfig,
33+
transformer_tx: Option<TransformerTx>,
3034
}
3135

3236
impl WebhookReceiverPlugin {
3337
pub fn new(cfg: WebhookReceiverPluginConfig) -> Self {
34-
Self { cfg }
38+
Self {
39+
cfg,
40+
transformer_tx: None,
41+
}
3542
}
3643
}
3744

@@ -45,11 +52,16 @@ impl TryInto<Box<dyn Plugin>> for WebhookReceiverPluginConfig {
4552

4653
#[async_trait]
4754
impl Plugin for WebhookReceiverPlugin {
55+
fn set_transformer(&mut self, tx: Option<TransformerTx>) {
56+
self.transformer_tx = tx;
57+
}
58+
4859
async fn run(&self) -> std::io::Result<()> {
4960
let addr = &self.cfg.listen_addr;
50-
let state = InternalState::from_routes(self.cfg.routes.as_slice())
51-
.await
52-
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
61+
let state =
62+
InternalState::from_routes(self.cfg.routes.as_slice(), self.transformer_tx.clone())
63+
.await
64+
.map_err(|e| std::io::Error::new(std::io::ErrorKind::Other, e))?;
5365

5466
let router = Router::new()
5567
.route(
@@ -81,18 +93,47 @@ impl Plugin for WebhookReceiverPlugin {
8193
)]
8294
async fn route(
8395
Path(integration_id): Path<IntegrationId>,
84-
State(InternalState(id_map)): State<InternalState>,
96+
State(InternalState {
97+
routes,
98+
transformer_tx,
99+
}): State<InternalState>,
85100
req: SerializableRequest<Unvalidated>,
86101
) -> http::StatusCode {
87102
if let Some(IntegrationState {
88103
verifier,
89104
forwarder,
90-
}) = id_map.get(&integration_id)
105+
transformation,
106+
}) = routes.get(&integration_id)
91107
{
92108
match req.validate(verifier).await {
93109
Ok(req) => {
110+
let payload = match req.payload().as_js_object() {
111+
Ok(payload) => match transformation.clone() {
112+
Some(script) => {
113+
match transform(payload, script, transformer_tx.clone()).await {
114+
Ok(transformed_payload) => transformed_payload,
115+
Err(c) => return c,
116+
}
117+
}
118+
// Keep the original payload as-is if there's no transformation specified.
119+
None => payload,
120+
},
121+
Err(e) => {
122+
tracing::error!("failed to parse payload as json object: {}", e);
123+
return http::StatusCode::BAD_REQUEST;
124+
}
125+
};
126+
94127
tracing::debug!("forwarding request");
95-
req.forward(forwarder).await
128+
// `forward` method was ambiguous. It looks like there are many traits that offer
129+
// this method.
130+
match ForwardingMethod::forward(forwarder, payload).await {
131+
Ok(c) => c,
132+
Err(e) => {
133+
tracing::error!("Error forwarding request: {}", e);
134+
http::StatusCode::INTERNAL_SERVER_ERROR
135+
}
136+
}
96137
}
97138
Err(code) => {
98139
tracing::warn!("validation failed: {code}");
@@ -104,3 +145,35 @@ async fn route(
104145
http::StatusCode::NOT_FOUND
105146
}
106147
}
148+
149+
/// Attempts to run the payload through a js transformation.
150+
async fn transform(
151+
payload: JsObject,
152+
script: String,
153+
tx: Option<TransformerTx>,
154+
) -> Result<JsObject, http::StatusCode> {
155+
let tx = tx.ok_or_else(|| {
156+
tracing::error!("transformations are not available");
157+
http::StatusCode::INTERNAL_SERVER_ERROR
158+
})?;
159+
160+
let (job, callback) = TransformerJob::new(script.clone(), payload);
161+
if let Err(e) = tx.send(job) {
162+
tracing::error!("transformations are not available: {}", e);
163+
return Err(http::StatusCode::INTERNAL_SERVER_ERROR);
164+
}
165+
166+
match callback.await {
167+
// This is the only "good" outcome giving a RHS value for the assignment.
168+
// All other match arms should bail with a non-2xx status.
169+
Ok(Ok(JsReturn::Object(obj))) => Ok(obj),
170+
Ok(Ok(JsReturn::Invalid)) => {
171+
tracing::error!("transformation produced invalid payload");
172+
Err(http::StatusCode::INTERNAL_SERVER_ERROR)
173+
}
174+
_ => {
175+
tracing::error!("transformation failed");
176+
Err(http::StatusCode::INTERNAL_SERVER_ERROR)
177+
}
178+
}
179+
}

webhook-bridge/svix-webhook-bridge-plugin-webhook-receiver/src/types.rs

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::config::IntegrationConfig;
22
use crate::forwarding::GenericQueueForwarder;
33
use crate::{
44
config::{ForwardDestination, VerificationScheme},
5-
forwarding::{Forwarder, ForwardingMethod},
5+
forwarding::Forwarder,
66
verification::{NoVerifier, SvixVerifier, VerificationMethod, Verifier},
77
};
88
use anyhow::Result;
@@ -15,13 +15,21 @@ use axum::{
1515
use http::{HeaderMap, HeaderValue, Request};
1616
use serde::{Deserialize, Serialize};
1717
use std::{collections::HashMap, marker::PhantomData, sync::Arc};
18+
use svix_webhook_bridge_types::{JsObject, TransformerTx};
1819

1920
#[derive(Clone, Debug)]
2021
/// The [`InternalState`] is passed to the Axum route and is used to map the "IntegrationId" in the
2122
/// URL to the configured [`Verifier`] and [`Forwarder`] variants.
22-
pub struct InternalState(pub HashMap<IntegrationId, IntegrationState>);
23+
pub struct InternalState {
24+
pub routes: HashMap<IntegrationId, IntegrationState>,
25+
pub transformer_tx: Option<TransformerTx>,
26+
}
27+
2328
impl InternalState {
24-
pub async fn from_routes(routes: &[IntegrationConfig]) -> Result<Self> {
29+
pub async fn from_routes(
30+
routes: &[IntegrationConfig],
31+
transformer_tx: Option<TransformerTx>,
32+
) -> Result<Self> {
2533
let mut state_map = HashMap::new();
2634

2735
for cfg in routes {
@@ -57,11 +65,15 @@ impl InternalState {
5765
IntegrationState {
5866
verifier,
5967
forwarder,
68+
transformation: cfg.transformation.clone(),
6069
},
6170
);
6271
}
6372

64-
Ok(InternalState(state_map))
73+
Ok(InternalState {
74+
routes: state_map,
75+
transformer_tx,
76+
})
6577
}
6678
}
6779

@@ -99,6 +111,7 @@ impl AsRef<str> for IntegrationId {
99111
pub struct IntegrationState {
100112
pub verifier: Verifier,
101113
pub forwarder: Forwarder,
114+
pub transformation: Option<String>,
102115
}
103116

104117
/// Any arbitrary HTTP request which is not a webhook dispatched by Svix may also have arbitrary
@@ -282,18 +295,6 @@ impl SerializableRequest<Unvalidated> {
282295
}
283296
}
284297

285-
impl SerializableRequest<Validated> {
286-
pub async fn forward<F: ForwardingMethod>(self, f: &F) -> http::StatusCode {
287-
match f.forward(self).await {
288-
Ok(c) => c,
289-
Err(e) => {
290-
tracing::error!("Error forwarding request: {}", e);
291-
http::StatusCode::INTERNAL_SERVER_ERROR
292-
}
293-
}
294-
}
295-
}
296-
297298
#[derive(Clone, Debug)]
298299
pub enum SerializableHeaderMap {
299300
Standard(HeaderMap),
@@ -385,4 +386,11 @@ impl SerializablePayload {
385386
Self::StringSerializable(s) => Ok(Self::StringSerializable(s)),
386387
}
387388
}
389+
390+
pub fn as_js_object(&self) -> Result<JsObject> {
391+
Ok(match self {
392+
Self::Standard(v) => serde_json::from_slice(v)?,
393+
Self::StringSerializable(s) => serde_json::from_str(s)?,
394+
})
395+
}
388396
}

0 commit comments

Comments
 (0)