5
5
import pytest
6
6
7
7
from ..learner import Learner1D , Learner2D
8
- from ..runner import simple , BlockingRunner , AsyncRunner , SequentialExecutor
8
+ from ..runner import (simple , BlockingRunner , AsyncRunner , SequentialExecutor ,
9
+ with_ipyparallel , with_distributed )
9
10
10
11
11
12
def blocking_runner (learner , goal ):
@@ -19,15 +20,18 @@ def async_runner(learner, goal):
19
20
20
21
runners = [simple , blocking_runner , async_runner ]
21
22
23
+
22
24
def trivial_goal (learner ):
23
25
return learner .npoints > 10
24
26
27
+
25
28
@pytest .mark .parametrize ('runner' , runners )
26
29
def test_simple (runner ):
27
30
"""Test that the runners actually run."""
28
31
29
32
def f (x ):
30
33
return x
34
+
31
35
learner = Learner1D (f , (- 1 , 1 ))
32
36
runner (learner , lambda l : l .npoints > 10 )
33
37
assert len (learner .data ) > 10
@@ -54,3 +58,52 @@ async def f(x):
54
58
learner = Learner1D (f , (- 1 , 1 ))
55
59
runner = AsyncRunner (learner , trivial_goal )
56
60
asyncio .get_event_loop ().run_until_complete (runner .task )
61
+
62
+
63
+ ### Test with different executors
64
+
65
+ @pytest .fixture (scope = "session" )
66
+ def ipyparallel_executor ():
67
+ from ipyparallel import Client
68
+ import pexpect
69
+
70
+ child = pexpect .spawn ('ipcluster start -n 1' )
71
+ child .expect ('Engines appear to have started successfully' , timeout = 35 )
72
+ yield Client ()
73
+ if not child .terminate (force = True ):
74
+ raise RuntimeError ('Could not stop ipcluster' )
75
+
76
+
77
+ @pytest .fixture (scope = "session" )
78
+ def dask_executor ():
79
+ from distributed import LocalCluster , Client
80
+
81
+ client = Client (n_workers = 1 )
82
+ yield client
83
+ client .close ()
84
+
85
+
86
+ def linear (x ):
87
+ return x
88
+
89
+
90
+ def test_concurrent_futures_executor ():
91
+ from concurrent .futures import ProcessPoolExecutor
92
+ BlockingRunner (Learner1D (linear , (- 1 , 1 )), trivial_goal ,
93
+ executor = ProcessPoolExecutor (max_workers = 1 ))
94
+
95
+
96
+ @pytest .mark .skipif (not with_ipyparallel , reason = 'IPyparallel is not installed' )
97
+ def test_ipyparallel_executor (ipyparallel_executor ):
98
+ learner = Learner1D (linear , (- 1 , 1 ))
99
+ BlockingRunner (learner , trivial_goal ,
100
+ executor = ipyparallel_executor )
101
+ assert learner .npoints > 0
102
+
103
+
104
+ @pytest .mark .skipif (not with_distributed , reason = 'dask.distributed is not installed' )
105
+ def test_distributed_executor (dask_executor ):
106
+ learner = Learner1D (linear , (- 1 , 1 ))
107
+ BlockingRunner (learner , trivial_goal ,
108
+ executor = dask_executor )
109
+ assert learner .npoints > 0
0 commit comments