Skip to content

Commit 6ba28be

Browse files
authored
Merge pull request #486 from minrk/imap
Implement LoadBalancedView.imap
2 parents ec30d97 + cc67ddd commit 6ba28be

File tree

2 files changed

+223
-29
lines changed

2 files changed

+223
-29
lines changed

ipyparallel/client/view.py

Lines changed: 140 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
"""Views of remote engines."""
22
# Copyright (c) IPython Development Team.
33
# Distributed under the terms of the Modified BSD License.
4-
from __future__ import absolute_import
5-
from __future__ import print_function
6-
4+
import concurrent.futures
75
import inspect
86
import warnings
7+
from collections import deque
98
from contextlib import contextmanager
109

1110
from decorator import decorator
@@ -1213,9 +1212,8 @@ def _really_apply(
12131212

12141213
@sync_results
12151214
@save_ids
1216-
def map(self, f, *sequences, **kwargs):
1217-
"""``view.map(f, *sequences, block=self.block, chunksize=1, ordered=True)`` => list|AsyncMapResult
1218-
Parallel version of builtin `map`, load-balanced by this View.
1215+
def map(self, f, *sequences, block=None, chunksize=1, ordered=True):
1216+
"""Parallel version of builtin `map`, load-balanced by this View.
12191217
12201218
`block`, and `chunksize` can be specified by keyword only.
12211219
@@ -1231,10 +1229,6 @@ def map(self, f, *sequences, **kwargs):
12311229
the sequences to be distributed and passed to `f`
12321230
block : bool [default self.block]
12331231
whether to wait for the result or not
1234-
track : bool
1235-
whether to create a MessageTracker to allow the user to
1236-
safely edit after arrays and buffers during non-copying
1237-
sends.
12381232
chunksize : int [default 1]
12391233
how many elements should be in each task.
12401234
ordered : bool [default True]
@@ -1256,14 +1250,8 @@ def map(self, f, *sequences, **kwargs):
12561250
"""
12571251

12581252
# default
1259-
block = kwargs.get('block', self.block)
1260-
chunksize = kwargs.get('chunksize', 1)
1261-
ordered = kwargs.get('ordered', True)
1262-
1263-
keyset = set(kwargs.keys())
1264-
extra_keys = keyset.difference_update(set(['block', 'chunksize']))
1265-
if extra_keys:
1266-
raise TypeError("Invalid kwargs: %s" % list(extra_keys))
1253+
if block is None:
1254+
block = self.block
12671255

12681256
assert len(sequences) > 0, "must have some sequences to map onto!"
12691257

@@ -1272,6 +1260,139 @@ def map(self, f, *sequences, **kwargs):
12721260
)
12731261
return pf.map(*sequences)
12741262

1263+
def imap(
1264+
self,
1265+
f,
1266+
*sequences,
1267+
ordered=True,
1268+
max_outstanding='auto',
1269+
):
1270+
"""Parallel version of lazily-evaluated `imap`, load-balanced by this View.
1271+
1272+
`ordered`, and `max_outstanding` can be specified by keyword only.
1273+
1274+
Unlike other map functions in IPython Parallel,
1275+
this one does not consume the full iterable before submitting work,
1276+
returning a single 'AsyncMapResult' representing the full computation.
1277+
1278+
Instead, it consumes iterables as they come, submitting up to `max_outstanding`
1279+
tasks to the cluster before waiting on results (default: one task per engine).
1280+
This allows it to work with infinite generators,
1281+
and avoid potentially expensive read-ahead for large streams of inputs
1282+
that may not fit in memory all at once.
1283+
1284+
.. versionadded: 7.0
1285+
1286+
Parameters
1287+
----------
1288+
f : callable
1289+
function to be mapped
1290+
*sequences : one or more sequences of matching length
1291+
the sequences to be distributed and passed to `f`
1292+
ordered : bool [default True]
1293+
Whether the results should be yielded on a first-come-first-yield basis,
1294+
or preserve the order of submission.
1295+
1296+
max_outstanding : int [default len(engines)]
1297+
The maximum number of tasks to be outstanding.
1298+
1299+
max_outstanding=0 will greedily consume the whole generator
1300+
(map_async may be more efficient).
1301+
1302+
A limit of 1 should be strictly worse than running a local map,
1303+
as there will be no parallelism.
1304+
1305+
Use this to tune how greedily input generator should be consumed.
1306+
1307+
Returns
1308+
-------
1309+
1310+
lazily-evaluated generator, yielding results of `f` on each item of sequences.
1311+
Yield-order depends on `ordered` argument.
1312+
"""
1313+
1314+
assert len(sequences) > 0, "must have some sequences to map onto!"
1315+
1316+
if max_outstanding == 'auto':
1317+
max_outstanding = len(self)
1318+
1319+
pf = PrePickled(f)
1320+
1321+
if ordered:
1322+
outstanding = deque()
1323+
1324+
def wait_for_ready():
1325+
ar = outstanding.popleft()
1326+
return [ar]
1327+
1328+
def should_yield():
1329+
# ordered: yield first result if it's ready
1330+
if outstanding[0].ready():
1331+
return True
1332+
1333+
if max_outstanding == 0:
1334+
# no limit
1335+
return False
1336+
1337+
# or if we've reached capacity (only counting still-outstanding computations)
1338+
# not counting locally available, but not yet yielded results
1339+
# TODO: should we limit the local?
1340+
# if consumers are much slower than producers,
1341+
# this can fill up local memory
1342+
return sum(not ar.ready() for ar in outstanding) >= max_outstanding
1343+
1344+
else:
1345+
outstanding = []
1346+
1347+
def wait_for_ready():
1348+
# unordered, yield whatever finishes first, as soon as it's ready
1349+
done, outstanding[:] = concurrent.futures.wait(
1350+
outstanding, return_when=concurrent.futures.FIRST_COMPLETED
1351+
)
1352+
return done
1353+
1354+
def should_yield():
1355+
# unordered, we are ready to yield if any result is ready
1356+
if any(ar.ready() for ar in outstanding):
1357+
return True
1358+
1359+
if max_outstanding == 0:
1360+
# no limit
1361+
return False
1362+
1363+
# or wait if we are full
1364+
if len(outstanding) >= max_outstanding:
1365+
return True
1366+
return False
1367+
1368+
# zip is a lazy iterator
1369+
for args in zip(*sequences):
1370+
# submit one work item
1371+
ar = self.apply_async(pf, *args)
1372+
outstanding.append(ar)
1373+
# count 'pending' tasks
1374+
# yield first result if it's ready
1375+
# *or* the number of outstanding tasks has reached our limit
1376+
# yielding immediately means
1377+
if should_yield():
1378+
for ready_ar in wait_for_ready():
1379+
yield ready_ar.get()
1380+
1381+
# we've filled the buffer, wait for at least one result before continuing
1382+
if len(outstanding) == max_outstanding:
1383+
for ready_ar in wait_for_ready():
1384+
yield ready_ar.get()
1385+
1386+
# yield any remaining results
1387+
if ordered:
1388+
for ar in outstanding:
1389+
yield ar.get()
1390+
else:
1391+
while outstanding:
1392+
done, outstanding = concurrent.futures.wait(outstanding)
1393+
for ar in done:
1394+
yield ar.get()
1395+
12751396
def register_joblib_backend(self, name='ipyparallel', make_default=False):
12761397
"""Register this View as a joblib parallel backend
12771398
@@ -1319,7 +1440,7 @@ def map(self, func, *iterables, **kwargs):
13191440
if 'timeout' in kwargs:
13201441
warnings.warn("timeout unsupported in ViewExecutor.map")
13211442
kwargs.pop('timeout')
1322-
for r in self.view.map_async(func, *iterables, **kwargs):
1443+
for r in self.view.imap(func, *iterables, **kwargs):
13231444
yield r
13241445

13251446
def shutdown(self, wait=True):

ipyparallel/tests/test_lbview.py

Lines changed: 83 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,12 @@
11
# -*- coding: utf-8 -*-
22
"""test LoadBalancedView objects"""
3-
import sys
43
import time
4+
from itertools import count
55

6-
import pytest
7-
import zmq
8-
9-
import ipyparallel as pmod
6+
import ipyparallel as ipp
107
from .clienttest import ClusterTestCase
118
from .clienttest import crash
12-
from .clienttest import skip_without
13-
from .clienttest import wait
149
from ipyparallel import error
15-
from ipyparallel.tests import add_engines
1610

1711

1812
class TestLoadBalancedView(ClusterTestCase):
@@ -89,7 +83,7 @@ def slow_f(x):
8983
reference = list(map(f, data))
9084

9185
amr = self.view.map_async(slow_f, data, ordered=False)
92-
self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
86+
self.assertTrue(isinstance(amr, ipp.AsyncMapResult))
9387
# check individual elements, retrieved as they come
9488
# list comprehension uses __iter__
9589
astheycame = [r for r in amr]
@@ -113,7 +107,7 @@ def slow_f(x):
113107
reference = list(map(f, data))
114108

115109
amr = self.view.map_async(slow_f, data)
116-
self.assertTrue(isinstance(amr, pmod.AsyncMapResult))
110+
self.assertTrue(isinstance(amr, ipp.AsyncMapResult))
117111
# check individual elements, retrieved as they come
118112
# list(amr) uses __iter__
119113
astheycame = list(amr)
@@ -131,6 +125,85 @@ def test_map_iterable(self):
131125
r = view.map_sync(lambda x: x, arr)
132126
self.assertEqual(r, list(arr))
133127

128+
def test_imap_max_outstanding(self):
129+
view = self.view
130+
131+
source = count()
132+
133+
def task(i):
134+
import time
135+
136+
time.sleep(0.1)
137+
return i
138+
139+
gen = view.imap(task, source, max_outstanding=5)
140+
# should submit at least max_outstanding
141+
first_result = next(gen)
142+
assert len(view.history) == 5
143+
# retrieving results should first result
144+
second_result = next(gen)
145+
assert 6 <= len(view.history) <= 8
146+
self.client.wait(timeout=self.timeout)
147+
148+
def test_imap_infinite(self):
149+
view = self.view
150+
151+
source = count()
152+
153+
def task(i):
154+
import time
155+
156+
time.sleep(0.1)
157+
return i
158+
159+
gen = view.imap(task, source, max_outstanding=2)
160+
results = []
161+
for i in gen:
162+
results.append(i)
163+
if i >= 3:
164+
break
165+
166+
assert len(results) == 4
167+
168+
# wait
169+
self.client.wait(timeout=self.timeout)
170+
# verify that max_outstanding wasn't exceeded
171+
assert 5 <= len(self.view.history) < 8
172+
173+
def test_imap_unordered(self):
174+
self.minimum_engines(4)
175+
view = self.view
176+
177+
source = count()
178+
179+
def yield_up_and_down(n):
180+
for i in range(n):
181+
if i % 4 == 0:
182+
yield 1 + i / 100
183+
else:
184+
yield i / 100
185+
186+
def task(t):
187+
import time
188+
189+
time.sleep(t)
190+
return t
191+
192+
gen = view.imap(task, yield_up_and_down(10), max_outstanding=2, ordered=False)
193+
results = []
194+
for i, t in enumerate(gen):
195+
results.append(t)
196+
if i >= 2:
197+
break
198+
assert len(results) == 3
199+
print(results)
200+
assert all([r < 1 for r in results])
201+
202+
# wait
203+
self.client.wait(timeout=self.timeout)
204+
# verify that max_outstanding wasn't exceeded
205+
assert 4 <= len(self.view.history) <= 6
206+
134207
def test_abort(self):
135208
view = self.view
136209
ar = self.client[:].apply_async(time.sleep, 0.5)

0 commit comments

Comments
 (0)