Skip to content

[Bug]: Error reading RabbitMQ messages with headers containing List values with nested objects #32885

@devfreitag

Description

@devfreitag

What happened?

Apache Beam version: 2.59.0
Java version: 17

Follow-up to issue #31397, where I previously reported an error when trying to read RabbitMQ messages with headers containing a List type value. While that was resolved, I've now encountered a new error that occurs when the headers contain a List value with an object.

It works:

headers = {
    'abc': 'abc-123456',
    'xyz': 'xyz-123456'
}

image

It doesn't works:

headers = {
    'abc': 'abc-123456',
    'xyzDict': {
        'xyz': 'xyz-123456'
    }
}

image

Stacktrace:

Exception in thread "main" java.lang.IllegalArgumentException: Forbidden IOException when writing to OutputStream
	at org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:89)
	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:70)
	at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:55)
	at org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:168)
	at org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:118)
	at org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:49)
	at org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:115)
	at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:145)
	at org.apache.beam.runners.direct.DirectTransformExecutor.processElements(DirectTransformExecutor.java:165)
	at org.apache.beam.runners.direct.DirectTransformExecutor.run(DirectTransformExecutor.java:129)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString
	at java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1187)
	at java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:350)
Caused by: java.io.NotSerializableException: com.rabbitmq.client.impl.LongStringHelper$ByteArrayLongString

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions