Skip to content

Commit 5d9d3cb

Browse files
committed
watch debouncing
1 parent 2a5322a commit 5d9d3cb

File tree

5 files changed

+131
-49
lines changed

5 files changed

+131
-49
lines changed

package-lock.json

Lines changed: 4 additions & 13 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/server/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@
4040
"effection": "^4.0.0",
4141
"@effectionx/context-api": "^0.2.1",
4242
"@effectionx/process": "^0.6.2",
43-
"@effectionx/stream-helpers": "^0.4.1",
43+
"@effectionx/stream-helpers": "^0.5.1",
4444
"@effectionx/timebox": "^0.3.1",
4545
"chokidar": "^5.0.0",
4646
"picomatch": "^4.0.3"

packages/server/src/services.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export type ServiceGraph<
3535
[service in keyof S]: ServiceDefinition<keyof S, T>;
3636
};
3737
serviceUpdates?: Stream<ServiceUpdate, unknown> | undefined;
38+
serviceChanges?: Stream<ServiceUpdate, unknown> | undefined;
3839
// map of service name => listening port (when the service exposes one)
3940
servicePorts?: Map<string, number> | undefined;
4041
};
@@ -122,7 +123,12 @@ export function useServiceGraph<
122123
);
123124
}
124125

125-
const watcher = yield* useWatcher(effectiveServices);
126+
const watcher = yield* useWatcher(
127+
effectiveServices,
128+
options?.watchDebounce
129+
? { watchDebounce: options.watchDebounce }
130+
: undefined
131+
);
126132

127133
const status = new Map<
128134
string,
@@ -156,7 +162,7 @@ export function useServiceGraph<
156162

