Skip to content

Commit 167aa6d

Browse files
committed
Add more edge-case tests for .imap()/.imap_unodered
These tests mostly come from a similar PR adding `buffersize` param to `concurrent.futures.Executor.map` - https://github.com/python/cpython/pull/125663/files
1 parent 07da591 commit 167aa6d

File tree

1 file changed

+122
-58
lines changed

1 file changed

+122
-58
lines changed

Lib/test/_test_multiprocessing.py

Lines changed: 122 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -2953,64 +2953,6 @@ def test_imap(self):
29532953
self.assertEqual(next(it), i * i)
29542954
self.assertRaises(StopIteration, it.__next__)
29552955

2956-
def test_imap_fast_iterable_with_slow_task(self):
2957-
if self.TYPE != "threads":
2958-
self.skipTest("test not appropriate for {}".format(self.TYPE))
2959-
2960-
processes = 4
2961-
p = self.Pool(processes)
2962-
2963-
tasks_started_later = 2
2964-
last_produced_task_arg = Value("i")
2965-
2966-
def produce_args():
2967-
for arg in range(1, processes + tasks_started_later + 1):
2968-
last_produced_task_arg.value = arg
2969-
yield arg
2970-
2971-
it = p.imap(functools.partial(sqr, wait=0.2), produce_args())
2972-
2973-
next(it)
2974-
time.sleep(0.2)
2975-
# `iterable` should've been advanced only up by `processes` times,
2976-
# but in fact advances further (by `>=processes+1`).
2977-
# In this case, it advances to the maximum value.
2978-
self.assertGreater(last_produced_task_arg.value, processes + 1)
2979-
2980-
p.terminate()
2981-
p.join()
2982-
2983-
def test_imap_fast_iterable_with_slow_task_and_buffersize(self):
2984-
if self.TYPE != "threads":
2985-
self.skipTest("test not appropriate for {}".format(self.TYPE))
2986-
2987-
processes = 4
2988-
p = self.Pool(processes)
2989-
2990-
tasks_started_later = 2
2991-
last_produced_task_arg = Value("i")
2992-
2993-
def produce_args():
2994-
for arg in range(1, processes + tasks_started_later + 1):
2995-
last_produced_task_arg.value = arg
2996-
yield arg
2997-
2998-
it = p.imap(
2999-
functools.partial(sqr, wait=0.2),
3000-
produce_args(),
3001-
buffersize=processes,
3002-
)
3003-
3004-
time.sleep(0.2)
3005-
self.assertEqual(last_produced_task_arg.value, processes)
3006-
3007-
next(it)
3008-
time.sleep(0.2)
3009-
self.assertEqual(last_produced_task_arg.value, processes + 1)
3010-
3011-
p.terminate()
3012-
p.join()
3013-
30142956
def test_imap_handle_iterable_exception(self):
30152957
if self.TYPE == 'manager':
30162958
self.skipTest('test not appropriate for {}'.format(self.TYPE))
@@ -3101,6 +3043,128 @@ def test_imap_unordered_handle_iterable_exception(self):
31013043
self.assertIn(value, expected_values)
31023044
expected_values.remove(value)
31033045

