Skip to content

Commit 3c1f585

Browse files
committed
Add Azure Event Hubs Emulator container to Azure module
- Fix Typo in the container's name - Require AzuriteContainer as dependency - Add Kafka Event Hubs test - Add Kafka option to Azure documentation Signed-off-by: Esta Nagy <[email protected]>
1 parent ce13e06 commit 3c1f585

File tree

6 files changed

+224
-103
lines changed

6 files changed

+224
-103
lines changed

docs/modules/azure.md

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ This module is INCUBATING. While it is ready for use and operational in the curr
55

66
Testcontainers module for the Microsoft Azure's [SDK](https://github.com/Azure/azure-sdk-for-java).
77

8-
Currently, the module supports `Azurite`, `Azure Eventhubs` and `CosmosDB` emulators. In order to use them, you should use the following classes:
8+
Currently, the module supports `Azurite`, `Azure Event Hubs` and `CosmosDB` emulators. In order to use them, you should use the following classes:
99

1010
Class | Container Image
1111
-|-
1212
AzuriteContainer | [mcr.microsoft.com/azure-storage/azurite](https://github.com/microsoft/containerregistry)
13-
AzureEventhubsEmulatorContainer | [mcr.microsoft.com/azure-messaging/eventhubs-emulator](https://github.com/microsoft/containerregistry)
13+
AzureEventHubsEmulatorContainer | [mcr.microsoft.com/azure-messaging/eventhubs-emulator](https://github.com/microsoft/containerregistry)
1414
CosmosDBEmulatorContainer | [mcr.microsoft.com/cosmosdb/linux/azure-cosmos-emulator](https://github.com/microsoft/containerregistry)
1515

1616
## Usage example
@@ -73,22 +73,40 @@ Build Azure Table client:
7373
!!! note
7474
We can use custom credentials the same way as defined in the Blob section.
7575

76-
### Azure Eventhubs Emulator
76+
### Azure Event Hubs Emulator
7777

7878
<!--codeinclude-->
79-
[Configuring the Azure Eventhubs Emulator container](../../modules/azure/src/test/resources/eventhubs_config.json)
79+
[Configuring the Azure Event Hubs Emulator container](../../modules/azure/src/test/resources/eventhubs_config.json)
8080
<!--/codeinclude-->
8181

82-
Start Azure Eventhubs Emulator during a test:
82+
Start Azure Event Hubs Emulator during a test:
8383

8484
<!--codeinclude-->
85-
[Starting a Azure Eventhubs Emulator container](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventhubsEmulatorContainerTest.java) inside_block:emulatorContainer
85+
[Setting uo a network](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:network
8686
<!--/codeinclude-->
8787

88+
<!--codeinclude-->
89+
[Starting an Azurite container as dependency](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:azuriteContainer
90+
<!--/codeinclude-->
91+
92+
<!--codeinclude-->
93+
[Starting a Azure Event Hubs Emulator container](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:emulatorContainer
94+
<!--/codeinclude-->
95+
96+
#### Using Azure Event Hubs clients
97+
98+
Configure the consumer and the producer clients:
99+
100+
<!--codeinclude-->
101+
[Configuring the clients](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:createProducerAndConsumer
102+
<!--/codeinclude-->
103+
104+
#### Using Kafka clients
105+
88106
Configure the consumer and the producer clients:
89107

90108
<!--codeinclude-->
91-
[Configuring the clients](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventhubsEmulatorContainerTest.java) inside_block:createProducerAndConsumer
109+
[Obtaining the Kafka connection properties](../../modules/azure/src/test/java/org/testcontainers/azure/AzureEventHubsEmulatorContainerTest.java) inside_block:kafkaProperties
92110
<!--/codeinclude-->
93111

94112
### CosmosDB

modules/azure/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ dependencies {
1111
testImplementation 'com.azure:azure-storage-queue:12.24.0'
1212
testImplementation 'com.azure:azure-data-tables:12.5.0'
1313
testImplementation 'com.azure:azure-messaging-eventhubs:5.19.2'
14+
testImplementation 'org.apache.kafka:kafka-clients:3.8.0'
1415
}

modules/azure/src/main/java/org/testcontainers/azure/AzureEventhubsEmulatorContainer.java renamed to modules/azure/src/main/java/org/testcontainers/azure/AzureEventHubsEmulatorContainer.java

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
* <li>Kafka: 9092</li>
1818
* </ul>
1919
*/
20-
public class AzureEventhubsEmulatorContainer extends GenericContainer<AzureEventhubsEmulatorContainer> {
20+
public class AzureEventHubsEmulatorContainer extends GenericContainer<AzureEventHubsEmulatorContainer> {
2121

2222
private static final int DEFAULT_AMQP_PORT = 5672;
2323

@@ -26,6 +26,8 @@ public class AzureEventhubsEmulatorContainer extends GenericContainer<AzureEvent
2626
private static final String CONNECTION_STRING_FORMAT =
2727
"Endpoint=sb://%s:%d;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=SAS_KEY_VALUE;UseDevelopmentEmulator=true;";
2828

29+
private static final String BOOTSTRAP_SERVERS_FORMAT = "%s:%d";
30+
2931
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse(
3032
"mcr.microsoft.com/azure-messaging/eventhubs-emulator"
3133
);
@@ -34,34 +36,21 @@ public class AzureEventhubsEmulatorContainer extends GenericContainer<AzureEvent
3436

3537
private Transferable config;
3638

39+
private boolean useKafka;
40+
3741
/**
3842
* @param dockerImageName specified docker image name to run
3943
*/
40-
public AzureEventhubsEmulatorContainer(final DockerImageName dockerImageName) {
44+
public AzureEventHubsEmulatorContainer(
45+
final DockerImageName dockerImageName,
46+
final AzuriteContainer azuriteContainer
47+
) {
4148
super(dockerImageName);
4249
dockerImageName.assertCompatibleWith(DEFAULT_IMAGE_NAME);
43-
50+
this.azuriteContainer = azuriteContainer;
51+
dependsOn(azuriteContainer);
4452
waitingFor(Wait.forLogMessage(".*Emulator Service is Successfully Up!.*", 1));
45-
withExposedPorts(DEFAULT_AMQP_PORT, DEFAULT_KAFKA_PORT);
46-
}
47-
48-
@Override
49-
public void start() {
50-
if (this.azuriteContainer == null) {
51-
this.azuriteContainer =
52-
new AzuriteContainer(AzuriteContainer.DEFAULT_IMAGE_NAME.withTag("3.33.0")).withNetwork(getNetwork());
53-
}
54-
this.azuriteContainer.start();
55-
56-
super.start();
57-
}
58-
59-
@Override
60-
public void stop() {
61-
super.stop();
62-
if (this.azuriteContainer != null) {
63-
this.azuriteContainer.stop();
64-
}
53+
withExposedPorts(DEFAULT_AMQP_PORT);
6554
}
6655

6756
/**
@@ -70,7 +59,7 @@ public void stop() {
7059
* @param config The file containing the broker configuration
7160
* @return this
7261
*/
73-
public AzureEventhubsEmulatorContainer withConfig(final Transferable config) {
62+
public AzureEventHubsEmulatorContainer withConfig(final Transferable config) {
7463
this.config = config;
7564
return this;
7665
}
@@ -80,10 +69,20 @@ public AzureEventhubsEmulatorContainer withConfig(final Transferable config) {
8069
*
8170
* @return this
8271
*/
83-
public AzureEventhubsEmulatorContainer acceptLicense() {
72+
public AzureEventHubsEmulatorContainer acceptLicense() {
8473
return withEnv("ACCEPT_EULA", "Y");
8574
}
8675

76+
/**
77+
* Enables Kafka support.
78+
*
79+
* @return this
80+
*/
81+
public AzureEventHubsEmulatorContainer enableKafka() {
82+
this.useKafka = true;
83+
return this;
84+
}
85+
8786
@Override
8887
protected void configure() {
8988
dependsOn(azuriteContainer);
@@ -99,6 +98,10 @@ protected void configure() {
9998
logger().info("Using path for configuration file: '{}'", this.config);
10099
withCopyToContainer(this.config, "/Eventhubs_Emulator/ConfigFiles/Config.json");
101100
}
101+
if (this.useKafka) {
102+
//Kafka must use the default port or the advertised port won't match
103+
this.addFixedExposedPort(DEFAULT_KAFKA_PORT, DEFAULT_KAFKA_PORT);
104+
}
102105
}
103106

104107
/**
@@ -109,4 +112,13 @@ protected void configure() {
109112
public String getConnectionString() {
110113
return String.format(CONNECTION_STRING_FORMAT, getHost(), getMappedPort(DEFAULT_AMQP_PORT));
111114
}
115+
116+
/**
117+
* Returns the kafka bootstrap servers
118+
*
119+
* @return bootstrap servers
120+
*/
121+
public String getBootstrapServers() {
122+
return String.format(BOOTSTRAP_SERVERS_FORMAT, getHost(), getMappedPort(DEFAULT_KAFKA_PORT));
123+
}
112124
}

modules/azure/src/main/java/org/testcontainers/azure/AzuriteContainer.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public class AzuriteContainer extends GenericContainer<AzuriteContainer> {
4040
private static final String WELL_KNOWN_ACCOUNT_KEY =
4141
"Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw==";
4242

43-
static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("mcr.microsoft.com/azure-storage/azurite");
43+
private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse(
44+
"mcr.microsoft.com/azure-storage/azurite"
45+
);
4446

4547
private MountableFile cert = null;
4648

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package org.testcontainers.azure;
2+
3+
import com.azure.core.util.IterableStream;
4+
import com.azure.messaging.eventhubs.EventData;
5+
import com.azure.messaging.eventhubs.EventHubClientBuilder;
6+
import com.azure.messaging.eventhubs.EventHubConsumerClient;
7+
import com.azure.messaging.eventhubs.EventHubProducerClient;
8+
import com.azure.messaging.eventhubs.models.EventPosition;
9+
import com.azure.messaging.eventhubs.models.PartitionEvent;
10+
import com.google.common.collect.ImmutableMap;
11+
import org.apache.kafka.clients.consumer.ConsumerConfig;
12+
import org.apache.kafka.clients.consumer.ConsumerRecord;
13+
import org.apache.kafka.clients.consumer.ConsumerRecords;
14+
import org.apache.kafka.clients.consumer.KafkaConsumer;
15+
import org.apache.kafka.clients.producer.KafkaProducer;
16+
import org.apache.kafka.clients.producer.ProducerConfig;
17+
import org.apache.kafka.clients.producer.ProducerRecord;
18+
import org.apache.kafka.common.serialization.StringDeserializer;
19+
import org.apache.kafka.common.serialization.StringSerializer;
20+
import org.awaitility.Awaitility;
21+
import org.junit.Rule;
22+
import org.junit.Test;
23+
import org.testcontainers.containers.Network;
24+
import org.testcontainers.utility.DockerImageName;
25+
import org.testcontainers.utility.MountableFile;
26+
27+
import java.time.Duration;
28+
import java.util.Collections;
29+
import java.util.Optional;
30+
import java.util.Properties;
31+
import java.util.UUID;
32+
33+
import static org.assertj.core.api.Assertions.assertThat;
34+
import static org.assertj.core.api.Assertions.tuple;
35+
import static org.awaitility.Awaitility.waitAtMost;
36+
37+
public class AzureEventHubsEmulatorContainerTest {
38+
39+
@Rule
40+
// network {
41+
public Network network = Network.newNetwork();
42+
43+
// }
44+
45+
@Rule
46+
// azuriteContainer {
47+
public AzuriteContainer azuriteContainer = new AzuriteContainer("mcr.microsoft.com/azure-storage/azurite:3.33.0")
48+
.withNetwork(network);
49+
50+
// }
51+
52+
@Rule
53+
// emulatorContainer {
54+
public AzureEventHubsEmulatorContainer emulator = new AzureEventHubsEmulatorContainer(
55+
DockerImageName.parse("mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1"),
56+
azuriteContainer
57+
)
58+
.acceptLicense()
59+
.enableKafka() //optional
60+
.withNetwork(network)
61+
.withConfig(MountableFile.forClasspathResource("/eventhubs_config.json"));
62+
63+
// }
64+
65+
@Test
66+
public void testWithEventhubsClient() {
67+
try (
68+
// createProducerAndConsumer {
69+
EventHubProducerClient producer = new EventHubClientBuilder()
70+
.connectionString(emulator.getConnectionString())
71+
.fullyQualifiedNamespace("emulatorNs1")
72+
.eventHubName("eh1")
73+
.buildProducerClient();
74+
EventHubConsumerClient consumer = new EventHubClientBuilder()
75+
.connectionString(emulator.getConnectionString())
76+
.fullyQualifiedNamespace("emulatorNs1")
77+
.eventHubName("eh1")
78+
.consumerGroup("cg1")
79+
.buildConsumerClient()
80+
// }
81+
) {
82+
producer.send(Collections.singletonList(new EventData("test")));
83+
84+
waitAtMost(Duration.ofSeconds(30))
85+
.pollDelay(Duration.ofSeconds(5))
86+
.untilAsserted(() -> {
87+
IterableStream<PartitionEvent> events = consumer.receiveFromPartition(
88+
"0",
89+
1,
90+
EventPosition.earliest(),
91+
Duration.ofSeconds(2)
92+
);
93+
Optional<PartitionEvent> event = events.stream().findFirst();
94+
assertThat(event).isPresent();
95+
assertThat(event.get().getData().getBodyAsString()).isEqualTo("test");
96+
});
97+
}
98+
}
99+
100+
@Test
101+
public void testWithKafkaClient() throws Exception {
102+
// kafkaProperties {
103+
ImmutableMap<String, String> commonProperties = ImmutableMap
104+
.<String, String>builder()
105+
.put("bootstrap.servers", emulator.getBootstrapServers())
106+
.put("sasl.mechanism", "PLAIN")
107+
.put("security.protocol", "SASL_PLAINTEXT")
108+
.put(
109+
"sasl.jaas.config",
110+
String.format(
111+
"org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"%s\";",
112+
emulator.getConnectionString()
113+
)
114+
)
115+
.build();
116+
// }
117+
118+
Properties producerProperties = new Properties();
119+
producerProperties.put(ProducerConfig.CLIENT_ID_CONFIG, UUID.randomUUID().toString());
120+
producerProperties.putAll(commonProperties);
121+
122+
Properties consumerProperties = new Properties();
123+
consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "tc-" + UUID.randomUUID());
124+
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
125+
consumerProperties.putAll(commonProperties);
126+
try (
127+
KafkaProducer<String, String> producer = new KafkaProducer<>(
128+
producerProperties,
129+
new StringSerializer(),
130+
new StringSerializer()
131+
);
132+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(
133+
consumerProperties,
134+
new StringDeserializer(),
135+
new StringDeserializer()
136+
);
137+
) {
138+
String topicName = "eh1";
139+
consumer.subscribe(Collections.singletonList(topicName));
140+
141+
producer.send(new ProducerRecord<>(topicName, "testcontainers", "rulezzz")).get();
142+
143+
Awaitility
144+
.await()
145+
.atMost(Duration.ofSeconds(10))
146+
.untilAsserted(() -> {
147+
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
148+
149+
assertThat(records)
150+
.hasSize(1)
151+
.extracting(ConsumerRecord::topic, ConsumerRecord::key, ConsumerRecord::value)
152+
.containsExactly(tuple(topicName, "testcontainers", "rulezzz"));
153+
});
154+
155+
consumer.unsubscribe();
156+
}
157+
}
158+
}

0 commit comments

Comments
 (0)