Skip to content

Commit 83bfceb

Browse files
committed
Fix broken unit tests
1 parent 34ad462 commit 83bfceb

File tree

3 files changed

+18
-10
lines changed

3 files changed

+18
-10
lines changed

sdks/python/apache_beam/runners/interactive/recording_manager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,6 @@ def _run_async_computation(
492492
if not self._wait_for_dependencies(pcolls_to_compute, async_result):
493493
raise RuntimeError('Dependency computation failed or was cancelled.')
494494

495-
self._env.mark_pcollection_computing(pcolls_to_compute)
496495
_LOGGER.info(
497496
'Starting asynchronous computation for %d PCollections.',
498497
len(pcolls_to_compute))
@@ -696,6 +695,7 @@ def compute_async(
696695
async_result = AsyncComputationResult(
697696
future, pcolls_to_compute, self.user_pipeline, self)
698697
self._async_computations[async_result._display_id] = async_result
698+
self._env.mark_pcollection_computing(pcolls_to_compute)
699699

700700
def task():
701701
try:

sdks/python/apache_beam/runners/interactive/recording_manager_test.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -993,12 +993,6 @@ def capture_task(task):
993993

994994
mock_submit.side_effect = capture_task
995995

996-
res = rm.compute_async({pcoll}, blocking=False)
997-
self.assertIs(res, mock_async_res_instance)
998-
mock_submit.assert_called_once()
999-
self.assertIsNotNone(task_submitted)
1000-
1001-
# Patch dependencies of _run_async_computation
1002996
with patch.object(
1003997
rm, '_wait_for_dependencies', return_value=True
1004998
), patch.object(
@@ -1008,10 +1002,22 @@ def capture_task(task):
10081002
'mark_pcollection_computing',
10091003
wraps=ie.current_env().mark_pcollection_computing,
10101004
) as wrapped_mark:
1011-
# Run the task to trigger the marks
1012-
task_submitted()
1005+
1006+
res = rm.compute_async({pcoll}, blocking=False)
10131007
wrapped_mark.assert_called_once_with({pcoll})
10141008

1009+
# Run the task to trigger the marks
1010+
self.assertIs(res, mock_async_res_instance)
1011+
mock_submit.assert_called_once()
1012+
self.assertIsNotNone(task_submitted)
1013+
1014+
with patch.object(
1015+
rm, '_wait_for_dependencies', return_value=True
1016+
), patch.object(
1017+
rm, '_execute_pipeline_fragment'
1018+
) as _:
1019+
task_submitted()
1020+
10151021
self.assertTrue(pcoll in ie.current_env().computing_pcollections)
10161022

10171023
def test_get_all_dependencies(self):

sdks/python/apache_beam/runners/interactive/utils_test.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,9 @@ def test_child_module_logger_can_override_logging_level(self, mock_emit):
244244
reason='[interactive] dependency is not installed.')
245245
class ProgressIndicatorTest(unittest.TestCase):
246246
def setUp(self):
247-
ie.new_env()
247+
with patch(
248+
'apache_beam.runners.interactive.cache_manager.CacheManager.cleanup'):
249+
ie.new_env()
248250

249251
@patch('IPython.get_ipython', new_callable=mock_get_ipython)
250252
@patch(

0 commit comments

Comments
 (0)