Skip to content

Merge branch 'release-2.70' into users/damccorm/extra-cp

8d8fcaa
Select commit
Loading
Failed to load commit list.
Sign in for the full log view
Merged

Cherry-pick: Fix python postcommit (#36977) #36985

Merge branch 'release-2.70' into users/damccorm/extra-cp
8d8fcaa
Select commit
Loading
Failed to load commit list.
GitHub Actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, main) failed Dec 3, 2025 in 0s

3 fail, 124 skipped, 450 pass in 19m 0s

  2 files    2 suites   19m 0s ⏱️
577 tests 450 ✅ 124 💤 3 ❌
581 runs  450 ✅ 128 💤 3 ❌

Results for commit 8d8fcaa.

Annotations

Check warning on line 0 in apache_beam.ml.transforms.embeddings.huggingface_test.SentenceTransformerEmbeddingsTest

See this annotation in the file changed.

@github-actions github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, main)

test_sentence_transformer_image_embeddings (apache_beam.ml.transforms.embeddings.huggingface_test.SentenceTransformerEmbeddingsTest) failed

sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-ml_no_xdist.xml [took 5m 0s]
Raw output
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-12-03T22:16:49.897408616+00:00"}"
>
self = <apache_beam.ml.transforms.embeddings.huggingface_test.SentenceTransformerEmbeddingsTest testMethod=test_sentence_transformer_image_embeddings>

    @unittest.skipIf(Image is None, 'Pillow is not installed.')
    def test_sentence_transformer_image_embeddings(self):
      embedding_config = SentenceTransformerEmbeddings(
          model_name=IMAGE_MODEL_NAME,
          columns=[test_query_column],
          image_model=True)
      img = self.generateRandomImage(256)
>     with beam.Pipeline() as pipeline:

apache_beam/ml/transforms/embeddings/huggingface_test.py:301: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/pipeline.py:648: in __exit__
    self.result.wait_until_finish()
target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py:571: in wait_until_finish
    raise self._runtime_exception
target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py:580: in _observe_state
    for state_response in self._state_stream:
target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/grpc/_channel.py:543: in __next__
    return self._next()
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exc...ved from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-12-03T22:16:49.897408616+00:00"}"
>

    def _next(self) -> Any:
        with self._state.condition:
            if self._state.code is None:
                event_handler = _event_handler(
                    self._state, self._response_deserializer
                )
                self._state.due.add(cygrpc.OperationType.receive_message)
                operating = self._call.operate(
                    (cygrpc.ReceiveMessageOperation(_EMPTY_FLAGS),),
                    event_handler,
                )
                if not operating:
                    self._state.due.remove(cygrpc.OperationType.receive_message)
            elif self._state.code is grpc.StatusCode.OK:
                raise StopIteration()
            else:
                raise self
    
            def _response_ready():
                return self._state.response is not None or (
                    cygrpc.OperationType.receive_message not in self._state.due
                    and self._state.code is not None
                )
    
            _common.wait(self._state.condition.wait, _response_ready)
            if self._state.response is not None:
                response = self._state.response
                self._state.response = None
                return response
            elif cygrpc.OperationType.receive_message not in self._state.due:
                if self._state.code is grpc.StatusCode.OK:
                    raise StopIteration()
                elif self._state.code is not None:
>                   raise self
E                   grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
E                   	status = StatusCode.DEADLINE_EXCEEDED
E                   	details = "Deadline Exceeded"
E                   	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"Deadline Exceeded", grpc_status:4, created_time:"2025-12-03T22:16:49.897408616+00:00"}"
E                   >

target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/grpc/_channel.py:969: _MultiThreadedRendezvous

Check warning on line 0 in apache_beam.ml.transforms.embeddings.huggingface_test.SentenceTransformerEmbeddingsTest

See this annotation in the file changed.

@github-actions github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, main)

test_sentence_transformer_images_with_str_data_types (apache_beam.ml.transforms.embeddings.huggingface_test.SentenceTransformerEmbeddingsTest) failed

sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-ml_no_xdist.xml [took 5m 0s]
Raw output
AssertionError: "Embeddings can only be generated" does not match "<_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-12-03T22:21:51.008819279+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>"
grpc._channel._MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with:
	status = StatusCode.DEADLINE_EXCEEDED
	details = "Deadline Exceeded"
	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-12-03T22:21:51.008819279+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
>

During handling of the above exception, another exception occurred:

self = <apache_beam.ml.transforms.embeddings.huggingface_test.SentenceTransformerEmbeddingsTest testMethod=test_sentence_transformer_images_with_str_data_types>

    def test_sentence_transformer_images_with_str_data_types(self):
      embedding_config = SentenceTransformerEmbeddings(
          model_name=IMAGE_MODEL_NAME,
          columns=[test_query_column],
          image_model=True)
>     with self.assertRaisesRegex(Exception, "Embeddings can only be generated"):
E     AssertionError: "Embeddings can only be generated" does not match "<_MultiThreadedRendezvous of RPC that terminated with:
E     	status = StatusCode.DEADLINE_EXCEEDED
E     	details = "Deadline Exceeded"
E     	debug_error_string = "UNKNOWN:Error received from peer  {created_time:"2025-12-03T22:21:51.008819279+00:00", grpc_status:4, grpc_message:"Deadline Exceeded"}"
E     >"

apache_beam/ml/transforms/embeddings/huggingface_test.py:321: AssertionError

Check warning on line 0 in apache_beam.ml.inference.sklearn_inference_test.SkLearnRunInferenceTest

See this annotation in the file changed.

@github-actions github-actions / Python 3.10 Test Results (self-hosted, ubuntu-20.04, main)

test_pipeline_pandas_custom_batching (apache_beam.ml.inference.sklearn_inference_test.SkLearnRunInferenceTest) failed

