Skip to content

Commit aab823f

Browse files
Adopt swift-service-lifecycle (#81)
* Adapt `swift-service-lifecycle` Modifications: * add `swift-service-lifecycle` aka `ServicLifecycle` dependency to `Package.swift` * add `Sendable` conformance to the following types: * `KafkaProducerConfiguration` * `KafkaConsumerConfiguration` * `KafkaTopicConfiguration` * `KafkaPartition` * `KafkaClient` * `KafkaConsumerMessages` * `KafkaConsumerMessage` * `RDKafkaConfig.CapturedClosures` * `RDKafkaTopicHandles` * add `@unchecked Sendable` conformance to the following types: * `KafkaProducer` * `KafkaConsumer` * add `Service` conformance to the following types: * `KafkaProducer` * `KafkaConsumer` * `SwiftKafkaTests`: * use `ServiceGroup`s * `KafkaProducerTests`: * use `ServiceGroup`s * remove `testFlushQueuedProducerMessages` as it relied on the now `private` implementation detail `triggerGracefulShutdown` * remove `testProducerNotUsableAfterShutdown` as it relied on the now `private` implementation detail `triggerGracefulShutdown` * `testNoMemoryLeakAfterShutdown`: remove obsolete wait for timeout as flushing in `KafkaProducer` is now non-blocking * `KafkaConsumer`: * remove `triggerGracefulShutdown` on `deinit` -> this is now `ServicLifecycle`'s responsibility * refactor `RDKafkaConfig` to make `RDKafkaConfig.CapturedClosures` sendable * Remove `ShutdownOnTerminate` for `AsyncSequence`s Motivation: Exiting the `run()` loop early is an error in `swift-service-lifecycle`. Therefore we **don't** want to invoke `KafkaProducer/KafkaConsumer.triggerGracefulShutdown()` when the our `NIOAsyncSequence`s have stopped being consumed Modifications: * remove `Kafka[Producer|Consumer]ShutdownOnTerminate` behaviour * adjust tests * Review Franz Modifications: * remove `*Service` suffix from `consumerService` and `producerService` in tests and README * remove old comment in `KafkaConsumer`
1 parent 5ed295f commit aab823f

15 files changed

+299
-284
lines changed

Package.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ let package = Package(
4242
],
4343
dependencies: [
4444
.package(url: "https://github.com/apple/swift-nio.git", from: "2.43.1"),
45+
.package(url: "https://github.com/swift-server/swift-service-lifecycle.git", from: "2.0.0-alpha.1"),
4546
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
4647
// The zstd Swift package produces warnings that we cannot resolve:
4748
// https://github.com/facebook/zstd/issues/3328
@@ -73,6 +74,7 @@ let package = Package(
7374
dependencies: [
7475
"Crdkafka",
7576
.product(name: "NIOCore", package: "swift-nio"),
77+
.product(name: "ServiceLifecycle", package: "swift-service-lifecycle"),
7678
.product(name: "Logging", package: "swift-log"),
7779
]
7880
),

README.md

Lines changed: 32 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ Finally, add `import SwiftKafka` to your source code.
2323

2424
## Usage
2525

26+
`SwiftKafka` should be used within a [`Swift Service Lifecycle`](https://github.com/swift-server/swift-service-lifecycle)
27+
[`ServiceGroup`](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/main/documentation/servicelifecycle/servicegroup) for proper startup and shutdown handling.
28+
Both the `KafkaProducer` and the `KafkaConsumer` implement the [`Service`](https://swiftpackageindex.com/swift-server/swift-service-lifecycle/main/documentation/servicelifecycle/service) protocol.
29+
2630
### Producer API
2731

2832
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
@@ -32,14 +36,19 @@ let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])
3236

3337
let (producer, acknowledgements) = try KafkaProducer.makeProducerWithAcknowledgements(
3438
config: config,
35-
logger: .kafkaTest // Your logger here
39+
logger: logger
3640
)
3741

3842
await withThrowingTaskGroup(of: Void.self) { group in
3943

4044
// Run Task
4145
group.addTask {
42-
try await producer.run()
46+
let serviceGroup = ServiceGroup(
47+
services: [producer],
48+
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
49+
logger: logger
50+
)
51+
try await serviceGroup.run()
4352
}
4453

4554
// Task receiving acknowledgements
@@ -54,9 +63,6 @@ await withThrowingTaskGroup(of: Void.self) { group in
5463
for await acknowledgement in acknowledgements {
5564
// Check if acknowledgement belongs to the sent message
5665
}
57-
58-
// Required
59-
producer.triggerGracefulShutdown()
6066
}
6167
}
6268
```
@@ -76,14 +82,19 @@ let config = KafkaConsumerConfiguration(
7682

7783
let consumer = try KafkaConsumer(
7884
config: config,
79-
logger: .kafkaTest // Your logger here
85+
logger: logger
8086
)
8187

8288
await withThrowingTaskGroup(of: Void.self) { group in
8389

8490
// Run Task
8591
group.addTask {
86-
try await consumer.run()
92+
let serviceGroup = ServiceGroup(
93+
services: [consumer],
94+
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
95+
logger: logger
96+
)
97+
try await serviceGroup.run()
8798
}
8899

89100
// Task receiving messages
@@ -107,14 +118,19 @@ let config = KafkaConsumerConfiguration(
107118

108119
let consumer = try KafkaConsumer(
109120
config: config,
110-
logger: .kafkaTest // Your logger here
121+
logger: logger
111122
)
112123

113124
await withThrowingTaskGroup(of: Void.self) { group in
114125

115126
// Run Task
116127
group.addTask {
117-
try await consumer.run()
128+
let serviceGroup = ServiceGroup(
129+
services: [consumer],
130+
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
131+
logger: logger
132+
)
133+
try await serviceGroup.run()
118134
}
119135

120136
// Task receiving messages
@@ -139,14 +155,19 @@ let config = KafkaConsumerConfiguration(
139155

140156
let consumer = try KafkaConsumer(
141157
config: config,
142-
logger: .kafkaTest // Your logger here
158+
logger: logger
143159
)
144160

145161
await withThrowingTaskGroup(of: Void.self) { group in
146162

147163
// Run Task
148164
group.addTask {
149-
try await consumer.run()
165+
let serviceGroup = ServiceGroup(
166+
services: [consumer],
167+
configuration: ServiceGroupConfiguration(gracefulShutdownSignals: []),
168+
logger: logger
169+
)
170+
try await serviceGroup.run()
150171
}
151172

152173
// Task receiving messages

Sources/SwiftKafka/Configuration/KafkaConsumerConfiguration.swift

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import Crdkafka
1616
import struct Foundation.UUID
1717

18-
public struct KafkaConsumerConfiguration: Hashable {
18+
public struct KafkaConsumerConfiguration {
1919
// MARK: - SwiftKafka-specific Config properties
2020

2121
/// The time between two consecutive polls.
@@ -468,12 +468,20 @@ public struct KafkaConsumerConfiguration: Hashable {
468468
}
469469
}
470470

471+
// MARK: - KafkaConsumerConfiguration + Hashable
472+
473+
extension KafkaConsumerConfiguration: Hashable {}
474+
475+
// MARK: - KafkaConsumerConfiguration + Sendable
476+
477+
extension KafkaConsumerConfiguration: Sendable {}
478+
471479
// MARK: - KafkaSharedConfiguration + Consumer Additions
472480

473481
extension KafkaSharedConfiguration {
474482
/// A struct representing the different Kafka message consumption strategies.
475-
public struct ConsumptionStrategy: Hashable {
476-
enum _ConsumptionStrategy: Hashable {
483+
public struct ConsumptionStrategy: Sendable, Hashable {
484+
enum _ConsumptionStrategy: Sendable, Hashable {
477485
case partition(topic: String, partition: KafkaPartition, offset: Int)
478486
case group(groupID: String, topics: [String])
479487
}

Sources/SwiftKafka/Configuration/KafkaProducerConfiguration.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
public struct KafkaProducerConfiguration: Hashable {
15+
public struct KafkaProducerConfiguration {
1616
// MARK: - SwiftKafka-specific Config properties
1717

1818
/// The time between two consecutive polls.
@@ -427,3 +427,11 @@ public struct KafkaProducerConfiguration: Hashable {
427427
return KafkaSharedConfiguration.SASLMechanism(description: value)
428428
}
429429
}
430+
431+
// MARK: - KafkaProducerConfiguration + Hashable
432+
433+
extension KafkaProducerConfiguration: Hashable {}
434+
435+
// MARK: - KafkaProducerConfiguration + Sendable
436+
437+
extension KafkaProducerConfiguration: Sendable {}

Sources/SwiftKafka/Configuration/KafkaTopicConfiguration.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//===----------------------------------------------------------------------===//
1414

1515
/// Used to configure new topics created by the ``KafkaProducer``.
16-
public struct KafkaTopicConfiguration: Hashable {
16+
public struct KafkaTopicConfiguration {
1717
var dictionary: [String: String] = [:]
1818

1919
/// This field indicates the number of acknowledgements the leader broker must receive from ISR brokers before responding to the request: 0=Broker does not send any response/ack to client, -1 or all=Broker will block until message is committed by all in sync replicas (ISRs). If there are less than min.insync.replicas (broker configuration) in the ISR set the produce request will fail.
@@ -126,3 +126,11 @@ extension KafkaSharedConfiguration {
126126
public static let inherit = CompressionCodec(description: "inherit")
127127
}
128128
}
129+
130+
// MARK: - KafkaTopicConfiguration + Hashable
131+
132+
extension KafkaTopicConfiguration: Hashable {}
133+
134+
// MARK: - KafkaTopicConfiguration + Sendable
135+
136+
extension KafkaTopicConfiguration: Sendable {}

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import Logging
1717

1818
/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
1919
/// which is used to handle the connection to the Kafka ecosystem.
20-
final class KafkaClient {
20+
final class KafkaClient: Sendable {
2121
// Default size for Strings returned from C API
2222
static let stringSize = 1024
2323

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 17 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -16,52 +16,15 @@ import Crdkafka
1616
import Logging
1717
import NIOConcurrencyHelpers
1818
import NIOCore
19-
20-
// MARK: - KafkaConsumerShutDownOnTerminate
21-
22-
/// `NIOAsyncSequenceProducerDelegate` that terminates the shuts the consumer down when
23-
/// `didTerminate()` is invoked.
24-
internal struct KafkaConsumerShutdownOnTerminate: @unchecked Sendable { // We can do that because our stored propery is protected by a lock
25-
let stateMachine: NIOLockedValueBox<KafkaConsumer.StateMachine>
26-
}
27-
28-
extension KafkaConsumerShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
29-
func produceMore() {
30-
// No back pressure
31-
return
32-
}
33-
34-
func didTerminate() {
35-
// Duplicate of _triggerGracefulShutdown
36-
let action = self.stateMachine.withLockedValue { $0.finish() }
37-
switch action {
38-
case .triggerGracefulShutdownAndFinishSource(let client, let source):
39-
source.finish()
40-
41-
do {
42-
try client.consumerClose()
43-
} catch {
44-
self.stateMachine.withLockedValue {
45-
if let error = error as? KafkaError {
46-
$0.logger.error("Closing KafkaConsumer failed: \(error.description)")
47-
} else {
48-
$0.logger.error("Caught unknown error: \(error)")
49-
}
50-
}
51-
}
52-
case .none:
53-
return
54-
}
55-
}
56-
}
19+
import ServiceLifecycle
5720

5821
// MARK: - KafkaConsumerMessages
5922

6023
/// `AsyncSequence` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
61-
public struct KafkaConsumerMessages: AsyncSequence {
24+
public struct KafkaConsumerMessages: Sendable, AsyncSequence {
6225
public typealias Element = KafkaConsumerMessage
6326
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
64-
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, KafkaConsumerShutdownOnTerminate>
27+
typealias WrappedSequence = NIOAsyncSequenceProducer<Element, BackPressureStrategy, NoDelegate>
6528
let wrappedSequence: WrappedSequence
6629

6730
/// `AsynceIteratorProtocol` implementation for handling messages received from the Kafka cluster (``KafkaConsumerMessage``).
@@ -81,14 +44,14 @@ public struct KafkaConsumerMessages: AsyncSequence {
8144
// MARK: - KafkaConsumer
8245

8346
/// Receive messages from the Kafka cluster.
84-
public final class KafkaConsumer {
47+
public final class KafkaConsumer: Sendable, Service {
8548
typealias Producer = NIOAsyncSequenceProducer<
8649
KafkaConsumerMessage,
8750
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
88-
KafkaConsumerShutdownOnTerminate
51+
NoDelegate
8952
>
9053
/// The configuration object of the consumer client.
91-
private var config: KafkaConsumerConfiguration
54+
private let config: KafkaConsumerConfiguration
9255
/// A logger.
9356
private let logger: Logger
9457
/// State of the `KafkaConsumer`.
@@ -117,7 +80,7 @@ public final class KafkaConsumer {
11780
let sourceAndSequence = NIOAsyncSequenceProducer.makeSequence(
11881
elementType: KafkaConsumerMessage.self,
11982
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
120-
delegate: KafkaConsumerShutdownOnTerminate(stateMachine: self.stateMachine)
83+
delegate: NoDelegate()
12184
)
12285

12386
self.messages = KafkaConsumerMessages(
@@ -143,10 +106,6 @@ public final class KafkaConsumer {
143106
}
144107
}
145108

146-
deinit {
147-
self.triggerGracefulShutdown()
148-
}
149-
150109
/// Subscribe to the given list of `topics`.
151110
/// The partition assignment happens automatically using `KafkaConsumer`'s consumer group.
152111
/// - Parameter topics: An array of topic names to subscribe to.
@@ -189,6 +148,14 @@ public final class KafkaConsumer {
189148
///
190149
/// - Returns: An awaitable task representing the execution of the poll loop.
191150
public func run() async throws {
151+
try await withGracefulShutdownHandler {
152+
try await self._run()
153+
} onGracefulShutdown: {
154+
self.triggerGracefulShutdown()
155+
}
156+
}
157+
158+
private func _run() async throws {
192159
while !Task.isCancelled {
193160
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
194161
switch nextAction {
@@ -275,12 +242,12 @@ public final class KafkaConsumer {
275242

276243
extension KafkaConsumer {
277244
/// State machine representing the state of the ``KafkaConsumer``.
278-
struct StateMachine {
245+
struct StateMachine: Sendable {
279246
/// A logger.
280247
let logger: Logger
281248

282249
/// The state of the ``StateMachine``.
283-
enum State {
250+
enum State: @unchecked Sendable { // TODO: remove @unchecked when https://github.com/apple/swift-nio/pull/2459 is available
284251
/// The state machine has been initialized with init() but is not yet Initialized
285252
/// using `func initialize()` (required).
286253
case uninitialized

Sources/SwiftKafka/KafkaConsumerMessage.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import Crdkafka
1616
import NIOCore
1717

1818
/// A message received from the Kafka cluster.
19-
public struct KafkaConsumerMessage: Hashable {
19+
public struct KafkaConsumerMessage {
2020
/// The topic that the message was received from.
2121
public var topic: String
2222
/// The partition that the message was received from.
@@ -72,3 +72,11 @@ public struct KafkaConsumerMessage: Hashable {
7272
self.offset = Int(rdKafkaMessage.offset)
7373
}
7474
}
75+
76+
// MARK: - KafkaConsumerMessage + Hashable
77+
78+
extension KafkaConsumerMessage: Hashable {}
79+
80+
// MARK: - KafkaConsumerMessage + Sendable
81+
82+
extension KafkaConsumerMessage: Sendable {}

Sources/SwiftKafka/KafkaPartition.swift

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import Crdkafka
1616

1717
/// Type for representing the number of a Kafka Partition.
18-
public struct KafkaPartition: RawRepresentable, Hashable {
18+
public struct KafkaPartition: RawRepresentable {
1919
public var rawValue: Int32
2020

2121
public init(rawValue: Int32) {
@@ -24,3 +24,11 @@ public struct KafkaPartition: RawRepresentable, Hashable {
2424

2525
public static let unassigned = KafkaPartition(rawValue: RD_KAFKA_PARTITION_UA)
2626
}
27+
28+
// MARK: KafkaPartition + Hashable
29+
30+
extension KafkaPartition: Hashable {}
31+
32+
// MARK: KafkaPartition + Sendable
33+
34+
extension KafkaPartition: Sendable {}

0 commit comments

Comments
 (0)