Skip to content

Commit 68b3485

Browse files
committed
fix unit test
Change-Id: Ic811d4e0af800f145fb94122985965e1b0569915
1 parent 17bc455 commit 68b3485

File tree

9 files changed

+32
-10
lines changed

9 files changed

+32
-10
lines changed

broker/src/main/java/org/apache/rocketmq/broker/config/v1/RocksDBConsumerOffsetManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,7 @@ public void commitOffset(String clientHost, String group, String topic, int queu
197197
}
198198
}
199199
if (versionChangeCounter.incrementAndGet() % brokerController.getBrokerConfig().getConsumerOffsetUpdateVersionStep() == 0) {
200-
long stateMachineVersion = brokerController.getMessageStore() != null ? brokerController.getMessageStore().getStateMachineVersion() : 0;
201-
dataVersion.nextVersion(stateMachineVersion);
200+
updateDataVersion();
202201
}
203202
if (!brokerController.getBrokerConfig().isPersistConsumerOffsetIncrementally()) {
204203
return;

broker/src/main/java/org/apache/rocketmq/broker/processor/AckMessageProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,7 +515,7 @@ protected RemotingCommand ackLite(AckMessageRequestHeader requestHeader, BatchAc
515515
String lmqName = LiteUtil.toLmqName(requestHeader.getTopic(), requestHeader.getLiteTopic());
516516
long ackOffset = requestHeader.getOffset();
517517
long maxOffset = this.brokerController.getLiteLifecycleManager().getMaxOffsetInQueue(lmqName);
518-
if (ackOffset > maxOffset) { // TODO moling. minOffset
518+
if (ackOffset > maxOffset) {
519519
POP_LOGGER.warn("ack lite offset illegal, {}, {}, {}", lmqName, ackOffset, maxOffset);
520520
response.setCode(ResponseCode.NO_MESSAGE);
521521
response.setRemark("ack offset illegal.");

broker/src/main/java/org/apache/rocketmq/broker/processor/ChangeInvisibleTimeProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,7 @@ protected CompletableFuture<RemotingCommand> processChangeInvisibleTimeForLite(
372372
}
373373
String lmqName = LiteUtil.toLmqName(requestHeader.getTopic(), requestHeader.getLiteTopic());
374374
long maxOffset = this.brokerController.getLiteLifecycleManager().getMaxOffsetInQueue(lmqName);
375-
if (requestHeader.getOffset() > maxOffset) { // TODO moling. minOffset
375+
if (requestHeader.getOffset() > maxOffset) {
376376
POP_LOGGER.warn("process lite offset illegal, {}, {}, {}", lmqName, requestHeader.getOffset(), maxOffset);
377377
response.setCode(ResponseCode.NO_MESSAGE);
378378
return CompletableFuture.completedFuture(response);

broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -372,7 +372,8 @@ public StringBuilder transformOrderCountInfo(StringBuilder orderCountInfo, int m
372372
}
373373
}
374374

375-
private void recordPopLiteMetrics(GetMessageResult result, String parentTopic, String group) {
375+
@VisibleForTesting
376+
protected void recordPopLiteMetrics(GetMessageResult result, String parentTopic, String group) {
376377
Attributes attributes = this.brokerController.getBrokerMetricsManager().newAttributesBuilder()
377378
.put(LABEL_TOPIC, parentTopic)
378379
.put(LABEL_CONSUMER_GROUP, group)

broker/src/test/java/org/apache/rocketmq/broker/metrics/LiteConsumerLagCalculatorTest.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,9 +285,9 @@ public void testGetLagCountTopK_NormalCase() {
285285
LiteConsumerLagCalculator spyCalculator = spy(liteConsumerLagCalculator);
286286

287287
// Mock getStoreTimestamp method on the spy
288-
doReturn(timestamp1).when(spyCalculator).getStoreTimestamp(lmqName1, consumerOffset1 + 1);
289-
doReturn(timestamp2).when(spyCalculator).getStoreTimestamp(lmqName2, consumerOffset2 + 1);
290-
doReturn(timestamp3).when(spyCalculator).getStoreTimestamp(lmqName3, consumerOffset3 + 1);
288+
doReturn(timestamp1).when(spyCalculator).getStoreTimestamp(lmqName1, consumerOffset1);
289+
doReturn(timestamp2).when(spyCalculator).getStoreTimestamp(lmqName2, consumerOffset2);
290+
doReturn(timestamp3).when(spyCalculator).getStoreTimestamp(lmqName3, consumerOffset3);
291291

292292
// Mock getMaxOffset method on the spy
293293
doReturn(100L).when(spyCalculator).getMaxOffset(lmqName1);

broker/src/test/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessorTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -731,7 +731,7 @@ public void testSearchOffsetByTimestampWithLiteTopic() throws Exception {
731731

732732
SearchOffsetRequestHeader requestHeader = new SearchOffsetRequestHeader();
733733
requestHeader.setTopic(topic);
734-
requestHeader.setQueueId(1);
734+
requestHeader.setQueueId(0);
735735
requestHeader.setTimestamp(timestamp);
736736
requestHeader.setLiteTopic(liteTopic);
737737
requestHeader.setBoundaryType(BoundaryType.LOWER);

broker/src/test/java/org/apache/rocketmq/broker/processor/LiteManagerProcessorTest.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
3434
import org.apache.rocketmq.broker.metrics.LiteConsumerLagCalculator;
3535
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
36+
import org.apache.rocketmq.broker.offset.ConsumerOrderInfoManager;
3637
import org.apache.rocketmq.broker.subscription.SubscriptionGroupManager;
3738
import org.apache.rocketmq.broker.topic.TopicConfigManager;
3839
import org.apache.rocketmq.common.BrokerConfig;
@@ -123,6 +124,9 @@ public class LiteManagerProcessorTest {
123124
@Mock
124125
private LiteEventDispatcher liteEventDispatcher;
125126

127+
@Mock
128+
private PopLiteMessageProcessor popLiteMessageProcessor;
129+
126130
private LiteManagerProcessor processor;
127131

128132
@Before
@@ -136,9 +140,17 @@ public void setUp() {
136140
when(brokerController.getLiteSubscriptionRegistry()).thenReturn(liteSubscriptionRegistry);
137141
when(brokerController.getBrokerMetricsManager()).thenReturn(brokerMetricsManager);
138142
when(brokerController.getLiteEventDispatcher()).thenReturn(liteEventDispatcher);
143+
when(brokerController.getPopLiteMessageProcessor()).thenReturn(popLiteMessageProcessor);
144+
when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
145+
146+
ConsumerOrderInfoManager consumerOrderInfoManager = new ConsumerOrderInfoManager(brokerController);
147+
when(popLiteMessageProcessor.getConsumerOrderInfoManager()).thenReturn(consumerOrderInfoManager);
139148

140149
when(messageStore.getQueueStore()).thenReturn(consumeQueueStore);
150+
when(consumeQueueStore.getConsumeQueueTable()).thenReturn(new ConcurrentHashMap<>());
141151
when(brokerMetricsManager.getLiteConsumerLagCalculator()).thenReturn(liteConsumerLagCalculator);
152+
153+
when(consumerOffsetManager.getOffsetTable()).thenReturn(new ConcurrentHashMap<>());
142154
}
143155

144156
@Test
@@ -700,6 +712,11 @@ public void testTriggerLiteDispatch() throws Exception {
700712
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.TRIGGER_LITE_DISPATCH, requestHeader);
701713
request.makeCustomHeaderToNet();
702714

715+
SubscriptionGroupConfig groupConfig = new SubscriptionGroupConfig();
716+
groupConfig.setGroupName(group);
717+
groupConfig.setLiteBindTopic("parent_topic");
718+
when(subscriptionGroupManager.findSubscriptionGroupConfig(group)).thenReturn(groupConfig);
719+
703720
RemotingCommand response = processor.triggerLiteDispatch(ctx, request);
704721

705722
assertNotNull(response);

broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,11 @@
5959
import static org.junit.Assert.assertNotNull;
6060
import static org.junit.Assert.assertNull;
6161
import static org.junit.Assert.assertTrue;
62+
import static org.mockito.ArgumentMatchers.any;
6263
import static org.mockito.ArgumentMatchers.anyInt;
6364
import static org.mockito.ArgumentMatchers.anyLong;
6465
import static org.mockito.ArgumentMatchers.anyString;
66+
import static org.mockito.Mockito.doNothing;
6567
import static org.mockito.Mockito.when;
6668
import static org.mockito.Mockito.verify;
6769
import static org.mockito.Mockito.times;
@@ -336,6 +338,8 @@ public void testHandleGetMessageResult_found() {
336338
getResult.getMessageQueueOffset().add(0L);
337339
getResult.getMessageQueueOffset().add(1L);
338340

341+
doNothing().when(popLiteMessageProcessor).recordPopLiteMetrics(any(), anyString(), anyString());
342+
339343
Pair<StringBuilder, GetMessageResult> result = popLiteMessageProcessor.handleGetMessageResult(
340344
getResult, "parentTopic", "group", "lmqName", System.currentTimeMillis(), 6000L, "attemptId");
341345

common/src/main/java/org/apache/rocketmq/common/TopicAttributes.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.util.HashMap;
2020
import java.util.Map;
21+
import java.util.concurrent.TimeUnit;
2122
import org.apache.rocketmq.common.attribute.Attribute;
2223
import org.apache.rocketmq.common.attribute.EnumAttribute;
2324
import org.apache.rocketmq.common.attribute.LongRangeAttribute;
@@ -56,7 +57,7 @@ public class TopicAttributes {
5657
"lite.topic.expiration",
5758
true,
5859
-1,
59-
Long.MAX_VALUE,
60+
TimeUnit.DAYS.toMinutes(30),
6061
-1
6162
);
6263

0 commit comments

Comments
 (0)