Skip to content

Commit cad03db

Browse files
authored
Reconnect when making streams and the LB is idle (#41)
Motivation: When a LB becomes idle and a stream is created it's enqueued but doesn't cause the LB to start reconnecting again. Modifications: - Trigger a reconnect when enqueuing a waiter and the LB is idle Result: - Resolves #40
1 parent 6c0bc45 commit cad03db

File tree

2 files changed

+65
-5
lines changed

2 files changed

+65
-5
lines changed

Sources/GRPCNIOTransportCore/Client/Connection/GRPCChannel.swift

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -337,10 +337,15 @@ extension GRPCChannel {
337337
return
338338
}
339339

340-
let enqueued = self.state.withLock { state in
340+
let (enqueued, loadBalancer) = self.state.withLock { state in
341341
state.enqueue(continuation: continuation, waitForReady: waitForReady, id: id)
342342
}
343343

344+
if let loadBalancer = loadBalancer {
345+
// Attempting to pick a subchannel will trigger a connect event if the subchannel is idle.
346+
_ = loadBalancer.pickSubchannel()
347+
}
348+
344349
// Not enqueued because the channel is shutdown or shutting down.
345350
if !enqueued {
346351
let error = RPCError(code: .unavailable, message: "channel is shutdown")
@@ -896,20 +901,21 @@ extension GRPCChannel.StateMachine {
896901
continuation: CheckedContinuation<LoadBalancer, any Error>,
897902
waitForReady: Bool,
898903
id: QueueEntryID
899-
) -> Bool {
904+
) -> (enqueued: Bool, loadBalancer: LoadBalancer?) {
900905
switch self.state {
901906
case .notRunning(var state):
902907
self.state = ._modifying
903908
state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
904909
self.state = .notRunning(state)
905-
return true
910+
return (true, nil)
906911
case .running(var state):
907912
self.state = ._modifying
908913
state.queue.append(continuation: continuation, waitForReady: waitForReady, id: id)
909914
self.state = .running(state)
910-
return true
915+
// If idle then return the current load balancer so that it can be told to start connecting.
916+
return (true, state.connectivityState == .idle ? state.current : nil)
911917
case .stopping, .stopped:
912-
return false
918+
return (false, nil)
913919
case ._modifying:
914920
fatalError("Invalid state")
915921
}

Tests/GRPCNIOTransportCoreTests/Client/Connection/GRPCChannelTests.swift

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,60 @@ final class GRPCChannelTests: XCTestCase {
795795
group.cancelAll()
796796
}
797797
}
798+
799+
func testMakeStreamAfterIdleTimeout() async throws {
800+
let server = TestServer(eventLoopGroup: .singletonMultiThreadedEventLoopGroup)
801+
let address = try await server.bind()
802+
803+
// Configure a low idle time.
804+
let channel = GRPCChannel(
805+
resolver: .static(endpoints: [Endpoint(address)]),
806+
connector: .posix(maxIdleTime: .milliseconds(50)),
807+
config: .defaults,
808+
defaultServiceConfig: ServiceConfig()
809+
)
810+
811+
try await withThrowingDiscardingTaskGroup { group in
812+
group.addTask {
813+
// Just respond with 'ok'.
814+
try await server.run { inbound, outbound in
815+
let status = Status(code: .ok, message: "")
816+
try await outbound.write(.status(status, [:]))
817+
outbound.finish()
818+
}
819+
}
820+
821+
group.addTask {
822+
await channel.connect()
823+
}
824+
825+
func doAnRPC() async throws {
826+
try await channel.withStream(descriptor: .echoGet, options: .defaults) { stream in
827+
try await stream.outbound.write(.metadata([:]))
828+
await stream.outbound.finish()
829+
830+
let response = try await stream.inbound.reduce(into: []) { $0.append($1) }
831+
switch response.first {
832+
case .status(let status, _):
833+
XCTAssertEqual(status.code, .ok)
834+
default:
835+
XCTFail("Expected status")
836+
}
837+
}
838+
}
839+
840+
// Do an RPC.
841+
try await doAnRPC()
842+
843+
// Wait for the idle time to pass.
844+
try await Task.sleep(for: .milliseconds(100))
845+
846+
// Do another RPC.
847+
try await doAnRPC()
848+
849+
group.cancelAll()
850+
}
851+
}
798852
}
799853

800854
extension GRPCChannel.Config {

0 commit comments

Comments
 (0)