@@ -32,30 +32,30 @@ public final class RDKafkaClient: Sendable {
32
32
}
33
33
34
34
/// Handle for the C library's Kafka instance.
35
- private let kafkaHandle : OpaquePointer
35
+ private let kafkaHandle : SendableOpaquePointer
36
36
/// A logger.
37
37
private let logger : Logger
38
38
39
39
/// `librdkafka`'s `rd_kafka_queue_t` that events are received on.
40
- private let queue : OpaquePointer
40
+ private let queueHandle : SendableOpaquePointer
41
41
42
42
// Use factory method to initialize
43
43
private init (
44
44
type: ClientType ,
45
- kafkaHandle: OpaquePointer ,
45
+ kafkaHandle: SendableOpaquePointer ,
46
46
logger: Logger
47
47
) {
48
48
self . kafkaHandle = kafkaHandle
49
49
self . logger = logger
50
- self . queue = rd_kafka_queue_get_main ( self . kafkaHandle)
50
+ self . queueHandle = . init ( rd_kafka_queue_get_main ( self . kafkaHandle. pointer ) )
51
51
52
- rd_kafka_set_log_queue ( self . kafkaHandle, self . queue )
52
+ rd_kafka_set_log_queue ( self . kafkaHandle. pointer , self . queueHandle . pointer )
53
53
}
54
54
55
55
deinit {
56
56
// Loose reference to librdkafka's event queue
57
- rd_kafka_queue_destroy ( self . queue )
58
- rd_kafka_destroy ( kafkaHandle)
57
+ rd_kafka_queue_destroy ( self . queueHandle . pointer )
58
+ rd_kafka_destroy ( kafkaHandle. pointer )
59
59
}
60
60
61
61
/// Factory method creating a new instance of a ``RDKafkaClient``.
@@ -90,7 +90,8 @@ public final class RDKafkaClient: Sendable {
90
90
throw KafkaError . client ( reason: errorString)
91
91
}
92
92
93
- return RDKafkaClient ( type: type, kafkaHandle: handle, logger: logger)
93
+ let kafkaHandle = SendableOpaquePointer ( handle)
94
+ return RDKafkaClient ( type: type, kafkaHandle: kafkaHandle, logger: logger)
94
95
}
95
96
96
97
/// Produce a message to the Kafka cluster.
@@ -215,7 +216,7 @@ public final class RDKafkaClient: Sendable {
215
216
assert ( arguments. count == size)
216
217
217
218
return rd_kafka_produceva (
218
- self . kafkaHandle,
219
+ self . kafkaHandle. pointer ,
219
220
arguments,
220
221
arguments. count
221
222
)
@@ -307,7 +308,7 @@ public final class RDKafkaClient: Sendable {
307
308
events. reserveCapacity ( maxEvents)
308
309
309
310
for _ in 0 ..< maxEvents {
310
- let event = rd_kafka_queue_poll ( self . queue , 0 )
311
+ let event = rd_kafka_queue_poll ( self . queueHandle . pointer , 0 )
311
312
defer { rd_kafka_event_destroy ( event) }
312
313
313
314
let rdEventType = rd_kafka_event_type ( event)
@@ -446,7 +447,7 @@ public final class RDKafkaClient: Sendable {
446
447
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages.
447
448
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed.
448
449
func consumerPoll( ) throws -> KafkaConsumerMessage ? {
449
- guard let messagePointer = rd_kafka_consumer_poll ( self . kafkaHandle, 0 ) else {
450
+ guard let messagePointer = rd_kafka_consumer_poll ( self . kafkaHandle. pointer , 0 ) else {
450
451
// No error, there might be no more messages
451
452
return nil
452
453
}
@@ -469,7 +470,7 @@ public final class RDKafkaClient: Sendable {
469
470
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
470
471
func subscribe( topicPartitionList: RDKafkaTopicPartitionList ) throws {
471
472
try topicPartitionList. withListPointer { pointer in
472
- let result = rd_kafka_subscribe ( self . kafkaHandle, pointer)
473
+ let result = rd_kafka_subscribe ( self . kafkaHandle. pointer , pointer)
473
474
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
474
475
throw KafkaError . rdKafkaError ( wrapping: result)
475
476
}
@@ -480,7 +481,7 @@ public final class RDKafkaClient: Sendable {
480
481
/// - Parameter topicPartitionList: Pointer to a list of topics + partition pairs.
481
482
func assign( topicPartitionList: RDKafkaTopicPartitionList ) throws {
482
483
try topicPartitionList. withListPointer { pointer in
483
- let result = rd_kafka_assign ( self . kafkaHandle, pointer)
484
+ let result = rd_kafka_assign ( self . kafkaHandle. pointer , pointer)
484
485
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
485
486
throw KafkaError . rdKafkaError ( wrapping: result)
486
487
}
@@ -517,7 +518,7 @@ public final class RDKafkaClient: Sendable {
517
518
518
519
let error = changesList. withListPointer { listPointer in
519
520
rd_kafka_commit (
520
- self . kafkaHandle,
521
+ self . kafkaHandle. pointer ,
521
522
listPointer,
522
523
1 // async = true
523
524
)
@@ -560,9 +561,9 @@ public final class RDKafkaClient: Sendable {
560
561
561
562
changesList. withListPointer { listPointer in
562
563
rd_kafka_commit_queue (
563
- self . kafkaHandle,
564
+ self . kafkaHandle. pointer ,
564
565
listPointer,
565
- self . queue ,
566
+ self . queueHandle . pointer ,
566
567
nil ,
567
568
opaquePointer
568
569
)
@@ -581,7 +582,7 @@ public final class RDKafkaClient: Sendable {
581
582
let queue = DispatchQueue ( label: " com.swift-server.swift-kafka.flush " )
582
583
try await withCheckedThrowingContinuation { ( continuation: CheckedContinuation < Void , Error > ) in
583
584
queue. async {
584
- let error = rd_kafka_flush ( self . kafkaHandle, timeoutMilliseconds)
585
+ let error = rd_kafka_flush ( self . kafkaHandle. pointer , timeoutMilliseconds)
585
586
if error != RD_KAFKA_RESP_ERR_NO_ERROR {
586
587
continuation. resume ( throwing: KafkaError . rdKafkaError ( wrapping: error) )
587
588
} else {
@@ -596,7 +597,7 @@ public final class RDKafkaClient: Sendable {
596
597
///
597
598
/// Make sure to run poll loop until ``RDKafkaClient/consumerIsClosed`` returns `true`.
598
599
func consumerClose( ) throws {
599
- let result = rd_kafka_consumer_close_queue ( self . kafkaHandle, self . queue )
600
+ let result = rd_kafka_consumer_close_queue ( self . kafkaHandle. pointer , self . queueHandle . pointer )
600
601
let kafkaError = rd_kafka_error_code ( result)
601
602
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
602
603
throw KafkaError . rdKafkaError ( wrapping: kafkaError)
@@ -605,14 +606,14 @@ public final class RDKafkaClient: Sendable {
605
606
606
607
/// Returns `true` if the underlying `librdkafka` consumer is closed.
607
608
var isConsumerClosed : Bool {
608
- rd_kafka_consumer_closed ( self . kafkaHandle) == 1
609
+ rd_kafka_consumer_closed ( self . kafkaHandle. pointer ) == 1
609
610
}
610
611
611
612
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
612
613
/// - Warning: Do not escape the pointer from the closure for later use.
613
614
/// - Parameter body: The closure will use the Kafka handle pointer.
614
615
@discardableResult
615
616
func withKafkaHandlePointer< T> ( _ body: ( OpaquePointer ) throws -> T ) rethrows -> T {
616
- try body ( self . kafkaHandle)
617
+ try body ( self . kafkaHandle. pointer )
617
618
}
618
619
}
0 commit comments