Skip to content

Commit 5cb8ac7

Browse files
committed
Add logging msg in SwitchingDirectRunner
1 parent d50898c commit 5cb8ac7

File tree

2 files changed

+7
-4
lines changed

2 files changed

+7
-4
lines changed

sdks/python/apache_beam/runners/direct/direct_runner.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@ def visit_transform(self, applied_ptransform):
206206
# Check whether all transforms used in the pipeline are supported by the
207207
# PrismRunner
208208
if _PrismRunnerSupportVisitor().accept(pipeline, self._is_interactive):
209-
_LOGGER.info('Running pipeline with PrismRunner.')
209+
_LOGGER.exception('Running pipeline with PrismRunner.')
210210
from apache_beam.runners.portability import prism_runner
211211
runner = prism_runner.PrismRunner()
212212

@@ -216,7 +216,7 @@ def visit_transform(self, applied_ptransform):
216216
# probably failed on job submission.
217217
if (PipelineState.is_terminal(pr.state) and
218218
pr.state != PipelineState.DONE):
219-
_LOGGER.info(
219+
_LOGGER.exception(
220220
'Pipeline failed on PrismRunner, falling back to DirectRunner.')
221221
runner = BundleBasedDirectRunner()
222222
else:
@@ -225,8 +225,8 @@ def visit_transform(self, applied_ptransform):
225225
# If prism fails in Preparing the portable job, then the PortableRunner
226226
# code raises an exception. Catch it, log it, and use the Direct runner
227227
# instead.
228-
_LOGGER.info('Exception with PrismRunner:\n %s\n' % (e))
229-
_LOGGER.info('Falling back to DirectRunner')
228+
_LOGGER.exception('Exception with PrismRunner:\n %s\n' % (e))
229+
_LOGGER.exception('Falling back to DirectRunner')
230230
runner = BundleBasedDirectRunner()
231231

232232
# Check whether all transforms used in the pipeline are supported by the
@@ -240,6 +240,7 @@ def visit_transform(self, applied_ptransform):
240240
provision_info = fn_runner.ExtendedProvisionInfo(
241241
beam_provision_api_pb2.ProvisionInfo(
242242
pipeline_options=encoded_options))
243+
_LOGGER.exception("Use FnApiRunner")
243244
runner = fn_runner.FnApiRunner(provision_info=provision_info)
244245

245246
return runner.run_pipeline(pipeline, options)

sdks/python/apache_beam/runners/worker/data_plane.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,8 @@ def __init__(self, data_buffer_time_limit_ms=0):
472472
self._closed = False
473473
self._exception = None # type: Optional[Exception]
474474

475+
_LOGGER.exception("start grpc data channel")
476+
475477
def close(self):
476478
# type: () -> None
477479
self._to_send.put(self._WRITES_FINISHED)

0 commit comments

Comments
 (0)