Skip to content

Commit e519908

Browse files
Rename KafkaProducer.sendAsync -> .send (#70)
1 parent 3300617 commit e519908

File tree

5 files changed

+18
-17
lines changed

5 files changed

+18
-17
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ SwiftKafka is a Swift Package in development that provides a convenient way to c
66

77
### Producer API
88

9-
The `sendAsync(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
9+
The `send(_:)` method of `KafkaProducer` returns a message-id that can later be used to identify the corresponding acknowledgement. Acknowledgements are received through the `acknowledgements` [`AsyncSequence`](https://developer.apple.com/documentation/swift/asyncsequence). Each acknowledgement indicates that producing a message was successful or returns an error.
1010

1111
```swift
1212
let config = KafkaProducerConfiguration(bootstrapServers: ["localhost:9092"])
@@ -25,7 +25,7 @@ await withThrowingTaskGroup(of: Void.self) { group in
2525

2626
// Task receiving acknowledgements
2727
group.addTask {
28-
let messageID = try await producer.sendAsync(
28+
let messageID = try await producer.send(
2929
KafkaProducerMessage(
3030
topic: "topic-name",
3131
value: "Hello, World!"

Sources/SwiftKafka/KafkaAcknowledgedMessage.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ import NIOCore
1818
/// A message produced by the client and acknowledged by the Kafka cluster.
1919
public struct KafkaAcknowledgedMessage: Hashable {
2020
/// The unique identifier assigned by the ``KafkaProducer`` when the message was send to Kafka.
21-
/// The same identifier is returned by ``KafkaProducer/sendAsync(_:)`` and can be used to correlate
21+
/// The same identifier is returned by ``KafkaProducer/send(_:)`` and can be used to correlate
2222
/// a sent message and an acknowledged message.
2323
public var id: KafkaProducerMessageID
2424
/// The topic that the message was sent to.

Sources/SwiftKafka/KafkaProducer.swift

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -214,23 +214,24 @@ public actor KafkaProducer {
214214
}
215215
}
216216

217-
/// Send messages to the Kafka cluster asynchronously, aka "fire and forget".
218-
/// This function is non-blocking.
217+
/// Send messages to the Kafka cluster asynchronously. This method is non-blocking.
218+
/// Message send results shall be handled through the ``KafkaMessageAcknowledgements`` `AsyncSequence`.
219+
///
219220
/// - Parameter message: The ``KafkaProducerMessage`` that is sent to the KafkaCluster.
220221
/// - Returns: Unique ``KafkaProducerMessageID``matching the ``KafkaAcknowledgedMessage/id`` property
221222
/// of the corresponding ``KafkaAcknowledgedMessage``.
222223
/// - Throws: A ``KafkaError`` if sending the message failed.
223224
@discardableResult
224-
public func sendAsync(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
225+
public func send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
225226
switch self.state {
226227
case .started:
227-
return try self._sendAsync(message)
228+
return try self._send(message)
228229
case .shuttingDown, .shutDown:
229230
throw KafkaError.connectionClosed(reason: "Tried to produce a message with a closed producer")
230231
}
231232
}
232233

233-
private func _sendAsync(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
234+
private func _send(_ message: KafkaProducerMessage) throws -> KafkaProducerMessageID {
234235
let topicHandle = try self.createTopicHandleIfNeeded(topic: message.topic)
235236

236237
let keyBytes: [UInt8]?

Tests/IntegrationTests/SwiftKafkaTests.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ final class SwiftKafkaTests: XCTestCase {
273273
var messageIDs = Set<KafkaProducerMessageID>()
274274

275275
for message in messages {
276-
messageIDs.insert(try await producer.sendAsync(message))
276+
messageIDs.insert(try await producer.send(message))
277277
}
278278

279279
var acknowledgedMessages = Set<KafkaAcknowledgedMessage>()

Tests/SwiftKafkaTests/KafkaProducerTests.swift

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ final class KafkaProducerTests: XCTestCase {
5151
self.config = nil
5252
}
5353

54-
func testSendAsync() async throws {
54+
func testSend() async throws {
5555
let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
5656

5757
await withThrowingTaskGroup(of: Void.self) { group in
@@ -70,7 +70,7 @@ final class KafkaProducerTests: XCTestCase {
7070
value: "Hello, World!"
7171
)
7272

73-
let messageID = try await producer.sendAsync(message)
73+
let messageID = try await producer.send(message)
7474

7575
for await messageResult in acks {
7676
guard case .success(let acknowledgedMessage) = messageResult else {
@@ -90,7 +90,7 @@ final class KafkaProducerTests: XCTestCase {
9090
}
9191
}
9292

93-
func testSendAsyncEmptyMessage() async throws {
93+
func testSendEmptyMessage() async throws {
9494
let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
9595

9696
await withThrowingTaskGroup(of: Void.self) { group in
@@ -108,7 +108,7 @@ final class KafkaProducerTests: XCTestCase {
108108
value: ByteBuffer()
109109
)
110110

111-
let messageID = try await producer.sendAsync(message)
111+
let messageID = try await producer.send(message)
112112

113113
for await messageResult in acks {
114114
guard case .success(let acknowledgedMessage) = messageResult else {
@@ -128,7 +128,7 @@ final class KafkaProducerTests: XCTestCase {
128128
}
129129
}
130130

131-
func testSendAsyncTwoTopics() async throws {
131+
func testSendTwoTopics() async throws {
132132
let (producer, acks) = try await KafkaProducer.makeProducerWithAcknowledgements(config: self.config, logger: .kafkaTest)
133133
await withThrowingTaskGroup(of: Void.self) { group in
134134

@@ -152,8 +152,8 @@ final class KafkaProducerTests: XCTestCase {
152152

153153
var messageIDs = Set<KafkaProducerMessageID>()
154154

155-
messageIDs.insert(try await producer.sendAsync(message1))
156-
messageIDs.insert(try await producer.sendAsync(message2))
155+
messageIDs.insert(try await producer.send(message1))
156+
messageIDs.insert(try await producer.send(message2))
157157

158158
var acknowledgedMessages = Set<KafkaAcknowledgedMessage>()
159159

@@ -203,7 +203,7 @@ final class KafkaProducerTests: XCTestCase {
203203
)
204204

205205
do {
206-
try await producer.sendAsync(message)
206+
try await producer.send(message)
207207
XCTFail("Method should have thrown error")
208208
} catch {}
209209

0 commit comments

Comments
 (0)