Skip to content

Commit 0c8a876

Browse files
committed
Add Kafka consumer/producer contract tests.
1 parent 0b77afa commit 0c8a876

File tree

9 files changed

+83
-105
lines changed

9 files changed

+83
-105
lines changed

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/base/RuntimeMetricsContractTestBase.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,22 @@ protected void assertRuntimeMetrics() {
4848
metrics.forEach(
4949
metric -> {
5050
var dataPoints = metric.getMetric().getGauge().getDataPointsList();
51-
// logger.info("checking {}: {}", metric.getMetric().getName(), dataPoints.size());
5251
assertGreaterThanOrEqual(dataPoints, getThreshold(metric.getMetric().getName()));
5352
});
5453
}
5554

5655
protected Set<String> getExpectedMetrics() {
5756
return ImmutableSet.<String>builder()
58-
// .addAll(JMXMetricsConstants.JVM_METRICS_SET)
57+
.addAll(JMXMetricsConstants.JVM_METRICS_SET)
5958
.addAll(AppSignalsConstants.SLO_METRICS_SET)
6059
.build();
6160
}
6261

6362
protected long getThreshold(String metricName) {
6463
long threshold = 0;
6564
switch (metricName) {
66-
// If maximum memory size is undefined, then value is -1
67-
// https://docs.oracle.com/en/java/javase/17/docs/api/java.management/java/lang/management/MemoryUsage.html#getMax()
65+
// If maximum memory size is undefined, then value is -1
66+
// https://docs.oracle.com/en/java/javase/17/docs/api/java.management/java/lang/management/MemoryUsage.html#getMax()
6867
case JMXMetricsConstants.JVM_HEAP_MAX:
6968
case JMXMetricsConstants.JVM_NON_HEAP_MAX:
7069
case JMXMetricsConstants.JVM_POOL_MAX:
@@ -79,10 +78,6 @@ private void assertGreaterThanOrEqual(List<NumberDataPoint> dps, long threshold)
7978
}
8079

8180
private void assertDataPoints(List<NumberDataPoint> dps, Consumer<Long> consumer) {
82-
dps.forEach(
83-
datapoint -> {
84-
// logger.info("datapoint value: {}", datapoint.getAsInt());
85-
consumer.accept(datapoint.getAsInt());
86-
});
81+
dps.forEach(datapoint -> consumer.accept(datapoint.getAsInt()));
8782
}
8883
}

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/misc/RuntimeJvmMetricsTest.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,10 @@
1515

1616
package software.amazon.opentelemetry.appsignals.test.misc;
1717

18-
import com.google.common.collect.ImmutableSet;
1918
import org.junit.jupiter.api.Test;
2019
import org.junit.jupiter.api.TestInstance;
2120
import org.testcontainers.junit.jupiter.Testcontainers;
2221
import software.amazon.opentelemetry.appsignals.test.base.RuntimeMetricsContractTestBase;
23-
import software.amazon.opentelemetry.appsignals.test.utils.JMXMetricsConstants;
24-
25-
import java.util.Map;
26-
import java.util.Set;
2722

