Skip to content
This repository was archived by the owner on May 14, 2024. It is now read-only.

Commit 5204bb7

Browse files
committed
Implement queueing events until a listener appears
This resolves an issue arising from using both a callback and an EventEmitter together in the Client.search() API. Since the emitter would only be available through the callback, some events could be emitted before the callback is triggered, resulting in missed events. This change incorporates a test case originally by László Szűcs (@ifroz). For GH-602
1 parent a67303e commit 5204bb7

File tree

4 files changed

+175
-1
lines changed

4 files changed

+175
-1
lines changed

lib/client/client.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ var errors = require('../errors')
2525
var filters = require('../filters')
2626
var messages = require('../messages')
2727
var url = require('../url')
28+
var CorkedEmitter = require('../corked_emitter')
2829

2930
/// --- Globals
3031

@@ -632,7 +633,7 @@ Client.prototype.search = function search (base,
632633
pager.on('search', sendRequest)
633634
pager.begin()
634635
} else {
635-
sendRequest(controls, new EventEmitter(), callback)
636+
sendRequest(controls, new CorkedEmitter(), callback)
636637
}
637638
}
638639

lib/corked_emitter.js

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
'use strict'
2+
3+
var EventEmitter = require('events').EventEmitter
4+
5+
/**
6+
* A CorkedEmitter is a variant of an EventEmitter where events emitted
7+
* wait for the appearance of the first listener of any kind. That is,
8+
* a CorkedEmitter will store all .emit()s it receives, to be replayed
9+
* later when an .on() is applied.
10+
* It is meant for situations where the consumers of the emitter are
11+
* unable to register listeners right away, and cannot afford to miss
12+
* any events emitted from the start.
13+
* Note that, whenever the first emitter (for any event) appears,
14+
* the emitter becomes uncorked and works as usual for ALL events, and
15+
* will not cache anything anymore. This is necessary to avoid
16+
* re-ordering emits - either everything is being buffered, or nothing.
17+
*/
18+
function CorkedEmitter () {
19+
var self = this
20+
EventEmitter.call(self)
21+
/**
22+
* An array of arguments objects (array-likes) to emit on open.
23+
*/
24+
self._outstandingEmits = []
25+
/**
26+
* Whether the normal flow of emits is restored yet.
27+
*/
28+
self._opened = false
29+
// When the first listener appears, we enqueue an opening.
30+
// It is not done immediately, so that other listeners can be
31+
// registered in the same critical section.
32+
self.once('newListener', function () {
33+
setImmediate(function releaseStoredEvents () {
34+
self._opened = true
35+
self._outstandingEmits.forEach(function (args) {
36+
self.emit.apply(self, args)
37+
})
38+
})
39+
})
40+
}
41+
CorkedEmitter.prototype = Object.create(EventEmitter.prototype)
42+
CorkedEmitter.prototype.emit = function emit (eventName) {
43+
if (this._opened || eventName === 'newListener') {
44+
EventEmitter.prototype.emit.apply(this, arguments)
45+
} else {
46+
this._outstandingEmits.push(arguments)
47+
}
48+
}
49+
50+
module.exports = CorkedEmitter

test/client.test.js

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,25 @@ tap.test('search basic', function (t) {
703703
})
704704
})
705705

