Skip to content

Commit 0e97917

Browse files
authored
Implement basic Memcached Connection Actor with set and get operations (#16)
* init connection actor * init AsyncChannel connection actor * dispatch request using AsyncStream * remove debug print * doc comments for makeStream * iterator to handle request * passing connection actor test * clean up * added ConnectionState * closes #15 * implemented MemcachedValue * Unit Test for MemcachedValue * soundness cleanup * public not needed on MemcachedConnectionError * only create connection with calling run * soundess * queue request init * soundness closes #15 and #7 * removed MemcachedValue * defer the group
1 parent ed88f85 commit 0e97917

File tree

4 files changed

+258
-1
lines changed

4 files changed

+258
-1
lines changed

Package.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ let package = Package(
3030
),
3131
],
3232
dependencies: [
33-
.package(url: "https://github.com/apple/swift-nio.git", from: "2.43.1"),
33+
.package(url: "https://github.com/apple/swift-nio.git", from: "2.56.0"),
3434
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
3535
],
3636
targets: [
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-memcache-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-memcache-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-memcache-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// `AsyncStream` extension to facilitate creating an `AsyncStream` along with its corresponding `Continuation`.
16+
///
17+
/// This extension creates an `AsyncStream` and returns it along with its corresponding `Continuation`.
18+
/// A common usage pattern involves yielding requests and a `CheckedContinuation` via `withCheckedThrowingContinuation`
19+
/// to the `AsyncStream`'s `Continuation`.
20+
///
21+
/// - Parameters:
22+
/// - elementType: The type of element that the stream handles. By default, this is the `Element` type that the `AsyncStream` is initialized with.
23+
/// - limit: The buffering limit that the stream should use. By default, this is `.unbounded`.
24+
///
25+
/// - Returns: A tuple containing the created `AsyncStream` and its corresponding `Continuation`.
26+
extension AsyncStream {
27+
internal static func makeStream(
28+
of elementType: Element.Type = Element.self,
29+
bufferingPolicy limit: Continuation.BufferingPolicy = .unbounded
30+
) -> (stream: AsyncStream<Element>, continuation: AsyncStream<Element>.Continuation) {
31+
var continuation: AsyncStream<Element>.Continuation!
32+
let stream = AsyncStream<Element>(bufferingPolicy: limit) { continuation = $0 }
33+
return (stream: stream, continuation: continuation!)
34+
}
35+
}
Lines changed: 198 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,198 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-memcache-gsoc open source project
4+
//
5+
// Copyright (c) 2023 Apple Inc. and the swift-memcache-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-memcache-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
@_spi(AsyncChannel)
15+
16+
import NIOCore
17+
import NIOPosix
18+
19+
/// An actor to create a connection to a Memcache server.
20+
///
21+
/// This actor can be used to send commands to the server.
22+
public actor MemcachedConnection {
23+
private typealias StreamElement = (MemcachedRequest, CheckedContinuation<MemcachedResponse, Error>)
24+
private let host: String
25+
private let port: Int
26+
27+
/// Enum representing the current state of the MemcachedConnection.
28+
///
29+
/// The State is either initial, running or finished, depending on whether the connection
30+
/// to the server is active or has been closed. When running, it contains the properties
31+
/// for the buffer allocator, request stream, and the stream's continuation.
32+
private enum State {
33+
case initial(
34+
/// The channel's event loop group.
35+
eventLoopGroup: EventLoopGroup,
36+
/// The allocator used to create new buffers.
37+
bufferAllocator: ByteBufferAllocator,
38+
/// The stream of requests to be sent to the server.
39+
requestStream: AsyncStream<StreamElement>,
40+
/// The continuation for the request stream.
41+
requestContinuation: AsyncStream<StreamElement>.Continuation
42+
)
43+
case running(
44+
/// The allocator used to create new buffers.
45+
bufferAllocator: ByteBufferAllocator,
46+
/// The underlying channel to communicate with the server.
47+
channel: NIOAsyncChannel<MemcachedResponse, MemcachedRequest>,
48+
/// The stream of requests to be sent to the server.
49+
requestStream: AsyncStream<StreamElement>,
50+
/// The continuation for the request stream.
51+
requestContinuation: AsyncStream<StreamElement>.Continuation
52+
)
53+
case finished
54+
}
55+
56+
/// Enum representing the possible errors that can be encountered in `MemcachedConnection`.
57+
enum MemcachedConnectionError: Error {
58+
/// Indicates that the connection has shut down.
59+
case connectionShutdown
60+
}
61+
62+
private var state: State
63+
64+
/// Initialize a new MemcachedConnection.
65+
///
66+
/// - Parameters:
67+
/// - host: The host address of the Memcache server.
68+
/// - port: The port number of the Memcache server.
69+
/// - eventLoopGroup: The event loop group to use for this connection.
70+
public init(host: String, port: Int, eventLoopGroup: EventLoopGroup) {
71+
self.host = host
72+
self.port = port
73+
let (stream, continuation) = AsyncStream<StreamElement>.makeStream()
74+
let bufferAllocator = ByteBufferAllocator()
75+
self.state = .initial(
76+
eventLoopGroup: eventLoopGroup,
77+
bufferAllocator: bufferAllocator,
78+
requestStream: stream,
79+
requestContinuation: continuation
80+
)
81+
}
82+
83+
/// Runs the Memcache connection.
84+
///
85+
/// This method connects to the Memcache server and starts handling requests. It only returns when the connection
86+
/// to the server is finished or the task that called this method is cancelled.
87+
public func run() async throws {
88+
guard case .initial(let eventLoopGroup, let bufferAllocator, let stream, let continuation) = state else {
89+
throw MemcachedConnectionError.connectionShutdown
90+
}
91+
92+
let channel = try await ClientBootstrap(group: eventLoopGroup)
93+
.connect(host: self.host, port: self.port)
94+
.flatMap { channel in
95+
return channel.eventLoop.makeCompletedFuture {
96+
try channel.pipeline.syncOperations.addHandler(MessageToByteHandler(MemcachedRequestEncoder()))
97+
try channel.pipeline.syncOperations.addHandler(ByteToMessageHandler(MemcachedResponseDecoder()))
98+
return try NIOAsyncChannel<MemcachedResponse, MemcachedRequest>(synchronouslyWrapping: channel)
99+
}
100+
}.get()
101+
102+
self.state = .running(
103+
bufferAllocator: bufferAllocator,
104+
channel: channel,
105+
requestStream: stream,
106+
requestContinuation: continuation
107+
)
108+
109+
var iterator = channel.inboundStream.makeAsyncIterator()
110+
switch self.state {
111+
case .running(_, let channel, let requestStream, let requestContinuation):
112+
for await (request, continuation) in requestStream {
113+
do {
114+
try await channel.outboundWriter.write(request)
115+
let responseBuffer = try await iterator.next()
116+
117+
if let response = responseBuffer {
118+
continuation.resume(returning: response)
119+
}
120+
} catch {
121+
switch self.state {
122+
case .running:
123+
self.state = .finished
124+
requestContinuation.finish()
125+
continuation.resume(throwing: error)
126+
case .initial, .finished:
127+
break
128+
}
129+
}
130+
}
131+
132+
case .finished, .initial:
133+
break
134+
}
135+
}
136+
137+
/// Fetch the value for a key from the Memcache server.
138+
///
139+
/// - Parameter key: The key to fetch the value for.
140+
/// - Returns: A `ByteBuffer` containing the fetched value, or `nil` if no value was found.
141+
public func get(_ key: String) async throws -> ByteBuffer? {
142+
switch self.state {
143+
case .initial(_, _, _, let requestContinuation),
144+
.running(_, _, _, let requestContinuation):
145+
146+
var flags = MemcachedFlags()
147+
flags.shouldReturnValue = true
148+
let command = MemcachedRequest.GetCommand(key: key, flags: flags)
149+
let request = MemcachedRequest.get(command)
150+
151+
return try await withCheckedThrowingContinuation { continuation in
152+
switch requestContinuation.yield((request, continuation)) {
153+
case .enqueued:
154+
break
155+
case .dropped, .terminated:
156+
continuation.resume(throwing: MemcachedConnectionError.connectionShutdown)
157+
default:
158+
break
159+
}
160+
}.value
161+
162+
case .finished:
163+
throw MemcachedConnectionError.connectionShutdown
164+
}
165+
}
166+
167+
/// Set the value for a key on the Memcache server.
168+
///
169+
/// - Parameters:
170+
/// - key: The key to set the value for.
171+
/// - value: The value to set for the key.
172+
/// - Returns: A `ByteBuffer` containing the server's response to the set request.
173+
public func set(_ key: String, value: String) async throws -> ByteBuffer? {
174+
switch self.state {
175+
case .initial(_, let bufferAllocator, _, let requestContinuation),
176+
.running(let bufferAllocator, _, _, let requestContinuation):
177+
178+
var buffer = bufferAllocator.buffer(capacity: 0)
179+
buffer.writeString(value)
180+
let command = MemcachedRequest.SetCommand(key: key, value: buffer)
181+
let request = MemcachedRequest.set(command)
182+
183+
return try await withCheckedThrowingContinuation { continuation in
184+
switch requestContinuation.yield((request, continuation)) {
185+
case .enqueued:
186+
break
187+
case .dropped, .terminated:
188+
continuation.resume(throwing: MemcachedConnectionError.connectionShutdown)
189+
default:
190+
break
191+
}
192+
}.value
193+
194+
case .finished:
195+
throw MemcachedConnectionError.connectionShutdown
196+
}
197+
}
198+
}

Tests/SwiftMemcacheTests/IntegrationTest/MemcachedIntegrationTests.swift

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,4 +84,28 @@ final class MemcachedIntegrationTest: XCTestCase {
8484
XCTFail("Failed to connect to Memcached server: \(error)")
8585
}
8686
}
87+
88+
func testMemcachedConnectionActor() async throws {
89+
let group = MultiThreadedEventLoopGroup(numberOfThreads: 1)
90+
defer {
91+
XCTAssertNoThrow(try! group.syncShutdownGracefully())
92+
}
93+
let connectionActor = MemcachedConnection(host: "memcached", port: 11211, eventLoopGroup: group)
94+
95+
try await withThrowingTaskGroup(of: Void.self) { group in
96+
group.addTask { try await connectionActor.run() }
97+
98+
let setValue = "foo"
99+
var setBuffer = ByteBufferAllocator().buffer(capacity: setValue.count)
100+
setBuffer.writeString(setValue)
101+
let _ = try await connectionActor.set("bar", value: setValue)
102+
103+
// Get value for key
104+
let getValue = try await connectionActor.get("bar")
105+
let getValueString = getValue?.getString(at: getValue!.readerIndex, length: getValue!.readableBytes)
106+
XCTAssertEqual(getValueString, setValue, "Received value should be the same as sent")
107+
108+
group.cancelAll()
109+
}
110+
}
87111
}

0 commit comments

Comments
 (0)