2823
/**
2924
* Tests in this class validate that the SDK will emit JVM metrics when Application Signals runtime
@@ -46,19 +41,4 @@ protected String getApplicationImageName() {
4641
protected String getApplicationWaitPattern() {
4742
return ".*Started Application.*";
4843
}
49-
50-
@Override
51-
protected Set<String> getExpectedMetrics() {
52-
return ImmutableSet.<String>builder()
53-
.addAll(JMXMetricsConstants.JVM_METRICS_SET)
54-
.addAll(super.getExpectedMetrics())
55-
.build();
56-
}
57-
58-
@Override
59-
protected Map<String, String> getApplicationExtraEnvironmentVariables() {
60-
return Map.of(
61-
"OTEL_JMX_ENABLED", "true",
62-
"OTEL_JMX_TARGET_SYSTEM", "jvm");
63-
}
6444
}

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/misc/RuntimeKafkaConsumerMetricsTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ public class RuntimeKafkaConsumerMetricsTest extends RuntimeMetricsContractTestB
4343
private KafkaContainer kafka;
4444

4545
@Test
46-
void testKafkaMetrics() {
46+
void testKafkaConsumerMetrics() {
4747
doTestRuntimeMetrics();
4848
}
4949

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/misc/RuntimeKafkaMetricsTest.java

Lines changed: 42 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -16,96 +16,87 @@
1616
package software.amazon.opentelemetry.appsignals.test.misc;
1717

1818
import com.google.common.collect.ImmutableSet;
19-
import org.junit.jupiter.api.AfterAll;
20-
import org.junit.jupiter.api.BeforeAll;
21-
import org.junit.jupiter.api.Test;
22-
import org.junit.jupiter.api.TestInstance;
23-
import org.slf4j.Logger;
24-
import org.slf4j.LoggerFactory;
19+
import java.io.IOException;
20+
import java.util.Map;
21+
import java.util.Set;
22+
import org.junit.jupiter.api.*;
23+
import org.testcontainers.containers.GenericContainer;
2524
import org.testcontainers.containers.KafkaContainer;
2625
import org.testcontainers.containers.output.Slf4jLogConsumer;
27-
import org.testcontainers.containers.wait.strategy.Wait;
2826
import org.testcontainers.images.PullPolicy;
2927
import org.testcontainers.junit.jupiter.Testcontainers;
30-
import org.testcontainers.lifecycle.Startable;
3128
import org.testcontainers.utility.DockerImageName;
3229
import org.testcontainers.utility.MountableFile;
3330
import software.amazon.opentelemetry.appsignals.test.base.RuntimeMetricsContractTestBase;
3431
import software.amazon.opentelemetry.appsignals.test.utils.JMXMetricsConstants;
3532

36-
import java.io.IOException;
37-
import java.util.List;
38-
import java.util.Map;
39-
import java.util.Set;
40-
4133
@Testcontainers(disabledWithoutDocker = true)
4234
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
4335
public class RuntimeKafkaMetricsTest extends RuntimeMetricsContractTestBase {
44-
private final Logger kafkaLogger =
45-
LoggerFactory.getLogger("broker " + getApplicationOtelServiceName());
46-
47-
private KafkaContainer kafka;
48-
4936
@Test
5037
void testKafkaMetrics() {
51-
doTestRuntimeMetrics();
38+
assertRuntimeMetrics();
5239
}
5340

5441
@Override
55-
protected List<Startable> getApplicationDependsOnContainers() {
56-
kafka =
57-
new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
58-
.withImagePullPolicy(PullPolicy.alwaysPull())
59-
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
60-
.withNetworkAliases("kafkaBroker")
61-
.withNetwork(network)
62-
.withLogConsumer(new Slf4jLogConsumer(kafkaLogger))
63-
.withCopyFileToContainer(
64-
MountableFile.forHostPath(AGENT_PATH), "/opentelemetry-javaagent-all.jar")
65-
.withEnv("KAFKA_OPTS", "-javaagent:/opentelemetry-javaagent-all.jar")
66-
.withEnv("OTEL_METRIC_EXPORT_INTERVAL", "100") // 100 ms
67-
.withEnv("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "true")
68-
.withEnv("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", isRuntimeEnabled())
69-
.withEnv("OTEL_METRICS_EXPORTER", "none")
70-
.withEnv(
71-
"OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT",
72-
"http://" + COLLECTOR_HOSTNAME + ":" + COLLECTOR_PORT)
73-
.withEnv("OTEL_JMX_ENABLED", "true")
74-
.withEnv("OTEL_JMX_TARGET_SYSTEM", "kafka")
75-
.waitingFor(Wait.forLogMessage(".* Kafka Server started .*", 1))
76-
.withKraft();
77-
return List.of(kafka);
42+
protected GenericContainer<?> getApplicationContainer() {
43+
return new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:7.4.0"))
44+
.withImagePullPolicy(PullPolicy.alwaysPull())
45+
.withEnv("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
46+
.withNetworkAliases("kafkaBroker")
47+
.withNetwork(network)
48+
.withLogConsumer(new Slf4jLogConsumer(applicationLogger))
49+
.withCopyFileToContainer(
50+
MountableFile.forHostPath(AGENT_PATH), "/opentelemetry-javaagent-all.jar")
51+
.withEnv("KAFKA_OPTS", "-javaagent:/opentelemetry-javaagent-all.jar")
52+
.withEnv("OTEL_METRIC_EXPORT_INTERVAL", "100") // 100 ms
53+
.withEnv("OTEL_AWS_APPLICATION_SIGNALS_ENABLED", "true")
54+
.withEnv("OTEL_AWS_APPLICATION_SIGNALS_RUNTIME_ENABLED", isRuntimeEnabled())
55+
.withEnv("OTEL_METRICS_EXPORTER", "none")
56+
.withEnv(
57+
"OTEL_AWS_APPLICATION_SIGNALS_EXPORTER_ENDPOINT",
58+
"http://" + COLLECTOR_HOSTNAME + ":" + COLLECTOR_PORT)
59+
.withEnv(getApplicationExtraEnvironmentVariables())
60+
.waitingFor(getApplicationWaitCondition())
61+
.withKraft();
7862
}
7963

8064
@BeforeAll
8165
public void setup() throws IOException, InterruptedException {
82-
kafka.start();
83-
kafka.execInContainer(
66+
application.start();
67+
application.execInContainer(
8468
"/bin/sh",
8569
"-c",
8670
"/usr/bin/kafka-topics --bootstrap-server=localhost:9092 --create --topic kafka_topic --partitions 3 --replication-factor 1");
71+
mockCollectorClient = getMockCollectorClient();
8772
}
8873

89-
@AfterAll
90-
public void tearDown() {
91-
kafka.stop();
92-
}
74+
@BeforeEach
75+
@Override
76+
protected void setupClients() {}
9377

9478
@Override
9579
protected String getApplicationImageName() {
96-
return "aws-appsignals-tests-kafka-kafka-consumers";
80+
return "aws-appsignals-tests-kafka";
9781
}
9882

9983
@Override
10084
protected String getApplicationWaitPattern() {
101-
return ".*Routes ready.*";
85+
return ".* Kafka Server started .*";
10286
}
10387

10488
@Override
10589
protected Set<String> getExpectedMetrics() {
10690
return ImmutableSet.<String>builder()
10791
.addAll(JMXMetricsConstants.KAFKA_METRICS_SET)
108-
.addAll(super.getExpectedMetrics())
92+
.addAll(JMXMetricsConstants.JVM_METRICS_SET)
10993
.build();
11094
}
95+
96+
@Override
97+
protected Map<String, String> getApplicationExtraEnvironmentVariables() {
98+
return Map.of(
99+
"OTEL_JMX_ENABLED", "true",
100+
"OTEL_JMX_TARGET_SYSTEM", "kafka");
101+
}
111102
}

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/misc/RuntimeKafkaProducerMetricsTest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515

1616
package software.amazon.opentelemetry.appsignals.test.misc;
1717

18+
import static org.assertj.core.api.Assertions.assertThat;
19+
1820
import com.google.common.collect.ImmutableSet;
1921
import java.io.IOException;
2022
import java.util.List;
@@ -43,8 +45,13 @@ public class RuntimeKafkaProducerMetricsTest extends RuntimeMetricsContractTestB
4345
private KafkaContainer kafka;
4446

4547
@Test
46-
void testKafkaMetrics() {
47-
doTestRuntimeMetrics();
48+
void testKafkaProducerMetrics() {
49+
for (int i = 0; i < 50; i++) {
50+
var response = appClient.get("/success").aggregate().join();
51+
52+
assertThat(response.status().isSuccess()).isTrue();
53+
}
54+
assertRuntimeMetrics();
4855
}
4956

5057
@Override
@@ -96,7 +103,6 @@ protected Set<String> getExpectedMetrics() {
96103
protected Map<String, String> getApplicationExtraEnvironmentVariables() {
97104
return Map.of(
98105
"OTEL_JMX_ENABLED", "true",
99-
"OTEL_JMX_TARGET_SYSTEM", "kafka-producer",
100-
"OTEL_JMX_DISCOVERY_DELAY", "10");
106+
"OTEL_JMX_TARGET_SYSTEM", "kafka-producer");
101107
}
102108
}

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/utils/JMXMetricsConstants.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,9 @@ public class JMXMetricsConstants {
120120
KAFKA_ISR_OPERATION_COUNT,
121121
KAFKA_MAX_LAG,
122122
KAFKA_CONTROLLER_ACTIVE_COUNT,
123-
// KAFKA_LEADER_ELECTION_RATE,
124-
// KAFKA_UNCLEAN_ELECTION_RATE,
123+
// TODO: Add test case for leader election.
124+
// KAFKA_LEADER_ELECTION_RATE,
125+
// KAFKA_UNCLEAN_ELECTION_RATE,
125126
KAFKA_REQUEST_QUEUE,
126127
KAFKA_LOGS_FLUSH_TIME_COUNT,
127128
KAFKA_LOGS_FLUSH_TIME_MEDIAN,

appsignals-tests/contract-tests/src/test/java/software/amazon/opentelemetry/appsignals/test/utils/MockCollectorClient.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,6 @@ private List<ResourceScopeMetric> fetchMetrics(Set<String> presentMetrics, boole
160160
.flatMap(x -> x.getMetricsList().stream())
161161
.map(x -> x.getName())
162162
.collect(Collectors.toSet());
163-
logger.info("received metrics: {}", receivedMetrics);
164-
logger.info(
165-
"expected {} vs actual {}", presentMetrics.size(), receivedMetrics.size());
166163

167164
return (isRuntime
168165
? !exported.isEmpty() && receivedMetrics.size() == presentMetrics.size()

appsignals-tests/images/kafka/kafka-consumers/src/main/java/App.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import static spark.Spark.port;
2020

2121
import java.time.Duration;
22-
import java.util.Arrays;
22+
import java.util.List;
2323
import java.util.Properties;
2424
import java.util.UUID;
2525
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -39,7 +39,6 @@ public class App {
3939
public static final Logger log = LoggerFactory.getLogger(App.class);
4040

4141
public static void main(String[] args) {
42-
4342
String bootstrapServers = "kafkaBroker:9092";
4443
String topic = "kafka_topic";
4544

@@ -74,6 +73,16 @@ public static void main(String[] args) {
7473
consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
7574
consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
7675

76+
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
77+
consumer.subscribe(List.of(topic));
78+
Runtime.getRuntime()
79+
.addShutdownHook(
80+
new Thread(
81+
() -> {
82+
log.info("Shutting down Kafka consumer...");
83+
consumer.close();
84+
}));
85+
7786
// spark server
7887
port(Integer.parseInt("8080"));
7988
ipAddress("0.0.0.0");
@@ -83,8 +92,6 @@ public static void main(String[] args) {
8392
"/success",
8493
(req, res) -> {
8594
// create consumer
86-
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProperties);
87-
consumer.subscribe(Arrays.asList(topic));
8895
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
8996

9097
String consumedRecord = null;
@@ -93,12 +100,11 @@ public static void main(String[] args) {
93100
consumedRecord = record.value();
94101
}
95102
}
96-
consumer.close();
97103
if (consumedRecord != null && consumedRecord.equals("success")) {
98104
res.status(HttpStatus.OK_200);
99105
res.body("success");
100106
} else {
101-
log.info("consumer is unable to consumer right message");
107+
log.info("consumer is unable to consume right message");
102108
res.status(HttpStatus.INTERNAL_SERVER_ERROR_500);
103109
}
104110
return res.body();

appsignals-tests/images/kafka/kafka-producers/src/main/java/com/amazon/sampleapp/App.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,27 @@ public static void main(String[] args) {
4747
properties.setProperty(
4848
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
4949
properties.setProperty(ProducerConfig.MAX_BLOCK_MS_CONFIG, "10000");
50+
51+
// create the producer
52+
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
53+
Runtime.getRuntime()
54+
.addShutdownHook(
55+
new Thread(
56+
() -> {
57+
log.info("Shutting down Kafka producer...");
58+
producer.close();
59+
}));
60+
5061
// rest endpoints
5162
get(
5263
"/success",
5364
(req, res) -> {
54-
// create the producer
55-
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
56-
5765
// create a record
5866
ProducerRecord record = new ProducerRecord<>("kafka_topic", "success");
5967
// send data - asynchronous
6068
producer.send(record);
6169
// flush data - synchronous
6270
producer.flush();
63-
// close producer
64-
producer.close();
6571

6672
res.status(HttpStatus.OK_200);
6773
res.body("success");
@@ -70,8 +76,6 @@ public static void main(String[] args) {
7076
get(
7177
"/fault",
7278
(req, res) -> {
73-
// create the producer
74-
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
7579
// create a record & send data to a topic that does not exist- asynchronous
7680
ProducerRecord producerRecord = new ProducerRecord<>("fault_do_not_exist", "fault");
7781
producer.send(
@@ -91,8 +95,6 @@ public void onCompletion(RecordMetadata recordMetadata, Exception e) {
9195
});
9296
// flush data - synchronous
9397
producer.flush();
94-
// close producer
95-
producer.close();
9698
res.body("fault");
9799
return res.body();
98100
});

0 commit comments

Comments
 (0)