Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,23 @@ public long queryOffset(String group, String topic, int queueId) {
}
}

@Override
public boolean hasOffsetRecord(final String group, final String topic) {
if (!MixAll.isLmq(topic)) {
return super.hasOffsetRecord(group, topic);
}

ByteBuf keyBuf = keyOfConsumerOffset(group, topic, 0);
try {
byte[] slice = configStorage.get(keyBuf.nioBuffer());
return slice != null;
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
keyBuf.release();
}
}

@Override
public void commitPullOffset(String clientHost, String group, String topic, int queueId, long offset) {
if (!MixAll.isLmq(topic)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,20 @@ public long queryOffset(final String group, final String topic, final int queueI
return -1L;
}

public boolean hasOffsetRecord(final String group, final String topic) {
String key = topic + TOPIC_GROUP_SEPARATOR + group;

if (this.brokerController.getBrokerConfig().isUseServerSideResetOffset()) {
Map<Integer, Long> reset = resetOffsetTable.get(key);
if (reset != null) {
return true;
}
}

ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
return map != null;
}

/**
* Query pull offset in pullOffsetTable
* @param group Consumer group
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,18 @@ public Map<Integer, Long> queryOffset(final String group, final String topic) {
return map;
}

@Override
public boolean hasOffsetRecord(final String group, final String topic) {
if (!MixAll.isLmq(group)) {
return super.hasOffsetRecord(group, topic);
}
// topic@group
String key = topic + TOPIC_GROUP_SEPARATOR + group;

Long offset = lmqOffsetTable.get(key);
return offset != null;
}

@Override
public void commitOffset(final String clientHost, final String group, final String topic, final int queueId,
final long offset) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -327,18 +327,20 @@ private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingC
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
} else {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, do not set to zero, maybe this group boot first");
} else if (minOffset <= 0
&& this.brokerController.getMessageStore().checkInMemByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
if (this.brokerController.getConsumerOffsetManager().hasOffsetRecord(requestHeader.getConsumerGroup(), requestHeader.getTopic())) {
long minOffset =
this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),
requestHeader.getQueueId());
if (requestHeader.getSetZeroIfNotFound() != null && Boolean.FALSE.equals(requestHeader.getSetZeroIfNotFound())) {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, do not set to zero, maybe this group boot first");
} else if (minOffset <= 0
&& this.brokerController.getMessageStore().checkInMemByConsumeOffset(
requestHeader.getTopic(), requestHeader.getQueueId(), 0, 1)) {
responseHeader.setOffset(0L);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
}
} else {
response.setCode(ResponseCode.QUERY_NOT_FOUND);
response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public void testQueryConsumerOffset() throws RemotingCommandException, Execution

when(brokerController.getConsumerOffsetManager()).thenReturn(consumerOffsetManager);
when(consumerOffsetManager.queryOffset(anyString(),anyString(),anyInt())).thenReturn(0L);
when(consumerOffsetManager.hasOffsetRecord(anyString(), anyString())).thenReturn(true);
response = consumerManageProcessor.processRequest(handlerContext, request);
assertThat(response).isNotNull();
assertThat(response.getCode()).isEqualTo(ResponseCode.SUCCESS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,7 @@ public void testFilterConsumer() throws Exception {
producer.send("TagC", msgSize);
Assert.assertEquals("Not all sent succeeded", msgSize * 3, producer.getAllUndupMsgBody().size());
consumer.getListener().waitForMessageConsume(msgSize * 2, CONSUME_TIME);
assertThat(producer.getAllMsgBody())
.containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(),
consumer.getListener().getAllMsgBody()));
assertThat(producer.getAllMsgBody()).containsAllIn(VerifyUtils.getFilterdMessage(producer.getAllMsgBody(), consumer.getListener().getAllMsgBody()));

assertThat(consumer.getListener().getAllMsgBody().size()).isEqualTo(msgSize * 2);
}
Expand All @@ -90,7 +88,8 @@ public void testFilterPullConsumer() throws Exception {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer(group);
consumer.setNamesrvAddr(NAMESRV_ADDR);
consumer.start();
Thread.sleep(3000);
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().updateTopicRouteInfoFromNameServer(topic);
consumer.getDefaultMQPullConsumerImpl().getRebalanceImpl().getmQClientFactory().sendHeartbeatToAllBrokerWithLock();
producer.send("TagA", msgSize);
producer.send("TagB", msgSize);
producer.send("TagC", msgSize);
Expand All @@ -102,8 +101,7 @@ public void testFilterPullConsumer() throws Exception {
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
PullResult pullResult = consumer.pull(mq, selector, getMessageQueueOffset(mq), 32);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
Expand Down
Loading