Skip to content

Commit c295efd

Browse files
authored
Set tolerance to zero when using Task.sleep (#2225)
`Task.sleep` will by default try and coalesce multiple timers into one, mostly for client-specific reasons such as performance, power consumption, etc. However, this is undesirable on servers, as it can increase latency, memory usage, and (in the case of gRPC) may result in timeouts not firing when they should. We can avoid this by setting the sleep `tolerance` to zero.
1 parent e0ba0ed commit c295efd

File tree

8 files changed

+24
-19
lines changed

8 files changed

+24
-19
lines changed

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+HedgingExecutor.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ extension ClientRPCExecutor.HedgingExecutor {
8383
if let deadline = self.deadline {
8484
group.addTask {
8585
let result = await Result {
86-
try await Task.sleep(until: deadline, clock: .continuous)
86+
try await Task.sleep(until: deadline, tolerance: .zero, clock: .continuous)
8787
}
8888
return .timedOut(result)
8989
}
@@ -533,7 +533,7 @@ extension ClientRPCExecutor.HedgingExecutor {
533533
self._isPushback = pushback
534534
self._handle = group.addCancellableTask {
535535
do {
536-
try await Task.sleep(for: delay, clock: .continuous)
536+
try await Task.sleep(for: delay, tolerance: .zero, clock: .continuous)
537537
return .scheduledAttemptFired(.ran)
538538
} catch {
539539
return .scheduledAttemptFired(.cancelled)

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+OneShotExecutor.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ func withDeadline<Result: Sendable>(
137137
return await withTaskGroup(of: _DeadlineChildTaskResult<Result>.self) { group in
138138
group.addTask {
139139
do {
140-
try await Task.sleep(until: deadline)
140+
try await Task.sleep(until: deadline, tolerance: .zero)
141141
return .deadlinePassed
142142
} catch {
143143
return .timeoutCancelled

Sources/GRPCCore/Call/Client/Internal/ClientRPCExecutor+RetryExecutor.swift

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ extension ClientRPCExecutor.RetryExecutor {
9292
if let deadline = self.deadline {
9393
group.addTask {
9494
let result = await Result {
95-
try await Task.sleep(until: deadline, clock: .continuous)
95+
try await Task.sleep(until: deadline, tolerance: .zero, clock: .continuous)
9696
}
9797
return .timedOut(result)
9898
}
@@ -155,11 +155,16 @@ extension ClientRPCExecutor.RetryExecutor {
155155
// If the delay is overridden with server pushback then reset the iterator for the
156156
// next retry.
157157
delayIterator = delaySequence.makeIterator()
158-
try? await Task.sleep(until: .now.advanced(by: delayOverride), clock: .continuous)
158+
try? await Task.sleep(
159+
until: .now.advanced(by: delayOverride),
160+
tolerance: .zero,
161+
clock: .continuous
162+
)
159163
} else {
160164
// The delay iterator never terminates.
161165
try? await Task.sleep(
162166
until: .now.advanced(by: delayIterator.next()!),
167+
tolerance: .zero,
163168
clock: .continuous
164169
)
165170
}

Sources/GRPCCore/Call/Server/Internal/ServerRPCExecutor.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ struct ServerRPCExecutor {
123123
await withTaskGroup(of: Void.self) { group in
124124
group.addTask {
125125
do {
126-
try await Task.sleep(for: timeout, clock: .continuous)
126+
try await Task.sleep(for: timeout, tolerance: .zero, clock: .continuous)
127127
context.cancellation.cancel()
128128
} catch {
129129
() // Only cancel the RPC if the timeout completes.

Tests/GRPCCoreTests/Call/Client/Internal/ClientRPCExecutorTestSupport/ClientRPCExecutorTestHarness+ServerBehavior.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ extension ClientRPCExecutorTestHarness.ServerStreamHandler {
112112

113113
static func sleepFor(duration: Duration, then handler: Self) -> Self {
114114
return Self { stream in
115-
try await Task.sleep(until: .now.advanced(by: duration), clock: .continuous)
115+
try await Task.sleep(until: .now.advanced(by: duration), tolerance: .zero, clock: .continuous)
116116
try await handler.handle(stream: stream)
117117
}
118118
}

Tests/GRPCCoreTests/GRPCClientTests.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ final class GRPCClientTests: XCTestCase {
4040
transport: inProcess.client,
4141
interceptorPipeline: interceptorPipeline
4242
) { client in
43-
try await Task.sleep(for: .milliseconds(100))
43+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
4444
try await body(client, server)
4545
}
4646
}
@@ -341,7 +341,7 @@ final class GRPCClientTests: XCTestCase {
341341
let task = Task {
342342
try await client.clientStreaming(
343343
request: StreamingClientRequest { writer in
344-
try await Task.sleep(for: .seconds(5))
344+
try await Task.sleep(for: .seconds(5), tolerance: .zero)
345345
},
346346
descriptor: BinaryEcho.Methods.collect,
347347
serializer: IdentitySerializer(),
@@ -382,7 +382,7 @@ final class GRPCClientTests: XCTestCase {
382382
// Run the client.
383383
let task = Task { try await client.runConnections() }
384384
// Make sure the client is run for the first time here.
385-
try await Task.sleep(for: .milliseconds(10))
385+
try await Task.sleep(for: .milliseconds(10), tolerance: .zero)
386386

387387
// Client is already running, should throw an error.
388388
await XCTAssertThrowsErrorAsync(ofType: RuntimeError.self) {
@@ -545,7 +545,7 @@ struct ClientTests {
545545
}
546546

547547
// Make sure both server and client are running
548-
try await Task.sleep(for: .milliseconds(100))
548+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
549549
try await body(client, server)
550550
client.beginGracefulShutdown()
551551
server.beginGracefulShutdown()

Tests/GRPCCoreTests/Internal/Result+CatchingTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import XCTest
2121
final class ResultCatchingTests: XCTestCase {
2222
func testResultCatching() async {
2323
let result = await Result {
24-
try? await Task.sleep(nanoseconds: 1)
24+
try? await Task.sleep(for: .nanoseconds(1), tolerance: .zero)
2525
throw RPCError(code: .unknown, message: "foo")
2626
}
2727

Tests/GRPCInProcessTransportTests/InProcessClientTransportTests.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ final class InProcessClientTransportTests: XCTestCase {
6262
try await client.connect()
6363
}
6464
group.addTask {
65-
try await Task.sleep(for: .milliseconds(100))
65+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
6666
}
6767

6868
try await group.next()
@@ -97,7 +97,7 @@ final class InProcessClientTransportTests: XCTestCase {
9797
try await client.connect()
9898
}
9999
group.addTask {
100-
try await Task.sleep(for: .milliseconds(100))
100+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
101101
}
102102

103103
try await group.next()
@@ -121,7 +121,7 @@ final class InProcessClientTransportTests: XCTestCase {
121121
group.addTask {
122122
// Add a sleep to make sure connection happens after `withStream` has been called,
123123
// to test pending streams are handled correctly.
124-
try await Task.sleep(for: .milliseconds(100))
124+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
125125
try await client.connect()
126126
}
127127

@@ -171,7 +171,7 @@ final class InProcessClientTransportTests: XCTestCase {
171171
}
172172

173173
group.addTask {
174-
try await Task.sleep(for: .milliseconds(100))
174+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
175175
client.beginGracefulShutdown()
176176
}
177177

@@ -252,18 +252,18 @@ final class InProcessClientTransportTests: XCTestCase {
252252

253253
group.addTask {
254254
try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
255-
try await Task.sleep(for: .milliseconds(100))
255+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
256256
}
257257
}
258258

259259
group.addTask {
260260
try await client.withStream(descriptor: .testTest, options: .defaults) { stream, _ in
261-
try await Task.sleep(for: .milliseconds(100))
261+
try await Task.sleep(for: .milliseconds(100), tolerance: .zero)
262262
}
263263
}
264264

265265
group.addTask {
266-
try await Task.sleep(for: .milliseconds(50))
266+
try await Task.sleep(for: .milliseconds(50), tolerance: .zero)
267267
client.beginGracefulShutdown()
268268
}
269269

0 commit comments

Comments
 (0)