Skip to content

Commit 65a7078

Browse files
feat(Kafka): Add KRaft support (#1353)
1 parent d2e1609 commit 65a7078

File tree

10 files changed

+482
-48
lines changed

10 files changed

+482
-48
lines changed
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
namespace Testcontainers.Kafka;
2+
3+
/// <inheritdoc cref="KafkaVendorConfiguration" />
4+
internal sealed class ApacheConfiguration : KafkaVendorConfiguration
5+
{
6+
static ApacheConfiguration()
7+
{
8+
}
9+
10+
/// <summary>
11+
/// Initializes a new instance of the <see cref="ApacheConfiguration" /> class.
12+
/// </summary>
13+
private ApacheConfiguration()
14+
{
15+
}
16+
17+
/// <summary>
18+
/// Gets the singleton instance of the Apache vendor configuration.
19+
/// </summary>
20+
public static KafkaVendorConfiguration Instance { get; }
21+
= new ApacheConfiguration();
22+
23+
/// <inheritdoc />
24+
public override KafkaVendor Vendor
25+
=> KafkaVendor.ApacheSoftwareFoundation;
26+
27+
/// <inheritdoc />
28+
public override ConsensusProtocol ConsensusProtocol
29+
=> ConsensusProtocol.KRaft;
30+
31+
/// <inheritdoc />
32+
public override bool IsImageFromVendor(IImage image)
33+
{
34+
return image.Repository.Contains("apache") || image.Repository.Contains("bitnami");
35+
}
36+
37+
/// <inheritdoc />
38+
public override void Validate(KafkaConfiguration resourceConfiguration)
39+
{
40+
const string message = "Local ZooKeeper is not supported for Apache Kafka images. Configure an external ZooKeeper.";
41+
42+
var isZooKeeperConsensus = resourceConfiguration.ConsensusProtocol == ConsensusProtocol.ZooKeeper;
43+
44+
var hasLocalZooKeeper = isZooKeeperConsensus && resourceConfiguration.Environments.TryGetValue("KAFKA_ZOOKEEPER_CONNECT", out var connectionString) && connectionString.StartsWith("localhost");
45+
46+
_ = Guard.Argument(resourceConfiguration, nameof(IContainerConfiguration.Image))
47+
.ThrowIf(_ => hasLocalZooKeeper, argument => new ArgumentException(message, argument.Name));
48+
}
49+
50+
/// <inheritdoc />
51+
public override string CreateStartupScript(KafkaConfiguration resourceConfiguration, KafkaContainer container)
52+
{
53+
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
54+
55+
var startupScript = new StringWriter();
56+
startupScript.NewLine = "\n";
57+
startupScript.WriteLine("#!/bin/bash");
58+
startupScript.WriteLine("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaBuilder.KafkaPort) + ",BROKER://" + container.IpAddress + ":" + KafkaBuilder.BrokerPort + "," + additionalAdvertisedListeners);
59+
startupScript.WriteLine("exec /etc/kafka/docker/run");
60+
return startupScript.ToString();
61+
}
62+
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
namespace Testcontainers.Kafka;
2+
3+
/// <inheritdoc cref="KafkaVendorConfiguration" />
4+
internal sealed class ConfluentConfiguration : KafkaVendorConfiguration
5+
{
6+
static ConfluentConfiguration()
7+
{
8+
}
9+
10+
/// <summary>
11+
/// Initializes a new instance of the <see cref="ConfluentConfiguration" /> class.
12+
/// </summary>
13+
private ConfluentConfiguration()
14+
{
15+
}
16+
17+
/// <summary>
18+
/// Gets the singleton instance of the Confluent vendor configuration.
19+
/// </summary>
20+
public static KafkaVendorConfiguration Instance { get; }
21+
= new ConfluentConfiguration();
22+
23+
/// <inheritdoc />
24+
public override KafkaVendor Vendor
25+
=> KafkaVendor.Confluent;
26+
27+
/// <inheritdoc />
28+
public override ConsensusProtocol ConsensusProtocol
29+
=> ConsensusProtocol.ZooKeeper;
30+
31+
/// <inheritdoc />
32+
public override bool IsImageFromVendor(IImage image)
33+
{
34+
return image.Repository.Contains("confluentinc");
35+
}
36+
37+
/// <inheritdoc />
38+
public override void Validate(KafkaConfiguration resourceConfiguration)
39+
{
40+
const string message = "KRaft is not supported for Confluent Platform images with versions earlier than 7.0.0.";
41+
42+
Predicate<KafkaConfiguration> isUnsupportedImage = value => value.ConsensusProtocol == ConsensusProtocol.KRaft
43+
&& IsImageFromVendor(value.Image) && value.Image.MatchVersion(v => v.Major < 7);
44+
45+
_ = Guard.Argument(resourceConfiguration, nameof(IContainerConfiguration.Image))
46+
.ThrowIf(argument => isUnsupportedImage(argument.Value), argument => new ArgumentException(message, argument.Name));
47+
}
48+
49+
/// <inheritdoc />
50+
public override string CreateStartupScript(KafkaConfiguration resourceConfiguration, KafkaContainer container)
51+
{
52+
var additionalAdvertisedListeners = string.Join(",", container.AdvertisedListeners ?? Array.Empty<string>());
53+
54+
var isZooKeeperConsensus = resourceConfiguration.ConsensusProtocol == ConsensusProtocol.ZooKeeper;
55+
56+
var hasLocalZooKeeper = isZooKeeperConsensus && resourceConfiguration.Environments.TryGetValue("KAFKA_ZOOKEEPER_CONNECT", out var connectionString) && connectionString.StartsWith("localhost");
57+
58+
var startupScript = new StringWriter();
59+
startupScript.NewLine = "\n";
60+
startupScript.WriteLine("#!/bin/bash");
61+
62+
if (isZooKeeperConsensus && hasLocalZooKeeper)
63+
{
64+
startupScript.WriteLine("echo '' > /etc/confluent/docker/ensure");
65+
startupScript.WriteLine("echo 'clientPort=" + KafkaBuilder.ZooKeeperPort + "' > zookeeper.properties");
66+
startupScript.WriteLine("echo 'dataDir=/var/lib/zookeeper/data' >> zookeeper.properties");
67+
startupScript.WriteLine("echo 'dataLogDir=/var/lib/zookeeper/log' >> zookeeper.properties");
68+
startupScript.WriteLine("zookeeper-server-start zookeeper.properties &");
69+
}
70+
71+
startupScript.WriteLine("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaBuilder.KafkaPort) + ",BROKER://" + container.IpAddress + ":" + KafkaBuilder.BrokerPort + "," + additionalAdvertisedListeners);
72+
startupScript.WriteLine("exec /etc/confluent/docker/run");
73+
return startupScript.ToString();
74+
}
75+
}

0 commit comments

Comments
 (0)