Skip to content

Commit d5218be

Browse files
committed
dev
1 parent 7ccd607 commit d5218be

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

packages/cubejs-ksql-driver/src/KsqlDriver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ export class KsqlDriver extends BaseDriver implements DriverInterface {
131131
if (this.config.kafkaHost) {
132132
this.kafkaClient = new Kafka({
133133
clientId: 'Cube',
134-
brokers: [this.config.kafkaHost],
134+
brokers: this.config.kafkaHost.split(','),
135135
// authenticationTimeout: 10000,
136136
// reauthenticationThreshold: 10000,
137137
ssl: this.config.kafkaUseSsl,

rust/cubestore/cubestore/src/streaming/kafka.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,13 @@ impl StreamingSource for KafkaStreamingSource {
306306
let unique_key_columns = self.unique_key_columns.clone();
307307
let seq_column_index_to_move = self.seq_column_index;
308308
let traffic_sender = TrafficSender::new(self.trace_obj.clone());
309+
let hosts = self
310+
.host
311+
.clone()
312+
.split(",")
313+
.filter(|s| !s.is_empty())
314+
.map(|s| s.to_string())
315+
.collect();
309316
let stream = self
310317
.kafka_client
311318
.create_message_stream(
@@ -321,7 +328,7 @@ impl StreamingSource for KafkaStreamingSource {
321328
})
322329
.unwrap_or(Offset::End),
323330
),
324-
vec![self.host.clone()],
331+
hosts,
325332
&self.user,
326333
&self.password,
327334
self.use_ssl,

0 commit comments

Comments
 (0)