Skip to content

Commit 8d2daeb

Browse files
committed
Allow topic config properties to be set with the global config [c++]
1 parent 7837ac4 commit 8d2daeb

File tree

4 files changed

+25
-25
lines changed

4 files changed

+25
-25
lines changed

index.d.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ export abstract class Client<Events extends string> extends EventEmitter {
203203
}
204204

205205
export class KafkaConsumer extends Client<KafkaConsumerEvents> {
206-
constructor(conf: ConsumerGlobalConfig, topicConf: ConsumerTopicConfig);
206+
constructor(conf: ConsumerGlobalConfig | ConsumerTopicConfig, topicConf?: ConsumerTopicConfig);
207207

208208
assign(assignments: Assignment[]): this;
209209

@@ -256,7 +256,7 @@ export class KafkaConsumer extends Client<KafkaConsumerEvents> {
256256
}
257257

258258
export class Producer extends Client<KafkaProducerEvents> {
259-
constructor(conf: ProducerGlobalConfig, topicConf?: ProducerTopicConfig);
259+
constructor(conf: ProducerGlobalConfig | ProducerTopicConfig, topicConf?: ProducerTopicConfig);
260260

261261
flush(timeout?: NumberNullUndefined, cb?: (err: LibrdKafkaError) => void): this;
262262

lib/client.js

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ function Client(globalConf, SubClientType, topicConf) {
4545
// the producer and consumer main wrappers
4646

4747
var no_event_cb = globalConf.event_cb === false;
48-
topicConf = topicConf || {};
4948

5049
// delete this because librdkafka will complain since this particular
5150
// key is a real conf value

src/kafka-consumer.cc

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@ KafkaConsumer::KafkaConsumer(Conf* gconfig, Conf* tconfig):
3131
Connection(gconfig, tconfig) {
3232
std::string errstr;
3333

34-
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
34+
if (m_tconfig)
35+
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
3536

3637
m_consume_loop = nullptr;
3738
}
@@ -553,10 +554,6 @@ void KafkaConsumer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
553554
return Nan::ThrowError("Global configuration data must be specified");
554555
}
555556

556-
if (!info[1]->IsObject()) {
557-
return Nan::ThrowError("Topic configuration must be specified");
558-
}
559-
560557
std::string errstr;
561558

562559
Conf* gconfig =
@@ -567,13 +564,16 @@ void KafkaConsumer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
567564
return Nan::ThrowError(errstr.c_str());
568565
}
569566

570-
Conf* tconfig =
571-
Conf::create(RdKafka::Conf::CONF_TOPIC,
567+
// If tconfig isn't set, then just let us pick properties from gconf.
568+
Conf* tconfig = nullptr;
569+
if (info[1]->IsObject()) {
570+
tconfig = Conf::create(RdKafka::Conf::CONF_TOPIC,
572571
(info[1]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
573572

574-
if (!tconfig) {
575-
delete gconfig;
576-
return Nan::ThrowError(errstr.c_str());
573+
if (!tconfig) {
574+
delete gconfig;
575+
return Nan::ThrowError(errstr.c_str());
576+
}
577577
}
578578

579579
KafkaConsumer* consumer = new KafkaConsumer(gconfig, tconfig);

src/producer.cc

Lines changed: 13 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,9 @@ Producer::Producer(Conf* gconfig, Conf* tconfig):
3535
m_partitioner_cb() {
3636
std::string errstr;
3737

38-
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
38+
if (m_tconfig)
39+
m_gconfig->set("default_topic_conf", m_tconfig, errstr);
40+
3941
m_gconfig->set("dr_cb", &m_dr_cb, errstr);
4042
}
4143

@@ -110,10 +112,6 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
110112
return Nan::ThrowError("Global configuration data must be specified");
111113
}
112114

113-
if (!info[1]->IsObject()) {
114-
return Nan::ThrowError("Topic configuration must be specified");
115-
}
116-
117115
std::string errstr;
118116

119117
Conf* gconfig =
@@ -124,14 +122,17 @@ void Producer::New(const Nan::FunctionCallbackInfo<v8::Value>& info) {
124122
return Nan::ThrowError(errstr.c_str());
125123
}
126124

127-
Conf* tconfig =
128-
Conf::create(RdKafka::Conf::CONF_TOPIC,
129-
(info[1]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
125+
// If tconfig isn't set, then just let us pick properties from gconf.
126+
Conf* tconfig = nullptr;
127+
if (info[1]->IsObject()) {
128+
tconfig = Conf::create(RdKafka::Conf::CONF_TOPIC,
129+
(info[1]->ToObject(Nan::GetCurrentContext())).ToLocalChecked(), errstr);
130130

131-
if (!tconfig) {
132-
// No longer need this since we aren't instantiating anything
133-
delete gconfig;
134-
return Nan::ThrowError(errstr.c_str());
131+
if (!tconfig) {
132+
// No longer need this since we aren't instantiating anything
133+
delete gconfig;
134+
return Nan::ThrowError(errstr.c_str());
135+
}
135136
}
136137

137138
Producer* producer = new Producer(gconfig, tconfig);

0 commit comments

Comments
 (0)