Skip to content

Commit 3300617

Browse files
Dedicated KafkaProducerMessageID type (#71)
* Dedicated `KafkaProducerMessageID` type > **Warning**: this can potentially be a breaking change as message ids > will lose their chronological order in the public API after this > change. Motivation: We don't want to expose `UInt` for `KafkaProducer`'s message ids. Modifications: * create a new `struct` `KafkaProducerMessageID` that is internally backed by `UInt` (can also be `UUID` or similar in the future * update tests * Review Franz Modifications: * `KafkaProducerMessageID`: add `Comparable` conformance
1 parent 550bde6 commit 3300617

File tree

7 files changed

+60
-15
lines changed

7 files changed

+60
-15
lines changed

Sources/SwiftKafka/KafkaAcknowledgedMessage.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ public struct KafkaAcknowledgedMessage: Hashable {
2020
/// The unique identifier assigned by the ``KafkaProducer`` when the message was send to Kafka.
2121
/// The same identifier is returned by ``KafkaProducer/sendAsync(_:)`` and can be used to correlate
2222
/// a sent message and an acknowledged message.
23-
public var id: UInt
23+
public var id: KafkaProducerMessageID
2424
/// The topic that the message was sent to.
2525
public var topic: String
2626
/// The partition that the message was sent to.
@@ -34,7 +34,7 @@ public struct KafkaAcknowledgedMessage: Hashable {
3434

3535
/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
3636
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
37-
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: UInt) throws {
37+
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: KafkaProducerMessageID) throws {
3838
self.id = id
3939

4040
let rdKafkaMessage = messagePointer.pointee

Sources/SwiftKafka/KafkaAcknowledgedMessageError.swift

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ import Crdkafka
2020
/// ``KafkaError/code``.
2121
public struct KafkaAcknowledgedMessageError: Error, CustomStringConvertible {
2222
/// Identifier of the message that caused the error.
23-
public var messageID: UInt
23+
public var messageID: KafkaProducerMessageID
2424
/// The underlying ``KafkaError``.
2525
public let error: KafkaError
2626

27-
init(messageID: UInt, error: KafkaError) {
27+
init(messageID: KafkaProducerMessageID, error: KafkaError) {
2828
self.messageID = messageID
2929
self.error = error
3030
}
@@ -34,7 +34,7 @@ public struct KafkaAcknowledgedMessageError: Error, CustomStringConvertible {
3434
}
3535

3636
static func fromRDKafkaError(
37-
messageID: UInt,
37+
messageID: KafkaProducerMessageID,
3838
error: rd_kafka_resp_err_t,
3939
file: String = #fileID,
4040
line: UInt = #line
@@ -50,7 +50,7 @@ public struct KafkaAcknowledgedMessageError: Error, CustomStringConvertible {
5050
}
5151

5252
static func fromMessage(
53-
messageID: UInt,
53+
messageID: KafkaProducerMessageID,
5454
message: String,
5555
file: String = #fileID,
5656
line: UInt = #line

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,11 @@ public actor KafkaProducer {
217217
/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".
218218
/// This function is non-blocking.
219219
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
220-
/// - Returns: Unique message identifier matching the `id` property of the corresponding ``KafkaAcknowledgedMessage``
220+
/// - Returns: Unique ``KafkaProducerMessageID``matching the ``KafkaAcknowledgedMessage/id`` property
221+
/// of the corresponding ``KafkaAcknowledgedMessage``.
221222
/// - Throws: A ``KafkaError`` if sending the message failed.
222223
@discardableResult
223-
public func sendAsync(_ message: KafkaProducerMessage) throws -> UInt {
224+
public func sendAsync(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
224225
switch self.state {
225226
case .started:
226227
return try self._sendAsync(message)
@@ -229,7 +230,7 @@ public actor KafkaProducer {
229230
}
230231
}
231232

232-
private func _sendAsync(_ message: KafkaProducerMessage) throws -> UInt {
233+
private func _sendAsync(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
233234
let topicHandle = try self.createTopicHandleIfNeeded(topic: message.topic)
234235

235236
let keyBytes: [UInt8]?
@@ -261,7 +262,7 @@ public actor KafkaProducer {
261262
throw KafkaError.rdKafkaError(wrapping: rd_kafka_last_error())
262263
}
263264

264-
return self.messageIDCounter
265+
return KafkaProducerMessageID(rawValue: self.messageIDCounter)
265266
}
266267

267268
/// Check `topicHandles` for a handle matching the topic name and create a new handle if needed.
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
/// ID of message produced by the ``KafkaProducer``.
16+
/// The ``KafkaProducerMessageID`` is relates incoming ``KafkaAcknowledgedMessage``'s
17+
/// with their corresponding ``KafkaProducer/send(_:)`` invocation.
18+
public struct KafkaProducerMessageID {
19+
internal var rawValue: UInt
20+
21+
internal init(rawValue: UInt) {
22+
self.rawValue = rawValue
23+
}
24+
}
25+
26+
// MARK: - KafkaProducerMessageID + CustomStringConvertible
27+
28+
extension KafkaProducerMessageID: CustomStringConvertible {
29+
public var description: String {
30+
return String(self.rawValue)
31+
}
32+
}
33+
34+
// MARK: - KafkaProducerMessageID + Hashable
35+
36+
extension KafkaProducerMessageID: Hashable {}
37+
38+
// MARK: - KafkaProducerMessageID + Comparable
39+
40+
extension KafkaProducerMessageID: Comparable {
41+
public static func < (lhs: KafkaProducerMessageID, rhs: KafkaProducerMessageID) -> Bool {
42+
lhs.rawValue < rhs.rawValue
43+
}
44+
}

Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ struct RDKafkaConfig {
115115
return nil
116116
}
117117

118-
let messageID = UInt(bitPattern: messagePointer.pointee._private)
118+
let messageID = KafkaProducerMessageID(rawValue: UInt(bitPattern: messagePointer.pointee._private))
119119

120120
let messageResult: KafkaAcknowledgementResult
121121
do {

Tests/IntegrationTests/SwiftKafkaTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,7 +270,7 @@ final class SwiftKafkaTests: XCTestCase {
270270
acknowledgements: KafkaMessageAcknowledgements,
271271
messages: [KafkaProducerMessage]
272272
) async throws {
273-
var messageIDs = Set<UInt>()
273+
var messageIDs = Set<KafkaProducerMessageID>()
274274

275275
for message in messages {
276276
messageIDs.insert(try await producer.sendAsync(message))
@@ -292,7 +292,7 @@ final class SwiftKafkaTests: XCTestCase {
292292
}
293293

294294
XCTAssertEqual(messages.count, acknowledgedMessages.count)
295-
XCTAssertEqual(acknowledgedMessages.map(\.id).sorted(), messageIDs.sorted())
295+
XCTAssertEqual(Set(acknowledgedMessages.map(\.id)), messageIDs)
296296

297297
for message in messages {
298298
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message.topic }))

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ final class KafkaProducerTests: XCTestCase {
150150
value: "Hello, London!"
151151
)
152152

153-
var messageIDs = Set<UInt>()
153+
var messageIDs = Set<KafkaProducerMessageID>()
154154

155155
messageIDs.insert(try await producer.sendAsync(message1))
156156
messageIDs.insert(try await producer.sendAsync(message2))
@@ -171,7 +171,7 @@ final class KafkaProducerTests: XCTestCase {
171171
}
172172

173173
XCTAssertEqual(2, acknowledgedMessages.count)
174-
XCTAssertEqual(acknowledgedMessages.map(\.id).sorted(), messageIDs.sorted())
174+
XCTAssertEqual(Set(acknowledgedMessages.map(\.id)), messageIDs)
175175
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message1.topic }))
176176
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message2.topic }))
177177
XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == message1.key }))

0 commit comments

Comments
 (0)