Skip to content

Commit 31d4b46

Browse files
authored
fix(ksql-driver): Kafka, list of brokers (#9009)
1 parent 7ccd607 commit 31d4b46

File tree

2 files changed

+11
-2
lines changed

2 files changed

+11
-2
lines changed

packages/cubejs-ksql-driver/src/KsqlDriver.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,9 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
131131
if (this.config.kafkaHost) {
132132
this.kafkaClient = new Kafka({
133133
clientId: 'Cube',
134-
brokers: [this.config.kafkaHost],
134+
brokers: this.config.kafkaHost
135+
.split(',')
136+
.map(h => h.trim()),
135137
// authenticationTimeout: 10000,
136138
// reauthenticationThreshold: 10000,
137139
ssl: this.config.kafkaUseSsl,

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,13 @@ impl StreamingSource for KafkaStreamingSource {
306306
let unique_key_columns = self.unique_key_columns.clone();
307307
let seq_column_index_to_move = self.seq_column_index;
308308
let traffic_sender = TrafficSender::new(self.trace_obj.clone());
309+
let hosts = self
310+
.host
311+
.clone()
312+
.split(",")
313+
.filter(|s| !s.is_empty())
314+
.map(|s| s.trim().to_string())
315+
.collect();
309316
let stream = self
310317
.kafka_client
311318
.create_message_stream(
@@ -321,7 +328,7 @@ impl StreamingSource for KafkaStreamingSource {
321328
})
322329
.unwrap_or(Offset::End),
323330
),
324-
vec![self.host.clone()],
331+
hosts,
325332
&self.user,
326333
&self.password,
327334
self.use_ssl,

0 commit comments

Comments
 (0)