Skip to content

Commit 544534f

Browse files
author
Liudmila Molkova
authored
Messaging stress tests improvements (Azure#37058)
* Stress tests improvements
1 parent 67fd146 commit 544534f

18 files changed

+663
-321
lines changed

.vscode/cspell.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,6 +181,7 @@
181181
"sdk/remoterendering/TestResources/**",
182182
"sdk/purview/azure-analytics-purview-catalog/**",
183183
"sdk/servicebus/build/**",
184+
"sdk/servicebus/azure-messaging-servicebus-stress/templates/**",
184185
"sdk/servicebus/azure-messaging-servicebus-stress/workbooks/**",
185186
"sdk/servicebus/azure-messaging-servicebus-stress/Dockerfile",
186187
"sdk/search/azure-search-documents/**",

sdk/servicebus/azure-messaging-servicebus-stress/Dockerfile

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ FROM maven:3.8.6-openjdk-11 as builder
33
RUN mkdir /stress-sb
44
WORKDIR /stress-sb
55

6-
ADD ./sdk/tools /stress-eh/sdk/tools
7-
ADD ./sdk/parents /stress-eh/sdk/parents
6+
ADD ./sdk/tools /stress-sb/sdk/tools
7+
ADD ./sdk/parents /stress-sb/sdk/parents
88
ADD ./sdk/core /stress-sb/sdk/core
99
ADD ./sdk/servicebus /stress-sb/sdk/servicebus
10-
ADD ./eng /stress-eh/eng
10+
ADD ./eng /stress-sb/eng
1111

1212
RUN --mount=type=cache,target=/root/.m2 \
13-
mvn -f /stress-eh/eng/code-quality-reports/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
14-
mvn -f /stress-eh/sdk/tools/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
13+
mvn -f /stress-sb/eng/code-quality-reports/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
14+
mvn -f /stress-sb/sdk/tools/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
1515
mvn -f /stress-sb/sdk/core/azure-core/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
1616
mvn -f /stress-sb/sdk/core/azure-core-test/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
1717
mvn -f /stress-sb/sdk/core/azure-core-amqp/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
18+
mvn -f /stress-sb/sdk/core/azure-core-metrics-opentelemetry/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
1819
mvn -f /stress-sb/sdk/core/azure-core-http-netty/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
1920
mvn -f /stress-sb/sdk/servicebus/azure-messaging-servicebus/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests && \
2021
mvn -f /stress-sb/sdk/servicebus/azure-messaging-servicebus-stress/pom.xml clean install -Dcheckstyle.skip -Dgpg.skip -Dmaven.javadoc.skip -Drevapi.skip -Dspotbugs.skip -Djacoco.skip -DskipTests
@@ -24,8 +25,9 @@ FROM mcr.microsoft.com/openjdk/jdk:11-mariner
2425
WORKDIR /app
2526
COPY --from=builder /stress-sb/sdk/servicebus/azure-messaging-servicebus-stress/target .
2627

27-
ARG AGENT_URL=https://github.com/microsoft/ApplicationInsights-Java/releases/download/3.4.13/applicationinsights-agent-3.4.13.jar
28+
ARG AGENT_URL=https://github.com/microsoft/ApplicationInsights-Java/releases/download/3.4.15/applicationinsights-agent-3.4.15.jar
2829
ADD ${AGENT_URL} ./BOOT-INF/classes/
29-
ENTRYPOINT ["java", "-javaagent:BOOT-INF/classes/applicationinsights-agent-3.4.13.jar", \
30+
31+
ENTRYPOINT ["java", "-javaagent:BOOT-INF/classes/applicationinsights-agent-3.4.15.jar", \
3032
"-jar", "/app/azure-messaging-servicebus-stress/target/azure-messaging-servicebus-stress-1.0.0-beta.1.jar", \
3133
"--TEST_CLASS=MessageSender"]
Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,32 @@
11
matrix:
2-
image:
3-
- Dockerfile
42
scenarios:
5-
send-process:
6-
chaos: "true"
3+
composite:
4+
testDurationMin: 15
5+
processingConcurrency: 50
6+
sendRate: 160
7+
receiverTestClass: MessageProcessor
8+
senderTestClass: MessageSenderAsync
9+
abandonRatio: 0.002
10+
noDispositionRatio: 0.001
11+
lockRenewalNeededRatio: 0.01
12+
processCallbackDurationMaxInMs: 10
13+
autoRenewLock: "true"
14+
prefetchCount: 0
15+
lockDurationInMs: 5000
16+
chaos: "false"
17+
tryTimeout: 60
18+
delayStartMin: 0
19+
azureSdkLogLevel: 3
20+
azureSdkTracingEnabled: true
21+
applicationInsightsSamplingRate: 100
22+
applicationInsightsLogLevel: "INFO"
23+
imageBuildDir: ..\..\..\
24+
happy-case:
25+
testDurationMin: 15
26+
processingConcurrency: 64
27+
sendRate: 900
28+
receiverTestClass: MessageProcessor
29+
senderTestClass: MessageSenderAsync
30+
lockDurationInMs: 5000
31+
processCallbackDurationMaxInMs: 10
732
imageBuildDir: ..\..\..\

sdk/servicebus/azure-messaging-servicebus-stress/src/main/java/com/azure/messaging/servicebus/stress/ServiceBusScenarioRunner.java

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33

44
package com.azure.messaging.servicebus.stress;
55

6+
import com.azure.core.util.logging.ClientLogger;
67
import com.azure.messaging.servicebus.stress.scenarios.ServiceBusScenario;
8+
import com.azure.messaging.servicebus.stress.util.RunResult;
79
import com.azure.messaging.servicebus.stress.util.ScenarioOptions;
810
import org.springframework.beans.factory.annotation.Autowired;
911
import org.springframework.boot.ApplicationArguments;
@@ -19,7 +21,7 @@
1921
*/
2022
@SpringBootApplication
2123
public class ServiceBusScenarioRunner implements ApplicationRunner {
22-
24+
private static final ClientLogger LOGGER = new ClientLogger(ServiceBusScenarioRunner.class);
2325
@Autowired
2426
protected ApplicationContext applicationContext;
2527

@@ -36,10 +38,27 @@ public static void main(String[] args) {
3638
* @param args the application arguments. it should contain "--TEST_CLASS='your scenarios class name'".
3739
*/
3840
@Override
39-
public void run(ApplicationArguments args) {
41+
public void run(ApplicationArguments args) throws InterruptedException {
4042
String scenarioName = Objects.requireNonNull(options.getTestClass(),
4143
"The test class should be provided, please add --TEST_CLASS=<your test class> as start argument");
4244
ServiceBusScenario scenario = (ServiceBusScenario) applicationContext.getBean(scenarioName);
43-
scenario.run();
45+
46+
scenario.beforeRun();
47+
RunResult result = RunResult.INCONCLUSIVE;
48+
try {
49+
result = scenario.run();
50+
} catch (Exception ex) {
51+
LOGGER.error("Error running scenario", ex);
52+
result = RunResult.ERROR;
53+
} finally {
54+
scenario.afterRun(result);
55+
scenario.close();
56+
}
57+
58+
if (result == RunResult.ERROR) {
59+
throw LOGGER.logExceptionAsError(new RuntimeException("Test ended with error"));
60+
} else if (result == RunResult.WARNING) {
61+
throw LOGGER.logExceptionAsError(new RuntimeException("Test ended with warning"));
62+
}
4463
}
4564
}

sdk/servicebus/azure-messaging-servicebus-stress/src/main/java/com/azure/messaging/servicebus/stress/scenarios/MessageProcessor.java

Lines changed: 127 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,19 @@
55

66
import com.azure.core.util.logging.ClientLogger;
77
import com.azure.messaging.servicebus.ServiceBusProcessorClient;
8+
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
89
import com.azure.messaging.servicebus.ServiceBusReceivedMessageContext;
10+
import com.azure.messaging.servicebus.stress.util.RunResult;
911
import org.springframework.beans.factory.annotation.Value;
1012
import org.springframework.stereotype.Component;
1113

1214
import java.time.Duration;
15+
import java.time.OffsetDateTime;
1316
import java.util.concurrent.ThreadLocalRandom;
17+
import java.util.concurrent.atomic.AtomicReference;
1418

1519
import static com.azure.messaging.servicebus.stress.scenarios.TestUtils.blockingWait;
20+
import static com.azure.messaging.servicebus.stress.scenarios.TestUtils.createMessagePayload;
1621
import static com.azure.messaging.servicebus.stress.scenarios.TestUtils.getProcessorBuilder;
1722

1823
/**
@@ -22,45 +27,147 @@
2227
public class MessageProcessor extends ServiceBusScenario {
2328
private static final ClientLogger LOGGER = new ClientLogger(MessageProcessor.class);
2429

25-
@Value("${DURATION_IN_MINUTES:15}")
26-
private int testDurationInMinutes;
27-
28-
// lock duration is 5 sec, so in some cases we'll do lock renewal
29-
@Value("${PROCESS_CALLBACK_DURATION_MAX_IN_SECONDS:7}")
30-
private int processMessageDurationMaxInSeconds;
30+
@Value("${PROCESS_CALLBACK_DURATION_MAX_IN_MS:50}")
31+
private int processMessageDurationMaxInMs;
3132

3233
@Value("${MAX_CONCURRENT_CALLS:100}")
3334
private int maxConcurrentCalls;
3435

3536
@Value("${PREFETCH_COUNT:0}")
3637
private int prefetchCount;
3738

39+
@Value("${ABANDON_RATIO:0}")
40+
private double abandonRatio;
41+
42+
@Value("${NO_DISPOSITION_RATIO:0}")
43+
private double noDispositionRatio;
44+
45+
@Value("${LOCK_RENEWAL_NEEDED_RATIO:0}")
46+
private double lockRenewalNeededRatio;
47+
48+
@Value("${LOCK_DURATION_IN_MS:30000}")
49+
private int lockDurationInMs;
50+
51+
@Value("${AUTO_RENEW_LOCK:true}")
52+
private boolean renewLock;
53+
54+
private byte[] expectedPayload;
55+
56+
private final AtomicReference<RunResult> runResult = new AtomicReference<>(RunResult.INCONCLUSIVE);
57+
58+
3859
@Override
39-
public void run() {
40-
ServiceBusProcessorClient processor = getProcessorBuilder(options)
41-
.maxAutoLockRenewDuration(Duration.ofSeconds(processMessageDurationMaxInSeconds + 1))
60+
public RunResult run() throws InterruptedException {
61+
expectedPayload = createMessagePayload(options.getMessageSize());
62+
63+
ServiceBusProcessorClient processor = toClose(getProcessorBuilder(options)
64+
.maxAutoLockRenewDuration(renewLock ? Duration.ofMinutes(5) : Duration.ZERO)
4265
.maxConcurrentCalls(maxConcurrentCalls)
4366
.prefetchCount(prefetchCount)
4467
.processMessage(this::process)
4568
.processError(err -> {
46-
throw LOGGER.logExceptionAsError(new RuntimeException(err.getException()));
69+
LOGGER.atError()
70+
.addKeyValue("source", err.getErrorSource())
71+
.log("processor error", err.getException());
72+
runResult.set(RunResult.ERROR);
4773
})
48-
.buildProcessorClient();
49-
74+
.buildProcessorClient());
5075
processor.start();
51-
blockingWait(Duration.ofMinutes(testDurationInMinutes));
52-
processor.close();
76+
77+
blockingWait(options.getTestDuration());
78+
79+
int activeMessages = getRemainingQueueMessages();
80+
for (int extraMinutes = 0; extraMinutes < 3 && activeMessages > 0; extraMinutes++) {
81+
blockingWait(Duration.ofMinutes(1));
82+
activeMessages = getRemainingQueueMessages();
83+
}
84+
85+
return activeMessages != 0 ? RunResult.WARNING : runResult.get();
5386
}
5487

5588
private void process(ServiceBusReceivedMessageContext messageContext) {
89+
ServiceBusReceivedMessage message = messageContext.getMessage();
90+
if (checkMessage(message)) {
91+
blockingWait(Duration.ofMillis(getWaitTime()));
92+
settleMessage(messageContext);
93+
}
94+
}
95+
96+
private int getWaitTime() {
97+
ThreadLocalRandom random = ThreadLocalRandom.current();
98+
if (random.nextDouble(1) < lockRenewalNeededRatio) {
99+
return lockDurationInMs + 1000;
100+
} else if (processMessageDurationMaxInMs != 0) {
101+
return random.nextInt(processMessageDurationMaxInMs);
102+
}
103+
104+
return 0;
105+
}
106+
107+
private boolean checkMessage(ServiceBusReceivedMessage message) {
108+
LOGGER.atInfo()
109+
.addKeyValue("messageId", message.getMessageId())
110+
.addKeyValue("traceparent", message.getApplicationProperties().get("traceparent"))
111+
.addKeyValue("deliveryCount", message.getDeliveryCount())
112+
.addKeyValue("lockToken", message.getLockToken())
113+
.addKeyValue("lockedUntil", message.getLockedUntil())
114+
.log("message received");
115+
116+
if (message.getLockedUntil().isBefore(OffsetDateTime.now())) {
117+
LOGGER.atError()
118+
.addKeyValue("messageId", message.getMessageId())
119+
.addKeyValue("deliveryCount", message.getDeliveryCount())
120+
.log("message lock expired");
121+
runResult.set(RunResult.ERROR);
122+
return false;
123+
}
124+
125+
byte[] payload = message.getBody().toBytes();
126+
if (payload.length != expectedPayload.length) {
127+
LOGGER.atError()
128+
.addKeyValue("messageId", message.getMessageId())
129+
.addKeyValue("actualSize", payload.length)
130+
.addKeyValue("expectedSize", expectedPayload.length)
131+
.log("message corrupted");
132+
runResult.set(RunResult.ERROR);
133+
}
134+
135+
for (int i = 0; i < payload.length; i++) {
136+
if (payload[i] != expectedPayload[i]) {
137+
LOGGER.atError()
138+
.addKeyValue("messageId", message.getMessageId())
139+
.addKeyValue("index", i)
140+
.addKeyValue("actual", payload[i])
141+
.addKeyValue("expected", expectedPayload[i])
142+
.log("message corrupted");
143+
runResult.set(RunResult.ERROR);
144+
}
145+
}
146+
147+
return true;
148+
}
149+
150+
private void settleMessage(ServiceBusReceivedMessageContext messageContext) {
151+
String operation = "ignored";
56152
try {
57-
if (processMessageDurationMaxInSeconds != 0) {
58-
int processTimeMs = ThreadLocalRandom.current().nextInt(processMessageDurationMaxInSeconds * 1000);
59-
Thread.sleep(processTimeMs);
153+
double random = ThreadLocalRandom.current().nextDouble(1);
154+
if (random < abandonRatio) {
155+
operation = "abandoned";
156+
messageContext.abandon();
157+
} else if (random >= abandonRatio + noDispositionRatio) {
158+
operation = "completed";
159+
messageContext.complete();
60160
}
61-
messageContext.complete();
62-
} catch (Exception ex) {
63-
LOGGER.logThrowableAsWarning(ex);
161+
162+
LOGGER.atInfo()
163+
.addKeyValue("messageId", messageContext.getMessage().getMessageId())
164+
.addKeyValue("deliveryCount", messageContext.getMessage().getDeliveryCount())
165+
.log("message " + operation);
166+
} catch (RuntimeException ex) {
167+
runResult.set(RunResult.ERROR);
168+
LOGGER.atVerbose()
169+
.addKeyValue("messageId", messageContext.getMessage().getMessageId())
170+
.log("message settlement failed");
64171
}
65172
}
66173
}

sdk/servicebus/azure-messaging-servicebus-stress/src/main/java/com/azure/messaging/servicebus/stress/scenarios/MessageReceiver.java

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@
77
import com.azure.core.util.logging.ClientLogger;
88
import com.azure.messaging.servicebus.ServiceBusReceivedMessage;
99
import com.azure.messaging.servicebus.ServiceBusReceiverClient;
10+
import com.azure.messaging.servicebus.stress.util.RunResult;
1011
import org.springframework.beans.factory.annotation.Value;
1112
import org.springframework.stereotype.Component;
1213

14+
import java.time.Duration;
1315
import java.time.Instant;
14-
import java.time.temporal.ChronoUnit;
1516

17+
import static com.azure.messaging.servicebus.stress.scenarios.TestUtils.blockingWait;
1618
import static com.azure.messaging.servicebus.stress.scenarios.TestUtils.getReceiverBuilder;
1719

1820
/**
@@ -22,17 +24,15 @@
2224
public class MessageReceiver extends ServiceBusScenario {
2325
private static final ClientLogger LOGGER = new ClientLogger(MessageReceiver.class);
2426

25-
@Value("${DURATION_IN_MINUTES:15}")
26-
private int durationInMinutes;
27-
2827
@Value("${BATCH_SIZE:10}")
2928
private int batchSize;
3029

3130
@Override
32-
public void run() {
33-
long endAtEpochMillis = Instant.now().plus(durationInMinutes, ChronoUnit.MINUTES).toEpochMilli();
31+
public RunResult run() {
32+
RunResult result = RunResult.INCONCLUSIVE;
33+
long endAtEpochMillis = Instant.now().plus(options.getTestDuration()).toEpochMilli();
3434

35-
ServiceBusReceiverClient client = getReceiverBuilder(options, false).buildClient();
35+
ServiceBusReceiverClient client = toClose(getReceiverBuilder(options, false).buildClient());
3636

3737
while (Instant.now().toEpochMilli() < endAtEpochMillis) {
3838
IterableStream<ServiceBusReceivedMessage> receivedMessages = client.receiveMessages(batchSize);
@@ -43,25 +43,21 @@ public void run() {
4343
try {
4444
client.complete(receivedMessage);
4545
} catch (Throwable ex) {
46-
LOGGER.error("Completion error. messageId: {}, lockToken: {}",
47-
receivedMessage.getMessageId(),
48-
receivedMessage.getLockToken(),
49-
ex);
46+
LOGGER.atError()
47+
.addKeyValue("messageId", receivedMessage.getMessageId())
48+
.addKeyValue("lockToken", receivedMessage.getLockToken())
49+
.log("Completion error", ex);
50+
result = RunResult.ERROR;
5051
}
5152

5253
count++;
5354
}
5455

5556
if (count == 0) {
56-
try {
57-
// avoid busy looping
58-
Thread.sleep(100);
59-
} catch (InterruptedException e) {
60-
LOGGER.logExceptionAsError(new RuntimeException(e));
61-
}
57+
blockingWait(Duration.ofMillis(100));
6258
}
6359
}
6460

65-
client.close();
61+
return result;
6662
}
6763
}

0 commit comments

Comments
 (0)