Current AioKafkaBroker.kick does:
await self._aiokafka_producer.send( # type: ignore
topic=topic_name,
value=message.message,
)
AIOKafkaProducer.send returns a future that completes with RecordMetadata or raises (e.g. RequestTimedOutError, auth/SSL failures). The first await only enqueues into the producer buffer; the returned future is never awaited. Consequences:
- Transport-level send errors are silently lost; kick returns success and middlewares (e.g. post_send) run as if the message reached Kafka.
- In real cases (e.g. Azure EventHub Kafka-compatible with request_timeout_ms=10000), we see RequestTimedOutError logged by aiokafka but not propagated to TaskIQ.
Request: Change "kick" to call "send_and_wait" so that sending failures can propagate:
await self._aiokafka_producer.send_and_wait( # type: ignore
topic=topic_name,
value=message.message,
)
This preserves async behavior (no thread blocking) but ensures kick only succeeds after the broker confirms receipt, and errors are observable to the caller/middlewares.