@@ -81,17 +81,8 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
81
81
return NULL ;
82
82
}
83
83
} else {
84
- v8::Local<v8::Function> cb = value.As <v8::Function>();
85
- rdconf->ConfigureCallback (string_key, cb, true , errstr);
86
- if (!errstr.empty ()) {
87
- delete rdconf;
88
- return NULL ;
89
- }
90
- rdconf->ConfigureCallback (string_key, cb, false , errstr);
91
- if (!errstr.empty ()) {
92
- delete rdconf;
93
- return NULL ;
94
- }
84
+ // Do nothing - Connection::NodeConfigureCallbacks will handle this for each
85
+ // of the three client types.
95
86
}
96
87
}
97
88
@@ -100,56 +91,76 @@ Conf * Conf::create(RdKafka::Conf::ConfType type, v8::Local<v8::Object> object,
100
91
101
92
void Conf::ConfigureCallback (const std::string &string_key, const v8::Local<v8::Function> &cb, bool add, std::string &errstr) {
102
93
if (string_key.compare (" rebalance_cb" ) == 0 ) {
94
+ NodeKafka::Callbacks::Rebalance *rebalance = rebalance_cb ();
103
95
if (add) {
104
- if (this ->m_rebalance_cb == NULL ) {
105
- this ->m_rebalance_cb = new NodeKafka::Callbacks::Rebalance ();
96
+ if (rebalance == NULL ) {
97
+ rebalance = new NodeKafka::Callbacks::Rebalance ();
98
+ this ->set (string_key, rebalance, errstr);
106
99
}
107
- this -> m_rebalance_cb ->dispatcher .AddCallback (cb);
108
- this ->set (string_key, this -> m_rebalance_cb , errstr);
100
+ rebalance ->dispatcher .AddCallback (cb);
101
+ this ->set (string_key, rebalance , errstr);
109
102
} else {
110
- if (this ->m_rebalance_cb != NULL ) {
111
- this ->m_rebalance_cb ->dispatcher .RemoveCallback (cb);
103
+ if (rebalance == NULL ) {
104
+ rebalance->dispatcher .RemoveCallback (cb);
105
+ this ->set (string_key, rebalance, errstr);
112
106
}
113
107
}
114
108
} else if (string_key.compare (" offset_commit_cb" ) == 0 ) {
109
+ NodeKafka::Callbacks::OffsetCommit *offset_commit = offset_commit_cb ();
115
110
if (add) {
116
- if (this ->m_offset_commit_cb == NULL ) {
117
- this ->m_offset_commit_cb = new NodeKafka::Callbacks::OffsetCommit ();
111
+ if (offset_commit == NULL ) {
112
+ offset_commit = new NodeKafka::Callbacks::OffsetCommit ();
113
+ this ->set (string_key, offset_commit, errstr);
118
114
}
119
- this ->m_offset_commit_cb ->dispatcher .AddCallback (cb);
120
- this ->set (string_key, this ->m_offset_commit_cb , errstr);
115
+ offset_commit->dispatcher .AddCallback (cb);
121
116
} else {
122
- if (this -> m_offset_commit_cb != NULL ) {
123
- this -> m_offset_commit_cb ->dispatcher .RemoveCallback (cb);
117
+ if (offset_commit != NULL ) {
118
+ offset_commit ->dispatcher .RemoveCallback (cb);
124
119
}
125
120
}
126
121
}
127
122
}
128
123
129
124
void Conf::listen () {
130
- if (m_rebalance_cb) {
131
- m_rebalance_cb->dispatcher .Activate ();
125
+ NodeKafka::Callbacks::Rebalance *rebalance = rebalance_cb ();
126
+ if (rebalance) {
127
+ rebalance->dispatcher .Activate ();
132
128
}
133
129
134
- if (m_offset_commit_cb) {
135
- m_offset_commit_cb->dispatcher .Activate ();
130
+ NodeKafka::Callbacks::OffsetCommit *offset_commit = offset_commit_cb ();
131
+ if (offset_commit) {
132
+ offset_commit->dispatcher .Activate ();
136
133
}
137
134
}
138
135
139
136
void Conf::stop () {
140
- if (m_rebalance_cb) {
141
- m_rebalance_cb->dispatcher .Deactivate ();
137
+ NodeKafka::Callbacks::Rebalance *rebalance = rebalance_cb ();
138
+ if (rebalance) {
139
+ rebalance->dispatcher .Deactivate ();
140
+ }
141
+
142
+ NodeKafka::Callbacks::OffsetCommit *offset_commit = offset_commit_cb ();
143
+ if (offset_commit) {
144
+ offset_commit->dispatcher .Deactivate ();
142
145
}
146
+ }
147
+
148
+ Conf::~Conf () {}
143
149
144
- if (m_offset_commit_cb) {
145
- m_offset_commit_cb->dispatcher .Deactivate ();
150
+ NodeKafka::Callbacks::Rebalance* Conf::rebalance_cb () const {
151
+ RdKafka::RebalanceCb *cb = NULL ;
152
+ if (this ->get (cb) != RdKafka::Conf::CONF_OK) {
153
+ return NULL ;
146
154
}
155
+ return static_cast <NodeKafka::Callbacks::Rebalance*>(cb);
147
156
}
148
157
149
- Conf::~Conf () {
150
- if (m_rebalance_cb) {
151
- delete m_rebalance_cb;
158
+ NodeKafka::Callbacks::OffsetCommit* Conf::offset_commit_cb () const {
159
+ RdKafka::OffsetCommitCb *cb = NULL ;
160
+ if (this ->get (cb) != RdKafka::Conf::CONF_OK) {
161
+ return NULL ;
152
162
}
163
+ return static_cast <NodeKafka::Callbacks::OffsetCommit*>(cb);
153
164
}
154
165
155
166
} // namespace NodeKafka
0 commit comments