Skip to content

Commit 20faefb

Browse files
defer source.finish (#157)
1 parent 51c5f72 commit 20faefb

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

Sources/Kafka/KafkaProducer.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,10 @@ public final class KafkaProducer: Service, Sendable {
238238
0...Int(Int32.max) ~= self.configuration.flushTimeoutMilliseconds,
239239
"Flush timeout outside of valid range \(0...Int32.max)"
240240
)
241+
defer { // we should finish source indefinetely of exception in client.flush()
242+
source?.finish()
243+
}
241244
try await client.flush(timeoutMilliseconds: Int32(self.configuration.flushTimeoutMilliseconds))
242-
source?.finish()
243245
return
244246
case .terminatePollLoop:
245247
return

0 commit comments

Comments
 (0)