Skip to content

Commit 7ee2faf

Browse files
committed
merge with master
Change-Id: Ida1ecd633464c59cce7ff9603de8670293d75ad0
2 parents bbcbfc5 + 607263b commit 7ee2faf

File tree

10 files changed

+411
-33
lines changed

10 files changed

+411
-33
lines changed

auth/src/main/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandler.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,11 @@ private int comparePolicyEntries(PolicyEntry o1, PolicyEntry o2) {
154154
// the decision deny has higher priority
155155
Decision d1 = o1.getDecision();
156156
Decision d2 = o2.getDecision();
157-
return d1 == Decision.DENY ? 1 : d2 == Decision.DENY ? -1 : 0;
157+
158+
if (d1 != d2) {
159+
return d1 == Decision.DENY ? -1 : 1;
160+
}
161+
return 0;
158162
}
159163

160164
private static void throwException(DefaultAuthorizationContext context, String detail) {

auth/src/test/java/org/apache/rocketmq/auth/authorization/chain/AclAuthorizationHandlerTest.java

Lines changed: 362 additions & 0 deletions
Large diffs are not rendered by default.

broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -676,9 +676,13 @@ private void initLagAndDlqMetrics() {
676676
.setDescription("Consumer lag messages")
677677
.ofLongs()
678678
.buildWithCallback(measurement -> {
679-
consumerLagCalculator.calculateLag(result ->
680-
measurement.record(result.lag, buildLagAttributes(result))
681-
);
679+
consumerLagCalculator.calculateLag(result -> {
680+
// Note: 'record' method uses HashMap which may cause
681+
// concurrent access issues when Pull thread executes Pop callbacks.
682+
synchronized (this) {
683+
measurement.record(result.lag, buildLagAttributes(result));
684+
}
685+
});
682686

683687
liteConsumerLagCalculator.calculateLiteLagCount(result ->
684688
measurement.record(result.lag, buildLagAttributes(result))

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@
135135
<caffeine.version>2.9.3</caffeine.version>
136136
<spring.version>5.3.27</spring.version>
137137
<okio-jvm.version>3.4.0</okio-jvm.version>
138-
<opentelemetry.version>1.29.0</opentelemetry.version>
139-
<opentelemetry-exporter-prometheus.version>1.29.0-alpha</opentelemetry-exporter-prometheus.version>
138+
<opentelemetry.version>1.44.1</opentelemetry.version>
139+
<opentelemetry-exporter-prometheus.version>1.44.1-alpha</opentelemetry-exporter-prometheus.version>
140140
<jul-to-slf4j.version>2.0.6</jul-to-slf4j.version>
141141
<s3.version>2.20.29</s3.version>
142142
<rocksdb.version>1.0.2</rocksdb.version>

proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -281,8 +281,9 @@ private CompletableFuture<PopResult> doPopLiteMessage(
281281
requestHeader, consumerGroup, topic, subscriptionData, popMessageResultFilter), this.executor);
282282
} catch (Throwable t) {
283283
future.completeExceptionally(t);
284+
FutureUtils.addExecutor(future, this.executor);
284285
}
285-
return FutureUtils.addExecutor(future, this.executor);
286+
return future;
286287
}
287288

288289
private void fillUniqIDIfNeed(MessageExt messageExt) {
@@ -426,8 +427,9 @@ public CompletableFuture<AckResult> changeInvisibleTime(ProxyContext ctx, Receip
426427
}, this.executor);
427428
} catch (Throwable t) {
428429
future.completeExceptionally(t);
430+
FutureUtils.addExecutor(future, this.executor);
429431
}
430-
return FutureUtils.addExecutor(future, this.executor);
432+
return future;
431433
}
432434

433435
protected String createHandle(String handleString, long commitLogOffset) {

proxy/src/main/java/org/apache/rocketmq/proxy/processor/ProducerProcessor.java

Lines changed: 16 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -102,29 +102,27 @@ public CompletableFuture<List<SendResult>> sendMessage(ProxyContext ctx, QueueSe
102102
messageList,
103103
requestHeader,
104104
timeoutMillis)
105-
.thenApplyAsync(sendResultList -> {
106-
for (SendResult sendResult : sendResultList) {
107-
int tranType = MessageSysFlag.getTransactionValue(requestHeader.getSysFlag());
108-
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
109-
tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
110-
StringUtils.isNotBlank(sendResult.getTransactionId())) {
111-
fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList);
105+
.whenCompleteAsync((sendResultList, throwable) -> {
106+
long endTimestamp = System.currentTimeMillis();
107+
if (throwable == null) {
108+
for (SendResult sendResult : sendResultList) {
109+
int tranType = MessageSysFlag.getTransactionValue(requestHeader.getSysFlag());
110+
if (SendStatus.SEND_OK.equals(sendResult.getSendStatus()) &&
111+
tranType == MessageSysFlag.TRANSACTION_PREPARED_TYPE &&
112+
StringUtils.isNotBlank(sendResult.getTransactionId())) {
113+
fillTransactionData(ctx, producerGroup, finalMessageQueue, sendResult, messageList);
114+
}
112115
}
116+
this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, false, true);
117+
} else {
118+
this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, true, false);
113119
}
114-
return sendResultList;
115-
}, this.executor)
116-
.whenComplete((result, exception) -> {
117-
long endTimestamp = System.currentTimeMillis();
118-
if (exception != null) {
119-
this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(), endTimestamp - beginTimestampFirst, true, false);
120-
} else {
121-
this.serviceManager.getTopicRouteService().updateFaultItem(finalMessageQueue.getBrokerName(),endTimestamp - beginTimestampFirst, false, true);
122-
}
123-
});
120+
}, this.executor);
124121
} catch (Throwable t) {
125122
future.completeExceptionally(t);
123+
FutureUtils.addExecutor(future, this.executor);
126124
}
127-
return FutureUtils.addExecutor(future, this.executor);
125+
return future;
128126
}
129127

