Skip to content

Commit 26ea9b8

Browse files
committed
implement BroadcastView.execute
and add test coverage copy DirectView tests, which should all pass eventually. explicitly mark tests that don't pass as xfail. TODO for feature-parity: - single-target views should produce unpacked single-results - implement map - fix abort metadata - fix sent-message tracking
1 parent d72a731 commit 26ea9b8

File tree

3 files changed

+141
-29
lines changed

3 files changed

+141
-29
lines changed

ipyparallel/client/view.py

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,34 @@ def activate(self, suffix=''):
864864
class BroadcastView(DirectView):
865865
is_coalescing = Bool(False)
866866

867+
def _init_metadata(self, s_idents):
868+
"""initialize request metadata"""
869+
return dict(
870+
targets=s_idents,
871+
is_broadcast=True,
872+
is_coalescing=self.is_coalescing,
873+
)
874+
875+
def _make_async_result(self, message_future, s_idents, **kwargs):
876+
original_msg_id = message_future.msg_id
877+
if not self.is_coalescing:
878+
futures = []
879+
for ident in s_idents:
880+
msg_and_target_id = f'{original_msg_id}_{ident}'
881+
future = self.client.create_message_futures(
882+
msg_and_target_id, async_result=True, track=True
883+
)
884+
self.client.outstanding.add(msg_and_target_id)
885+
self.outstanding.add(msg_and_target_id)
886+
futures.append(future[0])
887+
if original_msg_id in self.outstanding:
888+
self.outstanding.remove(original_msg_id)
889+
else:
890+
self.client.outstanding.add(original_msg_id)
891+
futures = message_future
892+
893+
return AsyncResult(self.client, futures, owner=True, **kwargs)
894+
867895
@sync_results
868896
@save_ids
869897
def _really_apply(
@@ -883,44 +911,62 @@ def _really_apply(
883911

884912
s_idents = [ident.decode("utf8") for ident in idents]
885913

886-
metadata = dict(
887-
targets=s_idents, is_broadcast=True, is_coalescing=self.is_coalescing
914+
metadata = self._init_metadata(s_idents)
915+
message_future = self.client.send_apply_request(
916+
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
917+
)
918+
ar = self._make_async_result(
919+
message_future, s_idents, fname=getname(f), targets=targets
888920
)
889-
if not self.is_coalescing:
890-
original_future = self.client.send_apply_request(
891-
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
892-
)
893-
original_msg_id = original_future.msg_id
894921

895-
for ident in s_idents:
896-
msg_and_target_id = f'{original_msg_id}_{ident}'
897-
future = self.client.create_message_futures(
898-
msg_and_target_id, async_result=True, track=True
899-
)
900-
self.client.outstanding.add(msg_and_target_id)
901-
self.outstanding.add(msg_and_target_id)
902-
futures.append(future[0])
903-
if original_msg_id in self.outstanding:
904-
self.outstanding.remove(original_msg_id)
905-
else:
906-
message_future = self.client.send_apply_request(
907-
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
908-
)
909-
self.client.outstanding.add(message_future.msg_id)
910-
futures = message_future
922+
if block:
923+
try:
924+
return ar.get()
925+
except KeyboardInterrupt:
926+
pass
927+
return ar
911928

912-
ar = AsyncResult(
913-
self.client, futures, fname=getname(f), targets=_targets, owner=True
929+
@sync_results
930+
@save_ids
931+
def execute(self, code, silent=True, targets=None, block=None):
932+
"""Executes `code` on `targets` in blocking or nonblocking manner.
933+
934+
``execute`` is always `bound` (affects engine namespace)
935+
936+
Parameters
937+
----------
938+
code : str
939+
the code string to be executed
940+
block : bool
941+
whether or not to wait until done to return
942+
default: self.block
943+
"""
944+
block = self.block if block is None else block
945+
targets = self.targets if targets is None else targets
946+
947+
_idents, _targets = self.client._build_targets(targets)
948+
s_idents = [ident.decode("utf8") for ident in _idents]
949+
950+
metadata = self._init_metadata(s_idents)
951+
message_future = self.client.send_execute_request(
952+
self._socket,
953+
code,
954+
silent=silent,
955+
metadata=metadata,
956+
)
957+
ar = self._make_async_result(
958+
message_future, s_idents, fname='execute', targets=_targets
914959
)
915960
if block:
916961
try:
917-
return ar.get()
962+
ar.get()
963+
ar.wait_for_output()
918964
except KeyboardInterrupt:
919965
pass
920966
return ar
921967

922968
def map(self, f, *sequences, **kwargs):
923-
pass
969+
raise NotImplementedError("BroadcastView.map not yet implemented")
924970

925971

926972
class LoadBalancedView(View):

ipyparallel/controller/broadcast_scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,11 @@ def dispatch_result(self, raw_msg):
125125
msg = self.session.deserialize(msg, content=False, copy=False)
126126
outgoing_id = idents[0]
127127

128-
except:
128+
except Exception:
129129
self.log.error(
130130
f'broadcast::Invalid broadcast msg: {raw_msg}', exc_info=True
131131
)
132132
return
133-
134133
original_msg_id = msg['metadata']['original_msg_id']
135134
is_coalescing = msg['metadata']['is_coalescing']
136135
if is_coalescing:
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""test BroadcastView objects"""
2+
import pytest
3+
4+
from . import test_view
5+
6+
7+
needs_map = pytest.mark.xfail(reason="map not yet implemented")
8+
9+
10+
@pytest.mark.usefixtures('ipython')
11+
class TestBroadcastView(test_view.TestView):
12+
def setUp(self):
13+
super().setUp()
14+
self._broadcast_view_used = False
15+
# use broadcast view for direct API
16+
real_direct_view = self.client.direct_view
17+
18+
def broadcast_or_direct(targets):
19+
if isinstance(targets, int):
20+
return real_direct_view(targets)
21+
else:
22+
self._broadcast_view_used = True
23+
return self.client.broadcast_view(targets)
24+
25+
self.client.direct_view = broadcast_or_direct
26+
27+
def tearDown(self):
28+
super().tearDown()
29+
# note that a test didn't use a broadcast view
30+
if not self._broadcast_view_used:
31+
pytest.skip("No broadcast view used")
32+
33+
@pytest.mark.xfail(reason="aborted replies missing metadata")
34+
def test_abort(self):
35+
pass
36+
37+
@pytest.mark.xfail(reason="aborted replies missing metadata")
38+
def test_abort_all(self):
39+
pass
40+
41+
@needs_map
42+
def test_map(self):
43+
pass
44+
45+
@needs_map
46+
def test_map_ref(self):
47+
pass
48+
49+
@needs_map
50+
def test_map_reference(self):
51+
pass
52+
53+
@needs_map
54+
def test_map_iterable(self):
55+
pass
56+
57+
@needs_map
58+
def test_map_empty_sequence(self):
59+
pass
60+
61+
@needs_map
62+
def test_map_numpy(self):
63+
pass
64+
65+
@pytest.mark.xfail(reason="Tracking gets disconnected from original message")
66+
def test_scatter_tracked(self):
67+
pass

0 commit comments

Comments
 (0)