Skip to content

Commit 00dedc6

Browse files
authored
Merge pull request #462 from minrk/cl-7-pre
7.0 prerelease
2 parents d72a731 + f4722b7 commit 00dedc6

File tree

6 files changed

+156
-31
lines changed

6 files changed

+156
-31
lines changed

docs/source/changelog.rst

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,19 @@ Changelog
55

66
Changes in IPython Parallel
77

8+
7.0.0
9+
-----
10+
11+
**prerelease** there are some big things coming! This is a first prerelease to get some things out there for testing
12+
13+
- **Require Python 3.6**
14+
- Fix compatibility issues with ipykernel 6, jupyter-client 7
15+
- New prototype BroadcastScheduler with vastly improved scaling in 'do-on-all' operations,
16+
c/o Tom-Olav Bøyum's Master's thesis.
17+
- Add :meth:`.AsyncResult.stream_output` context manager for streaming output.
18+
Stream output by default in parallel magics.
19+
- The repo has been updated to use pre-commit, black, myst, and friends and GitHub Actions for CI, but this should not affect users, only making it a bit nicer for contributors.
20+
821
6.3.0
922
-----
1023

ipyparallel/client/view.py

Lines changed: 73 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -864,6 +864,34 @@ def activate(self, suffix=''):
864864
class BroadcastView(DirectView):
865865
is_coalescing = Bool(False)
866866

