Skip to content

Commit 458c8d5

Browse files
authored
Add an optional timeout to the periodic scheduler (#539)
This can cause extractor shutdown to hang forever if there's an issue with shutting down a task, so we should allow setting a timeout. Should we set a default? It would technically be a breaking change, but hanging forever seems unlikely to be desirable behavior.
1 parent 881f71f commit 458c8d5

File tree

3 files changed

+24
-3
lines changed

3 files changed

+24
-3
lines changed

Cognite.Common/PeriodicScheduler.cs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,7 +254,7 @@ public async Task WaitForTermination(string name)
254254
/// Same as calling ExitAndWaitForTermination on all tasks.
255255
/// </summary>
256256
/// <returns>Task which completes once all tasks are done</returns>
257-
public async Task ExitAllAndWait()
257+
public async Task ExitAllAndWait(int timeoutMs = 0)
258258
{
259259
var tasks = new List<PeriodicTask>(_tasks.Count);
260260
lock (_taskListMutex)
@@ -268,7 +268,19 @@ public async Task ExitAllAndWait()
268268
}
269269
try
270270
{
271-
await Task.WhenAll(tasks.Select(tsk => tsk.Task)).ConfigureAwait(false);
271+
var timeout = timeoutMs > 0 ? TimeSpan.FromMilliseconds(timeoutMs) : Timeout.InfiniteTimeSpan;
272+
var tasksToWaitFor = tasks.Select(tsk => tsk.Task).ToList();
273+
// Do not pass the cancellation token, since we are likely cancelled, and this method bypasses that.
274+
var waitTask = CommonUtils.WaitAsync(_newTaskEvent, timeout, CancellationToken.None);
275+
var res = await Task.WhenAny(Task.WhenAll(tasksToWaitFor), waitTask).ConfigureAwait(false);
276+
if (res == waitTask)
277+
{
278+
var notFinishedTasks = tasks.Where(t => !t.Task.IsCompleted).Select(t => t.Name).ToList();
279+
280+
throw new TimeoutException(
281+
$"Timed out waiting for task scheduler to finish. The following tasks failed to finish within {timeout}: {string.Join(", ", notFinishedTasks)}"
282+
);
283+
}
272284
}
273285
catch (TaskCanceledException) { }
274286
catch (AggregateException aex)

ExtractorUtils.Test/unit/SchedulerTest.cs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,5 +205,14 @@ public async Task TestBlockingResourceIncCapacity()
205205

206206
Assert.Equal(1, await task2);
207207
}
208+
209+
[Fact]
210+
public async Task TestSchedulerTimeout()
211+
{
212+
using var source = new CancellationTokenSource();
213+
using var scheduler = new PeriodicScheduler(source.Token);
214+
scheduler.ScheduleTask("forever", (_t) => Task.Delay(Timeout.Infinite, source.Token));
215+
await Assert.ThrowsAsync<TimeoutException>(() => scheduler.ExitAllAndWait(200));
216+
}
208217
}
209218
}

version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
1.32.0
1+
1.33.0

0 commit comments

Comments
 (0)