|  | 
|  | 1 | +import itertools | 
|  | 2 | +from multiprocessing import Manager | 
| 1 | 3 | import threading | 
| 2 | 4 | import time | 
| 3 | 5 | import weakref | 
| @@ -69,6 +71,50 @@ def test_map_timeout(self): | 
| 69 | 71 | 
 | 
| 70 | 72 |         self.assertEqual([None, None], results) | 
| 71 | 73 | 
 | 
|  | 74 | +    def test_map_args(self): | 
|  | 75 | +        with self.assertRaisesRegex( | 
|  | 76 | +            ValueError, "buffersize must be None or >= 1." | 
|  | 77 | +        ): | 
|  | 78 | +            self.executor.map(bool, [], buffersize=0) | 
|  | 79 | +        with self.assertRaisesRegex( | 
|  | 80 | +            ValueError, "cannot specify both buffersize and timeout." | 
|  | 81 | +        ): | 
|  | 82 | +            self.executor.map(bool, [], timeout=1, buffersize=1) | 
|  | 83 | + | 
|  | 84 | +    def test_map_infinite_iterable(self): | 
|  | 85 | +        self.assertEqual( | 
|  | 86 | +            next(iter(self.executor.map(str, itertools.count(1), buffersize=1))), | 
|  | 87 | +            "1", | 
|  | 88 | +        ) | 
|  | 89 | + | 
|  | 90 | +    def test_map_buffersize(self): | 
|  | 91 | +        manager = Manager() | 
|  | 92 | + | 
|  | 93 | +        for buffersize, iterable_size in [ | 
|  | 94 | +            (1, 5), | 
|  | 95 | +            (5, 5), | 
|  | 96 | +            (10, 5), | 
|  | 97 | +        ]: | 
|  | 98 | +            iterable = range(iterable_size) | 
|  | 99 | +            processed_elements = manager.list() | 
|  | 100 | + | 
|  | 101 | +            iterator = self.executor.map( | 
|  | 102 | +                processed_elements.append, iterable, buffersize=buffersize | 
|  | 103 | +            ) | 
|  | 104 | +            time.sleep(1)  # wait for buffered futures to finish | 
|  | 105 | +            self.assertSetEqual( | 
|  | 106 | +                set(processed_elements), | 
|  | 107 | +                set(range(min(buffersize, iterable_size))), | 
|  | 108 | +            ) | 
|  | 109 | +            next(iterator) | 
|  | 110 | +            time.sleep(1)  # wait for the created future to finish | 
|  | 111 | +            self.assertSetEqual( | 
|  | 112 | +                set(processed_elements), | 
|  | 113 | +                set(range(min(buffersize + 1, iterable_size))), | 
|  | 114 | +            ) | 
|  | 115 | + | 
|  | 116 | + | 
|  | 117 | + | 
| 72 | 118 |     def test_shutdown_race_issue12456(self): | 
| 73 | 119 |         # Issue #12456: race condition at shutdown where trying to post a | 
| 74 | 120 |         # sentinel in the call queue blocks (the queue is full while processes | 
|  | 
0 commit comments