Skip to content

Commit 28b7276

Browse files
authored
add close with value (#343)
## Why ```ts const res = await someAsyncCall(); if (!res.ok) { resWritable.write(envRes); resWritable.close(); return; } ``` we see this pattern a lot in server-side streams and is verbose and prone to accidentally writing to a closed writable (e.g. if proc was cancelled mid someAsyncCall) ## What changed ```ts const res = await someAsyncCall(); if (!res.ok) { return resWritable.close(envRes); } ``` ## Versioning - [ ] Breaking protocol change - [ ] Breaking ts/js API change <!-- Kind reminder to add tests and updated documentation if needed -->
1 parent 7d9a4d3 commit 28b7276

File tree

5 files changed

+29
-8
lines changed

5 files changed

+29
-8
lines changed

README.md

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
# River
22

3-
⚠️ Not production ready, while Replit is using parts of River in production, we are still going through rapid breaking changes. First production ready version will be `1.x.x` ⚠️
4-
53
River allows multiple clients to connect to and make remote procedure calls to a remote server as if they were local procedures.
64

75
## Long-lived streaming remote procedure calls

__tests__/streams.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -354,6 +354,23 @@ describe('Writable unit', () => {
354354
expect(closeCb).toHaveBeenCalledOnce();
355355
});
356356

357+
it('should support closing with value', () => {
358+
const closeCb = vi.fn();
359+
const writeCb = vi.fn();
360+
const writable = new WritableImpl<number>({
361+
writeCb,
362+
closeCb,
363+
});
364+
365+
expect(writable.isWritable()).toBeTruthy();
366+
367+
writable.close(42);
368+
expect(writable.isWritable()).toBeFalsy();
369+
expect(closeCb).toHaveBeenCalledOnce();
370+
expect(writeCb).toHaveBeenCalledOnce();
371+
expect(writeCb).toHaveBeenCalledWith(42);
372+
});
373+
357374
it('should allow calling close multiple times', () => {
358375
const closeCb = vi.fn();
359376
const writable = new WritableImpl<number>({

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"name": "@replit/river",
33
"description": "It's like tRPC but... with JSON Schema Support, duplex streaming and support for service multiplexing. Transport agnostic!",
4-
"version": "0.209.6",
4+
"version": "0.209.7",
55
"type": "module",
66
"exports": {
77
".": {

router/streams.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,10 +93,12 @@ export interface Writable<T> {
9393
/**
9494
* {@link close} signals the closure of the {@link Writeable}, informing the {@link Readable} end that
9595
* all data has been transmitted and we've cleanly closed.
96+
* Optionally a final value can be passed to {@link close}, which will be the last value
97+
* to write before it closes.
9698
*
9799
* Calling {@link close} multiple times is a no-op.
98100
*/
99-
close(): undefined;
101+
close(value?: T): undefined;
100102
/**
101103
* {@link isWritable} returns true if it's safe to call {@link write}, which
102104
* means that the {@link Writable} hasn't been closed due to {@link close} being called
@@ -362,7 +364,7 @@ export class WritableImpl<T> implements Writable<T> {
362364
/**
363365
* Passed via constructor to pass on calls to {@link close}
364366
*/
365-
private closeCb: () => void;
367+
private closeCb: (value?: T) => void;
366368
/**
367369
* Whether {@link close} was called, and {@link Writable} is not writable anymore.
368370
*/
@@ -385,11 +387,15 @@ export class WritableImpl<T> implements Writable<T> {
385387
return !this.closed;
386388
}
387389

388-
public close(): undefined {
390+
public close(value?: T): undefined {
389391
if (this.closed) {
390392
return;
391393
}
392394

395+
if (value !== undefined) {
396+
this.writeCb(value);
397+
}
398+
393399
this.closed = true;
394400
this.writeCb = () => undefined;
395401
this.closeCb();

0 commit comments

Comments
 (0)