Skip to content

Commit 4ccbda1

Browse files
authored
Merge pull request #109 from thefrontside/tm/streamp-helpers-subject
feat(stream-helpers): Added behavior subject
2 parents 5515419 + e23daed commit 4ccbda1

File tree

4 files changed

+207
-1
lines changed

4 files changed

+207
-1
lines changed

stream-helpers/README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,53 @@ function* exampleWithCloseValue() {
192192
}
193193
```
194194

195+
### Subject (like RxJS BehaviorSubject)
196+
197+
The `createSubject` helper converts any stream into a multicast stream that
198+
replays the latest value to new subscribers. It's analogous to
199+
[RxJS BehaviorSubject](https://www.learnrxjs.io/learn-rxjs/subjects/behaviorsubject).
200+
201+
```typescript
202+
import { createSubject } from "@effectionx/stream-helpers";
203+
import { createChannel, spawn } from "effection";
204+
205+
function* example() {
206+
const subject = createSubject<number>();
207+
const channel = createChannel<number, void>();
208+
const downstream = subject(channel);
209+
210+
// First subscriber
211+
const sub1 = yield* downstream;
212+
213+
yield* channel.send(1);
214+
yield* channel.send(2);
215+
216+
console.log(yield* sub1.next()); // { done: false, value: 1 }
217+
console.log(yield* sub1.next()); // { done: false, value: 2 }
218+
219+
// Late subscriber gets the latest value immediately
220+
const sub2 = yield* downstream;
221+
console.log(yield* sub2.next()); // { done: false, value: 2 }
222+
}
223+
```
224+
225+
Use it with a pipe operator to convert any stream into a behavior subject:
226+
227+
```typescript
228+
import { createSubject, map } from "@effectionx/stream-helpers";
229+
import { pipe } from "remeda";
230+
231+
const subject = createSubject<string>();
232+
233+
const stream = pipe(
234+
source,
235+
map(function* (x) {
236+
return x.toString();
237+
}),
238+
subject,
239+
);
240+
```
241+
195242
### Passthrough Tracker
196243

197244
Passthrough Tracker stream helper provides a way to know if all items that

stream-helpers/deno.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@effectionx/stream-helpers",
3-
"version": "0.4.1",
3+
"version": "0.5.0",
44
"imports": {
55
"effection": "npm:effection@^3",
66
"immutable": "npm:immutable@^5",

stream-helpers/subject.test.ts

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import {
2+
createChannel,
3+
type Operation,
4+
type Stream,
5+
type Subscription,
6+
} from "effection";
7+
import { beforeEach, describe, it } from "@effectionx/bdd";
8+
import { expect } from "@std/expect";
9+
10+
import { createSubject } from "./subject.ts";
11+
12+
function* next<T, TClose>(
13+
subscription: Subscription<T, TClose>,
14+
): Operation<T | TClose> {
15+
const item = yield* subscription.next();
16+
if (item.done) {
17+
return item.value;
18+
} else {
19+
return item.value;
20+
}
21+
}
22+
23+
describe("subject", () => {
24+
let subject = createSubject<number>();
25+
let upstream = createChannel<number, string>();
26+
let downstream: Stream<number, string>;
27+
28+
beforeEach(function* () {
29+
subject = createSubject();
30+
31+
upstream = createChannel();
32+
33+
downstream = subject(upstream);
34+
});
35+
36+
it("allows multiple subscribers", function* () {
37+
const subscriber1 = yield* downstream;
38+
const subscriber2 = yield* downstream;
39+
40+
yield* upstream.send(1);
41+
yield* upstream.send(2);
42+
43+
// 1 multicast to both
44+
expect(yield* next(subscriber1)).toEqual(1);
45+
expect(yield* next(subscriber2)).toEqual(1);
46+
47+
// 2 multicast to both
48+
expect(yield* next(subscriber1)).toEqual(2);
49+
expect(yield* next(subscriber2)).toEqual(2);
50+
});
51+
52+
it("each later subscribers get latest value", function* () {
53+
const subscriber1 = yield* downstream;
54+
yield* upstream.send(1);
55+
expect(yield* next(subscriber1)).toEqual(1);
56+
57+
yield* upstream.send(2);
58+
expect(yield* next(subscriber1)).toEqual(2);
59+
60+
const subscriber2 = yield* downstream;
61+
expect(yield* next(subscriber2)).toEqual(2);
62+
});
63+
64+
it("sends closing value to all subscribers", function* () {
65+
const subscriber1 = yield* downstream;
66+
const subscriber2 = yield* downstream;
67+
68+
yield* upstream.send(1);
69+
yield* upstream.close("bye");
70+
71+
// 1 multicast to both
72+
expect(yield* next(subscriber1)).toEqual(1);
73+
expect(yield* next(subscriber2)).toEqual(1);
74+
75+
// 2 multicast to both
76+
expect(yield* next(subscriber1)).toEqual("bye");
77+
expect(yield* next(subscriber2)).toEqual("bye");
78+
});
79+
80+
it("subscriber after close receives last value and close value", function* () {
81+
const subscriber1 = yield* downstream;
82+
83+
yield* upstream.send(1);
84+
yield* upstream.close("bye");
85+
86+
// First subscriber gets value and close
87+
expect(yield* next(subscriber1)).toEqual(1);
88+
expect(yield* next(subscriber1)).toEqual("bye");
89+
90+
// Late subscriber after close should get last value and close value
91+
const subscriber2 = yield* downstream;
92+
expect(yield* next(subscriber2)).toEqual("bye");
93+
});
94+
});

stream-helpers/subject.ts

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
import type { Stream, Subscription } from "effection";
2+
3+
/**
4+
* Converts any stream into a multicast stream that produces latest value
5+
* to new subscribers. It's designed to be analagous in function to [RxJS
6+
* BehaviorSubject](https://www.learnrxjs.io/learn-rxjs/subjects/behaviorsubject).
7+
*
8+
* @returns A function that takes a stream and returns a multicast stream
9+
*
10+
* @example
11+
* ```ts
12+
* const subject = createSubject<number>();
13+
* const downstream = subject(upstream);
14+
*
15+
* const sub1 = yield* downstream; // subscribes to upstream
16+
* yield* upstream.send(1);
17+
* yield* sub1.next(); // { done: false, value: 1 }
18+
*
19+
* const sub2 = yield* downstream; // late subscriber
20+
* yield* sub2.next(); // { done: false, value: 1 } - gets latest value
21+
* ```
22+
*
23+
* Use it with a pipe operator to convert any stream into a behavior subject.
24+
*
25+
* @example
26+
* ```
27+
* let source = createChannel<string, void>();
28+
* let subject = createSubject<string>();
29+
*
30+
* let pipeline = pipe([
31+
* top,
32+
* transform1,
33+
* transform2,
34+
* subject,
35+
* ]);
36+
* ```
37+
*/
38+
export function createSubject<T>(): <TClose>(
39+
stream: Stream<T, TClose>,
40+
) => Stream<T, TClose> {
41+
let current: IteratorResult<T> | undefined = undefined;
42+
43+
return <TClose>(stream: Stream<T, TClose>) => ({
44+
*[Symbol.iterator]() {
45+
let upstream = yield* stream;
46+
47+
let iterator: Subscription<T, TClose> = current
48+
? {
49+
*next() {
50+
iterator = upstream;
51+
return current!;
52+
},
53+
}
54+
: {
55+
*next() {
56+
return current = yield* upstream.next();
57+
},
58+
};
59+
60+
return {
61+
next: () => iterator.next(),
62+
};
63+
},
64+
});
65+
}

0 commit comments

Comments
 (0)