Skip to content

Commit 02b2d6d

Browse files
committed
Add unix domain socket based async channel test
1 parent 6e17bc9 commit 02b2d6d

File tree

1 file changed

+169
-0
lines changed

1 file changed

+169
-0
lines changed
Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the SwiftNIO open source project
4+
//
5+
// Copyright (c) 2025 Apple Inc. and the SwiftNIO project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
import NIOPosix
15+
import Testing
16+
17+
@testable import NIOCore
18+
19+
@Suite
20+
private enum AsynChannelUnixDomainSocketTests {
21+
/// This is a end-to-end async channel based test.
22+
///
23+
/// The server side listens on a UNIX domain socket, and the client connects to this socket.
24+
///
25+
/// The server and client exchange simple, line based messages.
26+
@available(macOS 10.15, iOS 17, tvOS 13, watchOS 6, *)
27+
@Test()
28+
static func runServer() async throws {
29+
try await confirmation("Client did receive message") { clientDidReceive in
30+
try await confirmation("Server did receive message") { serverDidReceive in
31+
try await check(
32+
clientDidReceive: clientDidReceive,
33+
serverDidReceive: serverDidReceive
34+
)
35+
}
36+
}
37+
}
38+
}
39+
40+
@available(iOS 17.0, *)
41+
private func check(
42+
clientDidReceive: Confirmation,
43+
serverDidReceive: Confirmation
44+
) async throws {
45+
// This uses a hard-coded path.
46+
//
47+
// The path of a UNIX domain socket has a relatively low limit on its total
48+
// length, and we thus can not put this inside some (potentially) deeply
49+
// nested directory hierarchy.
50+
let path = "/tmp/9ac7750dc22a066066871aadf481e31a"
51+
let serverChannel = try await makeServerChannel(path: path)
52+
53+
try await withThrowingDiscardingTaskGroup { group in
54+
try await serverChannel.executeThenClose { inbound in
55+
group.addTask {
56+
// Create a client connection to the server:
57+
let clientChannel = try await makeClientChannel(path: path)
58+
print("Executing client channel")
59+
try await clientChannel.executeThenClose { inbound, outbound in
60+
print("C: Sending hello")
61+
try await outbound.write("Hello")
62+
63+
var inboundIterator = inbound.makeAsyncIterator()
64+
guard let messageA = try await inboundIterator.next() else { return }
65+
print("C: Did receive '\(messageA)'")
66+
clientDidReceive.confirm()
67+
#expect(messageA == "Hello")
68+
69+
try await outbound.write("QUIT")
70+
}
71+
}
72+
73+
for try await connectionChannel in inbound {
74+
group.addTask {
75+
print("Handling new connection")
76+
await handleConnection(
77+
channel: connectionChannel,
78+
serverDidReceive: serverDidReceive
79+
)
80+
print("Done handling connection")
81+
}
82+
break
83+
}
84+
}
85+
}
86+
}
87+
88+
private func makeServerChannel(
89+
path: String
90+
) async throws -> NIOAsyncChannel<NIOAsyncChannel<String, String>, Never> {
91+
try await ServerBootstrap(
92+
group: NIOSingletons.posixEventLoopGroup
93+
).bind(
94+
unixDomainSocketPath: path,
95+
cleanupExistingSocketFile: true,
96+
serverBackPressureStrategy: nil
97+
) { childChannel in
98+
childChannel.eventLoop.makeCompletedFuture {
99+
try childChannel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder()))
100+
try childChannel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder()))
101+
return try NIOAsyncChannel<String, String>(
102+
wrappingChannelSynchronously: childChannel
103+
)
104+
}
105+
}
106+
}
107+
108+
private func makeClientChannel(
109+
path: String
110+
) async throws -> NIOAsyncChannel<String, String> {
111+
try await ClientBootstrap(group: NIOSingletons.posixEventLoopGroup)
112+
.connect(unixDomainSocketPath: path)
113+
.flatMap { channel in
114+
channel.eventLoop.makeCompletedFuture {
115+
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(NewlineDelimiterCoder()))
116+
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(NewlineDelimiterCoder()))
117+
return try NIOAsyncChannel<String, String>(wrappingChannelSynchronously: channel)
118+
}
119+
}
120+
.get()
121+
}
122+
123+
private func handleConnection(
124+
channel: NIOAsyncChannel<String, String>,
125+
serverDidReceive: Confirmation
126+
) async {
127+
do {
128+
print("S: New channel")
129+
try await channel.executeThenClose { inbound, outbound in
130+
for try await message in inbound {
131+
print("S: Did receive '\(message)'")
132+
guard message != "QUIT" else { return }
133+
serverDidReceive.confirm()
134+
try await outbound.write(message)
135+
}
136+
print("S: Bye")
137+
}
138+
} catch {
139+
print("Error: \(error)")
140+
}
141+
}
142+
143+
/// A simple newline based encoder and decoder.
144+
private final class NewlineDelimiterCoder: ByteToMessageDecoder, MessageToByteEncoder {
145+
typealias InboundIn = ByteBuffer
146+
typealias InboundOut = String
147+
148+
private let newLine = UInt8(ascii: "\n")
149+
150+
init() {}
151+
152+
func decode(context: ChannelHandlerContext, buffer: inout ByteBuffer) throws -> DecodingState {
153+
let readableBytes = buffer.readableBytesView
154+
155+
if let firstLine = readableBytes.firstIndex(of: self.newLine).map({ readableBytes[..<$0] }) {
156+
buffer.moveReaderIndex(forwardBy: firstLine.count + 1)
157+
// Fire a read without a newline
158+
context.fireChannelRead(Self.wrapInboundOut(String(buffer: ByteBuffer(firstLine))))
159+
return .continue
160+
} else {
161+
return .needMoreData
162+
}
163+
}
164+
165+
func encode(data: String, out: inout ByteBuffer) throws {
166+
out.writeString(data)
167+
out.writeInteger(self.newLine)
168+
}
169+
}

0 commit comments

Comments
 (0)