Skip to content

AIOKafkaConsumer.getone infinite loop. #712

@TEH30P

Description

@TEH30P

Describe the bug

If network was down during await AIOKafkaConsumer.getone the await will never get done.
Even wrapping in asyncio.wait_for doesn't help.

Expected behaviour

Some kind of exception should happen.

Environment (please complete the following information):

  • aiokafka version
#python -c "import aiokafka; print(aiokafka.__version__)
0.7.0
  • kafka-python version (python -c "import kafka; print(kafka.__version__)"):
#python -c "import kafka; print(kafka.__version__)"
2.0.2
  • Kafka Broker version (kafka-topics.sh --version):

Can not perform this.

  • Other information (Confluent Cloud version, etc.):
#python --version
Python 3.8.6
#hostnamectl 
   Static hostname: fws.tz.local
         Icon name: computer-vm
           Chassis: vm
        Machine ID: 00000000000000000000000000000000
           Boot ID: 00000000000000000000000000000000
    Virtualization: vmware
  Operating System: Fedora 32 (Workstation Edition)
       CPE OS Name: cpe:/o:fedoraproject:fedora:32
            Kernel: Linux 5.10.7-100.fc32.x86_64
      Architecture: x86-64

Reproducible example

import logging as m_log
import sys as m_sys
import io as m_io
import os as m_os
import asyncio as m_aio
import aiokafka as m_ak
import signal as m_sig


gv_log: m_log.Logger = m_log.getLogger(__name__)


class MyConsumerCommitter:
    def __init__(self, iv_consumer: m_ak.AIOKafkaConsumer, i_cache_file: str):
        self.l_partition = dict()
        self._consumer = iv_consumer
        self._cache_file = i_cache_file
        self._changed = False

    def _save(self) -> None:
        with m_io.open(self._cache_file, mode='r', encoding='utf-8', newline='\n') as v_rd:
            l_line_new: list = []
            l_partition_new: dict = self.l_partition.copy()

            for v_line in v_rd:
                l_line_parsed = v_line.split(maxsplit=2)
                v_tp_committed = m_ak.TopicPartition(l_line_parsed[2], int(l_line_parsed[1]))

                if v_tp_committed in self.l_partition:
                    l_line_parsed[0] = str(self.l_partition[v_tp_committed])
                    del l_partition_new[v_tp_committed]

                l_line_new.append(str(' ').join(l_line_parsed))
            l_line_new += \
                [f'{v_offset} {v_tp_committed.partition} {v_tp_committed.topic}' \
                for v_tp_committed, v_offset in l_partition_new.items()]

        with m_io.open(self._cache_file, mode='w', encoding='utf-8', newline='\n') as v_wr:
            v_wr.writelines(l_line_new)

    def _load(self) -> None:
        with m_io.open(self._cache_file, mode='r', encoding='utf-8', newline='\n') as v_rd:
            for v_line in v_rd:
                l_line_parsed = v_line.split(maxsplit=2)
                v_tp_committed = m_ak.TopicPartition(l_line_parsed[2], int(l_line_parsed[1]))

                if v_tp_committed in self.l_partition:
                    self.l_partition[v_tp_committed] = int(l_line_parsed[0])

    def partition_assign(self, il_partition) -> None:
        gv_log.info(f'Rebalancer: assigned {len(il_partition)}, {il_partition}')
        self._changed = True
        for v_tp in il_partition:
            if v_tp not in self.l_partition:
                self.l_partition[v_tp] = 0

        self._load()

        for v_tp in il_partition:
            self._consumer.seek(v_tp, self.l_partition[v_tp] + 1)

    def partition_revoke(self, il_partition) -> None:
        gv_log.info(f'Rebalancer: revoked {len(il_partition)}, {il_partition}')
        self._changed = True
        for v_tp in il_partition:
            del self.l_partition[v_tp]

    def is_changed(self) -> bool:
        if self._changed:
            self._changed = False
            return not self._changed
        else:
            return self._changed

    def offset_set(self, iv_topic, iv_partition, iv_offset) -> None:
        self.l_partition[m_ak.TopicPartition(iv_topic, iv_partition)] = iv_offset
        self._save()

    def offset_get(self, iv_topic, iv_partition) -> int:
        return self.l_partition[m_ak.TopicPartition(iv_topic, iv_partition)]


gv_running: bool = True

def on_signal_exit():
    gv_log.info("on signal to exit")
    global gv_running
    gv_running = False


