Skip to content

Commit b5db8a0

Browse files
fix: ensure aiokafka commit is called with kafka.structs.TopicPartition (#539) (#541)
1 parent 87a80a9 commit b5db8a0

File tree

2 files changed

+27
-4
lines changed

2 files changed

+27
-4
lines changed

faust/transport/drivers/aiokafka.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -711,13 +711,15 @@ async def commit(self, offsets: Mapping[TP, int]) -> bool:
711711
async def _commit(self, offsets: Mapping[TP, int]) -> bool:
712712
consumer = self._ensure_consumer()
713713
now = monotonic()
714+
commitable_offsets = {
715+
tp: offset for tp, offset in offsets.items() if tp in self.assignment()
716+
}
714717
try:
715718
aiokafka_offsets = {
716-
tp: OffsetAndMetadata(offset, "")
717-
for tp, offset in offsets.items()
718-
if tp in self.assignment()
719+
ensure_aiokafka_TP(tp): OffsetAndMetadata(offset, "")
720+
for tp, offset in commitable_offsets.items()
719721
}
720-
self.tp_last_committed_at.update({tp: now for tp in aiokafka_offsets})
722+
self.tp_last_committed_at.update({tp: now for tp in commitable_offsets})
721723
await consumer.commit(aiokafka_offsets)
722724
except CommitFailedError as exc:
723725
if "already rebalanced" in str(exc):
@@ -1621,3 +1623,17 @@ def credentials_to_aiokafka_auth(
16211623
}
16221624
else:
16231625
return {"security_protocol": "PLAINTEXT"}
1626+
1627+
1628+
def ensure_aiokafka_TP(tp: TP) -> _TopicPartition:
1629+
"""Convert Faust ``TP`` to aiokafka ``TopicPartition``."""
1630+
return (
1631+
tp
1632+
if isinstance(tp, _TopicPartition)
1633+
else _TopicPartition(tp.topic, tp.partition)
1634+
)
1635+
1636+
1637+
def ensure_aiokafka_TPset(tps: Iterable[TP]) -> Set[_TopicPartition]:
1638+
"""Convert set of Faust ``TP`` to aiokafka ``TopicPartition``."""
1639+
return {ensure_aiokafka_TP(tp) for tp in tps}

tests/unit/transport/drivers/test_aiokafka.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
ThreadedProducer,
3535
Transport,
3636
credentials_to_aiokafka_auth,
37+
ensure_aiokafka_TPset,
3738
server_list,
3839
)
3940
from faust.types import TP
@@ -2038,3 +2039,9 @@ def test_credentials_to_aiokafka(credentials, ssl_context, expected):
20382039
def test_credentials_to_aiokafka__invalid():
20392040
with pytest.raises(ImproperlyConfigured):
20402041
credentials_to_aiokafka_auth(object())
2042+
2043+
2044+
def test_ensure_aiokafka_TPset():
2045+
actual = ensure_aiokafka_TPset({TP(topic="foo", partition=0)})
2046+
assert actual == {TopicPartition("foo", 0)}
2047+
assert all(isinstance(tp, TopicPartition) for tp in actual)

0 commit comments

Comments
 (0)