Skip to content

Commit 52a03d1

Browse files
committed
[add] Basic methods for Pipe
1 parent 2f20a26 commit 52a03d1

File tree

4 files changed

+136
-49
lines changed

4 files changed

+136
-49
lines changed

package.json

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
2-
"name": "waterwheel",
3-
"version": "1.0.0-alpha.1",
2+
"name": "iterable-observer",
3+
"version": "1.0.0-beta.0",
44
"license": "LGPL-3.0",
55
"author": "[email protected]",
66
"description": "Observable Proposal implement based on Async Generator (ES 2018) & TypeScript",
@@ -13,28 +13,28 @@
1313
"generator",
1414
"iterator"
1515
],
16-
"homepage": "https://web-cell.dev/WaterWheel/",
16+
"homepage": "https://web-cell.dev/iterable-observer/",
1717
"repository": {
1818
"type": "git",
19-
"url": "git+https://github.com/EasyWebApp/WaterWheel.git"
19+
"url": "git+https://github.com/EasyWebApp/iterable-observer.git"
2020
},
2121
"bugs": {
22-
"url": "https://github.com/EasyWebApp/WaterWheel/issues"
22+
"url": "https://github.com/EasyWebApp/iterable-observer/issues"
2323
},
2424
"source": "source/index.ts",
2525
"types": "dist/index.d.ts",
26-
"main": "dist/waterwheel.umd.js",
27-
"module": "dist/waterwheel.js",
26+
"main": "dist/iterable-observer.umd.js",
27+
"module": "dist/iterable-observer.js",
2828
"devDependencies": {
2929
"@types/jest": "^24.0.23",
30-
"husky": "^3.0.9",
30+
"husky": "^3.1.0",
3131
"jest": "^24.9.0",
3232
"lint-staged": "^9.4.3",
3333
"microbundle": "^0.11.0",
3434
"open-cli": "^5.0.0",
3535
"prettier": "^1.19.1",
3636
"ts-jest": "^24.2.0",
37-
"typedoc": "^0.15.2",
37+
"typedoc": "^0.15.3",
3838
"typescript": "^3.7.2"
3939
},
4040
"prettier": {
@@ -60,8 +60,8 @@
6060
"scripts": {
6161
"test": "lint-staged && jest",
6262
"debug": "node --inspect node_modules/jest/bin/jest --runInBand",
63-
"pack-docs": "typedoc --name WaterWheel --out docs/ source/",
64-
"build": "microbundle && npm run pack-docs",
63+
"pack-docs": "typedoc --name \"Iterable Observer\" --out docs/ source/",
64+
"build": "rm -rf dist/ && microbundle && npm run pack-docs",
6565
"help": "npm run pack-docs && open-cli docs/index.html",
6666
"prepublishOnly": "npm test && npm run build"
6767
},

source/Observable.ts

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Defer, makeDefer } from './utility';
33
export interface Observer<T = any> {
44
next(value: T): void;
55
error(reason: string | Error): void;
6-
complete(value: T): void;
6+
complete(): void;
77
}
88

99
export type SubscriberFunction = (observer: Observer) => (() => void) | void;
@@ -29,29 +29,68 @@ export class Observable<T = any> {
2929
queue.push(makeDefer());
3030
},
3131
error(reason) {
32-
if (done) return;
33-
34-
queue[queue.length - 1].reject(reason), (done = true);
32+
if (!done)
33+
queue[queue.length - 1].reject(reason), (done = true);
3534

3635
if (canceler) canceler();
3736
},
38-
complete(value) {
39-
if (done) return;
40-
41-
queue[queue.length - 1].resolve(value), (done = true);
37+
complete() {
38+
if (!done) queue[queue.length - 1].resolve(), (done = true);
4239

4340
if (canceler) canceler();
4441
}
4542
};
4643

4744
canceler = this.subscriber(observer);
4845

49-
while (true) {
46+
do {
5047
yield queue[0].promise;
5148

5249
queue.shift();
50+
} while (queue[0] || !done);
51+
}
52+
53+
static of<T = any>(...items: T[]) {
54+
return new this<T>(({ next, complete }) => {
55+
for (const item of items) next(item);
56+
57+
complete();
58+
});
59+
}
60+
61+
subscribe(
62+
onNext: Observer<T>['next'],
63+
onError?: Observer<T>['error'],
64+
onComplete?: Observer<T>['complete']
65+
) {
66+
var stop = false;
67+
68+
(async () => {
69+
try {
70+
for await (const item of this)
71+
if (!stop) onNext(item);
72+
else break;
73+
74+
if (onComplete instanceof Function) onComplete();
75+
} catch (error) {
76+
if (onError instanceof Function) onError(error);
77+
}
78+
})();
79+
80+
return {
81+
unsubscribe() {
82+
stop = true;
83+
},
84+
get closed() {
85+
return stop;
86+
}
87+
};
88+
}
5389

54-
if (done) break;
55-
}
90+
static from<T = any>(observable: Observable<T>) {
91+
return new this<T>(
92+
({ next, error, complete }) =>
93+
observable.subscribe(next, error, complete).unsubscribe
94+
);
5695
}
5796
}

source/utility.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
export interface Defer<T = any> {
22
promise: Promise<T>;
3-
resolve: (data: T) => void;
3+
resolve: (data?: T) => void;
44
reject: (error: Error | string) => void;
55
}
66

test/index.spec.ts

Lines changed: 74 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,94 @@
11
import { Observable, Observer } from '../source';
22

3-
describe('Observable', () => {
4-
var observable: Observable, timer: any;
3+
function createExample() {
4+
var timer: any,
5+
count = 0;
56

67
const cleaner = jest.fn(() => clearInterval(timer));
78

8-
it('should not call Subscriber Function while constructing', () => {
9-
var count = 0;
9+
const subscriber = jest.fn(({ next, complete }: Observer) => {
10+
timer = setInterval(() => (++count < 6 ? next(count) : complete()), 0);
1011

11-
const subscriber = jest.fn(({ next, complete }: Observer) => {
12-
timer = setInterval(
13-
() => (++count < 5 ? next(count) : complete(count)),
14-
0
15-
);
12+
return cleaner;
13+
});
14+
15+
return { subscriber, cleaner };
16+
}
17+
18+
describe('Observable', () => {
19+
describe('Single', () => {
20+
const { subscriber, cleaner } = createExample();
21+
var observable: Observable;
1622

17-
return cleaner;
23+
it('should not call Subscriber Function while constructing', () => {
24+
observable = new Observable<number>(subscriber);
25+
26+
expect(subscriber).toBeCalledTimes(0);
1827
});
1928

20-
observable = new Observable<number>(subscriber);
29+
it('should iterate Limited items asynchronously', async () => {
30+
const list = [];
2131

22-
expect(subscriber).toBeCalledTimes(0);
23-
});
32+
for await (const item of observable) list.push(item);
2433

25-
it('should iterate Limited items asynchronously', async () => {
26-
const list = [];
34+
expect(list).toEqual(expect.arrayContaining([1, 2, 3, 4, 5]));
2735

28-
for await (const item of observable) list.push(item);
36+
expect(cleaner).toBeCalledTimes(1);
37+
});
2938

30-
expect(list).toEqual(expect.arrayContaining([1, 2, 3, 4, 5]));
39+
it('should throw Error after error() called', () => {
40+
const observable = new Observable(({ error }) => {
41+
error(new Error('test'));
42+
});
3143

32-
expect(cleaner).toBeCalledTimes(1);
44+
expect(
45+
(async () => {
46+
for await (const item of observable);
47+
})()
48+
).rejects.toStrictEqual(new Error('test'));
49+
});
3350
});
3451

35-
it('should throw Error after error() called', () => {
36-
const observable = new Observable(({ error }) => {
37-
error(new Error('test'));
52+
describe('Pipe', () => {
53+
it('should construct an Observable from Static data', async () => {
54+
const observable = Observable.of<number>(1, 2, 3),
55+
list = [];
56+
57+
for await (const item of observable) list.push(item);
58+
59+
expect(list).toEqual(expect.arrayContaining([1, 2, 3]));
3860
});
3961

40-
expect(
41-
(async () => {
42-
for await (const item of observable);
43-
})()
44-
).rejects.toStrictEqual(new Error('test'));
62+
it('should invoke handlers after subscribing', async () => {
63+
const { subscriber, cleaner } = createExample();
64+
65+
const observable = new Observable<number>(subscriber),
66+
onNext = jest.fn();
67+
68+
await new Promise(resolve =>
69+
observable.subscribe(onNext, null, resolve)
70+
);
71+
72+
expect(onNext).toHaveBeenNthCalledWith(1, 1);
73+
expect(onNext).toHaveBeenNthCalledWith(2, 2);
74+
expect(onNext).toHaveBeenNthCalledWith(3, 3);
75+
expect(onNext).toHaveBeenNthCalledWith(4, 4);
76+
expect(onNext).toHaveBeenNthCalledWith(5, 5);
77+
78+
expect(cleaner).toBeCalledTimes(1);
79+
});
80+
81+
it('should construct an Observable from an exist Observable', async () => {
82+
const { subscriber } = createExample();
83+
84+
const old = new Observable<number>(subscriber);
85+
86+
const observable = Observable.from<number>(old),
87+
list = [];
88+
89+
for await (const item of observable) list.push(item);
90+
91+
expect(list).toEqual(expect.arrayContaining([1, 2, 3, 4, 5]));
92+
});
4593
});
4694
});

0 commit comments

Comments
 (0)