Skip to content

Commit ee7b7c6

Browse files
Consumer: assign / subscribe on run() (#133)
Motivation: We should start assignments / subscriptions when `KafkaConsumer.run()` is invoked and not on initialization. Modifications: * `KafkaConsumer`: move `assign` / `subscribe` from `init` to `func run()`
1 parent c85a410 commit ee7b7c6

File tree

1 file changed

+7
-7
lines changed

1 file changed

+7
-7
lines changed

Sources/Kafka/KafkaConsumer.swift

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -181,13 +181,6 @@ public final class KafkaConsumer: Sendable, Service {
181181

182182
// Forward main queue events to the consumer queue.
183183
try client.pollSetConsumer()
184-
185-
switch configuration.consumptionStrategy._internal {
186-
case .partition(topic: let topic, partition: let partition, offset: let offset):
187-
try self.assign(topic: topic, partition: partition, offset: offset)
188-
case .group(groupID: _, topics: let topics):
189-
try self.subscribe(topics: topics)
190-
}
191184
}
192185

193186
/// Initialize a new ``KafkaConsumer``.
@@ -331,6 +324,13 @@ public final class KafkaConsumer: Sendable, Service {
331324
}
332325

333326
private func _run() async throws {
327+
switch self.configuration.consumptionStrategy._internal {
328+
case .partition(topic: let topic, partition: let partition, offset: let offset):
329+
try self.assign(topic: topic, partition: partition, offset: offset)
330+
case .group(groupID: _, topics: let topics):
331+
try self.subscribe(topics: topics)
332+
}
333+
334334
while !Task.isCancelled {
335335
let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() }
336336
switch nextAction {

0 commit comments

Comments
 (0)