Skip to content

Commit caa77e3

Browse files
authored
Merge branch 'main' into issue-165-nonblocking-poll-task-executor
2 parents 5565624 + ebfa094 commit caa77e3

File tree

7 files changed

+98
-91
lines changed

7 files changed

+98
-91
lines changed

Package.swift

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,12 @@ let rdkafkaExclude = [
2424
"./librdkafka/src/rdkafka_sasl_win32.c",
2525
"./librdkafka/src/rdwin32.h",
2626
"./librdkafka/src/win32_config.h",
27+
// Remove dependency on cURL. Disabling `ENABLE_CURL` and `WITH_CURL` does
28+
// not appear to prevent processing of the below files, so we have to exclude
29+
// them explicitly.
30+
"./librdkafka/src/rdkafka_sasl_oauthbearer.c",
31+
"./librdkafka/src/rdkafka_sasl_oauthbearer_oidc.c",
32+
"./librdkafka/src/rdhttp.c",
2733
]
2834

2935
let package = Package(
@@ -72,7 +78,6 @@ let package = Package(
7278
.define("_GNU_SOURCE", to: "1"), // Fix build error for Swift 5.9 onwards
7379
],
7480
linkerSettings: [
75-
.linkedLibrary("curl"),
7681
.linkedLibrary("sasl2"),
7782
.linkedLibrary("z"), // zlib
7883
]

Sources/Crdkafka/custom/config/config.h

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
#define ENABLE_ZSTD 1
2424
#define ENABLE_SSL 1
2525
#define ENABLE_GSSAPI 1
26-
#define ENABLE_CURL 1
26+
#define ENABLE_CURL 0
2727
#define ENABLE_DEVEL 0
2828
#define ENABLE_VALGRIND 0
2929
#define ENABLE_REFCNT_DEBUG 0
@@ -35,7 +35,6 @@
3535
#define ENABLE_ZSTD 1
3636
#define ENABLE_SSL 1
3737
#define ENABLE_GSSAPI 1
38-
#define ENABLE_CURL 1
3938
#define ENABLE_LZ4_EXT 1
4039
#define WITH_STATIC_LINKING 1
4140
#define MKL_APP_NAME "librdkafka"
@@ -136,7 +135,7 @@
136135
// libzstd
137136
#define WITH_ZSTD 1
138137
// libcurl
139-
#define WITH_CURL 1
138+
#define WITH_CURL 0
140139
// WITH_HDRHISTOGRAM
141140
#define WITH_HDRHISTOGRAM 1
142141
// WITH_SNAPPY
@@ -146,9 +145,9 @@
146145
// WITH_SASL_SCRAM
147146
#define WITH_SASL_SCRAM 1
148147
// WITH_SASL_OAUTHBEARER
149-
#define WITH_SASL_OAUTHBEARER 1
148+
#define WITH_SASL_OAUTHBEARER 0
150149
// WITH_OAUTHBEARER_OIDC
151-
#define WITH_OAUTHBEARER_OIDC 1
150+
#define WITH_OAUTHBEARER_OIDC 0
152151
// regex
153152
#define HAVE_REGEX 1
154153
// strndup

Sources/Kafka/Configuration/KafkaConfiguration+Security.swift

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,7 @@ extension KafkaConfiguration {
309309
}
310310
}
311311

