Skip to content

[Bug]: Eliminate field sorting in Beam schema-aware transforms which can result in compatibility issues #36496

@chamikaramj

Description

@chamikaramj

What happened?

Tests are failing due to failing to construct the Kafka IO schema-transform using a Beam Row created using an older Beam version. This is done to perform upgrading Kafka for pipelines that use old Beam versions.

Stack-trace:

INFO 2025-10-13T15:36:48.915770337Z E1013 15:36:48.792550 14 managed_transforms_worker_main.cc:272] Failed to upgrade using the expansion service manager: INTERNAL: Expansion request failed: java.lang.IllegalStateException: Expected field number 3 for field + redistributeNumKeys instead got 2
INFO 2025-10-13T15:36:48.915774910Z at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState(Preconditions.java:512)
INFO 2025-10-13T15:36:48.915782126Z at org.apache.beam.sdk.schemas.AutoValueSchema.validateFieldNumbers(AutoValueSchema.java:80)
INFO 2025-10-13T15:36:48.915786418Z at org.apache.beam.sdk.schemas.AutoValueSchema.access$000(AutoValueSchema.java:38)
INFO 2025-10-13T15:36:48.915790340Z at org.apache.beam.sdk.schemas.AutoValueSchema$AbstractGetterTypeSupplier.get(AutoValueSchema.java:68)
INFO 2025-10-13T15:36:48.915795972Z at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:107)
INFO 2025-10-13T15:36:48.915802211Z at org.apache.beam.sdk.schemas.utils.StaticSchemaInference.schemaFromClass(StaticSchemaInference.java:89)
INFO 2025-10-13T15:36:48.915806181Z at org.apache.beam.sdk.schemas.utils.JavaBeanUtils.schemaFromJavaBeanClass(JavaBeanUtils.java:79)
INFO 2025-10-13T15:36:48.915810433Z at org.apache.beam.sdk.schemas.AutoValueSchema.schemaFor(AutoValueSchema.java:146)
INFO 2025-10-13T15:36:48.915815235Z at org.apache.beam.sdk.schemas.annotations.DefaultSchema$DefaultSchemaProvider.schemaFor(DefaultSchema.java:148)
INFO 2025-10-13T15:36:48.915837462Z at org.apache.beam.sdk.schemas.SchemaRegistry.lambda$getSchema$0(SchemaRegistry.java:245)
INFO 2025-10-13T15:36:48.915842880Z at org.apache.beam.sdk.schemas.SchemaRegistry.getProviderResult(SchemaRegistry.java:331)
INFO 2025-10-13T15:36:48.915847526Z at org.apache.beam.sdk.schemas.SchemaRegistry.getSchema(SchemaRegistry.java:245)
INFO 2025-10-13T15:36:48.915853469Z at org.apache.beam.sdk.schemas.SchemaRegistry.getSchema(SchemaRegistry.java:233)
INFO 2025-10-13T15:36:48.915857238Z at org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider.configurationSchema(TypedSchemaTransformProvider.java:96)
INFO 2025-10-13T15:36:48.915860690Z at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider$ManagedSchemaTransform.<init>(ManagedSchemaTransformProvider.java:163)
INFO 2025-10-13T15:36:48.915865414Z at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.from(ManagedSchemaTransformProvider.java:152)
INFO 2025-10-13T15:36:48.915869461Z at org.apache.beam.sdk.managed.ManagedSchemaTransformProvider.from(ManagedSchemaTransformProvider.java:57)
INFO 2025-10-13T15:36:48.915873575Z at org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider.from(TypedSchemaTransformProvider.java:111)
INFO 2025-10-13T15:36:48.915878155Z at org.apache.beam.sdk.expansion.service.ExpansionServiceSchemaTransformProvider.getTransform(ExpansionServiceSchemaTransformProvider.java:137)
INFO 2025-10-13T15:36:48.915882745Z at org.apache.beam.sdk.expansion.service.ExpansionService$TransformProviderForPayloadTranslator.getTransform(ExpansionService.java:263)
INFO 2025-10-13T15:36:48.915888115Z at org.apache.beam.sdk.expansion.service.TransformProvider.apply(TransformProvider.java:121)
INFO 2025-10-13T15:36:48.915891688Z at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:657)
INFO 2025-10-13T15:36:48.915896062Z at org.apache.beam.sdk.expansion.service.ExpansionService.expand(ExpansionService.java:758)
INFO 2025-10-13T15:36:48.915914630Z at org.apache.beam.model.expansion.v1.ExpansionServiceGrpc$MethodHandlers.invoke(ExpansionServiceGrpc.java:306)
INFO 2025-10-13T15:36:48.915951892Z at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182)
INFO 2025-10-13T15:36:48.915960032Z at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:356)
INFO 2025-10-13T15:36:48.915964459Z at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:861)
INFO 2025-10-13T15:36:48.915968683Z at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
INFO 2025-10-13T15:36:48.915972398Z at org.apache.beam.vendor.grpc.v1p69p0.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
INFO 2025-10-13T15:36:48.915976930Z at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
INFO 2025-10-13T15:36:48.915998564Z at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
INFO 2025-10-13T15:36:48.916002886Z at java.base/java.lang.Thread.run(Thread.java:829)

Added more context to #36295.

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions