11import dataclasses
22import enum
33import logging
4- from datetime import datetime , timedelta
54import threading
5+ from collections import defaultdict
6+ from datetime import datetime , timedelta
67# Imports from typing are deprecated as of Python 3.9 but required for
78# compatibility with earlier versions
8- from typing import Dict , Iterable , Iterator , List , Optional , Set , Union , Collection
9- from collections import defaultdict
9+ from typing import ( Collection , Dict , Iterable , Iterator , List , Optional , Set ,
10+ Union )
1011
1112import confluent_kafka # type: ignore
1213import confluent_kafka .admin # type: ignore
1516from .errors import ErrorCallback , log_client_errors
1617from .oidc import set_oauth_cb
1718
19+
1820class LogicalOffset (enum .IntEnum ):
1921 BEGINNING = confluent_kafka .OFFSET_BEGINNING
2022 EARLIEST = confluent_kafka .OFFSET_BEGINNING
@@ -26,6 +28,7 @@ class LogicalOffset(enum.IntEnum):
2628
2729 INVALID = confluent_kafka .OFFSET_INVALID
2830
31+
2932class Consumer :
3033 conf : 'ConsumerConfig'
3134 _consumer : confluent_kafka .Consumer
@@ -99,14 +102,15 @@ def mark_done(self, msg: confluent_kafka.Message, asynchronous: bool = True):
99102 self ._consumer .commit (msg , asynchronous = False )
100103
101104 def _offsets_for_position (self , partitions : Collection [confluent_kafka .TopicPartition ],
102- position : Union [datetime , LogicalOffset ]) -> List [confluent_kafka .TopicPartition ]:
105+ position : Union [datetime , LogicalOffset ]) \
106+ -> List [confluent_kafka .TopicPartition ]:
103107 if isinstance (position , datetime ):
104108 offset = int (position .timestamp () * 1000 )
105109 elif isinstance (position , LogicalOffset ):
106110 offset = position
107111 else :
108112 raise TypeError ("Only datetime objects and logical offsets supported" )
109-
113+
110114 _partitions = [
111115 confluent_kafka .TopicPartition (topic = tp .topic , partition = tp .partition , offset = offset )
112116 for tp in partitions
@@ -248,6 +252,7 @@ def close(self):
248252 """ Close the consumer, ending its subscriptions. """
249253 self ._consumer .close ()
250254
255+
251256# Used to be called ConsumerStartPosition, though this was confusing because
252257# it only affects "auto.offset.reset" not the start position for a call to
253258# consume.
@@ -258,10 +263,12 @@ class ConsumerDefaultPosition(enum.Enum):
258263 def __str__ (self ):
259264 return self .name .lower ()
260265
266+
261267# Alias to the old name
262268# TODO: Remove alias on the next breaking release
263269ConsumerStartPosition = ConsumerDefaultPosition
264270
271+
265272@dataclasses .dataclass
266273class ConsumerConfig :
267274 broker_urls : List [str ]
0 commit comments