Skip to content

Commit 598593e

Browse files
authored
bridge: Add Kafka as an input (#1333)
… that is, support converting Kafka messages into Svix API calls. Part of svix/monorepo-private#8508.
2 parents e45f14c + 762cb4a commit 598593e

File tree

14 files changed

+1069
-7
lines changed

14 files changed

+1069
-7
lines changed

bridge/Cargo.lock

Lines changed: 81 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

bridge/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ members = [
99
"svix-bridge-types",
1010
"svix-bridge",
1111
"svix-bridge-plugin-queue",
12+
"svix-bridge-plugin-kafka",
1213
]
1314

1415
[profile.dev.package]
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
[package]
2+
name = "svix-bridge-plugin-kafka"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
rdkafka = { version = "0.36.0", features = ["cmake-build", "ssl", "tracing"] }
8+
serde = { version = "1.0", features = ["derive"] }
9+
serde_json = "1.0.117"
10+
svix-bridge-types = { path = "../svix-bridge-types" }
11+
thiserror = "1.0.61"
12+
tokio = { version = "1.28.1", features = ["time"] }
13+
tracing = "0.1.40"
14+
15+
[dev-dependencies]
16+
ctor = "0.2.8"
17+
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
18+
wiremock = "0.5.18"
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
Copyright (c) 2024 Svix Inc.
2+
3+
Permission is hereby granted, free of charge, to any person obtaining
4+
a copy of this software and associated documentation files (the
5+
"Software"), to deal in the Software without restriction, including
6+
without limitation the rights to use, copy, modify, merge, publish,
7+
distribute, sublicense, and/or sell copies of the Software, and to
8+
permit persons to whom the Software is furnished to do so, subject to
9+
the following conditions:
10+
11+
The above copyright notice and this permission notice shall be
12+
included in all copies or substantial portions of the Software.
13+
14+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
15+
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
16+
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
17+
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
18+
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
19+
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
20+
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
use rdkafka::{consumer::StreamConsumer, error::KafkaResult, ClientConfig};
2+
use serde::Deserialize;
3+
use svix_bridge_types::{SenderInput, SenderOutputOpts, TransformationConfig};
4+
5+
use crate::{input::KafkaConsumer, Result};
6+
7+
#[derive(Clone, Deserialize)]
8+
pub struct KafkaInputOpts {
9+
/// Comma-separated list of addresses.
10+
///
11+
/// Example: `localhost:9094`
12+
#[serde(rename = "kafka_bootstrap_brokers")]
13+
pub bootstrap_brokers: String,
14+
15+
/// The consumer group ID, used to track the stream offset between restarts
16+
/// (due to host maintenance, upgrades, crashes, etc.).
17+
#[serde(rename = "kafka_group_id")]
18+
pub group_id: String,
19+
20+
/// The topic to listen to.
21+
#[serde(rename = "kafka_topic")]
22+
pub topic: String,
23+
24+
/// The value for 'security.protocol' in the kafka config.
25+
#[serde(flatten)]
26+
pub security_protocol: KafkaSecurityProtocol,
27+
28+
/// The 'debug' config value for rdkafka - enables more verbose logging
29+
/// for the selected 'contexts'
30+
#[serde(rename = "kafka_debug_contexts")]
31+
pub debug_contexts: Option<String>,
32+
}
33+
34+
impl KafkaInputOpts {
35+
pub(crate) fn create_consumer(self) -> KafkaResult<StreamConsumer> {
36+
let mut config = ClientConfig::new();
37+
config
38+
.set("group.id", self.group_id)
39+
.set("bootstrap.servers", self.bootstrap_brokers)
40+
// messages are committed manually after webhook delivery was successful.
41+
.set("enable.auto.commit", "false");
42+
43+
match self.security_protocol {
44+
KafkaSecurityProtocol::Plaintext => {
45+
config.set("security.protocol", "plaintext");
46+
}
47+
KafkaSecurityProtocol::Ssl => {
48+
config.set("security.protocol", "ssl");
49+
}
50+
KafkaSecurityProtocol::SaslSsl {
51+
sasl_username,
52+
sasl_password,
53+
} => {
54+
config
55+
.set("security.protocol", "sasl_ssl")
56+
.set("sasl.mechanisms", "SCRAM-SHA-512")
57+
.set("sasl.username", sasl_username)
58+
.set("sasl.password", sasl_password);
59+
}
60+
}
61+
62+
if let Some(debug_contexts) = self.debug_contexts {
63+
if !debug_contexts.is_empty() {
64+
config.set("debug", debug_contexts);
65+
}
66+
}
67+
68+
config.create()
69+
}
70+
}
71+
72+
#[derive(Clone, Debug, Deserialize)]
73+
#[serde(tag = "kafka_security_protocol", rename_all = "snake_case")]
74+
pub enum KafkaSecurityProtocol {
75+
Plaintext,
76+
Ssl,
77+
SaslSsl {
78+
#[serde(rename = "kafka_sasl_username")]
79+
sasl_username: String,
80+
#[serde(rename = "kafka_sasl_password")]
81+
sasl_password: String,
82+
},
83+
}
84+
85+
pub fn into_sender_input(
86+
name: String,
87+
opts: KafkaInputOpts,
88+
transformation: Option<TransformationConfig>,
89+
output: SenderOutputOpts,
90+
) -> Result<Box<dyn SenderInput>> {
91+
Ok(Box::new(KafkaConsumer::new(
92+
name,
93+
opts,
94+
transformation,
95+
output,
96+
)?))
97+
}
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
use std::str;
2+
3+
use rdkafka::error::KafkaError;
4+
use svix_bridge_types::svix::error::Error as SvixClientError;
5+
6+
#[derive(Debug, thiserror::Error)]
7+
pub enum Error {
8+
#[error("kafka error")]
9+
Kafka(#[from] KafkaError),
10+
11+
#[error("svix client error")]
12+
SvixClient(#[from] SvixClientError),
13+
14+
#[error("JSON deserialization failed")]
15+
Deserialization(#[source] serde_json::Error),
16+
17+
#[error("non-UTF8 payload")]
18+
NonUtf8Payload(#[source] str::Utf8Error),
19+
20+
#[error("kafka message is missing payload")]
21+
MissingPayload,
22+
23+
#[error("transformation error: {error}")]
24+
Transformation { error: String },
25+
}
26+
27+
impl Error {
28+
pub(crate) fn transformation(error: impl Into<String>) -> Self {
29+
Self::Transformation {
30+
error: error.into(),
31+
}
32+
}
33+
}
34+
35+
pub type Result<T, E = Error> = std::result::Result<T, E>;

0 commit comments

Comments
 (0)