diff --git a/faust/tables/recovery.py b/faust/tables/recovery.py index 11737aef0..936b514ee 100644 --- a/faust/tables/recovery.py +++ b/faust/tables/recovery.py @@ -706,11 +706,6 @@ async def _build_offsets( # -- Update offsets # Offsets may have been compacted, need to get to the recent ones earliest = await consumer.earliest_offsets(*tps) - # FIXME To be consistent with the offset -1 logic - earliest = { - tp: offset - 1 if offset is not None else None - for tp, offset in earliest.items() - } for tp in tps: last_value = destination[tp] @@ -723,7 +718,7 @@ async def _build_offsets( elif new_value is None: destination[tp] = last_value else: - destination[tp] = max(last_value, new_value) + destination[tp] = max(last_value, new_value - 1) if destination: self.log.info( diff --git a/tests/unit/tables/test_recovery.py b/tests/unit/tables/test_recovery.py index 81fcad8d5..2790d6a29 100644 --- a/tests/unit/tables/test_recovery.py +++ b/tests/unit/tables/test_recovery.py @@ -249,7 +249,7 @@ async def test__build_offsets_with_none(self, *, recovery, app) -> None: destination = {TP1: None, TP2: 1, TP3: 8, TP4: -1} await recovery._build_offsets(consumer, tps, destination, "some-title") assert len(destination) == 4 - assert destination[TP1] == -1 + assert destination[TP1] == 0 assert destination[TP2] == 2 assert destination[TP3] == 8 assert destination[TP4] == -1