312-
public struct OAuthBearerMethod: Sendable, Hashable {
312+
struct OAuthBearerMethod: Sendable, Hashable {
313313
internal enum _OAuthBearerMethod: Sendable, Hashable {
314314
case `default`(
315315
configuration: String?
@@ -337,7 +337,7 @@ extension KafkaConfiguration {
337337
/// For example: `principalClaimName=azp principal=admin scopeClaimName=roles scope=role1,role2 lifeSeconds=600`.
338338
/// In addition, SASL extensions can be communicated to the broker via `extension_NAME=value`.
339339
/// For example: `principal=admin extension_traceId=123`
340-
public static func `default`(configuration: String? = nil) -> OAuthBearerMethod {
340+
static func `default`(configuration: String? = nil) -> OAuthBearerMethod {
341341
OAuthBearerMethod(_internal: .default(configuration: configuration))
342342
}
343343

@@ -359,7 +359,7 @@ extension KafkaConfiguration {
359359
/// - scope: The client uses this to specify the scope of the access request to the broker.
360360
/// - extensions: Allow additional information to be provided to the broker.
361361
/// Comma-separated list of key=value pairs. E.g., "supportFeatureX=true,organizationId=sales-emea".
362-
public static func oidc(
362+
static func oidc(
363363
configuration: String? = nil,
364364
clientID: String,
365365
clientSecret: String,
@@ -419,7 +419,8 @@ extension KafkaConfiguration {
419419
}
420420

421421
/// Use the OAUTHBEARER mechanism.
422-
public static func oAuthBearer(method: OAuthBearerMethod) -> SASLMechanism {
422+
// This is currently disabled since it requires a curl dependency otherwise.
423+
static func oAuthBearer(method: OAuthBearerMethod) -> SASLMechanism {
423424
SASLMechanism(
424425
_internal: .oAuthBearer(method: method)
425426
)

Sources/Kafka/KafkaAcknowledgedMessage.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public struct KafkaAcknowledgedMessage {
2727
public var value: ByteBuffer
2828
/// The offset of the message in its partition.
2929
public var offset: KafkaOffset
30+
/// The headers of the message.
31+
public var headers: [KafkaHeader]
3032

3133
/// Initialize ``KafkaAcknowledgedMessage`` from `rd_kafka_message_t` pointer.
3234
/// - Throws: A ``KafkaAcknowledgedMessageError`` for failed acknowledgements or malformed messages.
@@ -53,7 +55,7 @@ public struct KafkaAcknowledgedMessage {
5355
self.topic = topic
5456

5557
self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))
56-
58+
self.headers = try RDKafkaClient.getHeaders(for: messagePointer)
5759
if let keyPointer = rdKafkaMessage.key {
5860
let keyBufferPointer = UnsafeRawBufferPointer(
5961
start: keyPointer,

Sources/Kafka/KafkaConsumerMessage.swift

Lines changed: 1 addition & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public struct KafkaConsumerMessage {
6666

6767
self.partition = KafkaPartition(rawValue: Int(rdKafkaMessage.partition))
6868

69-
self.headers = try Self.getHeaders(for: messagePointer)
69+
self.headers = try RDKafkaClient.getHeaders(for: messagePointer)
7070

7171
if let keyPointer = rdKafkaMessage.key {
7272
let keyBufferPointer = UnsafeRawBufferPointer(
@@ -91,82 +91,3 @@ extension KafkaConsumerMessage: Hashable {}
9191
// MARK: - KafkaConsumerMessage + Sendable
9292

9393
extension KafkaConsumerMessage: Sendable {}
94-
95-
// MARK: - Helpers
96-
97-
extension KafkaConsumerMessage {
98-
/// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer.
99-
///
100-
/// - Parameters:
101-
/// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from.
102-
private static func getHeaders(
103-
for messagePointer: UnsafePointer<rd_kafka_message_t>
104-
) throws -> [KafkaHeader] {
105-
var result: [KafkaHeader] = []
106-
var headers: OpaquePointer?
107-
108-
var readStatus = rd_kafka_message_headers(messagePointer, &headers)
109-
110-
if readStatus == RD_KAFKA_RESP_ERR__NOENT {
111-
// No Header Entries
112-
return result
113-
}
114-
115-
guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
116-
throw KafkaError.rdKafkaError(wrapping: readStatus)
117-
}
118-
119-
guard let headers else {
120-
return result
121-
}
122-
123-
let headerCount = rd_kafka_header_cnt(headers)
124-
result.reserveCapacity(headerCount)
125-
126-
var headerIndex = 0
127-
128-
while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount {
129-
var headerKeyPointer: UnsafePointer<CChar>?
130-
var headerValuePointer: UnsafeRawPointer?
131-
var headerValueSize = 0
132-
133-
readStatus = rd_kafka_header_get_all(
134-
headers,
135-
headerIndex,
136-
&headerKeyPointer,
137-
&headerValuePointer,
138-
&headerValueSize
139-
)
140-
141-
if readStatus == RD_KAFKA_RESP_ERR__NOENT {
142-
// No Header Entries
143-
return result
144-
}
145-
146-
guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
147-
throw KafkaError.rdKafkaError(wrapping: readStatus)
148-
}
149-
150-
guard let headerKeyPointer else {
151-
fatalError("Found null pointer when reading KafkaConsumerMessage header key")
152-
}
153-
let headerKey = String(cString: headerKeyPointer)
154-
155-
var headerValue: ByteBuffer?
156-
if let headerValuePointer, headerValueSize > 0 {
157-
let headerValueBufferPointer = UnsafeRawBufferPointer(
158-
start: headerValuePointer,
159-
count: headerValueSize
160-
)
161-
headerValue = ByteBuffer(bytes: headerValueBufferPointer)
162-
}
163-
164-
let newHeader = KafkaHeader(key: headerKey, value: headerValue)
165-
result.append(newHeader)
166-
167-
headerIndex += 1
168-
}
169-
170-
return result
171-
}
172-
}

Sources/Kafka/RDKafka/RDKafkaClient.swift

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import Crdkafka
1616
import Dispatch
1717
import Logging
18+
import NIOCore
1819

1920
import class Foundation.JSONDecoder
2021

@@ -616,4 +617,79 @@ public final class RDKafkaClient: Sendable {
616617
func withKafkaHandlePointer<T>(_ body: (OpaquePointer) throws -> T) rethrows -> T {
617618
try body(self.kafkaHandle.pointer)
618619
}
620+
621+
/// Extract ``KafkaHeader``s from a `rd_kafka_message_t` pointer.
622+
///
623+
/// - Parameters:
624+
/// - for: Pointer to the `rd_kafka_message_t` object to extract the headers from.
625+
internal static func getHeaders(
626+
for messagePointer: UnsafePointer<rd_kafka_message_t>
627+
) throws -> [KafkaHeader] {
628+
var result: [KafkaHeader] = []
629+
var headers: OpaquePointer?
630+
631+
var readStatus = rd_kafka_message_headers(messagePointer, &headers)
632+
633+
if readStatus == RD_KAFKA_RESP_ERR__NOENT {
634+
// No Header Entries
635+
return result
636+
}
637+
638+
guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
639+
throw KafkaError.rdKafkaError(wrapping: readStatus)
640+
}
641+
642+
guard let headers else {
643+
return result
644+
}
645+
646+
let headerCount = rd_kafka_header_cnt(headers)
647+
result.reserveCapacity(headerCount)
648+
649+
var headerIndex = 0
650+
651+
while readStatus != RD_KAFKA_RESP_ERR__NOENT && headerIndex < headerCount {
652+
var headerKeyPointer: UnsafePointer<CChar>?
653+
var headerValuePointer: UnsafeRawPointer?
654+
var headerValueSize = 0
655+
656+
readStatus = rd_kafka_header_get_all(
657+
headers,
658+
headerIndex,
659+
&headerKeyPointer,
660+
&headerValuePointer,
661+
&headerValueSize
662+
)
663+
664+
if readStatus == RD_KAFKA_RESP_ERR__NOENT {
665+
// No Header Entries
666+
return result
667+
}
668+
669+
guard readStatus == RD_KAFKA_RESP_ERR_NO_ERROR else {
670+
throw KafkaError.rdKafkaError(wrapping: readStatus)
671+
}
672+
673+
guard let headerKeyPointer else {
674+
fatalError("Found null pointer when reading KafkaConsumerMessage header key")
675+
}
676+
let headerKey = String(cString: headerKeyPointer)
677+
678+
var headerValue: ByteBuffer?
679+
if let headerValuePointer, headerValueSize > 0 {
680+
let headerValueBufferPointer = UnsafeRawBufferPointer(
681+
start: headerValuePointer,
682+
count: headerValueSize
683+
)
684+
headerValue = ByteBuffer(bytes: headerValueBufferPointer)
685+
}
686+
687+
let newHeader = KafkaHeader(key: headerKey, value: headerValue)
688+
result.append(newHeader)
689+
690+
headerIndex += 1
691+
}
692+
693+
return result
694+
}
619695
}

Tests/KafkaTests/KafkaProducerTests.swift

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,10 @@ final class KafkaProducerTests: XCTestCase {
8282
}
8383

8484
let expectedTopic = "test-topic"
85+
let headers = [KafkaHeader(key: "some", value: ByteBuffer.init(string: "test"))]
8586
let message = KafkaProducerMessage(
8687
topic: expectedTopic,
88+
headers: headers,
8789
key: "key",
8890
value: "Hello, World!"
8991
)
@@ -118,6 +120,7 @@ final class KafkaProducerTests: XCTestCase {
118120
XCTAssertEqual(expectedTopic, receivedMessage.topic)
119121
XCTAssertEqual(ByteBuffer(string: message.key!), receivedMessage.key)
120122
XCTAssertEqual(ByteBuffer(string: message.value), receivedMessage.value)
123+
XCTAssertEqual(headers, receivedMessage.headers)
121124

122125
// Shutdown the serviceGroup
123126
await serviceGroup.triggerGracefulShutdown()

0 commit comments

Comments
 (0)