Skip to content

Consumer not reconnecting on lost session on proxy connectivity #1233

@fd8s0

Description

@fd8s0

This behaviour doesn't seem to apply when not using a kafka proxy. Connecting directly we observe the stream always recomposes itself.

Example code:

import org.apache.kafka.clients.consumer.ConsumerConfig
import zio._
import zio.kafka.consumer.{Consumer, ConsumerSettings, RebalanceListener, Subscription}
import zio.kafka.serde.{Deserializer, Serializer}

object ZioKafkaProxyTest extends ZIOAppDefault {

  private val Props = Map(
    ConsumerConfig.GROUP_ID_CONFIG                 -> "zio-kafka-test",
    ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer",
    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG        -> "earliest"
  )

  override def run: ZIO[Any with ZIOAppArgs with Scope, Any, Any] = {
    for {
      consumer <- Consumer.make(
        ConsumerSettings(List("kafka-proxy:9192"))
          .withProperties(Props)
          .withPollTimeout(1.second)
          .withRebalanceListener(RebalanceListener(
            (assigned, _) => ZIO.logInfo(s"assigned: ${assigned.map(_.partition())}"),
            (revoked, _) => ZIO.logInfo(s"revoked: ${revoked.map(_.partition())}"),
            (lost, _) => ZIO.logInfo(s"lost: ${lost.map(_.partition())}"),
          ))
      )
      _ <-
        consumer.plainStream(Subscription.topics("topic"), Deserializer.string, Serializer.string, 2)
          .mapZIO(m => ZIO.logInfo(m.value))
          .runDrain
    } yield ExitCode.success
  }

}

In version 2.3.2 it goes something like this:

timestamp=2024-04-30T16:58:09.273167Z level=INFO thread=#zio-fiber-19 message="assigned: Set(0)" 
proxy stopped
wait for session timeout
proxy start
timestamp=2024-04-30T16:58:40.757980Z level=INFO thread=#zio-fiber-90 message="lost: Set(0)" 
timestamp=2024-04-30T16:58:43.769893Z level=INFO thread=#zio-fiber-97 message="assigned: Set(0)"
stream resumes

In version 2.7.4 it goes:

timestamp=2024-04-30T15:26:46.500919Z level=INFO thread=#zio-fiber-18 message="assigned: Set(0)"
stopped proxy
wait for session timeout
started proxy
timestamp=2024-04-30T15:29:32.603663Z level=INFO thread=#zio-fiber-22 message="lost: Set(0)" 
hangs indefinitely
when stopping:
2024-04-30 16:31:57 ERROR ConsumerCoordinator:1201 - [Consumer clientId=consumer-zio-kafka-test-1, groupId=zio-kafka-test] LeaveGroup request with Generation{generationId=-1, memberId='consumer-zio-kafka-test-1-e2669255-c590-4ac7-9457-a585dfbf3969', protocol='null'} failed with error: The coordinator is not aware of this member.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions