Skip to content

[Bug]: Prism panic when TestStream is followed by a stateful dofn #36965

@shunping

Description

@shunping

What happened?

This is observed in some of our test workflow. Even though it is not causing the test to fail, I believe it is only because of luck, aka our tests do not cover this case.

Thanks @aIbrahiim for bringing this up in PR #36927.

To reproduce, we can use the following code:

import logging
import apache_beam as beam
from apache_beam.coders import VarIntCoder
from apache_beam.transforms.userstate import ReadModifyWriteStateSpec
from apache_beam.utils.timestamp import Timestamp
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.testing.test_stream import TestStream

logging.basicConfig(level=logging.INFO)
#logging.basicConfig(level=logging.WARNING)

class MyDoFn(beam.DoFn):
  COUNT = ReadModifyWriteStateSpec('count', VarIntCoder())

  def __init__(self):
    pass

  def process(
      self,
      element,
      count_state=beam.DoFn.StateParam(COUNT),
  ):
    print(element)
    count = count_state.read()
    if not count:
      count = 1
    else:
      count += 1

    count_state.write(count)
    yield count




options = PipelineOptions([
    "--streaming",
    "--environment_type=LOOPBACK",
    "--runner=PrismRunner",
    "--prism_log_kind=dev",
    #"--prism_beam_version_override=v2.66.0"
    # "--runner=PortableRunner",
    # "--job_endpoint=localhost:8073",
])

with beam.Pipeline(options=options) as p:
  now = Timestamp.now()
  _ = (
      p | TestStream().add_elements([(1, 2), (3, 4)])
      | 'MyDoFn' >> beam.ParDo(MyDoFn())
      | beam.LogElements(
          prefix="result=",
          level=logging.WARNING,
          with_timestamp=True,
          with_window=True,
          use_epoch_time=True))

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions