-
Notifications
You must be signed in to change notification settings - Fork 12k
Description
Problem View
When we set CONSUME_FROM_LAST_OFFSET to pull data, if it is the first time to start the consumer group, data may be pulled from the 0L. This is not what we expected.
I built a demo to reproduce this situation.
First sent the data using the producer:

Set consumer mode as CONSUME_FROM_LAST_OFFSET.

At the first startup, the consumption situation obtained should theoretically not pull the previous message, but the message is still pulled from the beginning:

Location the problem
When a consumer group needs to pull messages, it obtains the initial location to be consumed according to the startup mode.

In cluster mode, the offset is read from the broker side. (cluster mode has the above problem)
In broadcast mode, the offset of the local record is read, and -1 is returned if it is not read.
In broker, in ConsumerManageProcessor#queryConsumerOffset, the broker will query the consumeoffset by consumegroup, topic and queueId.
● If the offsetTable records the consumption information of this consumer group under this topic and this queueId, return this recorded consumeoffset
● If there is no record in the offsetTable, the minimum offset in the consumequeue will be obtained. (When the minimum offset is less than or equal to 0 && offset 0 is in memory, it returns to 0) -> This leads that when consumequeue does not clean up expired files, the minimum offset is 0 and it is in memory , 0 will be returned, and the client receives an offset of 0, so it starts to consume from the starting position.

Solution
When querying consumeoffset, set ZeroIfNotFound false in the requestHeader.
So, when the previous record is not obtained, it will no longer return 0.

Another Question
The above solution can solve this problem, but I still have some doubts about the original implementation method
In ConsumerManageProcessor#queryConsumerOffset,why we need to set offset as 0L, when minOffset <= 0 and offset 0L is in memory. Can we return response just setCode as ResponseCode.QUERY_NOT_FOUND?
