Skip to content

[Bug]: Concurrent exception with ReadFromKafka YAML transform on PrismRunner #35185

@charlespnh

Description

@charlespnh

What happened?

pipeline:
  type: chain
  transforms:
    - type: ReadFromKafka
      name: ReadFromMyTopic
      config:
        format: JSON
        schema: |
          {
            "type": "object",
            "properties": {
              "value": {"type": "string"}
            }
          }
        topic: test
        bootstrap_servers: kafka:9092
        auto_offset_reset_config: earliest

    - type: LogForTesting

Run this pipeline (Beam version 2.65.0) on prism runner

python -m apache_beam.yaml.main \
--yaml_pipeline_file=sdks/python/apache_beam/yaml/examples/transforms/io/kafka_read.yaml \
--runner PrismRunner

... gives java.util.ConcurrentModificationException:

RuntimeError: Pipeline job-001 failed in state FAILED: bundle inst004 stage-002 failed:org.apache.beam.sdk.util.UserCodeException: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-2-thread-6, id: 33) otherThread(id: 32)
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn$Unbounded$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForWindowObservingSizedElementAndRestriction(FnApiDoFnRunner.java:1123)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$1500(FnApiDoFnRunner.java:145)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:658)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$4.accept(FnApiDoFnRunner.java:653)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:348)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:275)
        at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:211)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.multiplexElements(BeamFnDataInboundObserver.java:231)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver.awaitCompletion(BeamFnDataInboundObserver.java:185)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:543)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:150)
        at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:115)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:163)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access. currentThread(name: pool-2-thread-6, id: 33) otherThread(id: 32)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquire(ClassicKafkaConsumer.java:1232)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.acquireAndEnsureOpen(ClassicKafkaConsumer.java:1213)
        at org.apache.kafka.clients.consumer.internals.ClassicKafkaConsumer.resume(ClassicKafkaConsumer.java:965)
        at org.apache.kafka.clients.consumer.KafkaConsumer.resume(KafkaConsumer.java:1524)
        at org.apache.beam.sdk.io.kafka.ReadFromKafkaDoFn.processElement(ReadFromKafkaDoFn.java:593)

Regardless if there's data read or not from the topic, it will give this exception.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions