Skip to content

Commit ae5982c

Browse files
Implementation of the KafkaConsumer (#24)
* First implementation of KafkaConsumer Modifications: * created a new .testTarget "IntegrationTests" with a dedicated directory * made KafkaConsumerMessage and KafkaAcknowledgedMessage use KafkaPartition as type for the partition property * implemented first consumer that enables users to subscribe to topics and poll the consumer * built a KafkaSequenceConsumer on top of KafkaConsumer that enables users to consume messages in an AsyncSequence * enforced that the integration-test-topic is created when in the Kafka docker container Result: Users are able to consume messages from a Kafka cluster * Improved Public Interface for Consumer Modifications: * created Tests for KafkaConsumer * updated Docker compose Kafka server to create different topics for different tests to avoid overlaps * introduced two initializers for KafkaConsumer allowing to subscribe to a topic as a group or assign the KafkaConsumer to a particular topic+partition pair * added ability to manually commit offsets to Kafka * Implemented Backpressure into KafkaConsumer Modifications: * made KafkaConsumer an actor * implemented backpressure for KafkaConsumer * DocC Comments for KafkaConsumer Modifications: * made KafkaConsumer.close() method async instead of blocking * Improve Concurrency Model for KafkaConsumer Modifications: * KafkaConsumer blocking operations are now executed on a serial `DispatchQueue` * remove `deinit` as `close` is a blocking function and shall be invoked by the user anyway * Changes Franz Modifications: * move from Swift 5.6 to Swift 5.7 * add offset property to assignment consumer initializer * remove default config object in consumer initializer * added preconditions in KafkaConsumer that check if functions are called on the serial queue * make ConsumerMessagesAsyncSequenceDelegate a struct containing closures * close KafkaConsumer on NIOAsyncSequenceProducerDelegate.didTerminate instead of a public close function * add test utility functions to create and delete topics * Docker: build librdkafka from source instead of installing from apt because apt only provides librdkafka version 1.2.1 whereas we use 1.9.2 currently * repalce KafkaConsumer.State with simple boolean "closed" and assertions * refactor KafkaConsumerMessage * test IntegrationTests with multiple messages for each test
1 parent dc4c623 commit ae5982c

12 files changed

+985
-23
lines changed

Package.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// swift-tools-version: 5.6
1+
// swift-tools-version: 5.7
22
//===----------------------------------------------------------------------===//
33
//
44
// This source file is part of the swift-kafka-gsoc open source project
@@ -54,5 +54,9 @@ let package = Package(
5454
name: "SwiftKafkaTests",
5555
dependencies: ["SwiftKafka"]
5656
),
57+
.testTarget(
58+
name: "IntegrationTests",
59+
dependencies: ["SwiftKafka"]
60+
),
5761
]
5862
)

Sources/SwiftKafka/KafkaAcknowledgedMessage.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,15 @@ public struct KafkaAcknowledgedMessage: Hashable {
2424
/// The topic that the message was sent to.
2525
public var topic: String
2626
/// The partition that the message was sent to.
27-
public var partition: Int32
27+
public var partition: KafkaPartition
2828
/// The key of the message.
2929
public var key: ByteBuffer?
3030
/// The body of the message.
3131
public var value: ByteBuffer
3232
/// The offset of the message in its partition.
3333
public var offset: Int64
3434

35-
/// Initialize `KafkaAckedMessage` from `rd_kafka_message_t` pointer.
35+
/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
3636
init(messagePointer: UnsafePointer<rd_kafka_message_t>, id: UInt) throws {
3737
self.id = id
3838

@@ -57,7 +57,7 @@ public struct KafkaAcknowledgedMessage: Hashable {
5757
}
5858
self.topic = topic
5959

60-
self.partition = rdKafkaMessage.partition
60+
self.partition = KafkaPartition(rawValue: rdKafkaMessage.partition)
6161

6262
if let keyPointer = rdKafkaMessage.key {
6363
let keyBufferPointer = UnsafeRawBufferPointer(

0 commit comments

Comments
 (0)