706+
tap.test('GH-602 search basic with delayed event listener binding', function (t) {
707+
t.context.client.search('cn=test, ' + SUFFIX, '(objectclass=*)', function (err, res) {
708+
t.error(err)
709+
setTimeout(() => {
710+
let gotEntry = 0
711+
res.on('searchEntry', function (entry) {
712+
gotEntry++
713+
})
714+
res.on('error', function (err) {
715+
t.fail(err)
716+
})
717+
res.on('end', function (res) {
718+
t.equal(gotEntry, 2)
719+
t.end()
720+
})
721+
}, 100)
722+
})
723+
})
724+
706725
tap.test('search sizeLimit', function (t) {
707726
t.test('over limit', function (t2) {
708727
t.context.client.search('cn=sizelimit', {}, function (err, res) {

test/corked_emitter.test.js

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
'use strict'
2+
3+
const { test } = require('tap')
4+
const CorkedEmitter = require('../lib/corked_emitter')
5+
6+
function gatherEventSequence (expectedNumber) {
7+
const gatheredEvents = []
8+
let callback
9+
const finished = new Promise(function (resolve) {
10+
callback = function (...args) {
11+
gatheredEvents.push(...args)
12+
if (gatheredEvents.length >= expectedNumber) {
13+
// Prevent result mutation after our promise is resolved:
14+
resolve(gatheredEvents.slice())
15+
}
16+
}
17+
})
18+
return {
19+
finished,
20+
callback
21+
}
22+
}
23+
24+
test('normal emit flow', function (t) {
25+
const emitter = new CorkedEmitter()
26+
const expectedSequence = [
27+
['searchEntry', { data: 'a' }],
28+
['searchEntry', { data: 'b' }],
29+
['end']
30+
]
31+
const gatherer = gatherEventSequence(3)
32+
emitter.on('searchEntry', function (...args) {
33+
gatherer.callback(['searchEntry', ...args])
34+
})
35+
emitter.on('end', function (...args) {
36+
gatherer.callback(['end', ...args])
37+
})
38+
emitter.emit('searchEntry', { data: 'a' })
39+
emitter.emit('searchEntry', { data: 'b' })
40+
emitter.emit('end')
41+
gatherer.finished.then(function (gatheredEvents) {
42+
expectedSequence.forEach(function (expectedEvent, i) {
43+
t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i]))
44+
})
45+
t.end()
46+
})
47+
})
48+
49+
test('reversed listener registration', function (t) {
50+
const emitter = new CorkedEmitter()
51+
const expectedSequence = [
52+
['searchEntry', { data: 'a' }],
53+
['searchEntry', { data: 'b' }],
54+
['end']
55+
]
56+
const gatherer = gatherEventSequence(3)
57+
// This time, we swap the event listener registrations.
58+
// The order of emits should remain unchanged.
59+
emitter.on('end', function (...args) {
60+
gatherer.callback(['end', ...args])
61+
})
62+
emitter.on('searchEntry', function (...args) {
63+
gatherer.callback(['searchEntry', ...args])
64+
})
65+
emitter.emit('searchEntry', { data: 'a' })
66+
emitter.emit('searchEntry', { data: 'b' })
67+
emitter.emit('end')
68+
gatherer.finished.then(function (gatheredEvents) {
69+
expectedSequence.forEach(function (expectedEvent, i) {
70+
t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i]))
71+
})
72+
t.end()
73+
})
74+
})
75+
76+
test('delayed listener registration', function (t) {
77+
const emitter = new CorkedEmitter()
78+
const expectedSequence = [
79+
['searchEntry', { data: 'a' }],
80+
['searchEntry', { data: 'b' }],
81+
['end']
82+
]
83+
const gatherer = gatherEventSequence(3)
84+
emitter.emit('searchEntry', { data: 'a' })
85+
emitter.emit('searchEntry', { data: 'b' })
86+
emitter.emit('end')
87+
// The listeners only appear after a brief delay - this simulates
88+
// the situation described in https://github.com/ldapjs/node-ldapjs/issues/602
89+
// and in https://github.com/ifroz/node-ldapjs/commit/5239f6c68827f2c25b4589089c199d15bb882412
90+
setTimeout(function () {
91+
emitter.on('end', function (...args) {
92+
gatherer.callback(['end', ...args])
93+
})
94+
emitter.on('searchEntry', function (...args) {
95+
gatherer.callback(['searchEntry', ...args])
96+
})
97+
}, 50)
98+
gatherer.finished.then(function (gatheredEvents) {
99+
expectedSequence.forEach(function (expectedEvent, i) {
100+
t.equal(JSON.stringify(expectedEvent), JSON.stringify(gatheredEvents[i]))
101+
})
102+
t.end()
103+
})
104+
})

0 commit comments

Comments
 (0)