async def consume():
    global gv_running
    v_loop = m_aio.get_running_loop()
    v_loop.add_signal_handler(m_sig.SIGTERM, on_signal_exit)
    v_loop.add_signal_handler(m_sig.SIGINT, on_signal_exit)
    gv_log.info(m_sys.argv)

    v_kc = m_ak.AIOKafkaConsumer(
        bootstrap_servers=['kafka1.tz.local:9092']
        , group_id='user1'
        , security_protocol='SASL_PLAINTEXT'
        , sasl_mechanism='PLAIN'
        , sasl_plain_username='user1'
        , sasl_plain_password='mUATUysl9BxpbdI'
        , auto_offset_reset='earliest'
        , enable_auto_commit=False)

    try:
        gv_log.info(f'{__file__} Begin')
        v_co_cache = MyConsumerCommitter(v_kc, m_os.path.splitext(__file__)[0] + '.commit.txt')

        gv_log.info(f'Kafka: +start')
        await v_kc.start()
        gv_log.info(f'Kafka: -start')
        #v_kc.subscribe(topics=['simple-01'], listener=MyConsumerRebalanceListener(v_co_cache))
        v_kc.subscribe(topics=['simple-01'],
                       listener=type('', (m_ak.ConsumerRebalanceListener,), {
                           'on_partitions_assigned': lambda self, assigned: v_co_cache.partition_assign(assigned),
                           'on_partitions_revoked': lambda self, revoked: v_co_cache.partition_revoke(revoked)
                           })()
        )

        l_end_offset: dict = {}
        gv_log.info(f'Init done')

        if v_co_cache.is_changed():
            gv_log.info(f'Kafka: on rebalance [early]')
            for v_tp, v_offs in v_co_cache.l_partition.items():
                v_kc.seek(v_tp, v_offs + 1)

        v_cnt: int = 10

        while gv_running and v_cnt:
            gv_log.info(f'+loop')
            v_cnt -= 1
            gv_log.info(f'Kafka: +getone')
            # !!!TODO: Here the moment when you must try to shutdown network.
            v_msg = await m_aio.wait_for(v_kc.getone(), timeout=60)
            gv_log.info(f'Kafka: -getone')

            if v_co_cache.is_changed():
                gv_log.info(f'Kafka: on rebalance')
                for v_tp, v_offs in v_co_cache.l_partition.items():
                    gv_log.info(f'Kafka: seek')
                    v_kc.seek(v_tp, v_offs + 1)
                continue

            gv_log.info(f': consumed {v_msg.partition}:{v_msg.offset}')

            v_co_cache.offset_set('simple-01', v_msg.partition, v_msg.offset)
            await m_aio.sleep(1)
            gv_log.info(f': -loop [{gv_running}]')

    except BaseException as x:
        gv_log.error(f'Err({x.__repr__()})')
        raise
    finally:
        gv_log.info(f'Kafka: +stop')
        await v_kc.stop()
        gv_log.info(f'Kafka: -stop')
        gv_log.info(f'End')

m_log.basicConfig(filename=(m_os.extsep.join(__file__.split(m_os.extsep)[:-1] + ['log'])), filemode='w', level=m_log.DEBUG)
m_aio.run(consume())

Part of the log with messages which were created at start of the infinite loop:

INFO:__main__:Kafka: +getone
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='simple-01', partition=0) at offset 17902478
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka1.tz.local port=9092> Request 7: FetchRequest_v4(replica_id=-1, max_wait_time=500, min_bytes=1, max_bytes=52428800, isolation_level=0, topics=[(topic='simple-01', partitions=[(partition=0, offset=17902478, max_bytes=1048576)])])
DEBUG:aiokafka.consumer.group_coordinator:Heartbeat: user1[5] aiokafka-0.7.0-66cefff7-c085-48a9-ac55-8d2010cd0667
DEBUG:aiokafka.conn:<AIOKafkaConnection host=kafka1.tz.local port=9092> Request 6: HeartbeatRequest_v1(group='user1', generation_id=5, member_id='aiokafka-0.7.0-66cefff7-c085-48a9-ac55-8d2010cd0667')
DEBUG:aiokafka.conn:Closing connection at kafka1.tz.local:9092
ERROR:aiokafka.consumer.fetcher:Failed fetch messages from 0: [Error 7] RequestTimedOutError
DEBUG:aiokafka:Initiating connection to node 0 at kafka1.tz.local:9092
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='simple-01', partition=0) at offset 17902478
DEBUG:aiokafka:Initiating connection to node 0 at kafka1.tz.local:9092
ERROR:aiokafka:Unable connect to node with id 0: [Errno -2] Name or service not known
ERROR:aiokafka:Unable to update metadata from [0]
ERROR:aiokafka:Unable connect to node with id 0: [Errno -2] Name or service not known
ERROR:aiokafka.consumer.fetcher:Failed fetch messages from 0: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 0).
DEBUG:aiokafka:Initiating connection to node 0 at kafka1.tz.local:9092
ERROR:aiokafka:Unable connect to node with id 0: [Errno -2] Name or service not known
ERROR:aiokafka:Unable to update metadata from [0]
DEBUG:aiokafka.consumer.fetcher:Adding fetch request for partition TopicPartition(topic='simple-01', partition=0) at offset 17902478
DEBUG:aiokafka:Initiating connection to node 0 at kafka1.tz.local:9092
ERROR:aiokafka:Unable connect to node with id 0: [Errno -2] Name or service not known
ERROR:aiokafka.consumer.fetcher:Failed fetch messages from 0: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 0).

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions