Skip to content

Commit 6508c74

Browse files
feat(kafka): Add network support for Kafka container
1 parent 0d86bda commit 6508c74

File tree

4 files changed

+128
-5
lines changed

4 files changed

+128
-5
lines changed

src/Testcontainers.Kafka/KafkaBuilder.cs

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
14
namespace Testcontainers.Kafka;
25

36
/// <inheritdoc cref="ContainerBuilder{TBuilderEntity, TContainerEntity, TConfigurationEntity}" />
@@ -9,11 +12,18 @@ public sealed class KafkaBuilder : ContainerBuilder<KafkaBuilder, KafkaContainer
912
public const ushort KafkaPort = 9092;
1013

1114
public const ushort BrokerPort = 9093;
15+
16+
public const ushort ControllerPort = 9094;
1217

1318
public const ushort ZookeeperPort = 2181;
1419

1520
public const string StartupScriptFilePath = "/testcontainers.sh";
1621

22+
private HashSet<string> _listeners = [];
23+
private List<string> _advertisedListeners = [];
24+
25+
private const string ProtocolPrefix = "TC";
26+
1727
/// <summary>
1828
/// Initializes a new instance of the <see cref="KafkaBuilder" /> class.
1929
/// </summary>
@@ -43,6 +53,39 @@ public override KafkaContainer Build()
4353
return new KafkaContainer(DockerResourceConfiguration);
4454
}
4555

56+
/// <summary>
57+
/// Add a listener in the format host:port.
58+
/// Host will be included as a network alias.
59+
/// Use it to register additional connections to the Kafka within the same container network.
60+
///
61+
/// Default listeners: PLAINTEXT://0.0.0.0:9092, BROKER://0.0.0.0:9093, CONTROLLER://0.0.0.0:9094
62+
/// </summary>
63+
/// <param name="kafka"></param>
64+
/// <returns></returns>
65+
/// <exception cref="NotImplementedException"></exception>
66+
public KafkaBuilder WithListener(string kafka)
67+
{
68+
var host = kafka.Split(':')[0];
69+
70+
var protocol = $"{ProtocolPrefix}-{_listeners.Count}";
71+
var listener = $"{protocol}://{kafka}";
72+
var listenerSecurityProtocolMap = $"{protocol}:PLAINTEXT";
73+
74+
_listeners.Add(listener);
75+
_advertisedListeners.Add(listener);
76+
77+
var currentListeners = this.DockerResourceConfiguration.Environments["KAFKA_LISTENERS"];
78+
var currentListenersSecurityProtocolMap = this.DockerResourceConfiguration.Environments["KAFKA_LISTENER_SECURITY_PROTOCOL_MAP"];
79+
80+
return this.Merge(DockerResourceConfiguration, new KafkaConfiguration(_advertisedListeners))
81+
.WithEnvironment(new Dictionary<string, string>()
82+
{
83+
{ "KAFKA_LISTENERS", $"{currentListeners},{string.Join(",", listener)}" },
84+
{ "KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", $"{currentListenersSecurityProtocolMap},{string.Join(",", listenerSecurityProtocolMap)}" }
85+
})
86+
.WithNetworkAliases(host);
87+
}
88+
4689
/// <inheritdoc />
4790
protected override KafkaBuilder Init()
4891
{
@@ -51,8 +94,10 @@ protected override KafkaBuilder Init()
5194
.WithPortBinding(KafkaPort, true)
5295
.WithPortBinding(BrokerPort, true)
5396
.WithPortBinding(ZookeeperPort, true)
54-
.WithEnvironment("KAFKA_LISTENERS", "PLAINTEXT://0.0.0.0:" + KafkaPort + ",BROKER://0.0.0.0:" + BrokerPort)
55-
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT")
97+
.WithEnvironment("KAFKA_LISTENERS", $"PLAINTEXT://0.0.0.0:{KafkaPort},BROKER://0.0.0.0:{BrokerPort},CONTROLLER://0.0.0.0:{ControllerPort}")
98+
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "BROKER:PLAINTEXT,PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
99+
.WithEnvironment("KAFKA_NODE_ID", "1")
100+
.WithEnvironment("KAFKA_CONTROLLER_QUORUM_VOTERS", "1@localhost:" + ControllerPort)
56101
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "BROKER")
57102
.WithEnvironment("KAFKA_BROKER_ID", "1")
58103
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
@@ -79,7 +124,7 @@ protected override KafkaBuilder Init()
79124
startupScript.Append(lf);
80125
startupScript.Append("zookeeper-server-start zookeeper.properties &");
81126
startupScript.Append(lf);
82-
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort);
127+
startupScript.Append("export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://" + container.Hostname + ":" + container.GetMappedPublicPort(KafkaPort) + ",BROKER://" + container.IpAddress + ":" + BrokerPort+ "," + string.Join(",", container.AdvertisedListeners));
83128
startupScript.Append(lf);
84129
startupScript.Append("echo '' > /etc/confluent/docker/ensure");
85130
startupScript.Append(lf);
@@ -105,4 +150,5 @@ protected override KafkaBuilder Merge(KafkaConfiguration oldValue, KafkaConfigur
105150
{
106151
return new KafkaBuilder(new KafkaConfiguration(oldValue, newValue));
107152
}
153+
108154
}

