Skip to content

Commit 3b85354

Browse files
committed
feat: allow providing a buffer for pubsub subscribe call
1 parent fbe0beb commit 3b85354

File tree

2 files changed

+107
-27
lines changed

2 files changed

+107
-27
lines changed

.changeset/angry-clocks-sniff.md

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
---
2+
'@graphql-yoga/subscription': minor
3+
---
4+
5+
Support providing a `RepeaterBuffer` to the `PubSub.subscribe` method, by using the new object based call signature.
6+
7+
```ts
8+
import { createPubSub } from 'graphql-yoga'
9+
import { SlidingBuffer } from '@repeaterjs/repeater'
10+
11+
const pubSub = createPubSub()
12+
13+
pubSub.subscribe({
14+
topic: "userChanged",
15+
id: "1",
16+
buffer: new SlidingBuffer(1_000)
17+
})
18+
```
19+
20+
Learn more about buffers on the [Repeater.js website](https://repeater.js.org/docs/safety#3-buffering-and-dropping-values).

packages/subscription/src/create-pub-sub.ts

Lines changed: 87 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { TypedEventTarget } from '@graphql-yoga/typed-event-target';
2-
import { Repeater } from '@repeaterjs/repeater';
2+
import { Repeater, type RepeaterBuffer } from '@repeaterjs/repeater';
33
import { CustomEvent } from '@whatwg-node/events';
44

55
type PubSubPublishArgsByKey = {
@@ -45,9 +45,22 @@ export type PubSub<TPubSubPublishArgsByKey extends PubSubPublishArgsByKey> = {
4545
* Subscribe to a topic.
4646
*/
4747
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
48-
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
49-
? [TKey]
50-
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
48+
...args:
49+
| (TPubSubPublishArgsByKey[TKey][1] extends undefined
50+
? [key: TKey]
51+
: [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]])
52+
| [
53+
args: {
54+
topic: string;
55+
buffer?: RepeaterBuffer | undefined;
56+
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
57+
? {
58+
id?: void;
59+
}
60+
: {
61+
id: TPubSubPublishArgsByKey[TKey][0];
62+
}),
63+
]
5164
): Repeater<
5265
TPubSubPublishArgsByKey[TKey][1] extends undefined
5366
? MapToNull<TPubSubPublishArgsByKey[TKey][0]>
@@ -64,6 +77,75 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
6477
const target =
6578
config?.eventTarget ?? (new EventTarget() as PubSubEventTarget<TPubSubPublishArgsByKey>);
6679

80+
function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
81+
...args: TPubSubPublishArgsByKey[TKey][1] extends undefined
82+
? [routingKey: TKey]
83+
: [routingKey: TKey, id: TPubSubPublishArgsByKey[TKey][0]]
84+
): Repeater<
85+
TPubSubPublishArgsByKey[TKey][1] extends undefined
86+
? TPubSubPublishArgsByKey[TKey][0]
87+
: TPubSubPublishArgsByKey[TKey][1]
88+
>;
89+
function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
90+
args: {
91+
topic: string;
92+
buffer?: RepeaterBuffer | undefined;
93+
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
94+
? {
95+
id?: void;
96+
}
97+
: {
98+
id: TPubSubPublishArgsByKey[TKey][0];
99+
}),
100+
): Repeater<
101+
TPubSubPublishArgsByKey[TKey][1] extends undefined
102+
? TPubSubPublishArgsByKey[TKey][0]
103+
: TPubSubPublishArgsByKey[TKey][1]
104+
>;
105+
function subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
106+
...args:
107+
| (TPubSubPublishArgsByKey[TKey][1] extends undefined
108+
? [key: TKey]
109+
: [key: TKey, id: TPubSubPublishArgsByKey[TKey][0]])
110+
| [
111+
args: {
112+
topic: string;
113+
buffer?: RepeaterBuffer | undefined;
114+
} & (TPubSubPublishArgsByKey[TKey][1] extends undefined
115+
? {
116+
id?: void;
117+
}
118+
: {
119+
id: TPubSubPublishArgsByKey[TKey][0];
120+
}),
121+
]
122+
): Repeater<
123+
TPubSubPublishArgsByKey[TKey][1] extends undefined
124+
? TPubSubPublishArgsByKey[TKey][0]
125+
: TPubSubPublishArgsByKey[TKey][1]
126+
> {
127+
let topic: string;
128+
let buffer: RepeaterBuffer | undefined;
129+
if (typeof args[0] === 'string') {
130+
topic = args[1] === undefined ? args[0] : `${args[0]}:${args[1]}`;
131+
} else {
132+
topic = args[0].id === undefined ? args[0].topic : `${args[0].topic}:${args[0].id}`;
133+
buffer = args[0].buffer;
134+
}
135+
136+
return new Repeater(function subscriptionRepeater(next, stop) {
137+
stop.then(function subscriptionRepeaterStopHandler() {
138+
target.removeEventListener(topic, pubsubEventListener);
139+
});
140+
141+
target.addEventListener(topic, pubsubEventListener);
142+
143+
function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
144+
next(event.detail);
145+
}
146+
}, buffer);
147+
}
148+
67149
return {
68150
publish<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
69151
routingKey: TKey,
@@ -77,28 +159,6 @@ export const createPubSub = <TPubSubPublishArgsByKey extends PubSubPublishArgsBy
77159
});
78160
target.dispatchEvent(event);
79161
},
80-
subscribe<TKey extends Extract<keyof TPubSubPublishArgsByKey, string>>(
81-
...[routingKey, id]: TPubSubPublishArgsByKey[TKey][1] extends undefined
82-
? [TKey]
83-
: [TKey, TPubSubPublishArgsByKey[TKey][0]]
84-
): Repeater<
85-
TPubSubPublishArgsByKey[TKey][1] extends undefined
86-
? TPubSubPublishArgsByKey[TKey][0]
87-
: TPubSubPublishArgsByKey[TKey][1]
88-
> {
89-
const topic = id === undefined ? routingKey : `${routingKey}:${id as number}`;
90-
91-
return new Repeater(function subscriptionRepeater(next, stop) {
92-
stop.then(function subscriptionRepeaterStopHandler() {
93-
target.removeEventListener(topic, pubsubEventListener);
94-
});
95-
96-
target.addEventListener(topic, pubsubEventListener);
97-
98-
function pubsubEventListener(event: PubSubEvent<TPubSubPublishArgsByKey, TKey>) {
99-
next(event.detail);
100-
}
101-
});
102-
},
162+
subscribe,
103163
};
104164
};

0 commit comments

Comments
 (0)