diff --git a/packages/cubejs-ksql-driver/src/KsqlDriver.ts b/packages/cubejs-ksql-driver/src/KsqlDriver.ts index 651d29c883ec3..98f9161056cbc 100644 --- a/packages/cubejs-ksql-driver/src/KsqlDriver.ts +++ b/packages/cubejs-ksql-driver/src/KsqlDriver.ts @@ -131,7 +131,9 @@ export class KsqlDriver extends BaseDriver implements DriverInterface { if (this.config.kafkaHost) { this.kafkaClient = new Kafka({ clientId: 'Cube', - brokers: [this.config.kafkaHost], + brokers: this.config.kafkaHost + .split(',') + .map(h => h.trim()), // authenticationTimeout: 10000, // reauthenticationThreshold: 10000, ssl: this.config.kafkaUseSsl, diff --git a/rust/cubestore/cubestore/src/streaming/kafka.rs b/rust/cubestore/cubestore/src/streaming/kafka.rs index f3e9b57d39411..9c3c76ee43622 100644 --- a/rust/cubestore/cubestore/src/streaming/kafka.rs +++ b/rust/cubestore/cubestore/src/streaming/kafka.rs @@ -306,6 +306,13 @@ impl StreamingSource for KafkaStreamingSource { let unique_key_columns = self.unique_key_columns.clone(); let seq_column_index_to_move = self.seq_column_index; let traffic_sender = TrafficSender::new(self.trace_obj.clone()); + let hosts = self + .host + .clone() + .split(",") + .filter(|s| !s.is_empty()) + .map(|s| s.trim().to_string()) + .collect(); let stream = self .kafka_client .create_message_stream( @@ -321,7 +328,7 @@ impl StreamingSource for KafkaStreamingSource { }) .unwrap_or(Offset::End), ), - vec![self.host.clone()], + hosts, &self.user, &self.password, self.use_ssl,