Skip to content

Commit 276b02d

Browse files
committed
Added initial kafka support.
1 parent d4638b6 commit 276b02d

File tree

22 files changed

+589
-33
lines changed

22 files changed

+589
-33
lines changed

Cargo.lock

Lines changed: 99 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ chrono = { version = "^0.4.38", features = ["serde"] }
5151
fern = { version = "^0.7.0", features = ["colored"] }
5252
reqwest = { version = "^0.12.9", features = ["json"] }
5353
derive_more = { version = "^1.0.0", features = ["display", "from", "from_str"], default-features = false }
54+
rdkafka = "0.37.0"
5455

5556
[target.'cfg(not(target_env = "msvc"))'.dependencies.tikv-jemallocator]
5657
version = "0.6"

docker-compose.yaml

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,4 +42,28 @@ services:
4242
MINIO_DEFAULT_BUCKETS: "rustus"
4343
MINIO_ROOT_USER: "rustus-test"
4444
MINIO_ROOT_PASSWORD: "rustus-test"
45-
45+
kafka:
46+
image: bitnami/kafka:3.9-debian-12
47+
healthcheck:
48+
test:
49+
- CMD
50+
- kafka-topics.sh
51+
- --list
52+
- --bootstrap-server
53+
- localhost:9092
54+
interval: 1s
55+
timeout: 3s
56+
retries: 30
57+
ports:
58+
- 9094:9094
59+
environment:
60+
KAFKA_CFG_NODE_ID: "0"
61+
KAFKA_KRAFT_CLUSTER_ID: "0"
62+
KAFKA_CFG_PROCESS_ROLES: "controller,broker"
63+
KAFKA_CFG_LISTENERS: "PLAINTEXT://:9092,CONTROLLER://:9093,EXTERNAL://:9094"
64+
KAFKA_CFG_ADVERTISED_LISTENERS: "PLAINTEXT://kafka:9092,EXTERNAL://localhost:9094"
65+
KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,EXTERNAL:PLAINTEXT,PLAINTEXT:PLAINTEXT"
66+
KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: "0@kafka:9093"
67+
KAFKA_CFG_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
68+
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: "true"
69+
KAFKA_CFG_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"

src/config.rs

Lines changed: 89 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use clap::Parser;
55
use crate::{
66
data_storage::AvailableDataStorages,
77
info_storage::AvailableInfoStorages,
8-
notifiers::{Format, Hook},
8+
notifiers::{impls::kafka_notifier::ExtraKafkaOptions, Format, Hook},
99
protocol::extensions::Extensions,
1010
};
1111

@@ -284,6 +284,91 @@ pub struct AMQPHooksOptions {
284284
pub auto_delete: bool,
285285
}
286286

287+
#[derive(Parser, Debug, Clone)]
288+
pub struct KafkaHookOptions {
289+
/// Kafka urls.
290+
/// List of brokers to connect to in the format `host:port`.
291+
/// If you have multiple brokers, separate them with commas.
292+
/// Corresponds to `bootstrap.servers` in Kafka configuration.
293+
#[arg(
294+
name = "hooks-kafka-urls",
295+
long,
296+
env = "RUSTUS_HOOKS_KAFKA_URLS",
297+
use_value_delimiter = true
298+
)]
299+
pub urls: String,
300+
/// Kafka producer client.id.
301+
#[arg(
302+
name = "hooks-kafka-client-id",
303+
long,
304+
env = "RUSTUS_HOOKS_KAFKA_CLIENT_ID"
305+
)]
306+
pub client_id: Option<String>,
307+
/// Kafka topic. If specified, all events will be sent to this topic.
308+
#[arg(name = "hooks-kafka-topic", long, env = "RUSTUS_HOOKS_KAFKA_TOPIC")]
309+
pub topic: Option<String>,
310+
/// Kafka topic prefix. In case if specifeid, prefix will be added to all topics
311+
/// and all events will be sent to different topics.
312+
#[arg(name = "hooks-kafka-prefix", long, env = "RUSTUS_HOOKS_KAFKA_PREFIX")]
313+
pub prefix: Option<String>,
314+
/// Kafka required acks.
315+
/// This parameter is used to configure how many replicas
316+
/// must acknowledge the message.
317+
///
318+
/// Corresponds to `request.required.acks` in Kafka configuration.
319+
/// Possible values are:
320+
/// * -1 - all replicas must acknowledge the message;
321+
/// * 0 - no replicas must acknowledge the message;
322+
/// * ...1000 - number of replicas that must acknowledge the message.
323+
#[arg(
324+
name = "hooks-kafka-required-acks",
325+
long,
326+
env = "RUSTUS_HOOKS_KAFKA_REQUIRED_ACKS"
327+
)]
328+
pub required_acks: Option<String>,
329+
330+
/// Compression codec.
331+
/// This parameter is used to compress messages before sending them to Kafka.
332+
/// Possible values are:
333+
/// * none - no compression;
334+
/// * gzip - gzip compression;
335+
/// * snappy - snappy compression.
336+
/// * lz4 - lz4 compression.
337+
/// * zstd - zstd compression.
338+
/// Corresponds to `compression.codec` in Kafka configuration.
339+
#[arg(
340+
name = "hooks-kafka-compression",
341+
long,
342+
env = "RUSTUS_HOOKS_KAFKA_COMPRESSION"
343+
)]
344+
pub compression: Option<String>,
345+
346+
/// Kafka idle timeout in seconds.
347+
/// After this amount of time in seconds, the connection will be dropped.
348+
/// Corresponds to `connections.max.idle.ms` in Kafka configuration.
349+
#[arg(
350+
name = "hooks-kafka-idle-timeout",
351+
long,
352+
env = "RUSTUS_HOOKS_KAFKA_IDLE_TIMEOUT"
353+
)]
354+
pub idle_timeout: Option<u64>,
355+
356+
/// Kafka send timeout in seconds.
357+
/// After this amount of time in seconds, the message will be dropped.
358+
#[arg(
359+
name = "hooks-kafka-send-timeout",
360+
long,
361+
env = "RUSTUS_HOOKS_KAFKA_SEND_TIMEOUT"
362+
)]
363+
pub send_timeout: Option<u64>,
364+
365+
/// Extra options for Kafka.
366+
/// This parameter is used to pass additional options to Kafka.
367+
/// All options must be in the format `key=value`, separated by semicolon.
368+
/// Example: `key1=value1;key2=value2`.
369+
pub extra_kafka_opts: Option<ExtraKafkaOptions>,
370+
}
371+
287372
#[derive(Parser, Debug, Clone)]
288373
#[allow(clippy::struct_excessive_bools)]
289374
pub struct NotificationsOptions {
@@ -336,6 +421,9 @@ pub struct NotificationsOptions {
336421

337422
#[command(flatten)]
338423
pub amqp_hook_opts: AMQPHooksOptions,
424+
425+
#[command(flatten)]
426+
pub kafka_hook_opts: KafkaHookOptions,
339427
}
340428

341429
#[derive(Debug, Parser, Clone)]

src/data_storage/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use std::{fs::File, io::Read, path::PathBuf};
22

33
use base::DataStorage;
4+
use std::str::FromStr;
5+
use strum::IntoEnumIterator;
46

57
use crate::{config::RustusConf, file_info::FileInfo, from_str};
68

src/errors.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ pub enum RustusError {
4545
HookError(String),
4646
#[error("Unable to configure logging: {0}")]
4747
LogConfigError(#[from] log::SetLoggerError),
48+
#[error("Kafka extra options error: {0}")]
49+
KafkaExtraOptionsError(String),
4850
#[error("AMQP error: {0}")]
4951
AMQPError(#[from] lapin::Error),
5052
#[error("AMQP pooling error: {0}")]
@@ -73,6 +75,8 @@ pub enum RustusError {
7375
ParseIntError(#[from] std::num::ParseIntError),
7476
#[error("Can't convert int: {0}")]
7577
TryFromIntError(#[from] std::num::TryFromIntError),
78+
#[error("Kafka error: {0}")]
79+
KafkaError(#[from] rdkafka::error::KafkaError),
7680
}
7781

7882
/// This conversion allows us to use `RustusError` in the `main` function.

src/info_storage/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ pub mod base;
22
pub mod impls;
33

44
use derive_more::{Display, From};
5+
use std::str::FromStr;
6+
use strum::IntoEnumIterator;
57

68
use crate::{errors::RustusResult, from_str, RustusConf};
79

src/notifiers/base.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::errors::RustusResult;
1+
use crate::{errors::RustusResult, file_info::FileInfo};
22
use actix_web::http::header::HeaderMap;
33

44
use crate::notifiers::hooks::Hook;
@@ -9,6 +9,7 @@ pub trait Notifier {
99
&self,
1010
message: String,
1111
hook: Hook,
12+
file_info: &FileInfo,
1213
headers_map: &HeaderMap,
1314
) -> RustusResult<()>;
1415
}

src/notifiers/hooks.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@ use crate::from_str;
22
use derive_more::{Display, From};
33
use strum::EnumIter;
44

5+
use std::str::FromStr;
6+
use strum::IntoEnumIterator;
7+
58
/// Hooks for notifications.
69
#[derive(Copy, Clone, Debug, Display, From, EnumIter, Eq, PartialEq)]
710
pub enum Hook {

0 commit comments

Comments
 (0)