Skip to content

Commit f5a6d5b

Browse files
committed
implement kafka integration
1 parent acb26b9 commit f5a6d5b

File tree

6 files changed

+399
-1
lines changed

6 files changed

+399
-1
lines changed

Cargo.lock

Lines changed: 70 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ num_cpus = "1.15"
6666
once_cell = "1.17.1"
6767
prometheus = { version = "0.13", features = ["process"] }
6868
rand = "0.8.5"
69+
rdkafka = "0.36.2"
6970
regex = "1.7.3"
7071
relative-path = { version = "1.7", features = ["serde"] }
7172
reqwest = { version = "0.11.27", default-features = false, features = [

src/cli.rs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,14 @@ pub struct Cli {
119119
pub trino_auth: Option<String>,
120120
pub trino_schema: Option<String>,
121121
pub trino_catalog: Option<String>,
122+
123+
// Kafka specific env vars
124+
pub kafka_topic: Option<String>,
125+
pub kafka_host: Option<String>,
126+
pub kafka_group: Option<String>,
127+
pub kafka_client_id: Option<String>,
128+
pub kafka_security_protocol: Option<String>,
129+
pub kafka_partitions: Option<String>,
122130
}
123131

124132
impl Cli {
@@ -164,6 +172,14 @@ impl Cli {
164172
pub const TRINO_AUTHORIZATION: &'static str = "p-trino-authorization";
165173
pub const TRINO_SCHEMA: &'static str = "p-trino-schema";
166174

175+
// Kafka specific env vars
176+
pub const KAFKA_TOPIC: &'static str = "kafka-topic";
177+
pub const KAFKA_HOST: &'static str = "kafka-host";
178+
pub const KAFKA_GROUP: &'static str = "kafka-group";
179+
pub const KAFKA_CLIENT_ID: &'static str = "kafka-client-id";
180+
pub const KAFKA_SECURITY_PROTOCOL: &'static str = "kafka-security-protocol";
181+
pub const KAFKA_PARTITIONS: &'static str = "kafka-partitions";
182+
167183
pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf {
168184
self.local_staging_path.join(stream_name)
169185
}
@@ -177,6 +193,48 @@ impl Cli {
177193

178194
pub fn create_cli_command_with_clap(name: &'static str) -> Command {
179195
Command::new(name).next_line_help(false)
196+
.arg(
197+
Arg::new(Self::KAFKA_TOPIC)
198+
.long(Self::KAFKA_TOPIC)
199+
.env("P_KAFKA_TOPIC")
200+
.value_name("STRING")
201+
.help("Kafka topic to subscribe to"),
202+
)
203+
.arg(
204+
Arg::new(Self::KAFKA_HOST)
205+
.long(Self::KAFKA_HOST)
206+
.env("P_KAFKA_HOST")
207+
.value_name("STRING")
208+
.help("Address and port for Kafka server"),
209+
)
210+
.arg(
211+
Arg::new(Self::KAFKA_GROUP)
212+
.long(Self::KAFKA_GROUP)
213+
.env("P_KAFKA_GROUP")
214+
.value_name("STRING")
215+
.help("Kafka group"),
216+
)
217+
.arg(
218+
Arg::new(Self::KAFKA_CLIENT_ID)
219+
.long(Self::KAFKA_CLIENT_ID)
220+
.env("P_KAFKA_CLIENT_ID")
221+
.value_name("STRING")
222+
.help("Kafka client id"),
223+
)
224+
.arg(
225+
Arg::new(Self::KAFKA_SECURITY_PROTOCOL)
226+
.long(Self::KAFKA_SECURITY_PROTOCOL)
227+
.env("P_KAFKA_SECURITY_PROTOCOL")
228+
.value_name("STRING")
229+
.help("Kafka security protocol (ssl)"),
230+
)
231+
.arg(
232+
Arg::new(Self::KAFKA_PARTITIONS)
233+
.long(Self::KAFKA_PARTITIONS)
234+
.env("P_KAFKA_PARTITIONS")
235+
.value_name("STRING")
236+
.help("Kafka partitions"),
237+
)
180238
.arg(
181239
Arg::new(Self::TRINO_ENDPOINT)
182240
.long(Self::TRINO_ENDPOINT)
@@ -520,6 +578,13 @@ impl FromArgMatches for Cli {
520578
self.trino_schema = m.get_one::<String>(Self::TRINO_SCHEMA).cloned();
521579
self.trino_username = m.get_one::<String>(Self::TRINO_USER_NAME).cloned();
522580

581+
self.kafka_topic = m.get_one::<String>(Self::KAFKA_TOPIC).cloned();
582+
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
583+
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
584+
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
585+
self.kafka_security_protocol = m.get_one::<String>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
586+
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
587+
523588
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();
524589
self.query_cache_path = m.get_one::<PathBuf>(Self::QUERY_CACHE).cloned();
525590
self.tls_cert_path = m.get_one::<PathBuf>(Self::TLS_CERT).cloned();

0 commit comments

Comments
 (0)