867+
def _init_metadata(self, s_idents):
868+
"""initialize request metadata"""
869+
return dict(
870+
targets=s_idents,
871+
is_broadcast=True,
872+
is_coalescing=self.is_coalescing,
873+
)
874+
875+
def _make_async_result(self, message_future, s_idents, **kwargs):
876+
original_msg_id = message_future.msg_id
877+
if not self.is_coalescing:
878+
futures = []
879+
for ident in s_idents:
880+
msg_and_target_id = f'{original_msg_id}_{ident}'
881+
future = self.client.create_message_futures(
882+
msg_and_target_id, async_result=True, track=True
883+
)
884+
self.client.outstanding.add(msg_and_target_id)
885+
self.outstanding.add(msg_and_target_id)
886+
futures.append(future[0])
887+
if original_msg_id in self.outstanding:
888+
self.outstanding.remove(original_msg_id)
889+
else:
890+
self.client.outstanding.add(original_msg_id)
891+
futures = message_future
892+
893+
return AsyncResult(self.client, futures, owner=True, **kwargs)
894+
867895
@sync_results
868896
@save_ids
869897
def _really_apply(
@@ -883,44 +911,62 @@ def _really_apply(
883911

884912
s_idents = [ident.decode("utf8") for ident in idents]
885913

886-
metadata = dict(
887-
targets=s_idents, is_broadcast=True, is_coalescing=self.is_coalescing
914+
metadata = self._init_metadata(s_idents)
915+
message_future = self.client.send_apply_request(
916+
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
917+
)
918+
ar = self._make_async_result(
919+
message_future, s_idents, fname=getname(f), targets=targets
888920
)
889-
if not self.is_coalescing:
890-
original_future = self.client.send_apply_request(
891-
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
892-
)
893-
original_msg_id = original_future.msg_id
894921

895-
for ident in s_idents:
896-
msg_and_target_id = f'{original_msg_id}_{ident}'
897-
future = self.client.create_message_futures(
898-
msg_and_target_id, async_result=True, track=True
899-
)
900-
self.client.outstanding.add(msg_and_target_id)
901-
self.outstanding.add(msg_and_target_id)
902-
futures.append(future[0])
903-
if original_msg_id in self.outstanding:
904-
self.outstanding.remove(original_msg_id)
905-
else:
906-
message_future = self.client.send_apply_request(
907-
self._socket, pf, pargs, pkwargs, track=track, metadata=metadata
908-
)
909-
self.client.outstanding.add(message_future.msg_id)
910-
futures = message_future
922+
if block:
923+
try:
924+
return ar.get()
925+
except KeyboardInterrupt:
926+
pass
927+
return ar
911928

912-
ar = AsyncResult(
913-
self.client, futures, fname=getname(f), targets=_targets, owner=True
929+
@sync_results
930+
@save_ids
931+
def execute(self, code, silent=True, targets=None, block=None):
932+
"""Executes `code` on `targets` in blocking or nonblocking manner.
933+
934+
``execute`` is always `bound` (affects engine namespace)
935+
936+
Parameters
937+
----------
938+
code : str
939+
the code string to be executed
940+
block : bool
941+
whether or not to wait until done to return
942+
default: self.block
943+
"""
944+
block = self.block if block is None else block
945+
targets = self.targets if targets is None else targets
946+
947+
_idents, _targets = self.client._build_targets(targets)
948+
s_idents = [ident.decode("utf8") for ident in _idents]
949+
950+
metadata = self._init_metadata(s_idents)
951+
message_future = self.client.send_execute_request(
952+
self._socket,
953+
code,
954+
silent=silent,
955+
metadata=metadata,
956+
)
957+
ar = self._make_async_result(
958+
message_future, s_idents, fname='execute', targets=_targets
914959
)
915960
if block:
916961
try:
917-
return ar.get()
962+
ar.get()
963+
ar.wait_for_output()
918964
except KeyboardInterrupt:
919965
pass
920966
return ar
921967

922968
def map(self, f, *sequences, **kwargs):
923-
pass
969+
raise NotImplementedError("BroadcastView.map not yet implemented")
924970

925971

926972
class LoadBalancedView(View):

ipyparallel/controller/broadcast_scheduler.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -125,12 +125,11 @@ def dispatch_result(self, raw_msg):
125125
msg = self.session.deserialize(msg, content=False, copy=False)
126126
outgoing_id = idents[0]
127127

128-
except:
128+
except Exception:
129129
self.log.error(
130130
f'broadcast::Invalid broadcast msg: {raw_msg}', exc_info=True
131131
)
132132
return
133-
134133
original_msg_id = msg['metadata']['original_msg_id']
135134
is_coalescing = msg['metadata']['is_coalescing']
136135
if is_coalescing:

ipyparallel/tests/clienttest.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,8 @@ def setUp(self):
167167
self.engines = []
168168

169169
def tearDown(self):
170+
self.client[:].use_pickle()
171+
170172
# self.client.clear(block=True)
171173
# close fds:
172174
for e in filter(lambda e: e.poll() is not None, launchers):

ipyparallel/tests/test_joblib.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,6 @@ def test_default_backend(self):
3636
p = Parallel(backend='ipyparallel')
3737
assert p._backend._view.client is self.client
3838

39-
self.client[:].use_pickle()
40-
4139
def test_register_backend(self):
4240
view = self.client.load_balanced_view()
4341
view.register_joblib_backend('view')
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
"""test BroadcastView objects"""
2+
import pytest
3+
4+
from . import test_view
5+
6+
7+
needs_map = pytest.mark.xfail(reason="map not yet implemented")
8+
9+
10+
@pytest.mark.usefixtures('ipython')
11+
class TestBroadcastView(test_view.TestView):
12+
def setUp(self):
13+
super().setUp()
14+
self._broadcast_view_used = False
15+
# use broadcast view for direct API
16+
real_direct_view = self.client.direct_view
17+
18+
def broadcast_or_direct(targets):
19+
if isinstance(targets, int):
20+
return real_direct_view(targets)
21+
else:
22+
self._broadcast_view_used = True
23+
return self.client.broadcast_view(targets)
24+
25+
self.client.direct_view = broadcast_or_direct
26+
27+
def tearDown(self):
28+
super().tearDown()
29+
# note that a test didn't use a broadcast view
30+
if not self._broadcast_view_used:
31+
pytest.skip("No broadcast view used")
32+
33+
@pytest.mark.xfail(reason="aborted replies missing metadata")
34+
def test_abort(self):
35+
pass
36+
37+
@pytest.mark.xfail(reason="aborted replies missing metadata")
38+
def test_abort_all(self):
39+
pass
40+
41+
@needs_map
42+
def test_map(self):
43+
pass
44+
45+
@needs_map
46+
def test_map_ref(self):
47+
pass
48+
49+
@needs_map
50+
def test_map_reference(self):
51+
pass
52+
53+
@needs_map
54+
def test_map_iterable(self):
55+
pass
56+
57+
@needs_map
58+
def test_map_empty_sequence(self):
59+
pass
60+
61+
@needs_map
62+
def test_map_numpy(self):
63+
pass
64+
65+
@pytest.mark.xfail(reason="Tracking gets disconnected from original message")
66+
def test_scatter_tracked(self):
67+
pass

0 commit comments

Comments
 (0)