diff --git a/aiokafka/consumer/consumer.py b/aiokafka/consumer/consumer.py index 559fa35a..50b29535 100644 --- a/aiokafka/consumer/consumer.py +++ b/aiokafka/consumer/consumer.py @@ -336,6 +336,7 @@ def __init__( if loop.get_debug(): self._source_traceback = traceback.extract_stack(sys._getframe(1)) self._closed = False + self._started: bool = False if topics: topics = self._validate_topics(topics) @@ -446,6 +447,11 @@ async def start(self): await self._client.force_metadata_update() self._coordinator.assign_all_partitions(check_unknown=True) + self._started = True + + def _assert_started(self): + assert self._started, "The consumer needs to start() before it can be used" + async def _wait_topics(self): if self._subscription.subscription is not None: for topic in self._subscription.subscription.topics: @@ -510,6 +516,7 @@ async def stop(self): * Commit last consumed message if autocommit enabled * Leave group if used Consumer Groups """ + self._assert_started() if self._closed: return log.debug("Closing the KafkaConsumer.") @@ -575,6 +582,7 @@ async def commit(self, offsets=None): .. _kafka-python: https://github.com/dpkp/kafka-python """ + self._assert_started() if self._group_id is None: raise IllegalOperation("Requires group_id") @@ -615,6 +623,7 @@ async def committed(self, partition): Raises: IllegalOperation: If used with ``group_id == None`` """ + self._assert_started() if self._group_id is None: raise IllegalOperation("Requires group_id") @@ -633,6 +642,7 @@ async def topics(self): Returns: set: topics """ + self._assert_started() cluster = await self._client.fetch_all_metadata() return cluster.topics() @@ -669,6 +679,7 @@ async def position(self, partition): :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned partition """ + self._assert_started() while True: if not self._subscription.is_assigned(partition): raise IllegalStateError(f"Partition {partition} is not assigned") @@ -799,6 +810,7 @@ async def seek_to_beginning(self, *partitions): .. versionadded:: 0.3.0 """ + self._assert_started() if not all(isinstance(p, TopicPartition) for p in partitions): raise TypeError("partitions must be TopicPartition instances") @@ -839,6 +851,7 @@ async def seek_to_end(self, *partitions): .. versionadded:: 0.3.0 """ + self._assert_started() if not all(isinstance(p, TopicPartition) for p in partitions): raise TypeError("partitions must be TopicPartition instances") @@ -883,6 +896,7 @@ async def seek_to_committed(self, *partitions): :exc:`~aiokafka.errors.IllegalStateError` in case of unassigned partition """ + self._assert_started() if not all(isinstance(p, TopicPartition) for p in partitions): raise TypeError("partitions must be TopicPartition instances") @@ -939,6 +953,7 @@ async def offsets_for_times(self, timestamps): .. versionadded:: 0.3.0 """ + self._assert_started() if self._client.api_version <= (0, 10, 0): raise UnsupportedVersionError( "offsets_for_times API not supported" @@ -981,6 +996,7 @@ async def beginning_offsets(self, partitions): .. versionadded:: 0.3.0 """ + self._assert_started() if self._client.api_version <= (0, 10, 0): raise UnsupportedVersionError( "offsets_for_times API not supported" @@ -1018,6 +1034,7 @@ async def end_offsets(self, partitions): .. versionadded:: 0.3.0 """ + self._assert_started() if self._client.api_version <= (0, 10, 0): raise UnsupportedVersionError( "offsets_for_times API not supported" @@ -1149,6 +1166,7 @@ async def getone(self, *partitions) -> ConsumerRecord: print(message.offset, message.key, message.value) """ + self._assert_started() assert all(isinstance(k, TopicPartition) for k in partitions) if self._closed: raise ConsumerStoppedError() @@ -1196,6 +1214,7 @@ async def getmany( print(message.offset, message.key, message.value) """ + self._assert_started() assert all(isinstance(k, TopicPartition) for k in partitions) if self._closed: raise ConsumerStoppedError()