Skip to content

[Bug]: PrismRunner fails to execute pipeline that DirectRunner succeeds at #35867

@pritamdodeja

Description

@pritamdodeja

What happened?

I have a pipeline that works with DirectRunner (direct_num_workers: 0, direct_running_mode: multi_processing) without issues.

PrismRunner (2.67) fails with the below errors:

Traceback (most recent call last):
  File "/home/pritamdodeja/demo/template/windowed_sequences.py", line 112, in <module>
    with beam.Pipeline(options=opts) as p:
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/pipeline.py", line 663, in __exit__
    self.result.wait_until_finish()
Exception in thread wait_until_finish_read:
Traceback (most recent call last):
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 568, in wait_until_finish
    self.run()
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 953, in run
    raise self._runtime_exception
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 574, in _observe_state
    self._target(*self._args, **self._kwargs)
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py", line 533, in read_messages
    for state_response in self._state_stream:
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__
    for message in self._message_stream:
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__
    return self._next()
    return self._next()
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next
    raise self
    raise self
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.DEADLINE_EXCEEDED
        details = "Deadline Exceeded"
        debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-08-14T10:32:57.766869481-04:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.DEADLINE_EXCEEDED
        details = "Deadline Exceeded"
        debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-08-14T10:32:57.731394275-04:00"}"
>

Exception in thread run_worker_job-001[job]_ref_Environment_default_environment_1:
Traceback (most recent call last):
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 283, in run
Exception in thread read_state:
Traceback (most recent call last):
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 1016, in _bootstrap_inner
    self.run()
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/threading.py", line 953, in run
    self._target(*self._args, **self._kwargs)
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 1073, in pull_responses
    getattr(self, SdkHarness.REQUEST_METHOD_PREFIX + request_type)(
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 343, in _request_process_bundle_progress
    for response in responses:
    self._request_process_bundle_action(request)
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 543, in __next__
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 352, in _request_process_bundle_action
    return self._next()
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/site-packages/grpc/_channel.py", line 969, in _next
    self._report_progress_executor.submit(task)
  File "/home/pritamdodeja/.pyenv/versions/3.10.14/lib/python3.10/concurrent/futures/thread.py", line 167, in submit
    raise RuntimeError('cannot schedule new futures after shutdown')
    raise self
RuntimeError: cannot schedule new futures after shutdown
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
        status = StatusCode.UNAVAILABLE
        details = "Socket closed"
        debug_error_string = "UNKNOWN:Error received from peer ipv6:%5B::1%5D:42955 {grpc_message:"Socket closed", grpc_status:14, created_time:"2025-08-14T10:32:58.053915407-04:00"}"
>

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions