Skip to content

Commit a646b37

Browse files
Auxiliary types for sending Messages to Kafka (#20)
* Auxiliary types for sending Messages to Kafka Motivation: To be able to send messages to Kafka we need auxiliary types such as * `KafkaProducerMessage` * `KafkaTopicConfig` Modifications: * implemented `KafkaProducerMessage` * created implementation for `KafkaTopicConfig` that matches `KafkaConfig`s copy-on-write mechanism and has tests * created barebones of `KafkaProducer` class * Refactoring
1 parent b82c0bc commit a646b37

File tree

5 files changed

+312
-0
lines changed

5 files changed

+312
-0
lines changed
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
17+
/// Type for representing the number of a Kafka Partition.
18+
public struct KafkaPartition: RawRepresentable {
19+
public let rawValue: Int32
20+
21+
public init(rawValue: Int32) {
22+
self.rawValue = rawValue
23+
}
24+
25+
public static let unassigned = KafkaPartition(rawValue: RD_KAFKA_PARTITION_UA)
26+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
import Logging
17+
18+
/// Send messages to the Kafka cluster.
19+
/// - Note: When messages get published to a non-existent topic, a new topic is created using the ``KafkaTopicConfig``
20+
/// configuration object (only works if server has `auto.create.topics.enable` property set).
21+
public struct KafkaProducer {
22+
private let client: KafkaClient
23+
private let topicConfig: KafkaTopicConfig
24+
25+
// Preliminary implementation
26+
public init(
27+
config: KafkaConfig = KafkaConfig(),
28+
topicConfig: KafkaTopicConfig = KafkaTopicConfig(),
29+
logger: Logger
30+
) throws {
31+
self.client = try KafkaClient(type: .producer, config: config, logger: logger)
32+
self.topicConfig = topicConfig
33+
}
34+
}
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
import Foundation
17+
18+
/// Message that is sent by the `KafkaProducer`
19+
public struct KafkaProducerMessage {
20+
public var topic: String
21+
public var partition: KafkaPartition
22+
public var key: ContiguousBytes?
23+
public var value: ContiguousBytes
24+
25+
/// Create a new `KafkaProducerMessage` with any keys and values pair that conform to the `ContiguousBytes` protocol
26+
/// - Parameter topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
27+
/// - Parameter partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
28+
/// - Parameter key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
29+
/// - Parameter value: The message body.
30+
public init(
31+
topic: String,
32+
partition: KafkaPartition? = nil,
33+
key: ContiguousBytes? = nil,
34+
value: ContiguousBytes
35+
) {
36+
self.topic = topic
37+
self.key = key
38+
self.value = value
39+
40+
if let partition = partition {
41+
self.partition = partition
42+
} else {
43+
self.partition = .unassigned
44+
}
45+
}
46+
47+
/// Create a new `KafkaProducerMessage` with a `String` key and value
48+
/// - Parameter topic: The topic the message will be sent to. Topics may be created by the `KafkaProducer` if non-existent.
49+
/// - Parameter partition: The topic partition the message will be sent to. If not set explicitly, the partiotion will be assigned automatically.
50+
/// - Parameter key: Used to guarantee that messages with the same key will be sent to the same partittion so that their order is preserved.
51+
/// - Parameter value: The message body.
52+
public init(
53+
topic: String,
54+
partition: KafkaPartition? = nil,
55+
key: String? = nil,
56+
value: String
57+
) {
58+
self.init(
59+
topic: topic,
60+
partition: partition,
61+
key: key?.data(using: .utf8),
62+
value: Data(value.utf8)
63+
)
64+
}
65+
}
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
17+
/// `KafkaTopicConfig` is a `struct` that points to a topic configuration in memory.
18+
/// Once a property of the `KafkaTopicConfig` is changed, a duplicate in-memory config is created using the
19+
/// copy-on-write mechanism.
20+
/// For more information on how to configure Kafka topics, see
21+
/// [all available configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md#topic-configuration-properties).
22+
public struct KafkaTopicConfig: Hashable, Equatable {
23+
private final class _Internal: Hashable, Equatable {
24+
/// Pointer to the `rd_kafka_topic_conf_t` object managed by `librdkafka`
25+
private(set) var pointer: OpaquePointer
26+
27+
/// Initialize internal `KafkaTopicConfig` object with default configuration
28+
init() {
29+
self.pointer = rd_kafka_topic_conf_new()
30+
}
31+
32+
/// Initialize internal `KafkaTopicConfig` object through a given `rd_kafka_topic_conf_t` pointer
33+
init(pointer: OpaquePointer) {
34+
self.pointer = pointer
35+
}
36+
37+
deinit {
38+
rd_kafka_topic_conf_destroy(pointer)
39+
}
40+
41+
func value(forKey key: String) -> String? {
42+
let value = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
43+
defer { value.deallocate() }
44+
45+
var valueSize = KafkaClient.stringSize
46+
let configResult = rd_kafka_topic_conf_get(
47+
pointer,
48+
key,
49+
value,
50+
&valueSize
51+
)
52+
53+
if configResult == RD_KAFKA_CONF_OK {
54+
return String(cString: value)
55+
}
56+
return nil
57+
}
58+
59+
func set(_ value: String, forKey key: String) throws {
60+
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
61+
defer { errorChars.deallocate() }
62+
63+
let configResult = rd_kafka_topic_conf_set(
64+
pointer,
65+
key,
66+
value,
67+
errorChars,
68+
KafkaClient.stringSize
69+
)
70+
71+
if configResult != RD_KAFKA_CONF_OK {
72+
let errorString = String(cString: errorChars)
73+
throw KafkaError(description: errorString)
74+
}
75+
}
76+
77+
func createDuplicate() -> _Internal {
78+
let duplicatePointer: OpaquePointer = rd_kafka_topic_conf_dup(self.pointer)
79+
return .init(pointer: duplicatePointer)
80+
}
81+
82+
// MARK: Hashable
83+
84+
func hash(into hasher: inout Hasher) {
85+
hasher.combine(self.pointer)
86+
}
87+
88+
// MARK: Equatable
89+
90+
static func == (lhs: _Internal, rhs: _Internal) -> Bool {
91+
return lhs.pointer == rhs.pointer
92+
}
93+
}
94+
95+
private var _internal: _Internal
96+
97+
public init() {
98+
self._internal = .init()
99+
}
100+
101+
var pointer: OpaquePointer {
102+
return self._internal.pointer
103+
}
104+
105+
/// Retrieve value of topic configuration property for `key`
106+
public func value(forKey key: String) -> String? {
107+
return self._internal.value(forKey: key)
108+
}
109+
110+
/// Set topic configuration `value` for `key`
111+
public mutating func set(_ value: String, forKey key: String) throws {
112+
// Copy-on-write mechanism
113+
if !isKnownUniquelyReferenced(&(self._internal)) {
114+
self._internal = self._internal.createDuplicate()
115+
}
116+
117+
try self._internal.set(value, forKey: key)
118+
}
119+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import SwiftKafka
16+
import XCTest
17+
18+
final class KafkaTopicConfigTests: XCTestCase {
19+
func testSettingCorrectValueWorks() throws {
20+
var config = KafkaTopicConfig()
21+
22+
try config.set("gzip", forKey: "compression.type")
23+
24+
XCTAssertEqual("gzip", config.value(forKey: "compression.type"))
25+
}
26+
27+
func testSettingWrongKeyFails() {
28+
var config = KafkaTopicConfig()
29+
30+
XCTAssertThrowsError(try config.set("gzip", forKey: "not.a.valid.key"))
31+
}
32+
33+
func testSettingWrongValueFails() {
34+
var config = KafkaTopicConfig()
35+
36+
XCTAssertThrowsError(try config.set("not_a_compression_type", forKey: "compression.type"))
37+
}
38+
39+
func testGetterHasNoSideEffects() {
40+
let configA = KafkaTopicConfig()
41+
let configB = configA
42+
43+
_ = configA.value(forKey: "compression.type")
44+
_ = configB.value(forKey: "compression.type")
45+
46+
XCTAssertTrue(configA == configB)
47+
}
48+
49+
func testCopyOnWriteWorks() throws {
50+
var configA = KafkaTopicConfig()
51+
let configB = configA
52+
let configC = configA
53+
54+
// Check if all configs have the default value set
55+
[configA, configB, configC].forEach {
56+
XCTAssertEqual("inherit", $0.value(forKey: "compression.type"))
57+
}
58+
59+
try configA.set("gzip", forKey: "compression.type")
60+
61+
XCTAssertEqual("gzip", configA.value(forKey: "compression.type"))
62+
XCTAssertEqual("inherit", configB.value(forKey: "compression.type"))
63+
XCTAssertEqual("inherit", configC.value(forKey: "compression.type"))
64+
XCTAssertNotEqual(configA, configB)
65+
XCTAssertNotEqual(configA, configC)
66+
XCTAssertEqual(configB, configC)
67+
}
68+
}

0 commit comments

Comments
 (0)