diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 4094fd1d8058..faa756d7c5c5 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -234,9 +234,11 @@ def process_encoded(self, encoded_windowed_values: bytes) -> None: decoded_value = self.windowed_coder_impl.decode_from_stream( input_stream, True) except Exception as exn: + coder = str(self.windowed_coder) + step = self.name_context.step_name raise ValueError( - "Error decoding input stream with coder " + - str(self.windowed_coder)) from exn + f"Error decoding input stream with coder {coder} in step {step}" + ) from exn self.output(decoded_value) def monitoring_infos(