Skip to content

Commit 9ae820f

Browse files
Refactor
1 parent 8a16e45 commit 9ae820f

File tree

3 files changed

+66
-59
lines changed

3 files changed

+66
-59
lines changed

package-lock.json

Lines changed: 55 additions & 39 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/modules/kafka/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
"kafkajs": "^2.2.4"
3333
},
3434
"dependencies": {
35+
"compare-versions": "^6.1.1",
3536
"testcontainers": "^11.1.0"
3637
}
3738
}

packages/modules/kafka/src/kafka-container.ts

Lines changed: 10 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1+
import { satisfies } from "compare-versions";
12
import {
23
AbstractStartedContainer,
34
BoundPorts,
45
Content,
56
GenericContainer,
67
getContainerRuntimeClient,
7-
ImageName,
88
InspectResult,
99
RandomUuid,
1010
StartedTestContainer,
@@ -66,8 +66,7 @@ export class KafkaContainer extends GenericContainer {
6666
constructor(image: string) {
6767
super(image);
6868

69-
const parsedImage = ImageName.fromString(image);
70-
if (parsedImage.image === "confluentinc/cp-kafka" && parsedImage.tag.startsWith("8.")) {
69+
if (satisfies(this.imageName.tag, ">=8.0.0")) {
7170
this.withKraft();
7271
}
7372

@@ -149,7 +148,11 @@ export class KafkaContainer extends GenericContainer {
149148
}
150149

151150
public override async start(): Promise<StartedKafkaContainer> {
152-
if (this.mode === KafkaMode.KRAFT && this.saslSslConfig && this.isLessThanCP(7, 5)) {
151+
if (
152+
this.mode === KafkaMode.KRAFT &&
153+
this.saslSslConfig &&
154+
satisfies(this.imageName.tag, `<${MIN_KRAFT_SASL_VERSION}`)
155+
) {
153156
throw new Error(
154157
`Provided Confluent Platform's version ${this.imageName.tag} is not supported in Kraft mode with sasl (must be ${MIN_KRAFT_SASL_VERSION} or above)`
155158
);
@@ -166,15 +169,15 @@ export class KafkaContainer extends GenericContainer {
166169
// exporting KAFKA_ADVERTISED_LISTENERS with the container hostname
167170
command += `export KAFKA_ADVERTISED_LISTENERS=${advertisedListeners}\n`;
168171

169-
if (this.mode !== KafkaMode.KRAFT || this.isLessThanCP(7, 4)) {
172+
if (this.mode !== KafkaMode.KRAFT || satisfies(this.imageName.tag, "<7.4.0")) {
170173
// Optimization: skip the checks
171174
command += "echo '' > /etc/confluent/docker/ensure \n";
172175
}
173176
if (this.mode === KafkaMode.KRAFT) {
174177
if (this.saslSslConfig) {
175178
command += this.commandKraftCreateUser(this.saslSslConfig);
176179
}
177-
if (this.isLessThanCP(7, 4)) {
180+
if (satisfies(this.imageName.tag, "<7.4.0")) {
178181
command += this.commandKraft();
179182
}
180183
} else if (this.mode === KafkaMode.EMBEDDED_ZOOKEEPER) {
@@ -268,26 +271,13 @@ export class KafkaContainer extends GenericContainer {
268271
}
269272

270273
private verifyMinKraftVersion() {
271-
if (this.isLessThanCP(7)) {
274+
if (satisfies(this.imageName.tag, `<${MIN_KRAFT_VERSION}`)) {
272275
throw new Error(
273276
`Provided Confluent Platform's version ${this.imageName.tag} is not supported in Kraft mode (must be ${MIN_KRAFT_VERSION} or above)`
274277
);
275278
}
276279
}
277280

278-
private isLessThanCP(max: number, min = 0, patch = 0): boolean {
279-
if (this.imageName.tag === "latest") {
280-
return false;
281-
}
282-
const parts = this.imageName.tag.split(".");
283-
return !(
284-
parts.length > 2 &&
285-
(Number(parts[0]) > max ||
286-
(Number(parts[0]) === max &&
287-
(Number(parts[1]) > min || (Number(parts[1]) === min && Number(parts[2]) >= patch))))
288-
);
289-
}
290-
291281
private commandKraftCreateUser(saslOptions: SaslSslListenerOptions): string {
292282
return (
293283
"echo 'kafka-storage format --ignore-formatted " +

0 commit comments

Comments
 (0)