@@ -36,15 +36,13 @@ def __init__(self, *, listening_addr: str='127.0.0.1',
36
36
self .listen_task = None
37
37
38
38
async def _wait (self , work ):
39
- work_task = asyncio .ensure_future (work , loop = self .loop )
40
- stop_event_task = asyncio .ensure_future (self .stop_event .wait (),
41
- loop = self .loop )
39
+ work_task = asyncio .ensure_future (work )
40
+ stop_event_task = asyncio .ensure_future (self .stop_event .wait ())
42
41
43
42
try :
44
43
await asyncio .wait (
45
44
[work_task , stop_event_task ],
46
- return_when = asyncio .FIRST_COMPLETED ,
47
- loop = self .loop )
45
+ return_when = asyncio .FIRST_COMPLETED )
48
46
49
47
if self .stop_event .is_set ():
50
48
raise StopServer ()
@@ -58,7 +56,8 @@ async def _wait(self, work):
58
56
59
57
def start (self ):
60
58
started = threading .Event ()
61
- self .thread = threading .Thread (target = self ._start , args = (started ,))
59
+ self .thread = threading .Thread (
60
+ target = self ._start_thread , args = (started ,))
62
61
self .thread .start ()
63
62
if not started .wait (timeout = 2 ):
64
63
raise RuntimeError ('fuzzer proxy failed to start' )
@@ -70,13 +69,14 @@ def stop(self):
70
69
def _stop (self ):
71
70
self .stop_event .set ()
72
71
73
- def _start (self , started_event ):
72
+ def _start_thread (self , started_event ):
74
73
self .loop = asyncio .new_event_loop ()
74
+ asyncio .set_event_loop (self .loop )
75
75
76
- self .connectivity = asyncio .Event (loop = self . loop )
76
+ self .connectivity = asyncio .Event ()
77
77
self .connectivity .set ()
78
- self .connectivity_loss = asyncio .Event (loop = self . loop )
79
- self .stop_event = asyncio .Event (loop = self . loop )
78
+ self .connectivity_loss = asyncio .Event ()
79
+ self .stop_event = asyncio .Event ()
80
80
81
81
if self .listening_port is None :
82
82
self .listening_port = cluster .find_available_port ()
@@ -92,15 +92,15 @@ def _start(self, started_event):
92
92
self .loop .close ()
93
93
94
94
async def _main (self , started_event ):
95
- self .listen_task = asyncio .ensure_future (self .listen (), loop = self . loop )
95
+ self .listen_task = asyncio .ensure_future (self .listen ())
96
96
# Notify the main thread that we are ready to go.
97
97
started_event .set ()
98
98
try :
99
99
await self .listen_task
100
100
finally :
101
101
for c in list (self .connections ):
102
102
c .close ()
103
- await asyncio .sleep (0.01 , loop = self . loop )
103
+ await asyncio .sleep (0.01 )
104
104
if hasattr (self .loop , 'remove_reader' ):
105
105
self .loop .remove_reader (self .sock .fileno ())
106
106
self .sock .close ()
@@ -176,15 +176,15 @@ def close(self):
176
176
177
177
async def handle (self ):
178
178
self .proxy_to_backend_task = asyncio .ensure_future (
179
- self .proxy_to_backend (), loop = self . loop )
179
+ self .proxy_to_backend ())
180
180
181
181
self .proxy_from_backend_task = asyncio .ensure_future (
182
- self .proxy_from_backend (), loop = self . loop )
182
+ self .proxy_from_backend ())
183
183
184
184
try :
185
185
await asyncio .wait (
186
186
[self .proxy_to_backend_task , self .proxy_from_backend_task ],
187
- loop = self . loop , return_when = asyncio .FIRST_COMPLETED )
187
+ return_when = asyncio .FIRST_COMPLETED )
188
188
189
189
finally :
190
190
# Asyncio fails to properly remove the readers and writers
@@ -201,17 +201,14 @@ async def handle(self):
201
201
202
202
async def _read (self , sock , n ):
203
203
read_task = asyncio .ensure_future (
204
- self .loop .sock_recv (sock , n ),
205
- loop = self .loop )
204
+ self .loop .sock_recv (sock , n ))
206
205
conn_event_task = asyncio .ensure_future (
207
- self .connectivity_loss .wait (),
208
- loop = self .loop )
206
+ self .connectivity_loss .wait ())
209
207
210
208
try :
211
209
await asyncio .wait (
212
210
[read_task , conn_event_task ],
213
- return_when = asyncio .FIRST_COMPLETED ,
214
- loop = self .loop )
211
+ return_when = asyncio .FIRST_COMPLETED )
215
212
216
213
if self .connectivity_loss .is_set ():
217
214
return None
@@ -225,15 +222,14 @@ async def _read(self, sock, n):
225
222
226
223
async def _write (self , sock , data ):
227
224
write_task = asyncio .ensure_future (
228
- self .loop .sock_sendall (sock , data ), loop = self . loop )
225
+ self .loop .sock_sendall (sock , data ))
229
226
conn_event_task = asyncio .ensure_future (
230
- self .connectivity_loss .wait (), loop = self . loop )
227
+ self .connectivity_loss .wait ())
231
228
232
229
try :
233
230
await asyncio .wait (
234
231
[write_task , conn_event_task ],
235
- return_when = asyncio .FIRST_COMPLETED ,
236
- loop = self .loop )
232
+ return_when = asyncio .FIRST_COMPLETED )
237
233
238
234
if self .connectivity_loss .is_set ():
239
235
return None
0 commit comments