99
1010# Import ld_eventsource from parent directory
1111sys .path .insert (1 , os .path .join (sys .path [0 ], '..' ))
12- from ld_eventsource import *
13- from ld_eventsource .actions import *
14- from ld_eventsource .config import *
12+ from ld_eventsource import * # noqa: E402
13+ from ld_eventsource .actions import * # noqa: E402
14+ from ld_eventsource .config import * # noqa: E402
1515
1616http_client = urllib3 .PoolManager ()
1717
@@ -39,54 +39,59 @@ def run(self):
3939 connect = ConnectStrategy .http (
4040 url = stream_url ,
4141 headers = self .options .get ("headers" ),
42- urllib3_request_options = None if self .options .get ("readTimeoutMs" ) is None else {
43- "timeout" : urllib3 .Timeout (read = millis_to_seconds (self .options .get ("readTimeoutMs" )))
44- }
45- )
42+ urllib3_request_options = (
43+ None
44+ if self .options .get ("readTimeoutMs" ) is None
45+ else {
46+ "timeout" : urllib3 .Timeout (
47+ read = millis_to_seconds (self .options .get ("readTimeoutMs" ))
48+ )
49+ }
50+ ),
51+ )
4652 sse = SSEClient (
4753 connect ,
48- initial_retry_delay = millis_to_seconds (self .options .get ("initialDelayMs" )),
54+ initial_retry_delay = millis_to_seconds (
55+ self .options .get ("initialDelayMs" )
56+ ),
4957 last_event_id = self .options .get ("lastEventId" ),
50- error_strategy = ErrorStrategy .from_lambda (lambda _ :
51- (ErrorStrategy .FAIL if self .closed else ErrorStrategy .CONTINUE , None )),
52- logger = self .log
58+ error_strategy = ErrorStrategy .from_lambda (
59+ lambda _ : (
60+ ErrorStrategy .FAIL if self .closed else ErrorStrategy .CONTINUE ,
61+ None ,
62+ )
63+ ),
64+ logger = self .log ,
5365 )
5466 self .sse = sse
5567 for item in sse .all :
5668 if isinstance (item , Event ):
5769 self .log .info ('Received event from stream (%s)' , item .event )
58- self .send_message ({
59- 'kind' : 'event' ,
60- 'event' : {
61- 'type' : item .event ,
62- 'data' : item .data ,
63- 'id' : item .last_event_id
70+ self .send_message (
71+ {
72+ 'kind' : 'event' ,
73+ 'event' : {
74+ 'type' : item .event ,
75+ 'data' : item .data ,
76+ 'id' : item .last_event_id ,
77+ },
6478 }
65- } )
79+ )
6680 elif isinstance (item , Comment ):
6781 self .log .info ('Received comment from stream: %s' , item .comment )
68- self .send_message ({
69- 'kind' : 'comment' ,
70- 'comment' : item .comment
71- })
82+ self .send_message ({'kind' : 'comment' , 'comment' : item .comment })
7283 elif isinstance (item , Fault ):
7384 if self .closed :
7485 break
7586 # item.error will be None if this is just an EOF rather than an I/O error or HTTP error.
7687 # Currently the test harness does not expect us to send an error message in that case.
7788 if item .error :
7889 self .log .info ('Received error from stream: %s' % item .error )
79- self .send_message ({
80- 'kind' : 'error' ,
81- 'error' : str (item .error )
82- })
90+ self .send_message ({'kind' : 'error' , 'error' : str (item .error )})
8391 except Exception as e :
8492 self .log .info ('Received error from stream: %s' , e )
8593 self .log .info (traceback .format_exc ())
86- self .send_message ({
87- 'kind' : 'error' ,
88- 'error' : str (e )
89- })
94+ self .send_message ({'kind' : 'error' , 'error' : str (e )})
9095
9196 def do_command (self , command : str ) -> bool :
9297 self .log .info ('Test service sent command: %s' % command )
@@ -105,9 +110,9 @@ def send_message(self, message):
105110 resp = http_client .request (
106111 'POST' ,
107112 callback_url ,
108- headers = {'Content-Type' : 'application/json' },
109- body = json .dumps (message )
110- )
113+ headers = {'Content-Type' : 'application/json' },
114+ body = json .dumps (message ),
115+ )
111116 if resp .status >= 300 and not self .closed :
112117 self .log .error ('Callback request returned HTTP error %d' , resp .status )
113118 except Exception as e :
0 commit comments