Skip to content

Commit 0ba9303

Browse files
committed
[ Refactor ] Observable to EventStream
[ Add ] Unit tests for EventStream based on Mocha & Should
1 parent 0d66b36 commit 0ba9303

File tree

9 files changed

+404
-158
lines changed

9 files changed

+404
-158
lines changed

dist/Observable.js renamed to dist/EventStream.js

Lines changed: 51 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,69 @@
11
var _asyncGenerator = function () { function AwaitValue(value) { this.value = value; } function AsyncGenerator(gen) { var front, back; function send(key, arg) { return new Promise(function (resolve, reject) { var request = { key: key, arg: arg, resolve: resolve, reject: reject, next: null }; if (back) { back = back.next = request; } else { front = back = request; resume(key, arg); } }); } function resume(key, arg) { try { var result = gen[key](arg); var value = result.value; if (value instanceof AwaitValue) { Promise.resolve(value.value).then(function (arg) { resume("next", arg); }, function (arg) { resume("throw", arg); }); } else { settle(result.done ? "return" : "normal", result.value); } } catch (err) { settle("throw", err); } } function settle(type, value) { switch (type) { case "return": front.resolve({ value: value, done: true }); break; case "throw": front.reject(value); break; default: front.resolve({ value: value, done: false }); break; } front = front.next; if (front) { resume(front.key, front.arg); } else { back = null; } } this._invoke = send; if (typeof gen.return !== "function") { this.return = undefined; } } if (typeof Symbol === "function" && Symbol.asyncIterator) { AsyncGenerator.prototype[Symbol.asyncIterator] = function () { return this; }; } AsyncGenerator.prototype.next = function (arg) { return this._invoke("next", arg); }; AsyncGenerator.prototype.throw = function (arg) { return this._invoke("throw", arg); }; AsyncGenerator.prototype.return = function (arg) { return this._invoke("return", arg); }; return { wrap: function (fn) { return function () { return new AsyncGenerator(fn.apply(this, arguments)); }; }, await: function (value) { return new AwaitValue(value); } }; }();
22

3-
const _queue_ = new WeakMap();
3+
const _private_ = new WeakMap();
44

5-
export default class Observable {
5+
/**
6+
* A simplified implement of `Observable()`
7+
*
8+
* @see https://tc39.github.io/proposal-observable/
9+
*/
10+
export default class EventStream {
611
/**
7-
* @param {EmitterWrapper} emitter
12+
* @param {EmitterWrapper} listener
813
*/
9-
constructor(emitter) {
14+
constructor(listener) {
1015

11-
const queue = [];
16+
const that = {};
1217

13-
_queue_.set(this, queue);
18+
_private_.set(this, that);
1419

15-
var next;
20+
that.boot = that.done = false;
1621

17-
const wait = () => queue.push(new Promise(resolve => next = resolve));
22+
that.listener = listener;
1823

19-
wait();
24+
that.canceller = that.resolve = that.reject = null;
2025

21-
this.done = false;
26+
that.next = value => that.resolve(value);
2227

23-
emitter(value => (next(value), wait()), value => (next(value), this.done = true));
28+
that.fail = error => (that.reject(error), that.done = true);
29+
}
30+
31+
listen() {
32+
33+
const that = _private_.get(this);
34+
35+
if (that.boot) return;
36+
37+
try {
38+
that.canceller = that.listener.call(null, that.next, async value => {
39+
40+
that.next(value), that.done = true;
41+
42+
await Promise.resolve();
43+
44+
try {
45+
that.canceller.call(null);
46+
} catch (error) {
47+
that.fail(error);
48+
}
49+
}, that.fail);
50+
} catch (error) {
51+
that.fail(error);
52+
}
53+
54+
that.boot = true;
2455
}
2556

2657
[Symbol.asyncIterator]() {
2758
var _this = this;
2859

2960
return _asyncGenerator.wrap(function* () {
3061

31-
const queue = _queue_.get(_this);
62+
const that = _private_.get(_this);
3263

33-
for (let i = 0; !_this.done; i++) yield yield _asyncGenerator.await(queue[i]);
64+
while (!that.done) yield yield _asyncGenerator.await(new Promise(function (resolve, reject) {
65+
return that.resolve = resolve, that.reject = reject, _this.listen();
66+
}));
3467
})();
3568
}
3669

@@ -60,6 +93,9 @@ export default class Observable {
6093
*
6194
* @typedef {function} EmitterWrapper
6295
*
63-
* @param {function} next
64-
* @param {function} done
96+
* @param {function(value: *): void} next
97+
* @param {function(value: *): void} done
98+
* @param {function(error: Error): void} fail
99+
*
100+
* @return {Function} Remove Event listeners from the emitter
65101
*/

0 commit comments

Comments
 (0)