src/Testcontainers.Kafka/KafkaConfiguration.cs

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,26 @@
1+
using System.Collections;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
15
namespace Testcontainers.Kafka;
26

37
/// <inheritdoc cref="ContainerConfiguration" />
48
[PublicAPI]
59
public sealed class KafkaConfiguration : ContainerConfiguration
610
{
11+
public IEnumerable<string> AdvertisedListeners { get; }
12+
713
/// <summary>
814
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
915
/// </summary>
10-
public KafkaConfiguration()
16+
public KafkaConfiguration(IEnumerable<string> advertisedListeners = null)
1117
{
18+
if (advertisedListeners != null)
19+
{
20+
this.AdvertisedListeners = [..advertisedListeners];
21+
}
1222
}
13-
23+
1424
/// <summary>
1525
/// Initializes a new instance of the <see cref="KafkaConfiguration" /> class.
1626
/// </summary>
@@ -49,5 +59,6 @@ public KafkaConfiguration(KafkaConfiguration resourceConfiguration)
4959
public KafkaConfiguration(KafkaConfiguration oldValue, KafkaConfiguration newValue)
5060
: base(oldValue, newValue)
5161
{
62+
this.AdvertisedListeners = BuildConfiguration.Combine<IEnumerable<string>>(oldValue.AdvertisedListeners, newValue.AdvertisedListeners);
5263
}
5364
}

src/Testcontainers.Kafka/KafkaContainer.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,21 @@
1+
using System.Collections.Generic;
2+
13
namespace Testcontainers.Kafka;
24

35
/// <inheritdoc cref="DockerContainer" />
46
[PublicAPI]
57
public sealed class KafkaContainer : DockerContainer
68
{
9+
private KafkaConfiguration _configuration;
10+
internal IEnumerable<string> AdvertisedListeners => this._configuration.AdvertisedListeners;
711
/// <summary>
812
/// Initializes a new instance of the <see cref="KafkaContainer" /> class.
913
/// </summary>
1014
/// <param name="configuration">The container configuration.</param>
1115
public KafkaContainer(KafkaConfiguration configuration)
1216
: base(configuration)
1317
{
18+
this._configuration = configuration;
1419
}
1520

1621
/// <summary>
@@ -21,4 +26,5 @@ public string GetBootstrapAddress()
2126
{
2227
return new UriBuilder("PLAINTEXT", Hostname, GetMappedPublicPort(KafkaBuilder.KafkaPort)).ToString();
2328
}
29+
2430
}
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
using System.Collections.Generic;
2+
using System.Text;
3+
using DotNet.Testcontainers.Builders;
4+
using DotNet.Testcontainers.Containers;
5+
using DotNet.Testcontainers.Networks;
6+
7+
namespace Testcontainers.Kafka;
8+
9+
public sealed class KafkaContainerNetworkTest : IAsyncLifetime
10+
{
11+
private INetwork _network;
12+
private KafkaContainer _kafkaContainer;
13+
14+
private IContainer _kCatContainer;
15+
public async Task InitializeAsync()
16+
{
17+
_network = new NetworkBuilder().Build();
18+
_kafkaContainer = new KafkaBuilder()
19+
.WithImage("confluentinc/cp-kafka")
20+
.WithNetwork(_network)
21+
.WithListener("kafka:19092")
22+
.Build();
23+
24+
_kCatContainer = new ContainerBuilder()
25+
.WithImage("confluentinc/cp-kcat")
26+
.WithNetwork(_network)
27+
.WithCommand("-c", "tail -f /dev/null")
28+
.WithEntrypoint("sh")
29+
.WithResourceMapping(Encoding.Default.GetBytes("Message produced by kcat"), "/data/msgs.txt")
30+
.Build();
31+
32+
await _kCatContainer.StartAsync();
33+
await _kafkaContainer.StartAsync();
34+
}
35+
36+
public Task DisposeAsync()
37+
{
38+
return _kafkaContainer.DisposeAsync().AsTask();
39+
}
40+
41+
[Fact]
42+
public async Task TestUsageWithListener()
43+
{
44+
// kcat producer
45+
await _kCatContainer.ExecAsync(new List<string>()
46+
{
47+
"kcat", "-b", "kafka:19092", "-t", "msgs", "-P", "-l", "/data/msgs.txt"
48+
});
49+
50+
51+
// kcat consumer
52+
var kCatResult = await _kCatContainer.ExecAsync(new List<string>()
53+
{
54+
"kcat", "-b", "kafka:19092", "-C", "-t", "msgs", "-c", "1"
55+
});
56+
57+
Assert.Contains("Message produced by kcat", kCatResult.Stdout);
58+
}
59+
60+
}

0 commit comments

Comments
 (0)