sdks/python/test-suites/tox/py310/build/srcs/sdks/python/pytest_py310-ml.xml [took 2s]
Raw output
RuntimeError: Pipeline job-015 failed in state FAILED: bundle inst003 stage-005 failed:Traceback (most recent call last):
  File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    self.process_method(*args_for_process, **kwargs_for_process),
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1933, in process
    return self._run_inference(batch, inference_args)
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1903, in _run_inference
    raise e
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1889, in _run_inference
    result_generator = self._model_handler.run_inference(
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference.py", line 310, in run_inference
    predictions, splits = self._model_inference_fn(model, batch, inference_args)
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py", line 453, in batch_validator_pandas_inference_fn
    raise Exception(
Exception: Expected batch of size 5, received batch of size 3

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
    response = task()
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
    lambda: self.create_worker().do_instruction(request), request)
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
    return getattr(self, request_type)(
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 707, in process_bundle
    bundle_processor.process_bundle(instruction_id))
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1310, in process_bundle
    op.finish()
  File "apache_beam/runners/worker/operations.py", line 987, in apache_beam.runners.worker.operations.DoOperation.finish
    def finish(self):
  File "apache_beam/runners/worker/operations.py", line 990, in apache_beam.runners.worker.operations.DoOperation.finish
    with self.scoped_finish_state:
  File "apache_beam/runners/worker/operations.py", line 991, in apache_beam.runners.worker.operations.DoOperation.finish
    self.dofn_runner.finish()
  File "apache_beam/runners/common.py", line 1576, in apache_beam.runners.common.DoFnRunner.finish
    self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
  File "apache_beam/runners/common.py", line 1557, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 1588, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise exn
  File "apache_beam/runners/common.py", line 1555, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
    bundle_method()
  File "apache_beam/runners/common.py", line 616, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    def invoke_finish_bundle(self):
  File "apache_beam/runners/common.py", line 621, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
    self.output_handler.finish_bundle_outputs(
  File "apache_beam/runners/common.py", line 1847, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
    self.consumer.process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
    delayed_applications = self.dofn_runner.process(o)
  File "apache_beam/runners/common.py", line 1500, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn, windowed_value)
  File "apache_beam/runners/common.py", line 1609, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise new_exn
  File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
    return self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process
    self._invoke_process_per_window(
  File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
    self.process_method(*args_for_process, **kwargs_for_process),
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1933, in process
    return self._run_inference(batch, inference_args)
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1903, in _run_inference
    raise e
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1889, in _run_inference
    result_generator = self._model_handler.run_inference(
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference.py", line 310, in run_inference
    predictions, splits = self._model_inference_fn(model, batch, inference_args)
  File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py", line 453, in batch_validator_pandas_inference_fn
    raise Exception(
Exception: Expected batch of size 5, received batch of size 3 [while running 'RunInference/BeamML_RunInference']
self = <apache_beam.ml.inference.sklearn_inference_test.SkLearnRunInferenceTest testMethod=test_pipeline_pandas_custom_batching>

    def test_pipeline_pandas_custom_batching(self):
      temp_file_name = self.tmpdir + os.sep + 'pickled_file'
      with open(temp_file_name, 'wb') as file:
        pickle.dump(build_pandas_pipeline(), file)
    
      def batch_validator_pandas_inference_fn(
          model: BaseEstimator,
          batch: Sequence[numpy.ndarray],
          inference_args: Optional[dict[str, Any]] = None) -> Any:
        if len(batch) != 5:
          raise Exception(
              f'Expected batch of size 5, received batch of size {len(batch)}')
        return _default_pandas_inference_fn(model, batch, inference_args)
    
>     with TestPipeline() as pipeline:

apache_beam/ml/inference/sklearn_inference_test.py:457: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/pipeline.py:646: in __exit__
    self.result = self.run()
target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/testing/test_pipeline.py:122: in run
    state = result.wait_until_finish(duration=self.timeout)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <apache_beam.runners.portability.portable_runner.PipelineResult object at 0x7b5e3818c550>
duration = None

    def wait_until_finish(self, duration=None):
      """
      :param duration: The maximum time in milliseconds to wait for the result of
      the execution. If None or zero, will wait until the pipeline finishes.
      :return: The result of the pipeline, i.e. PipelineResult.
      """
      last_error_text = None
    
      def read_messages() -> None:
        nonlocal last_error_text
        previous_state = -1
        for message in self._message_stream:
          if message.HasField('message_response'):
            mr = message.message_response
            logging.log(MESSAGE_LOG_LEVELS[mr.importance], "%s", mr.message_text)
            if mr.importance == beam_job_api_pb2.JobMessage.JOB_MESSAGE_ERROR:
              last_error_text = mr.message_text
          else:
            current_state = message.state_response.state
            if current_state != previous_state:
              _LOGGER.info(
                  "Job state changed to %s",
                  self.runner_api_state_to_pipeline_state(current_state))
              previous_state = current_state
          self._messages.append(message)
    
      message_thread = threading.Thread(
          target=read_messages, name='wait_until_finish_read')
      message_thread.daemon = True
      message_thread.start()
    
      if duration:
        state_thread = threading.Thread(
            target=functools.partial(self._observe_state, message_thread),
            name='wait_until_finish_state_observer')
        state_thread.daemon = True
        state_thread.start()
        start_time = time.time()
        duration_secs = duration / 1000
        while (time.time() - start_time < duration_secs and
               state_thread.is_alive()):
          time.sleep(1)
      else:
        self._observe_state(message_thread)
    
      if self._runtime_exception:
>       raise self._runtime_exception
E       RuntimeError: Pipeline job-015 failed in state FAILED: bundle inst003 stage-005 failed:Traceback (most recent call last):
E         File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.process_method(*args_for_process, **kwargs_for_process),
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1933, in process
E           return self._run_inference(batch, inference_args)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1903, in _run_inference
E           raise e
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1889, in _run_inference
E           result_generator = self._model_handler.run_inference(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference.py", line 310, in run_inference
E           predictions, splits = self._model_inference_fn(model, batch, inference_args)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py", line 453, in batch_validator_pandas_inference_fn
E           raise Exception(
E       Exception: Expected batch of size 5, received batch of size 3
E       
E       During handling of the above exception, another exception occurred:
E       
E       Traceback (most recent call last):
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 316, in _execute
E           response = task()
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 390, in <lambda>
E           lambda: self.create_worker().do_instruction(request), request)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 669, in do_instruction
E           return getattr(self, request_type)(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/sdk_worker.py", line 707, in process_bundle
E           bundle_processor.process_bundle(instruction_id))
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/worker/bundle_processor.py", line 1310, in process_bundle
E           op.finish()
E         File "apache_beam/runners/worker/operations.py", line 987, in apache_beam.runners.worker.operations.DoOperation.finish
E           def finish(self):
E         File "apache_beam/runners/worker/operations.py", line 990, in apache_beam.runners.worker.operations.DoOperation.finish
E           with self.scoped_finish_state:
E         File "apache_beam/runners/worker/operations.py", line 991, in apache_beam.runners.worker.operations.DoOperation.finish
E           self.dofn_runner.finish()
E         File "apache_beam/runners/common.py", line 1576, in apache_beam.runners.common.DoFnRunner.finish
E           self._invoke_bundle_method(self.do_fn_invoker.invoke_finish_bundle)
E         File "apache_beam/runners/common.py", line 1557, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
E           self._reraise_augmented(exn)
E         File "apache_beam/runners/common.py", line 1588, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise exn
E         File "apache_beam/runners/common.py", line 1555, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
E           bundle_method()
E         File "apache_beam/runners/common.py", line 616, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
E           def invoke_finish_bundle(self):
E         File "apache_beam/runners/common.py", line 621, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
E           self.output_handler.finish_bundle_outputs(
E         File "apache_beam/runners/common.py", line 1847, in apache_beam.runners.common._OutputHandler.finish_bundle_outputs
E           self.main_receivers.receive(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 263, in apache_beam.runners.worker.operations.SingletonElementConsumerSet.receive
E           self.consumer.process(windowed_value)
E         File "apache_beam/runners/worker/operations.py", line 954, in apache_beam.runners.worker.operations.DoOperation.process
E           with self.scoped_process_state:
E         File "apache_beam/runners/worker/operations.py", line 955, in apache_beam.runners.worker.operations.DoOperation.process
E           delayed_applications = self.dofn_runner.process(o)
E         File "apache_beam/runners/common.py", line 1500, in apache_beam.runners.common.DoFnRunner.process
E           self._reraise_augmented(exn, windowed_value)
E         File "apache_beam/runners/common.py", line 1609, in apache_beam.runners.common.DoFnRunner._reraise_augmented
E           raise new_exn
E         File "apache_beam/runners/common.py", line 1498, in apache_beam.runners.common.DoFnRunner.process
E           return self.do_fn_invoker.invoke_process(windowed_value)
E         File "apache_beam/runners/common.py", line 912, in apache_beam.runners.common.PerWindowInvoker.invoke_process
E           self._invoke_process_per_window(
E         File "apache_beam/runners/common.py", line 1057, in apache_beam.runners.common.PerWindowInvoker._invoke_process_per_window
E           self.process_method(*args_for_process, **kwargs_for_process),
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1933, in process
E           return self._run_inference(batch, inference_args)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1903, in _run_inference
E           raise e
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/base.py", line 1889, in _run_inference
E           result_generator = self._model_handler.run_inference(
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference.py", line 310, in run_inference
E           predictions, splits = self._model_inference_fn(model, batch, inference_args)
E         File "/runner/_work/beam/beam/sdks/python/test-suites/tox/py310/build/srcs/sdks/python/apache_beam/ml/inference/sklearn_inference_test.py", line 453, in batch_validator_pandas_inference_fn
E           raise Exception(
E       Exception: Expected batch of size 5, received batch of size 3 [while running 'RunInference/BeamML_RunInference']

target/.tox-py310-ml/py310-ml/lib/python3.10/site-packages/apache_beam/runners/portability/portable_runner.py:571: RuntimeError