Skip to content

Commit 197cb5e

Browse files
committed
refactored: used serde
1 parent 05b9965 commit 197cb5e

File tree

2 files changed

+10
-6
lines changed

2 files changed

+10
-6
lines changed

src/cli.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::path::PathBuf;
2222
use url::Url;
2323

2424
use crate::{
25+
kafka::SslProtocol,
2526
oidc::{self, OpenidConfig},
2627
option::{validation, Compression, Mode},
2728
};
@@ -125,7 +126,7 @@ pub struct Cli {
125126
pub kafka_host: Option<String>,
126127
pub kafka_group: Option<String>,
127128
pub kafka_client_id: Option<String>,
128-
pub kafka_security_protocol: Option<String>,
129+
pub kafka_security_protocol: Option<SslProtocol>,
129130
pub kafka_partitions: Option<String>,
130131
}
131132

@@ -582,7 +583,9 @@ impl FromArgMatches for Cli {
582583
self.kafka_host = m.get_one::<String>(Self::KAFKA_HOST).cloned();
583584
self.kafka_group = m.get_one::<String>(Self::KAFKA_GROUP).cloned();
584585
self.kafka_client_id = m.get_one::<String>(Self::KAFKA_CLIENT_ID).cloned();
585-
self.kafka_security_protocol = m.get_one::<String>(Self::KAFKA_SECURITY_PROTOCOL).cloned();
586+
self.kafka_security_protocol = m
587+
.get_one::<SslProtocol>(Self::KAFKA_SECURITY_PROTOCOL)
588+
.cloned();
586589
self.kafka_partitions = m.get_one::<String>(Self::KAFKA_PARTITIONS).cloned();
587590

588591
self.local_cache_path = m.get_one::<PathBuf>(Self::CACHE).cloned();

src/kafka.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use rdkafka::error::{KafkaError as NativeKafkaError, RDKafkaError};
88
use rdkafka::message::BorrowedMessage;
99
use rdkafka::util::Timeout;
1010
use rdkafka::{Message, TopicPartitionList};
11+
use serde::{Deserialize, Serialize};
1112
use std::fmt::Display;
1213
use std::num::ParseIntError;
1314
use std::sync::Arc;
@@ -26,7 +27,8 @@ use crate::{
2627
storage::StreamType,
2728
};
2829

29-
enum SslProtocol {
30+
#[derive(Debug, Deserialize, Serialize, Clone, Copy)]
31+
pub enum SslProtocol {
3032
Plaintext,
3133
Ssl,
3234
SaslPlaintext,
@@ -150,9 +152,8 @@ fn setup_consumer() -> Result<(StreamConsumer, String), KafkaError> {
150152
// conf.set("api.version.request", val.to_string());
151153
// }
152154

153-
if let Some(val) = CONFIG.parseable.kafka_security_protocol.as_ref() {
154-
let mapped: SslProtocol = val.parse()?;
155-
conf.set("security.protocol", mapped.to_string());
155+
if let Some(ssl_protocol) = CONFIG.parseable.kafka_security_protocol.as_ref() {
156+
conf.set("security.protocol", serde_json::to_string(&ssl_protocol)?);
156157
}
157158

158159
let consumer: StreamConsumer = conf.create()?;

0 commit comments

Comments
 (0)