Skip to content

Commit b82c0bc

Browse files
Basic implementation of the KafkaClient class (#18)
* Basic implementation of the `KafkaClient` class Motivation: The `KafkaClient` class is the base class for the future implementations of `KafkaProducer` and `KafkaConsumer`. It handles the connection to the Kafka C library. Modifications: * removed the `ErrorPrinter` example code * added a basic implementation for `KafkaConfig` * added a basic implementation for `KafkaError` * deleted boilerplate `SwiftKafka` `struct` * implemented `KafkaClient` * added `swift-log` to project * added DocC documentatio to `KafkaClient` Result: There is a `KafkaClient` class that is configurable using a preliminary implementation of `KafkaConfig`. * Fixed memory issue for KafkaConfig pointer duplicate objects Modifications: * created a private initializer for KafkaConfig that takes an OpaquePointer as its argmument * implemented the createDuplicate() function that creates a duplicate object of the current KafkaConfig object so we can make use of KafkaConfigs deinitializer and avoid memory leaks Result: No memory issues when duplicating KafkaConfigs * Improved KafkaClient class Modifications: * changed access level of KafkaClient to internal as we do not want to subclass it in our public classes KafkaConsumer and KafkaProducer * made var's let's and moved their initialization to initialization * take Logger as an argument rather than creating our own Logger * Added missing DocC comments * Small changes to KafkaClient Modifications: * made the `KafkaClient` class `final` * got rid of leading _ for internal (not in a Swift sense) variables * added missing DocC comments * Made `KafkaConfig` a `struct` with copy-on-write mechanism Motivation: Modifications: * added a `description` property to the preliminary `KafkaError` type * throw a more descriptive error when `KafkaClient` initialization fails * replace `subscript` in `KafkaConfig` with explicit `set` and `get` functions * `KafkaConfig` is now a `struct` that contains an internal class and a * made `KafkaConfig` conform to `Equatable` * wrote tests for `KafkaConfig` * Refactoring Modifications: * changed /** DocC comments to /// comments * made `KafkaConfig` conform to `Hashable` * Refactored KafkaConfig's conformance to Hashable, Equatable
1 parent 79542fd commit b82c0bc

File tree

8 files changed

+263
-63
lines changed

8 files changed

+263
-63
lines changed

Package.swift

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -22,20 +22,17 @@ let package = Package(
2222
name: "SwiftKafka",
2323
targets: ["SwiftKafka"]
2424
),
25-
.executable(
26-
name: "ErrorPrinter",
27-
targets: ["ErrorPrinter"]
28-
),
2925
],
30-
dependencies: [],
26+
dependencies: [
27+
.package(url: "https://github.com/apple/swift-log.git", from: "1.0.0"),
28+
],
3129
targets: [
3230
.target(
3331
name: "SwiftKafka",
34-
dependencies: []
35-
),
36-
.executableTarget(
37-
name: "ErrorPrinter",
38-
dependencies: ["Crdkafka"]
32+
dependencies: [
33+
"Crdkafka",
34+
.product(name: "Logging", package: "swift-log"),
35+
]
3936
),
4037
.systemLibrary(
4138
name: "Crdkafka",

Sources/ErrorPrinter/ErrorPrinter.swift

Lines changed: 0 additions & 40 deletions
This file was deleted.

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
import Logging
17+
18+
/// Base class for ``KafkaProducer`` and ``KafkaConsumer``,
19+
/// which is used to handle the connection to the Kafka ecosystem.
20+
final class KafkaClient {
21+
// Default size for Strings returned from C API
22+
static let stringSize = 1024
23+
24+
/// A logger.
25+
private let logger: Logger
26+
27+
/// A client is either a `.producer` or a `.consumer`
28+
private let clientType: rd_kafka_type_t
29+
/// The configuration object of the client
30+
private let config: KafkaConfig
31+
/// Handle for the C library's Kafka instance
32+
private let kafkaHandle: OpaquePointer
33+
34+
/// Determines if client is a producer or a consumer
35+
enum `Type` {
36+
case producer
37+
case consumer
38+
}
39+
40+
init(type: Type, config: KafkaConfig, logger: Logger) throws {
41+
self.logger = logger
42+
self.clientType = type == .producer ? RD_KAFKA_PRODUCER : RD_KAFKA_CONSUMER
43+
self.config = config
44+
45+
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
46+
defer { errorChars.deallocate() }
47+
48+
guard let handle = rd_kafka_new(
49+
clientType,
50+
config.pointer,
51+
errorChars,
52+
KafkaClient.stringSize
53+
) else {
54+
let errorString = String(cString: errorChars)
55+
throw KafkaError(description: errorString)
56+
}
57+
self.kafkaHandle = handle
58+
}
59+
60+
deinit {
61+
rd_kafka_destroy(kafkaHandle)
62+
}
63+
}

Sources/SwiftKafka/KafkaConfig.swift

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
import Crdkafka
16+
17+
/// Used to configure producers and consumers.
18+
/// `KafkaConfig` is a `struct` that points to a configuration in memory.
19+
/// Once a property of the `KafkaConfig` is changed, a duplicate in-memory config is created using the
20+
/// copy-on-write mechanism.
21+
/// For more information on how to configure Kafka, see
22+
/// [all available configurations](https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md).
23+
public struct KafkaConfig: Hashable, Equatable {
24+
private final class _Internal: Hashable, Equatable {
25+
/// Pointer to the `rd_kafka_conf_t` object managed by `librdkafka`
26+
private(set) var pointer: OpaquePointer
27+
28+
/// Initialize internal `KafkaConfig` object with default configuration
29+
init() {
30+
self.pointer = rd_kafka_conf_new()
31+
}
32+
33+
/// Initialize internal `KafkaConfig` object through a given `rd_kafka_conf_t` pointer
34+
init(pointer: OpaquePointer) {
35+
self.pointer = pointer
36+
}
37+
38+
deinit {
39+
rd_kafka_conf_destroy(pointer)
40+
}
41+
42+
func value(forKey key: String) -> String? {
43+
let value = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
44+
defer { value.deallocate() }
45+
46+
var valueSize = KafkaClient.stringSize
47+
let configResult = rd_kafka_conf_get(
48+
pointer,
49+
key,
50+
value,
51+
&valueSize
52+
)
53+
54+
if configResult == RD_KAFKA_CONF_OK {
55+
return String(cString: value)
56+
}
57+
return nil
58+
}
59+
60+
func set(_ value: String, forKey key: String) throws {
61+
let errorChars = UnsafeMutablePointer<CChar>.allocate(capacity: KafkaClient.stringSize)
62+
defer { errorChars.deallocate() }
63+
64+
let configResult = rd_kafka_conf_set(
65+
pointer,
66+
key,
67+
value,
68+
errorChars,
69+
KafkaClient.stringSize
70+
)
71+
72+
if configResult != RD_KAFKA_CONF_OK {
73+
let errorString = String(cString: errorChars)
74+
throw KafkaError(description: errorString)
75+
}
76+
}
77+
78+
func createDuplicate() -> _Internal {
79+
let duplicatePointer: OpaquePointer = rd_kafka_conf_dup(self.pointer)
80+
return .init(pointer: duplicatePointer)
81+
}
82+
83+
// MARK: Hashable
84+
85+
func hash(into hasher: inout Hasher) {
86+
hasher.combine(self.pointer)
87+
}
88+
89+
// MARK: Equatable
90+
91+
static func == (lhs: _Internal, rhs: _Internal) -> Bool {
92+
return lhs.pointer == rhs.pointer
93+
}
94+
}
95+
96+
private var _internal: _Internal
97+
98+
public init() {
99+
self._internal = .init()
100+
}
101+
102+
var pointer: OpaquePointer {
103+
return self._internal.pointer
104+
}
105+
106+
/// Retrieve value of configuration property for `key`
107+
public func value(forKey key: String) -> String? {
108+
return self._internal.value(forKey: key)
109+
}
110+
111+
/// Set configuration `value` for `key`
112+
public mutating func set(_ value: String, forKey key: String) throws {
113+
// Copy-on-write mechanism
114+
if !isKnownUniquelyReferenced(&(self._internal)) {
115+
self._internal = self._internal.createDuplicate()
116+
}
117+
118+
try self._internal.set(value, forKey: key)
119+
}
120+
}

Sources/SwiftKafka/SwiftKafka.swift renamed to Sources/SwiftKafka/KafkaError.swift

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
//
1313
//===----------------------------------------------------------------------===//
1414

15-
public struct SwiftKafka {
16-
public private(set) var text = "Hello, World!"
17-
18-
public init() {}
15+
public struct KafkaError: Error {
16+
// Preliminary Implementation
17+
public let description: String
1918
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
//===----------------------------------------------------------------------===//
2+
//
3+
// This source file is part of the swift-kafka-gsoc open source project
4+
//
5+
// Copyright (c) 2022 Apple Inc. and the swift-kafka-gsoc project authors
6+
// Licensed under Apache License v2.0
7+
//
8+
// See LICENSE.txt for license information
9+
// See CONTRIBUTORS.txt for the list of swift-kafka-gsoc project authors
10+
//
11+
// SPDX-License-Identifier: Apache-2.0
12+
//
13+
//===----------------------------------------------------------------------===//
14+
15+
@testable import SwiftKafka
16+
import XCTest
17+
18+
final class KafkaConfigTests: XCTestCase {
19+
func testSettingCorrectValueWorks() throws {
20+
var config = KafkaConfig()
21+
22+
try config.set("ssl", forKey: "security.protocol")
23+
24+
XCTAssertEqual("ssl", config.value(forKey: "security.protocol"))
25+
}
26+
27+
func testSettingWrongKeyFails() {
28+
var config = KafkaConfig()
29+
30+
XCTAssertThrowsError(try config.set("ssl", forKey: "not.a.valid.key"))
31+
}
32+
33+
func testSettingWrongValueFails() {
34+
var config = KafkaConfig()
35+
36+
XCTAssertThrowsError(try config.set("not_a_protocol", forKey: "security.protocol"))
37+
}
38+
39+
func testGetterHasNoSideEffects() {
40+
let configA = KafkaConfig()
41+
let configB = configA
42+
43+
_ = configA.value(forKey: "security.protocol")
44+
_ = configB.value(forKey: "security.protocol")
45+
46+
XCTAssertTrue(configA == configB)
47+
}
48+
49+
func testCopyOnWriteWorks() throws {
50+
var configA = KafkaConfig()
51+
let configB = configA
52+
let configC = configA
53+
54+
// Check if all configs have the default value set
55+
[configA, configB, configC].forEach {
56+
XCTAssertEqual("plaintext", $0.value(forKey: "security.protocol"))
57+
}
58+
59+
try configA.set("ssl", forKey: "security.protocol")
60+
61+
XCTAssertEqual("ssl", configA.value(forKey: "security.protocol"))
62+
XCTAssertEqual("plaintext", configB.value(forKey: "security.protocol"))
63+
XCTAssertEqual("plaintext", configC.value(forKey: "security.protocol"))
64+
XCTAssertNotEqual(configA, configB)
65+
XCTAssertNotEqual(configA, configC)
66+
XCTAssertEqual(configB, configC)
67+
}
68+
}

Tests/SwiftKafkaTests/SwiftKafkaTests.swift

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,4 @@
1515
@testable import SwiftKafka
1616
import XCTest
1717

18-
final class SwiftKafkaTests: XCTestCase {
19-
func testExample() throws {
20-
// This is an example of a functional test case.
21-
// Use XCTAssert and related functions to verify your tests produce the correct
22-
// results.
23-
XCTAssertEqual(SwiftKafka().text, "Hello, World!")
24-
}
25-
}
18+
final class SwiftKafkaTests: XCTestCase {}

docker/Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,4 @@ WORKDIR /swift-kafka-gsoc
1010

1111
COPY . /swift-kafka-gsoc
1212

13-
CMD ["swift", "run", "ErrorPrinter"]
13+
CMD ["swift", "test"]

0 commit comments

Comments
 (0)