3046+
def test_imap_and_imap_unordered_buffersize_type_validation(self):
3047+
for method_name in ("imap", "imap_unordered"):
3048+
for buffersize in ("foo", 2.0):
3049+
with (
3050+
self.subTest(method=method_name, buffersize=buffersize),
3051+
self.assertRaisesRegex(
3052+
TypeError, "buffersize must be an integer or None"
3053+
),
3054+
):
3055+
method = getattr(self.pool, method_name)
3056+
method(str, range(4), buffersize=buffersize)
3057+
3058+
def test_imap_and_imap_unordered_buffersize_value_validation(self):
3059+
for method_name in ("imap", "imap_unordered"):
3060+
for buffersize in (0, -1):
3061+
with (
3062+
self.subTest(method=method_name, buffersize=buffersize),
3063+
self.assertRaisesRegex(
3064+
ValueError, "buffersize must be None or > 0"
3065+
),
3066+
):
3067+
method = getattr(self.pool, method_name)
3068+
method(str, range(4), buffersize=buffersize)
3069+
3070+
def test_imap_and_imap_unordered_when_buffer_is_full(self):
3071+
if self.TYPE != "threads":
3072+
self.skipTest("test not appropriate for {}".format(self.TYPE))
3073+
3074+
for method_name in ("imap", "imap_unordered"):
3075+
with self.subTest(method=method_name):
3076+
processes = 4
3077+
p = self.Pool(processes)
3078+
last_produced_task_arg = Value("i")
3079+
3080+
def produce_args():
3081+
for arg in itertools.count(1):
3082+
last_produced_task_arg.value = arg
3083+
yield arg
3084+
3085+
method = getattr(p, method_name)
3086+
it = method(functools.partial(sqr, wait=0.2), produce_args())
3087+
3088+
time.sleep(0.2)
3089+
# `iterable` could've been advanced only `processes` times,
3090+
# but in fact it advances further (`> processes`) because of
3091+
# not waiting for workers or user code to catch up.
3092+
self.assertGreater(last_produced_task_arg.value, processes)
3093+
3094+
next(it)
3095+
time.sleep(0.2)
3096+
self.assertGreater(last_produced_task_arg.value, processes + 1)
3097+
3098+
next(it)
3099+
time.sleep(0.2)
3100+
self.assertGreater(last_produced_task_arg.value, processes + 2)
3101+
3102+
p.terminate()
3103+
p.join()
3104+
3105+
def test_imap_and_imap_unordered_buffersize_when_buffer_is_full(self):
3106+
if self.TYPE != "threads":
3107+
self.skipTest("test not appropriate for {}".format(self.TYPE))
3108+
3109+
for method_name in ("imap", "imap_unordered"):
3110+
with self.subTest(method=method_name):
3111+
processes = 4
3112+
p = self.Pool(processes)
3113+
last_produced_task_arg = Value("i")
3114+
3115+
def produce_args():
3116+
for arg in itertools.count(1):
3117+
last_produced_task_arg.value = arg
3118+
yield arg
3119+
3120+
method = getattr(p, method_name)
3121+
it = method(
3122+
functools.partial(sqr, wait=0.2),
3123+
produce_args(),
3124+
buffersize=processes,
3125+
)
3126+
3127+
time.sleep(0.2)
3128+
self.assertEqual(last_produced_task_arg.value, processes)
3129+
3130+
next(it)
3131+
time.sleep(0.2)
3132+
self.assertEqual(last_produced_task_arg.value, processes + 1)
3133+
3134+
next(it)
3135+
time.sleep(0.2)
3136+
self.assertEqual(last_produced_task_arg.value, processes + 2)
3137+
3138+
p.terminate()
3139+
p.join()
3140+
3141+
def test_imap_and_imap_unordered_buffersize_on_infinite_iterable(self):
3142+
if self.TYPE != "threads":
3143+
self.skipTest("test not appropriate for {}".format(self.TYPE))
3144+
3145+
for method_name in ("imap", "imap_unordered"):
3146+
with self.subTest(method=method_name):
3147+
p = self.Pool(4)
3148+
method = getattr(p, method_name)
3149+
3150+
res = method(str, itertools.count(), buffersize=2)
3151+
3152+
self.assertEqual(next(res, None), "0")
3153+
self.assertEqual(next(res, None), "1")
3154+
self.assertEqual(next(res, None), "2")
3155+
3156+
p.terminate()
3157+
p.join()
3158+
3159+
def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self):
3160+
for method_name in ("imap", "imap_unordered"):
3161+
with self.subTest(method=method_name):
3162+
method = getattr(self.pool, method_name)
3163+
3164+
res = method(str, [], buffersize=2)
3165+
3166+
self.assertIsNone(next(res, None))
3167+
31043168
def test_make_pool(self):
31053169
expected_error = (RemoteError if self.TYPE == 'manager'
31063170
else ValueError)

0 commit comments

Comments
 (0)