157163
yield* spawn(function* () {
158164
// restart propagation to dependents is handled by useWatcher
159-
for (let restartService of yield* each(watcher.serviceUpdates)) {
165+
for (let restartService of yield* each(watcher.serviceChanges)) {
160166
bumpService(restartService.service);
161167
yield* each.next();
162168
}
@@ -219,6 +225,7 @@ export function useServiceGraph<
219225
yield* provide({
220226
services: services as S,
221227
serviceUpdates: watcher?.serviceUpdates,
228+
serviceChanges: watcher?.serviceChanges,
222229
servicePorts,
223230
});
224231
} finally {

packages/server/src/watch.ts

Lines changed: 30 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -5,44 +5,25 @@ import {
55
createSignal,
66
each,
77
type Operation,
8-
race,
98
resource,
10-
sleep,
119
spawn,
1210
type Stream,
1311
until,
1412
} from "effection";
1513
import picomatch, { type Matcher } from "picomatch";
16-
17-
export function debounce<T, R>(
18-
ms: number
19-
): (stream: Stream<T, R>) => Stream<T, R> {
20-
return (stream) => ({
21-
*[Symbol.iterator]() {
22-
let subscription = yield* stream;
23-
return {
24-
*next() {
25-
let next = yield* subscription.next();
26-
while (true) {
27-
let result = yield* race([sleep(ms), subscription.next()]);
28-
if (!result) {
29-
return next;
30-
} else {
31-
next = result;
32-
}
33-
}
34-
},
35-
};
36-
},
37-
});
38-
}
14+
import { filter } from "@effectionx/stream-helpers";
3915

4016
export type ServiceUpdate = { service: string; path: string };
4117

4218
export function useWatcher(
43-
services?: Record<string, { dependsOn?: { restart?: readonly string[] } }>
19+
services?: Record<
20+
string,
21+
{ dependsOn?: { restart?: readonly string[] }; watchDebounce?: number }
22+
>,
23+
options?: { watchDebounce?: number }
4424
): Operation<{
4525
serviceUpdates: Stream<{ service: string; path: string }, void>;
26+
serviceChanges: Stream<{ service: string; path: string }, void>;
4627
add: (service: string, paths: string[]) => void;
4728
}> {
4829
return resource(function* (provide) {
@@ -119,8 +100,30 @@ export function useWatcher(
119100
}
120101
});
121102

103+
const debounceMs =
104+
options?.watchDebounce !== undefined ? options.watchDebounce : 250;
105+
const serviceTimers = {} as Record<string, number>;
106+
const debouncedServiceChanges = filter<ServiceUpdate>(function* (
107+
updateStream
108+
) {
109+
const now = performance.now();
110+
if (
111+
serviceTimers[updateStream.service] &&
112+
now - serviceTimers[updateStream.service] < debounceMs
113+
) {
114+
return false;
115+
} else {
116+
serviceTimers[updateStream.service] = now;
117+
return true;
118+
}
119+
});
120+
122121
try {
123-
yield* provide({ serviceUpdates, add });
122+
yield* provide({
123+
serviceUpdates,
124+
add,
125+
serviceChanges: debouncedServiceChanges(serviceUpdates),
126+
});
124127
} finally {
125128
yield* until(watcher.close());
126129
}

packages/server/test/watch.test.ts

Lines changed: 87 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { it } from "node:test";
22
import assert from "node:assert";
3-
import { run, suspend, sleep, until, spawn, resource } from "effection";
3+
import { run, suspend, sleep, until, spawn, resource, ensure } from "effection";
44
import * as fs from "node:fs/promises";
55
import path from "node:path";
66
import os from "node:os";
@@ -41,12 +41,12 @@ it("restarts services on watched file change and restarts dependents", async ()
4141

4242
try {
4343
const services = yield* op();
44-
// subscribe to the immediate serviceUpdates stream and wait for the first update
45-
if (!services.serviceUpdates)
46-
throw new Error("serviceUpdates not available");
47-
const subscription = yield* services.serviceUpdates;
44+
// subscribe to the immediate raw serviceChanges stream and wait for the first update
45+
if (!services.serviceChanges)
46+
throw new Error("serviceChanges not available");
47+
const subscription = yield* services.serviceChanges;
4848

49-
// wait for the first update (will occur after the test touches the file)
49+
// wait for the first raw update (will occur after the test touches the file)
5050
const first = yield* subscription.next();
5151
updates.push(String((first.value as { service: string }).service));
5252
} catch (e) {
@@ -210,3 +210,84 @@ it("restarts transitive dependents when watched service changes", async () => {
210210
assert(startCounts.b >= 2, "b should have been restarted as dependent");
211211
assert(startCounts.c >= 2, "c should have been restarted as dependent of b");
212212
});
213+
214+
it("debounces rapid changes per service", async () => {
215+
const prefix = path.join(os.tmpdir(), "sim-watch-debounce-");
216+
const dir = await fs.mkdtemp(prefix);
217+
const trigger = path.join(dir, "trigger.txt");
218+
await fs.writeFile(trigger, "initial");
219+
220+
const updates: string[] = [];
221+
let rawCount = 0;
222+
223+
await run(function* () {
224+
yield* spawn(function* () {
225+
const op = useServiceGraph(
226+
{
227+
a: {
228+
watch: [dir],
229+
operation: resource<void>(function* (provide) {
230+
yield* provide();
231+
}),
232+
},
233+
},
234+
{ watch: true, watchDebounce: 150 }
235+
);
236+
237+
try {
238+
const services = yield* op();
239+
if (!services.serviceUpdates || !services.serviceChanges)
240+
throw new Error("service streams not available");
241+
const debSub = yield* services.serviceUpdates;
242+
const rawSub = yield* services.serviceChanges;
243+
244+
// collect debounced updates
245+
yield* spawn(function* () {
246+
while (true) {
247+
const n = yield* debSub.next();
248+
if (n.done) break;
249+
updates.push((n.value as { service: string }).service);
250+
}
251+
});
252+
253+
// count raw updates (should reflect every write)
254+
yield* spawn(function* () {
255+
while (true) {
256+
const n = yield* rawSub.next();
257+
if (n.done) break;
258+
if ((n.value as { service: string }).service === "a") rawCount++;
259+
}
260+
});
261+
} catch (e) {
262+
throw e;
263+
}
264+
265+
yield* suspend();
266+
});
267+
268+
// ensure watcher attached
269+
yield* sleep(0);
270+
271+
// write multiple times rapidly
272+
yield* until(fs.writeFile(trigger, "changed-1"));
273+
yield* sleep(10);
274+
yield* until(fs.writeFile(trigger, "changed-2"));
275+
yield* sleep(10);
276+
yield* until(fs.writeFile(trigger, "changed-3"));
277+
278+
yield* ensure(() => until(fs.rm(dir, { recursive: true, force: true })));
279+
// wait longer than debounce window
280+
yield* sleep(300);
281+
});
282+
283+
// we expect the rapid writes to coalesce: there should be at least one
284+
// raw and at least one debounced update, and debounced updates should be
285+
// fewer than the number of writes (3)
286+
assert(rawCount >= 1, `expected at least 1 raw update, got ${rawCount}`);
287+
assert(updates.length >= 1, "expected at least one debounced update");
288+
const aCount = updates.filter((u) => u === "a").length;
289+
assert(
290+
aCount < 3,
291+
`expected debounced updates to be fewer than writes (3), got ${aCount}`
292+
);
293+
});

0 commit comments

Comments
 (0)