1717import org .apache .kafka .clients .producer .ProducerRecord ;
1818import org .apache .kafka .common .serialization .StringDeserializer ;
1919import org .apache .kafka .common .serialization .StringSerializer ;
20- import org .awaitility .Awaitility ;
2120import org .junit .Rule ;
2221import org .junit .Test ;
2322import org .testcontainers .containers .Network ;
24- import org .testcontainers .utility .DockerImageName ;
2523import org .testcontainers .utility .MountableFile ;
2624
2725import java .time .Duration ;
3230
3331import static org .assertj .core .api .Assertions .assertThat ;
3432import static org .assertj .core .api .Assertions .tuple ;
33+ import static org .awaitility .Awaitility .await ;
3534import static org .awaitility .Awaitility .waitAtMost ;
3635
3736public class AzureEventHubsEmulatorContainerTest {
3837
3938 @ Rule
4039 // network {
4140 public Network network = Network .newNetwork ();
42-
4341 // }
4442
4543 @ Rule
4644 // azuriteContainer {
4745 public AzuriteContainer azuriteContainer = new AzuriteContainer ("mcr.microsoft.com/azure-storage/azurite:3.33.0" )
48- .withNetwork (network );
49-
46+ .withNetwork (network );
5047 // }
5148
5249 @ Rule
5350 // emulatorContainer {
5451 public AzureEventHubsEmulatorContainer emulator = new AzureEventHubsEmulatorContainer (
55- DockerImageName .parse ("mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1" ),
56- azuriteContainer
52+ "mcr.microsoft.com/azure-messaging/eventhubs-emulator:2.0.1"
5753 )
58- .acceptLicense ()
59- .enableKafka () //optional
60- .withNetwork (network )
61- .withConfig (MountableFile .forClasspathResource ("/eventhubs_config.json" ));
62-
54+ .acceptLicense ()
55+ .enableKafka () //optional
56+ .withNetwork (network )
57+ .withConfig (MountableFile .forClasspathResource ("/eventhubs_config.json" ))
58+ . withAzuriteContainer ( azuriteContainer );
6359 // }
6460
6561 @ Test
6662 public void testWithEventHubsClient () {
6763 try (
68- // createProducerAndConsumer {
69- EventHubProducerClient producer = new EventHubClientBuilder ()
70- .connectionString (emulator .getConnectionString ())
71- .fullyQualifiedNamespace ("emulatorNs1" )
72- .eventHubName ("eh1" )
73- .buildProducerClient ();
74- EventHubConsumerClient consumer = new EventHubClientBuilder ()
75- .connectionString (emulator .getConnectionString ())
76- .fullyQualifiedNamespace ("emulatorNs1" )
77- .eventHubName ("eh1" )
78- .consumerGroup ("cg1" )
79- .buildConsumerClient ()
80- // }
64+ // createProducerAndConsumer {
65+ EventHubProducerClient producer = new EventHubClientBuilder ()
66+ .connectionString (emulator .getConnectionString ())
67+ .fullyQualifiedNamespace ("emulatorNs1" )
68+ .eventHubName ("eh1" )
69+ .buildProducerClient ();
70+ EventHubConsumerClient consumer = new EventHubClientBuilder ()
71+ .connectionString (emulator .getConnectionString ())
72+ .fullyQualifiedNamespace ("emulatorNs1" )
73+ .eventHubName ("eh1" )
74+ .consumerGroup ("cg1" )
75+ .buildConsumerClient ()
76+ // }
8177 ) {
8278 producer .send (Collections .singletonList (new EventData ("test" )));
8379
8480 waitAtMost (Duration .ofSeconds (30 ))
85- .pollDelay (Duration .ofSeconds (5 ))
86- .untilAsserted (() -> {
87- IterableStream <PartitionEvent > events = consumer .receiveFromPartition (
88- "0" ,
89- 1 ,
90- EventPosition .earliest (),
91- Duration .ofSeconds (2 )
92- );
93- Optional <PartitionEvent > event = events .stream ().findFirst ();
94- assertThat (event ).isPresent ();
95- assertThat (event .get ().getData ().getBodyAsString ()).isEqualTo ("test" );
96- });
81+ .pollDelay (Duration .ofSeconds (5 ))
82+ .untilAsserted (() -> {
83+ IterableStream <PartitionEvent > events = consumer .receiveFromPartition (
84+ "0" ,
85+ 1 ,
86+ EventPosition .earliest (),
87+ Duration .ofSeconds (2 )
88+ );
89+ Optional <PartitionEvent > event = events .stream ().findFirst ();
90+ assertThat (event ).isPresent ();
91+ assertThat (event .get ().getData ().getBodyAsString ()).isEqualTo ("test" );
92+ });
9793 }
9894 }
9995
10096 @ Test
10197 public void testWithKafkaClient () throws Exception {
10298 // kafkaProperties {
10399 ImmutableMap <String , String > commonProperties = ImmutableMap
104- .<String , String >builder ()
105- .put ("bootstrap.servers" , emulator .getBootstrapServers ())
106- .put ("sasl.mechanism" , "PLAIN" )
107- .put ("security.protocol" , "SASL_PLAINTEXT" )
108- .put (
109- "sasl.jaas.config" ,
110- String .format (
111- "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" $ConnectionString\" password=\" %s\" ;" ,
112- emulator .getConnectionString ()
100+ .<String , String >builder ()
101+ .put ("bootstrap.servers" , emulator .getBootstrapServers ())
102+ .put ("sasl.mechanism" , "PLAIN" )
103+ .put ("security.protocol" , "SASL_PLAINTEXT" )
104+ .put (
105+ "sasl.jaas.config" ,
106+ String .format (
107+ "org.apache.kafka.common.security.plain.PlainLoginModule required username=\" $ConnectionString\" password=\" %s\" ;" ,
108+ emulator .getConnectionString ()
109+ )
113110 )
114- )
115- .build ();
111+ .build ();
116112 // }
117113
118114 Properties producerProperties = new Properties ();
@@ -124,33 +120,32 @@ public void testWithKafkaClient() throws Exception {
124120 consumerProperties .setProperty (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , "earliest" );
125121 consumerProperties .putAll (commonProperties );
126122 try (
127- KafkaProducer <String , String > producer = new KafkaProducer <>(
128- producerProperties ,
129- new StringSerializer (),
130- new StringSerializer ()
131- );
132- KafkaConsumer <String , String > consumer = new KafkaConsumer <>(
133- consumerProperties ,
134- new StringDeserializer (),
135- new StringDeserializer ()
136- );
123+ KafkaProducer <String , String > producer = new KafkaProducer <>(
124+ producerProperties ,
125+ new StringSerializer (),
126+ new StringSerializer ()
127+ );
128+ KafkaConsumer <String , String > consumer = new KafkaConsumer <>(
129+ consumerProperties ,
130+ new StringDeserializer (),
131+ new StringDeserializer ()
132+ );
137133 ) {
138134 String topicName = "eh1" ;
139135 consumer .subscribe (Collections .singletonList (topicName ));
140136
141137 producer .send (new ProducerRecord <>(topicName , "testcontainers" , "rulezzz" )).get ();
142138
143- Awaitility
144- .await ()
145- .atMost (Duration .ofSeconds (10 ))
146- .untilAsserted (() -> {
147- ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
148-
149- assertThat (records )
150- .hasSize (1 )
151- .extracting (ConsumerRecord ::topic , ConsumerRecord ::key , ConsumerRecord ::value )
152- .containsExactly (tuple (topicName , "testcontainers" , "rulezzz" ));
153- });
139+ await ()
140+ .atMost (Duration .ofSeconds (10 ))
141+ .untilAsserted (() -> {
142+ ConsumerRecords <String , String > records = consumer .poll (Duration .ofMillis (100 ));
143+
144+ assertThat (records )
145+ .hasSize (1 )
146+ .extracting (ConsumerRecord ::topic , ConsumerRecord ::key , ConsumerRecord ::value )
147+ .containsExactly (tuple (topicName , "testcontainers" , "rulezzz" ));
148+ });
154149
155150 consumer .unsubscribe ();
156151 }
0 commit comments