Skip to content

[Bug]: Timers fire incorrectly in Prism when bundle splitting happens #35771

@shunping

Description

@shunping

What happened?

Running the following code can result in a prism error.

import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.timestamp import MIN_TIMESTAMP, MAX_TIMESTAMP
from apache_beam.transforms.timeutil import TimeDomain
from apache_beam.transforms.userstate import TimerSpec, on_timer


logging.basicConfig(level=logging.INFO)

options = PipelineOptions([
    "--environment_type=LOOPBACK",
    "--runner=PrismRunner",
])


data = [(0, 0), (1, 1), (2, 0)]

class MyDoFn(beam.DoFn):
  WINDOW_TIMER = TimerSpec('window_timer', TimeDomain.WATERMARK)

  def __init__(self):
    pass

  def process(
      self,
      element,
      window=beam.DoFn.WindowParam,
      window_timer=beam.DoFn.TimerParam(WINDOW_TIMER),
      ):
    print("process is called")
    window_timer.set(window.end)


  @on_timer(WINDOW_TIMER)
  def on_timer(self, key=beam.DoFn.KeyParam):
    import time
    time.sleep(1)
    yield "on_timer called on key %s" % str(key)


with beam.Pipeline(options=options) as p:
  _ = (
      p | beam.Create(data)
      | beam.ParDo(MyDoFn())
      | beam.LogElements(
          level=logging.WARNING,
          ))

Error message:

RuntimeError: Pipeline job-001 failed in state FAILED: nothing in progress and no refreshes with non zero pending elements: 2
ElementManager Now: 1754103888458 processingTimeEvents: map[] injectedBundles: []
stage-000 watermark in -inf out -inf upstream -inf from IMPULSE   pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [ref_PCollection_PCollection_4] outputConsumers: [stage-001] sideConsumers: []
stage-001 watermark in -inf out -inf upstream -inf from stage-000 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [ref_PCollection_PCollection_5] outputConsumers: [stage-002] sideConsumers: []
stage-002 watermark in -inf out -inf upstream -inf from stage-001 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [] outputConsumers: [] sideConsumers: []
stage-003 watermark in +inf out +inf upstream +inf from IMPULSE   pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [ref_PCollection_PCollection_1] outputConsumers: [stage-004] sideConsumers: []
stage-004 watermark in +inf out +inf upstream +inf from stage-003 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [ref_PCollection_PCollection_2] outputConsumers: [stage-005] sideConsumers: []
stage-005 watermark in +inf out +inf upstream +inf from stage-004 pending [] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [ref_PCollection_PCollection_8] outputConsumers: [stage-006] sideConsumers: []
stage-006 watermark in glo out glo upstream +inf from stage-005 pending [{Timer - Window [*], EventTime glo, Hold glo, "ref_AppliedPTransform_ParDo-MyDoFn-_14" "ts-window_timer" "" "\x02"} {Timer - Window [*], EventTime glo, Hold glo, "ref_AppliedPTransform_ParDo-MyDoFn-_14" "ts-window_timer" "" "\x01"}] byKey map[] inprogressKeys map[] byBundle map[] holds [] holdCounts map[] holdsInBundle map[] pttEvents map[] bundlesToInject []
        sideInputs: [] outputCols: [] outputConsumers: [] sideConsumers: []

If we comment out the time.sleep() in on_timer(), the pipeline can run successfully. Turning on the debug log level, and see there are bundle splits reported.

The same pipeline can run successfully in FnApiRunner or DataflowRunner.


Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions