Skip to content

Commit c2e008d

Browse files
ayushi2103“Ayushi
andauthored
ETCD Watch Function (#7)
* Create watch message in etcd.proto and generated files * Watch client WIP * Added watch function and unit test * re-add header * Update watch to have WatchAsyncSequence * Create WatchEvent type for Element typealias * Updated example * Update test and WatchAsyncSequence * add documentation * Updated PR comments and added documentation * Add license info * Update kv to keyValue * Update tests and documentation * Update test --------- Co-authored-by: “Ayushi <“[email protected]”>
1 parent 727878a commit c2e008d

File tree

9 files changed

+1364
-17
lines changed

9 files changed

+1364
-17
lines changed

Sources/ETCD/ETCDClient.swift

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ public final class EtcdClient: @unchecked Sendable {
2323
private var group: EventLoopGroup
2424
private var connection: ClientConnection
2525
private var client: Etcdserverpb_KVNIOClient
26+
private var watchClient: Etcdserverpb_WatchAsyncClient
2627

2728
/// Initialize a new ETCD Connection.
2829
///
@@ -37,6 +38,7 @@ public final class EtcdClient: @unchecked Sendable {
3738
self.connection = ClientConnection.insecure(group: self.group)
3839
.connect(host: host, port: port)
3940
self.client = Etcdserverpb_KVNIOClient(channel: self.connection)
41+
self.watchClient = Etcdserverpb_WatchAsyncClient(channel: self.connection)
4042
}
4143

4244
/// Sets a value for a specified key in the ETCD server.
@@ -120,5 +122,24 @@ public final class EtcdClient: @unchecked Sendable {
120122
public func put(_ key: String, value: String) async throws {
121123
try await put(key.utf8, value: value.utf8)
122124
}
123-
125+
126+
/// Watches a specified key in the ETCD server.
127+
///
128+
/// - Parameters:
129+
/// - key: The key for which the value is watched. Parameter is of type Sequence<UInt8>.
130+
/// - operation: The operation to be run on the WatchAsyncSequence for the key.
131+
public func watch<Result>(_ key: some Sequence<UInt8>, _ operation: (WatchAsyncSequence) async throws -> Result) async throws -> Result {
132+
let request = [Etcdserverpb_WatchRequest.with { $0.createRequest.key = Data(key) }]
133+
let watchAsyncSequence = WatchAsyncSequence(grpcAsyncSequence: watchClient.watch(request))
134+
return try await operation(watchAsyncSequence)
135+
}
136+
137+
/// Watches a specified key in the ETCD server.
138+
///
139+
/// - Parameters:
140+
/// - key: The key for which the value is watched. Parameter is of type String.
141+
/// - operation: The operation to be run on the WatchAsyncSequence for the key.
142+
public func watch<Result>(_ key: String, _ operation: (WatchAsyncSequence) async throws -> Result) async throws -> Result {
143+
try await self.watch(key.utf8, operation)
144+
}
124145
}

Sources/ETCD/KeyValue.swift

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-etcd-client-gsoc open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the swift-etcd-client-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-etcd-client-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Foundation
16+
17+
/// Struct representing a key-value pair in etcd server.
18+
public struct KeyValue {
19+
public var key: Data
20+
public var createRevision: Int
21+
public var modRevision: Int
22+
public var version: Int
23+
public var value: Data
24+
public var lease: Int
25+
26+
init(protoKeyValue: Etcdserverpb_KeyValue) {
27+
self.key = protoKeyValue.key
28+
self.createRevision = Int(protoKeyValue.createRevision)
29+
self.modRevision = Int(protoKeyValue.modRevision)
30+
self.version = Int(protoKeyValue.version)
31+
self.value = protoKeyValue.value
32+
self.lease = Int(protoKeyValue.lease)
33+
}
34+
35+
/// Initialize a new KeyValue.
36+
///
37+
/// - Parameters:
38+
/// - key: key in bytes. An empty key is not allowed.
39+
/// - create_revision: revision of the last creation on the key.
40+
/// - mod_revision: revision of the last modification on the key.
41+
/// - version: version is the version of the key. A deletion resets the version to zero and any modification of the key increases its version.
42+
/// - value: value in bytes.
43+
/// - lease: the ID of the lease attached to the key. If lease is 0, then no lease is attached to the key.
44+
public init(key: Data, createRevision: Int, modRevision: Int, version: Int, value: Data, lease: Int) {
45+
self.key = key
46+
self.createRevision = createRevision
47+
self.modRevision = modRevision
48+
self.version = version
49+
self.value = value
50+
self.lease = lease
51+
}
52+
}

Sources/ETCD/WatchAsyncSequence.swift

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-etcd-client-gsoc open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the swift-etcd-client-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-etcd-client-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import GRPC
16+
17+
public struct WatchAsyncSequence: AsyncSequence {
18+
public typealias Element = [WatchEvent]
19+
20+
private let grpcAsyncSequence: GRPCAsyncResponseStream<Etcdserverpb_WatchResponse>
21+
22+
init(grpcAsyncSequence: GRPCAsyncResponseStream<Etcdserverpb_WatchResponse>) {
23+
self.grpcAsyncSequence = grpcAsyncSequence
24+
}
25+
26+
public func makeAsyncIterator() -> AsyncIterator {
27+
.init(grpcIterator: self.grpcAsyncSequence.makeAsyncIterator())
28+
}
29+
30+
public struct AsyncIterator: AsyncIteratorProtocol {
31+
private var grpcIterator: GRPCAsyncResponseStream<Etcdserverpb_WatchResponse>.AsyncIterator?
32+
33+
fileprivate init(grpcIterator: GRPCAsyncResponseStream<Etcdserverpb_WatchResponse>.AsyncIterator) {
34+
self.grpcIterator = grpcIterator
35+
}
36+
37+
public mutating func next() async throws -> Element? {
38+
while let response = try await self.grpcIterator?.next() {
39+
if response.created {
40+
// We receive this after setting up the watch and need to wait for the next
41+
// response that contains an event
42+
precondition(response.events.isEmpty, "Expected no watch events on created response")
43+
continue
44+
}
45+
46+
if response.canceled {
47+
// We got cancelled and have to return nil now
48+
self.grpcIterator = nil
49+
return nil
50+
}
51+
52+
let events = response.events.map { WatchEvent(protoEvent: $0) }
53+
return events
54+
}
55+
return nil;
56+
}
57+
}
58+
}

Sources/ETCD/WatchEvent.swift

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-etcd-client-gsoc open source project
4+
//
5+
// Copyright (c) 2024 Apple Inc. and the swift-etcd-client-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-etcd-client-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Foundation
16+
17+
public struct WatchEvent {
18+
public var keyValue: KeyValue
19+
public var previousKeyValue: KeyValue?
20+
21+
init(protoEvent: Etcdserverpb_Event) {
22+
self.keyValue = KeyValue(protoKeyValue: protoEvent.kv)
23+
if let protoPrevKV = protoEvent.hasPrevKv ? protoEvent.prevKv : nil {
24+
self.previousKeyValue = KeyValue(protoKeyValue: protoPrevKV)
25+
} else {
26+
self.previousKeyValue = nil
27+
}
28+
}
29+
30+
/// Struct representing a watch event in etcd.
31+
///
32+
/// - Parameters:
33+
/// - keyValue: keyValue representing a KeyValue to watch.
34+
/// - previousKeyValue: previousKeyValue representing a KeyValue that was previously received.
35+
public init(keyValue: KeyValue, previousKeyValue: KeyValue?) {
36+
self.keyValue = keyValue
37+
self.previousKeyValue = previousKeyValue
38+
}
39+
}

0 commit comments

Comments
 (0)