Skip to content

Commit e7eb7c9

Browse files
committed
Handle the non-interactve_env case and import error for unit tests
1 parent a9a2d2b commit e7eb7c9

File tree

4 files changed

+67
-7
lines changed

4 files changed

+67
-7
lines changed

sdks/python/apache_beam/runners/interactive/display/pipeline_graph.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,12 @@
2525
import collections
2626
import logging
2727
import threading
28+
from typing import Any
2829
from typing import DefaultDict
2930
from typing import Dict
3031
from typing import Iterator
3132
from typing import List
33+
from typing import Optional
3234
from typing import Tuple
3335
from typing import Union
3436

@@ -40,8 +42,12 @@
4042

4143
try:
4244
import pydot
45+
PydotDot = pydot.Dot
46+
PydotNode = pydot.Node
47+
PydotEdge = pydot.Edge
4348
except ImportError:
44-
pass
49+
pydot = None
50+
PydotDot = PydotNode = PydotEdge = Any
4551

4652
# pylint does not understand context
4753
# pylint:disable=dangerous-default-value
@@ -74,7 +80,7 @@ def __init__(
7480
rendered. See display.pipeline_graph_renderer for available options.
7581
"""
7682
self._lock = threading.Lock()
77-
self._graph: pydot.Dot = None
83+
self._graph: Optional[PydotDot] = None # type: ignore
7884
self._pipeline_instrument = None
7985
if isinstance(pipeline, beam.Pipeline):
8086
self._pipeline_instrument = inst.PipelineInstrument(
@@ -116,7 +122,11 @@ def __init__(
116122
self._renderer = pipeline_graph_renderer.get_renderer(render_option)
117123

118124
def get_dot(self) -> str:
119-
return self._get_graph().to_string()
125+
graph: Optional[PydotDot] = self._get_graph() # type: ignore
126+
if graph:
127+
return graph.to_string()
128+
else:
129+
return ""
120130

121131
def display_graph(self):
122132
"""Displays the graph generated."""
@@ -236,6 +246,10 @@ def _construct_graph(
236246
default_vertex_attrs: (Dict[str, str]) a dict of attributes
237247
default_edge_attrs: (Dict[str, str]) a dict of attributes
238248
"""
249+
if not pydot:
250+
self._graph = None
251+
return
252+
239253
with self._lock:
240254
self._graph = pydot.Dot()
241255

@@ -270,6 +284,10 @@ def _update_graph(self, vertex_dict=None, edge_dict=None):
270284
Or (Dict[(str, str), Dict[str, str]]) which maps vertex pairs to edge
271285
attributes
272286
"""
287+
graph: Optional[PydotDot] = self._get_graph() # type: ignore
288+
if not pydot or not graph:
289+
return
290+
273291
def set_attrs(ref, attrs):
274292
for attr_name, attr_val in attrs.items():
275293
ref.set(attr_name, attr_val)

sdks/python/apache_beam/runners/interactive/interactive_beam_test.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -696,6 +696,9 @@ def test_compute_blocking(self):
696696
collected = ib.collect(pcoll, raw_records=True)
697697
self.assertEqual(collected, data)
698698

699+
@unittest.skipIf(
700+
not ie.current_env().is_interactive_ready,
701+
'[interactive] dependency is not installed.')
699702
def test_compute_non_blocking(self):
700703
p = beam.Pipeline(ir.InteractiveRunner())
701704
data = list(range(5))
@@ -757,6 +760,9 @@ def test_compute_force_recompute(self):
757760
mock_evict.assert_called_once_with(p)
758761
self.assertTrue(pcoll in self.env.computed_pcollections)
759762

763+
@unittest.skipIf(
764+
not ie.current_env().is_interactive_ready,
765+
'[interactive] dependency is not installed.')
760766
def test_compute_non_blocking_exception(self):
761767
p = beam.Pipeline(ir.InteractiveRunner())
762768

@@ -777,6 +783,9 @@ def raise_error(elem):
777783
self.assertIsInstance(async_result.exception(), ValueError)
778784
self.assertFalse(pcoll in self.env.computed_pcollections)
779785

786+
@unittest.skipIf(
787+
not ie.current_env().is_interactive_ready,
788+
'[interactive] dependency is not installed.')
780789
@patch('apache_beam.runners.interactive.recording_manager.IS_IPYTHON', True)
781790
@patch('apache_beam.runners.interactive.recording_manager.display')
782791
@patch('apache_beam.runners.interactive.recording_manager.clear_output')
@@ -824,6 +833,9 @@ def test_compute_non_blocking_ipython_widgets(
824833
mock_clear_output.assert_called_once()
825834
async_result.result(timeout=60) # Let it finish
826835

836+
@unittest.skipIf(
837+
not ie.current_env().is_interactive_ready,
838+
'[interactive] dependency is not installed.')
827839
def test_compute_dependency_wait_true(self):
828840
p = beam.Pipeline(ir.InteractiveRunner())
829841
pcoll1 = p | 'Create1' >> beam.Create([1, 2, 3])
@@ -855,6 +867,9 @@ def test_compute_dependency_wait_true(self):
855867
async_res2.result(timeout=60)
856868
self.assertTrue(pcoll2 in self.env.computed_pcollections)
857869

870+
@unittest.skipIf(
871+
not ie.current_env().is_interactive_ready,
872+
'[interactive] dependency is not installed.')
858873
@patch.object(ie.InteractiveEnvironment, 'is_pcollection_computing')
859874
def test_compute_dependency_wait_false(self, mock_is_computing):
860875
p = beam.Pipeline(ir.InteractiveRunner())
@@ -878,6 +893,9 @@ def test_compute_dependency_wait_false(self, mock_is_computing):
878893
spy_execute.assert_called_with({pcoll2}, async_res2, ANY, ANY)
879894
self.assertTrue(pcoll2 in self.env.computed_pcollections)
880895

896+
@unittest.skipIf(
897+
not ie.current_env().is_interactive_ready,
898+
'[interactive] dependency is not installed.')
881899
def test_async_computation_result_cancel(self):
882900
p = beam.Pipeline(ir.InteractiveRunner())
883901
# A stream that never finishes to test cancellation
@@ -904,6 +922,9 @@ def test_async_computation_result_cancel(self):
904922
with self.assertRaises(TimeoutError):
905923
async_result.result(timeout=1) # It should not complete successfully
906924

925+
@unittest.skipIf(
926+
not ie.current_env().is_interactive_ready,
927+
'[interactive] dependency is not installed.')
907928
@patch(
908929
'apache_beam.runners.interactive.recording_manager.RecordingManager.'
909930
'_execute_pipeline_fragment')

sdks/python/apache_beam/runners/interactive/recording_manager_test.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@
4747
from apache_beam.utils.windowed_value import WindowedValue
4848

4949

50+
@unittest.skipIf(
51+
not ie.current_env().is_interactive_ready,
52+
'[interactive] dependency is not installed.')
5053
class AsyncComputationResultTest(unittest.TestCase):
5154
def setUp(self):
5255
self.mock_future = MagicMock(spec=Future)
@@ -962,6 +965,9 @@ def test_compute_async_blocking(self):
962965
mock_execute.assert_called_once()
963966
self.assertTrue(pcoll in ie.current_env().computed_pcollections)
964967

968+
@unittest.skipIf(
969+
not ie.current_env().is_interactive_ready,
970+
'[interactive] dependency is not installed.')
965971
@patch(
966972
'apache_beam.runners.interactive.recording_manager.AsyncComputationResult'
967973
)

sdks/python/apache_beam/runners/interactive/utils.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,18 @@
4949

5050
_LOGGER = logging.getLogger(__name__)
5151

52+
try:
53+
from google.cloud.exceptions import ClientError
54+
from google.cloud.exceptions import NotFound
55+
56+
from apache_beam.io.gcp.gcsio import create_storage_client # pylint: disable=ungrouped-imports
57+
GCP_LIBS_AVAILABLE = True
58+
except ImportError:
59+
GCP_LIBS_AVAILABLE = False
60+
ClientError = None
61+
NotFound = None
62+
create_storage_client = None
63+
5264
# Add line breaks to the IPythonLogHandler's HTML output.
5365
_INTERACTIVE_LOG_STYLE = """
5466
<style>
@@ -447,11 +459,14 @@ def assert_bucket_exists(bucket_name: str) -> None:
447459
448460
Logs a warning if the bucket cannot be verified to exist.
449461
"""
450-
try:
451-
from google.cloud.exceptions import ClientError
452-
from google.cloud.exceptions import NotFound
462+
if not GCP_LIBS_AVAILABLE:
463+
_LOGGER.warning(
464+
'google-cloud-storage is not installed. '
465+
'Unable to verify whether bucket %s exists.',
466+
bucket_name)
467+
return
453468

454-
from apache_beam.io.gcp.gcsio import create_storage_client
469+
try:
455470
storage_client = create_storage_client(PipelineOptions())
456471
storage_client.get_bucket(bucket_name)
457472
except ClientError as e:

0 commit comments

Comments
 (0)