Skip to content
Open
2 changes: 1 addition & 1 deletion extensions/notification/NotificationQueuePopulator.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NotificationQueuePopulator extends QueuePopulatorExtension {
this.bnConfigManager.setConfig(bucketName, bnConfig);
return undefined;
}
// bucket was deleter or notification conf has been removed, so remove zk node
// bucket was deleted or notification conf has been removed, so remove zk node
this.bnConfigManager.removeConfig(bucketName || bucket);
return undefined;
}
Expand Down
81 changes: 62 additions & 19 deletions extensions/notification/configManager/ZookeeperConfigManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,27 @@ class ZookeeperConfigManager extends BaseConfigManager {
this._setupEventListeners();
}

// https://github.com/alexguan/node-zookeeper-client/blob/master/lib/WatcherManager.js#L13-L39
// watchers should be reapplied only when their event is triggered, otherwise they will be duplicated
// and the lib will keep adding listeners
// (even if the functions are the same, they are js object so they don't match)
// warn early to catch leak more easily instead of waiting event emitter limit at 10 listeners.
_warnZkWatchersLeak(type, path) {
const watcherManager = this._zkClient?.client?.connectionManager?.watcherManager;

const watchers = watcherManager?.[`${type}Watchers`]?.[path];
if (!watchers) {
return;
}

const listeners = watchers.listenerCount('notification');
if (listeners > 0) {
process.emitWarning(`${type}Watchers[${path}] has already ${listeners} listeners`, {
code: 'ZkWatchersLeak',
});
}
}

_errorListener(error, listener) {
this.log.error('ZookeeperConfigManager.emitter.error', {
listener,
Expand All @@ -72,28 +93,29 @@ class ZookeeperConfigManager extends BaseConfigManager {
});
}

_getConfigListener(updatedBucket = '') {
_getConfigListener(bucket) {
this.log.debug('ZookeeperConfigManager.emitter.getConfig', {
event: 'getConfig',
bucket,
});
this._updateLocalStore([bucket]);
}

_listConfigsListener() {
this.log.debug('ZookeeperConfigManager.emitter.listConfigs', {
event: 'listConfigs',
});
this._listBucketsWithConfig((err, buckets) => {
if (err) {
this._emitter.emit('error', err, 'getConfigListener');
this._emitter.emit('error', err, 'listConfigsListener');
return undefined;
}
this.log.debug('bucket config to be updated in map', {
bucket: updatedBucket,
});
const newBuckets = this._getNewBucketNodes(buckets);
this.log.debug('new bucket configs to be added to map', {
buckets: newBuckets,
});
const bucketsToMap = updatedBucket ? [updatedBucket, ...newBuckets] : newBuckets;
this.log.debug('bucket configs to be added/updated to map', {
buckets: bucketsToMap,
});
if (bucketsToMap.length > 0) {
this._updateLocalStore(bucketsToMap);
if (newBuckets.length > 0) {
this._updateLocalStore(newBuckets);
}
return undefined;
});
Expand All @@ -118,6 +140,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
.on('setConfig',
(bucket, config) => this._setConfigListener(bucket, config))
.on('getConfig', bucket => this._getConfigListener(bucket))
.on('listConfigs', () => this._listConfigsListener())
.on('removeConfig', bucket => this._removeConfigListener(bucket));
}

Expand All @@ -132,10 +155,10 @@ class ZookeeperConfigManager extends BaseConfigManager {
return `/${constants.zkConfigParentNode}/${bucket}`;
}

_getConfigDataFromBuffer(data) {
_getConfigDataFromBuffer(data, bucket) {
const { error, result } = safeJsonParse(data);
if (error) {
this.log.error('invalid config', { error, config: data });
this.log.error('invalid config', { error, config: data, bucket });
return undefined;
}
return result;
Expand All @@ -150,6 +173,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
bucket,
zkPath,
});
this._warnZkWatchersLeak('data', zkPath);
return this._zkClient.getData(zkPath, event => {
this.log.debug('zookeeper getData watcher triggered', {
zkPath,
Expand Down Expand Up @@ -213,11 +237,20 @@ class ZookeeperConfigManager extends BaseConfigManager {
(exists, next) => {
if (!exists) {
return this._createBucketNotifConfigNode(bucket,
err => next(err));
err => next(err, exists));
}
return next(null, exists);
},
(exists, next) => this._zkClient.setData(zkPath, Buffer.from(data), -1, err => next(err, exists)),
(exists, next) => {
if (!exists) {
// if znode is created, run getData to set a watcher on the bucket config
// in case another node becomes leader on the raft and modifies the config
// while the current process keeps running
return this._updateLocalStore([bucket], next);
}
return next();
},
next => this._zkClient.setData(zkPath, Buffer.from(data), -1, next),
], err => {
if (err) {
this.log.error('error saving config', { method, zkPath, data });
Expand Down Expand Up @@ -319,14 +352,15 @@ class ZookeeperConfigManager extends BaseConfigManager {
const method
= 'ZookeeperConfigManager._listBucketsWithConfig';
const zkPath = `/${constants.zkConfigParentNode}`;
this._warnZkWatchersLeak('child', zkPath);
this._zkClient.getChildren(zkPath, event => {
this.log.debug('zookeeper getChildren watcher triggered', {
zkPath,
method,
event,
});
if (event.type === zookeeper.Event.NODE_CHILDREN_CHANGED) {
this._emitter.emit('getConfig');
this._emitter.emit('listConfigs');
}
}, (error, buckets) => {
if (error) {
Expand All @@ -339,6 +373,11 @@ class ZookeeperConfigManager extends BaseConfigManager {
});
this._callbackHandler(cb, error);
}
this.log.debug('list of buckets', {
zkPath,
method,
buckets,
});
this._callbackHandler(cb, null, buckets);
});
}
Expand All @@ -349,7 +388,7 @@ class ZookeeperConfigManager extends BaseConfigManager {
if (err) {
return next(err);
}
const configObject = this._getConfigDataFromBuffer(data);
const configObject = this._getConfigDataFromBuffer(data, bucket);
if (configObject) {
this._configs.set(bucket, configObject);
}
Expand Down Expand Up @@ -402,16 +441,20 @@ class ZookeeperConfigManager extends BaseConfigManager {
* Remove bucket notification configuration
*
* @param {String} bucket - bucket
* @param {boolean} [emitToZk = true] - whether to emit the event to zookeeper
* @return {boolean} - true if removed
*/
removeConfig(bucket) {
removeConfig(bucket, emitToZk = true) {
try {
this.log.debug('remove config', {
method: 'ZookeeperConfigManager.removeConfig',
bucket,
emitToZk,
});
this._configs.delete(bucket);
this._emitter.emit('removeConfig', bucket);
if (emitToZk) {
this._emitter.emit('removeConfig', bucket);
}
return true;
} catch (err) {
const errMsg
Expand Down
2 changes: 1 addition & 1 deletion lib/BackbeatConsumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ class BackbeatConsumer extends EventEmitter {
const { topic, partition, offset, key, timestamp } = entry;
this._log.debug('finished processing of consumed entry', {
method: 'BackbeatConsumer.subscribe',
entry: { topic, partition, offset, key, timestamp },
entry: { topic, partition, offset, key: key?.toString?.() || key, timestamp },
groupId: this._groupId,
});
if (err) {
Expand Down
Loading