Skip to content

Commit e984023

Browse files
authored
[ISSUE #9816] Fix calculate consumer lag with opentelemetry (#9873)
1 parent f33f626 commit e984023

File tree

3 files changed

+40
-20
lines changed

3 files changed

+40
-20
lines changed

broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopCommandCallback.java

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,31 @@
1717

1818
package org.apache.rocketmq.broker.longpolling;
1919

20+
import java.util.concurrent.CompletableFuture;
2021
import java.util.function.BiConsumer;
21-
import java.util.function.Consumer;
2222
import org.apache.rocketmq.broker.metrics.ConsumerLagCalculator;
2323
import org.apache.rocketmq.remoting.CommandCallback;
2424

2525
public class PopCommandCallback implements CommandCallback {
2626

2727
private final BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
28-
Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer;
29-
28+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult>> biConsumer;
3029
private final ConsumerLagCalculator.ProcessGroupInfo info;
31-
private final Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder;
32-
30+
private final CompletableFuture<ConsumerLagCalculator.CalculateLagResult> future;
3331

3432
public PopCommandCallback(
3533
BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
36-
Consumer<ConsumerLagCalculator.CalculateLagResult>> biConsumer,
34+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult>> biConsumer,
3735
ConsumerLagCalculator.ProcessGroupInfo info,
38-
Consumer<ConsumerLagCalculator.CalculateLagResult> lagRecorder) {
36+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult> future) {
3937

4038
this.biConsumer = biConsumer;
4139
this.info = info;
42-
this.lagRecorder = lagRecorder;
40+
this.future = future;
4341
}
4442

4543
@Override
4644
public void accept() {
47-
biConsumer.accept(info, lagRecorder);
45+
biConsumer.accept(info, future);
4846
}
4947
}

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -673,14 +673,8 @@ private void initLagAndDlqMetrics() {
673673
consumerLagMessages = brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_MESSAGES)
674674
.setDescription("Consumer lag messages")
675675
.ofLongs()
676-
.buildWithCallback(measurement ->
677-
consumerLagCalculator.calculateLag(result -> {
678-
// Note: 'record' method uses HashMap which may cause
679-
// concurrent access issues when Pull thread executes Pop callbacks.
680-
synchronized (this) {
681-
measurement.record(result.lag, buildLagAttributes(result));
682-
}
683-
}));
676+
.buildWithCallback(measurement -> consumerLagCalculator.calculateLag(result ->
677+
measurement.record(result.lag, buildLagAttributes(result))));
684678

685679
consumerLagLatency = brokerMeter.gaugeBuilder(GAUGE_CONSUMER_LAG_LATENCY)
686680
.setDescription("Consumer lag time")

broker/src/main/java/org/apache/rocketmq/broker/metrics/ConsumerLagCalculator.java

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,13 @@
1616
*/
1717
package org.apache.rocketmq.broker.metrics;
1818

19+
import java.util.ArrayList;
20+
import java.util.List;
1921
import java.util.Map;
2022
import java.util.Set;
23+
import java.util.concurrent.CompletableFuture;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.function.BiConsumer;
2126
import java.util.function.Consumer;
2227
import java.util.stream.Collectors;
2328
import org.apache.rocketmq.broker.BrokerController;
@@ -211,20 +216,43 @@ private void processAllGroup(Consumer<ProcessGroupInfo> consumer) {
211216
}
212217

213218
public void calculateLag(Consumer<CalculateLagResult> lagRecorder) {
219+
220+
List<CompletableFuture<CalculateLagResult>> futures = new ArrayList<>();
221+
222+
BiConsumer<ConsumerLagCalculator.ProcessGroupInfo,
223+
CompletableFuture<ConsumerLagCalculator.CalculateLagResult>> biConsumer =
224+
(info, future) -> calculate(info, future::complete);
225+
214226
processAllGroup(info -> {
215227
if (info.group == null || info.topic == null) {
216228
return;
217229
}
218-
230+
CompletableFuture<CalculateLagResult> future = new CompletableFuture<>();
219231
if (info.isPop && brokerConfig.isEnableNotifyBeforePopCalculateLag()) {
220232
if (popLongPollingService.notifyMessageArriving(info.topic, -1, info.group,
221-
true, null, 0, null, null, new PopCommandCallback(this::calculate, info, lagRecorder))) {
233+
true, null, 0, null, null,
234+
new PopCommandCallback(biConsumer, info, future))) {
235+
futures.add(future);
222236
return;
223237
}
224238
}
225-
226239
calculate(info, lagRecorder);
227240
});
241+
242+
// Set the maximum wait time to 10 seconds to avoid indefinite blocking
243+
// in case of a fast fail that causes the future to not complete its execution.
244+
try {
245+
CompletableFuture.allOf(futures.toArray(
246+
new CompletableFuture[0])).get(10, TimeUnit.SECONDS);
247+
248+
futures.forEach(future -> {
249+
if (future.isDone() && !future.isCompletedExceptionally()) {
250+
lagRecorder.accept(future.join());
251+
}
252+
});
253+
} catch (Exception e) {
254+
LOGGER.error("Calculate lag timeout after 10 seconds", e);
255+
}
228256
}
229257

230258
public void calculate(ProcessGroupInfo info, Consumer<CalculateLagResult> lagRecorder) {

0 commit comments

Comments
 (0)