Skip to content
Draft
7 changes: 1 addition & 6 deletions faust/tables/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -706,11 +706,6 @@ async def _build_offsets(
# -- Update offsets
# Offsets may have been compacted, need to get to the recent ones
earliest = await consumer.earliest_offsets(*tps)
# FIXME To be consistent with the offset -1 logic
earliest = {
tp: offset - 1 if offset is not None else None
for tp, offset in earliest.items()
}

for tp in tps:
last_value = destination[tp]
Expand All @@ -723,7 +718,7 @@ async def _build_offsets(
elif new_value is None:
destination[tp] = last_value
else:
destination[tp] = max(last_value, new_value)
destination[tp] = max(last_value, new_value - 1)

if destination:
self.log.info(
Expand Down
2 changes: 1 addition & 1 deletion tests/unit/tables/test_recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ async def test__build_offsets_with_none(self, *, recovery, app) -> None:
destination = {TP1: None, TP2: 1, TP3: 8, TP4: -1}
await recovery._build_offsets(consumer, tps, destination, "some-title")
assert len(destination) == 4
assert destination[TP1] == -1
assert destination[TP1] == 0
assert destination[TP2] == 2
assert destination[TP3] == 8
assert destination[TP4] == -1
Expand Down
Loading