Skip to content

Commit 7eed3b4

Browse files
authored
feat: allow interrupting repeating task (#3186)
Adds a `.run` method to a repeating task that lets us interrupt the timeout to run immediately (debounced) before rescheduling the task to run again in the future.
1 parent c142feb commit 7eed3b4

File tree

2 files changed

+108
-17
lines changed

2 files changed

+108
-17
lines changed

packages/utils/src/repeating-task.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { anySignal } from 'any-signal'
22
import { setMaxListeners } from 'main-event'
3+
import { debounce } from './debounce.ts'
34
import type { AbortOptions } from '@libp2p/interface'
45

56
export interface RepeatingTask {
@@ -25,6 +26,13 @@ export interface RepeatingTask {
2526
*/
2627
setTimeout(ms: number): void
2728

29+
/**
30+
* Schedule the task to be run immediately - if the task is not running it
31+
* will run after a short delay in order to debounce multiple `.run()`
32+
* invocations.
33+
*/
34+
run(): void
35+
2836
/**
2937
* Start the task running
3038
*/
@@ -49,11 +57,20 @@ export interface RepeatingTaskOptions {
4957
* @default false
5058
*/
5159
runImmediately?: boolean
60+
61+
/**
62+
* When `.run()` is called to run the task outside of the current interval,
63+
* debounce repeated calls to `.run()` by this amount.
64+
*
65+
* @default 100
66+
*/
67+
debounce?: number
5268
}
5369

5470
export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<void>, interval: number, options?: RepeatingTaskOptions): RepeatingTask {
5571
let timeout: ReturnType<typeof setTimeout>
5672
let shutdownController: AbortController
73+
let running = false
5774

5875
function runTask (): void {
5976
const opts: AbortOptions = {
@@ -67,11 +84,15 @@ export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<vo
6784
opts.signal = signal
6885
}
6986

87+
running = true
88+
7089
Promise.resolve().then(async () => {
7190
await fn(opts)
7291
})
7392
.catch(() => {})
7493
.finally(() => {
94+
running = false
95+
7596
if (shutdownController.signal.aborted) {
7697
// task has been cancelled, bail
7798
return
@@ -82,6 +103,8 @@ export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<vo
82103
})
83104
}
84105

106+
const runTaskDebounced = debounce(runTask, options?.debounce ?? 100)
107+
85108
let started = false
86109

87110
return {
@@ -103,6 +126,14 @@ export function repeatingTask (fn: (options?: AbortOptions) => void | Promise<vo
103126
options ??= {}
104127
options.timeout = ms
105128
},
129+
run: (): void => {
130+
if (running) {
131+
return
132+
}
133+
134+
clearTimeout(timeout)
135+
runTaskDebounced()
136+
},
106137
start: (): void => {
107138
if (started) {
108139
return

packages/utils/test/repeating-task.spec.ts

Lines changed: 77 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,19 @@ import { expect } from 'aegir/chai'
22
import delay from 'delay'
33
import pDefer from 'p-defer'
44
import { repeatingTask } from '../src/repeating-task.js'
5+
import type { RepeatingTask } from '../src/repeating-task.js'
56

67
describe('repeating-task', () => {
8+
let task: RepeatingTask
9+
10+
afterEach(() => {
11+
task?.stop()
12+
})
13+
714
it('should repeat a task', async () => {
815
let count = 0
916

10-
const task = repeatingTask(() => {
17+
task = repeatingTask(() => {
1118
count++
1219
}, 100)
1320
task.start()
@@ -22,7 +29,7 @@ describe('repeating-task', () => {
2229
it('should run a task immediately', async () => {
2330
let count = 0
2431

25-
const task = repeatingTask(() => {
32+
task = repeatingTask(() => {
2633
count++
2734
}, 60000, {
2835
runImmediately: true
@@ -31,15 +38,13 @@ describe('repeating-task', () => {
3138

3239
await delay(10)
3340

34-
task.stop()
35-
3641
expect(count).to.equal(1)
3742
})
3843

3944
it('should time out a task', async () => {
4045
const deferred = pDefer()
4146

42-
const task = repeatingTask((opts) => {
47+
task = repeatingTask((opts) => {
4348
opts?.signal?.addEventListener('abort', () => {
4449
deferred.resolve()
4550
})
@@ -49,29 +54,26 @@ describe('repeating-task', () => {
4954
task.start()
5055

5156
await deferred.promise
52-
task.stop()
5357
})
5458

5559
it('should repeat a task that throws', async () => {
5660
let count = 0
5761

58-
const task = repeatingTask(() => {
62+
task = repeatingTask(() => {
5963
count++
6064
throw new Error('Urk!')
6165
}, 100)
6266
task.start()
6367

6468
await delay(1000)
6569

66-
task.stop()
67-
6870
expect(count).to.be.greaterThan(1)
6971
})
7072

7173
it('should update the interval of a task', async () => {
7274
let count = 0
7375

74-
const task = repeatingTask(() => {
76+
task = repeatingTask(() => {
7577
count++
7678

7779
if (count === 1) {
@@ -82,15 +84,13 @@ describe('repeating-task', () => {
8284

8385
await delay(1000)
8486

85-
task.stop()
86-
8787
expect(count).to.equal(1)
8888
})
8989

9090
it('should update the timeout of a task', async () => {
9191
let count = 0
9292

93-
const task = repeatingTask(async (options) => {
93+
task = repeatingTask(async (options) => {
9494
// simulate a delay
9595
await delay(100)
9696

@@ -109,15 +109,13 @@ describe('repeating-task', () => {
109109

110110
await delay(1000)
111111

112-
task.stop()
113-
114112
expect(count).to.equal(1)
115113
})
116114

117115
it('should not reschedule the task if the interval is updated to the same value', async () => {
118116
let count = 0
119117

120-
const task = repeatingTask(() => {
118+
task = repeatingTask(() => {
121119
count++
122120
}, 1_000, {
123121
runImmediately: true
@@ -134,7 +132,69 @@ describe('repeating-task', () => {
134132

135133
await delay(100)
136134

137-
task.stop()
135+
expect(count).to.equal(2)
136+
})
137+
138+
it('should allow interrupting the timeout to run the task immediately', async () => {
139+
let count = 0
140+
141+
task = repeatingTask(() => {
142+
count++
143+
}, 1_000)
144+
task.start()
145+
146+
// run immediately
147+
task.run()
148+
149+
// less than the repeat interval
150+
await delay(200)
151+
152+
expect(count).to.equal(1)
153+
})
154+
155+
it('should debounce interrupting the timeout to run the task immediately', async () => {
156+
let count = 0
157+
158+
task = repeatingTask(() => {
159+
count++
160+
}, 1_000, {
161+
debounce: 10
162+
})
163+
task.start()
164+
165+
// run immediately
166+
task.run()
167+
task.run()
168+
task.run()
169+
task.run()
170+
task.run()
171+
172+
// less than the repeat interval
173+
await delay(50)
174+
175+
expect(count).to.equal(1)
176+
})
177+
178+
it('should schedule re-running the task after interrupting the timeout', async () => {
179+
let count = 0
180+
181+
task = repeatingTask(() => {
182+
count++
183+
}, 100, {
184+
debounce: 10
185+
})
186+
task.start()
187+
188+
// run immediately
189+
task.run()
190+
191+
// less than the repeat interval
192+
await delay(50)
193+
194+
expect(count).to.equal(1)
195+
196+
// wait longer than the repeat interval
197+
await delay(150)
138198

139199
expect(count).to.equal(2)
140200
})

0 commit comments

Comments
 (0)