130128
public CompletableFuture<String> recallMessage(ProxyContext ctx, String topic,

store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2538,7 +2538,7 @@ public void doReput() {
25382538
if (!isCommitLogAvailable) {
25392539
currentReputTimestamp = System.currentTimeMillis();
25402540
}
2541-
for (boolean doNext = true; isCommitLogAvailable && doNext; ) {
2541+
for (boolean doNext = true; isCommitLogAvailable() && doNext; ) {
25422542

25432543
SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);
25442544

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/core/MessageStoreFetcherImpl.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -210,8 +210,12 @@ public CompletableFuture<GetMessageResult> getMessageFromCacheAsync(
210210
// Pop revive will cause a large number of random reads,
211211
// so the amount of pre-fetch message num needs to be reduced.
212212
int fetchSize = maxCount == 1 ? 32 : storeConfig.getReadAheadMessageCountThreshold();
213+
214+
// In the current design, when the min offset cache expires,
215+
// this method may trigger an RPC call, causing buffer fetch thread starvation
213216
return fetchMessageThenPutToCache(flatFile, queueOffset, fetchSize)
214-
.thenApply(maxOffset -> getMessageFromCache(flatFile, queueOffset, maxCount, messageFilter));
217+
.thenApplyAsync(maxOffset -> getMessageFromCache(flatFile, queueOffset, maxCount, messageFilter),
218+
messageStore.getStoreExecutor().commonExecutor);
215219
}
216220

217221
public CompletableFuture<GetMessageResultExt> getMessageFromTieredStoreAsync(
@@ -438,6 +442,10 @@ public CompletableFuture<QueryMessageResult> queryMessageAsync(
438442
if (flatFile == null) {
439443
continue;
440444
}
445+
if (indexItem.getOffset() < flatFile.getCommitLogMinOffset() ||
446+
indexItem.getOffset() > flatFile.getCommitLogMaxOffset()) {
447+
continue;
448+
}
441449
CompletableFuture<SelectMappedBufferResult> getMessageFuture = flatFile
442450
.getCommitLogAsync(indexItem.getOffset(), indexItem.getSize())
443451
.thenApply(messageBuffer -> new SelectMappedBufferResult(

tieredstore/src/main/java/org/apache/rocketmq/tieredstore/index/IndexStoreService.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Map;
2929
import java.util.Set;
3030
import java.util.concurrent.CompletableFuture;
31-
import java.util.concurrent.ConcurrentHashMap;
3231
import java.util.concurrent.ConcurrentNavigableMap;
3332
import java.util.concurrent.ConcurrentSkipListMap;
3433
import java.util.concurrent.TimeUnit;
@@ -238,15 +237,15 @@ public CompletableFuture<List<IndexItem>> queryAsync(
238237
ConcurrentNavigableMap<Long, IndexFile> pendingMap =
239238
this.timeStoreTable.subMap(beginTime, true, endTime, true);
240239
List<CompletableFuture<Void>> futureList = new ArrayList<>(pendingMap.size());
241-
ConcurrentHashMap<String /* queueId-offset */, IndexItem> result = new ConcurrentHashMap<>();
240+
ConcurrentSkipListMap<String /* queueId-offset */, IndexItem> result = new ConcurrentSkipListMap<>();
242241

243242
for (Map.Entry<Long, IndexFile> entry : pendingMap.descendingMap().entrySet()) {
244243
CompletableFuture<Void> completableFuture = entry.getValue()
245244
.queryAsync(topic, key, maxCount, beginTime, endTime)
246245
.thenAccept(itemList -> itemList.forEach(indexItem -> {
247246
if (result.size() < maxCount) {
248247
result.put(String.format(
249-
"%d-%d", indexItem.getQueueId(), indexItem.getOffset()), indexItem);
248+
"%d-%20d", indexItem.getQueueId(), indexItem.getOffset()), indexItem);
250249
}
251250
}));
252251
futureList.add(completableFuture);
@@ -349,7 +348,8 @@ public void destroyExpiredFile(long expireTimestamp) {
349348
flatAppendFile.destroyExpiredFile(expireTimestamp);
350349
timeStoreTable.entrySet().removeIf(entry ->
351350
IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()) &&
352-
entry.getKey() < flatAppendFile.getMinTimestamp());
351+
(flatAppendFile.getFileSegmentList().isEmpty() ||
352+
entry.getKey() < flatAppendFile.getMinTimestamp()));
353353
int tableSize = (int) timeStoreTable.entrySet().stream()
354354
.filter(entry -> IndexFile.IndexStatusEnum.UPLOAD.equals(entry.getValue().getFileStatus()))
355355
.count();

tools/src/main/java/org/apache/rocketmq/tools/command/CommandUtil.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ public static String fetchMasterAddrByBrokerName(final MQAdminExt adminExt,
144144
return addr;
145145
}
146146
}
147-
throw new Exception(String.format("No broker address for broker name %s.%n", brokerData));
147+
throw new Exception(String.format("No broker address for broker name %s.", brokerName));
148148
}
149149

150150
public static Set<String> fetchMasterAndSlaveAddrByBrokerName(final MQAdminExt adminExt, final String brokerName)

0 commit comments

Comments
 (0)