11#!/usr/bin/env python
22#
3- # Copyright CEA/DAM/DIF (2010-2015 )
3+ # Copyright CEA/DAM/DIF (2010-2016 )
44# Contributor: Henri DOREAU <[email protected] > 55# Contributor: Stephane THIELL <[email protected] > 66#
@@ -71,18 +71,26 @@ def gateway_excepthook(exc_type, exc_value, tb):
7171
7272class WorkerTreeResponder (EventHandler ):
7373 """Gateway WorkerTree handler"""
74+
7475 def __init__ (self , task , gwchan , srcwkr ):
7576 EventHandler .__init__ (self )
7677 self .gwchan = gwchan # gateway channel
7778 self .srcwkr = srcwkr # id of distant parent WorkerTree
7879 self .worker = None # local WorkerTree instance
79- # For messages grooming
80- qdelay = task .info ("grooming_delay" )
81- self .timer = task .timer (qdelay , self , qdelay , autoclose = True )
80+ self .retcodes = {} # self-managed retcodes
8281 self .logger = logging .getLogger (__name__ )
83- self .logger .debug ("WorkerTreeResponder: initialized" )
84- # self-managed retcodes
85- self .retcodes = {}
82+
83+ # Grooming initialization
84+ self .timer = None
85+ qdelay = task .info ("grooming_delay" )
86+ if qdelay > 1.0e-3 :
87+ # Enable messages and rc grooming - enable msgtree (#181)
88+ task .set_default ("stdout_msgtree" , True )
89+ task .set_default ("stderr_msgtree" , True )
90+ # create auto-closing timer object for grooming
91+ self .timer = task .timer (qdelay , self , qdelay , autoclose = True )
92+
93+ self .logger .debug ("WorkerTreeResponder initialized grooming=%f" , qdelay )
8694
8795 def ev_start (self , worker ):
8896 self .logger .debug ("WorkerTreeResponder: ev_start" )
@@ -96,14 +104,14 @@ def ev_timer(self, timer):
96104
97105 # check for grooming opportunities for stdout/stderr
98106 for msg_elem , nodes in self .worker .iter_errors ():
99- logger .debug ("iter(stderr): %s: %d bytes" % \
100- ( nodes , len (msg_elem .message () )))
101- self .gwchan .send (StdErrMessage (nodes , msg_elem .message (), \
107+ logger .debug ("iter(stderr): %s: %d bytes" , nodes ,
108+ len (msg_elem .message ()))
109+ self .gwchan .send (StdErrMessage (nodes , msg_elem .message (),
102110 self .srcwkr ))
103111 for msg_elem , nodes in self .worker .iter_buffers ():
104- logger .debug ("iter(stdout): %s: %d bytes" % \
105- ( nodes , len (msg_elem .message () )))
106- self .gwchan .send (StdOutMessage (nodes , msg_elem .message (), \
112+ logger .debug ("iter(stdout): %s: %d bytes" , nodes ,
113+ len (msg_elem .message ()))
114+ self .gwchan .send (StdOutMessage (nodes , msg_elem .message (),
107115 self .srcwkr ))
108116 # empty internal MsgTree buffers
109117 self .worker .flush_buffers ()
@@ -113,39 +121,58 @@ def ev_timer(self, timer):
113121 # retcodes to parent node, instead of doing it at ev_hup (no msg
114122 # aggregation) or at ev_close (no parent node live updates)
115123 for rc , nodes in self .retcodes .iteritems ():
116- self .logger .debug ("iter(rc): %s: rc=%d" % ( nodes , rc ) )
124+ self .logger .debug ("iter(rc): %s: rc=%d" , nodes , rc )
117125 self .gwchan .send (RetcodeMessage (nodes , rc , self .srcwkr ))
118126 self .retcodes .clear ()
119127
128+ def ev_read (self , worker ):
129+ """message received on stdout"""
130+ if self .timer is None :
131+ self .gwchan .send (StdOutMessage (worker .current_node ,
132+ worker .current_msg ,
133+ self .srcwkr ))
134+
120135 def ev_error (self , worker ):
121- self .logger .debug ("WorkerTreeResponder: ev_error %s" % \
122- worker .current_errmsg )
136+ """message received on stderr"""
137+ self .logger .debug ("WorkerTreeResponder: ev_error %s %s" ,
138+ worker .current_node ,
139+ worker .current_errmsg )
140+ if self .timer is None :
141+ self .gwchan .send (StdErrMessage (worker .current_node ,
142+ worker .current_errmsg ,
143+ self .srcwkr ))
123144
124145 def ev_timeout (self , worker ):
125146 """Received timeout event: some nodes did timeout"""
126- self .gwchan .send (TimeoutMessage ( \
127- NodeSet ._fromlist1 (worker .iter_keys_timeout ()), self .srcwkr ))
147+ msg = TimeoutMessage (NodeSet ._fromlist1 (worker .iter_keys_timeout ()),
148+ self .srcwkr )
149+ self .gwchan .send (msg )
128150
129151 def ev_hup (self , worker ):
130152 """Received end of command from one node"""
131- if worker .current_rc in self .retcodes :
132- self .retcodes [worker .current_rc ].add (worker .current_node )
153+ if self .timer is None :
154+ self .gwchan .send (RetcodeMessage (worker .current_node ,
155+ worker .current_rc ,
156+ self .srcwkr ))
133157 else :
134- self .retcodes [worker .current_rc ] = NodeSet (worker .current_node )
158+ # retcode grooming
159+ if worker .current_rc in self .retcodes :
160+ self .retcodes [worker .current_rc ].add (worker .current_node )
161+ else :
162+ self .retcodes [worker .current_rc ] = NodeSet (worker .current_node )
135163
136164 def ev_close (self , worker ):
137165 """End of CTL responder"""
138166 self .logger .debug ("WorkerTreeResponder: ev_close" )
139- # finalize grooming
140- self .ev_timer (None )
141- self .timer .invalidate ()
167+ if self .timer is not None :
168+ # finalize grooming
169+ self .ev_timer (None )
170+ self .timer .invalidate ()
142171
143172
144173class GatewayChannel (Channel ):
145174 """high level logic for gateways"""
146175 def __init__ (self , task ):
147- """
148- """
149176 Channel .__init__ (self , error_response = True )
150177 self .task = task
151178 self .nodename = None
@@ -222,7 +249,7 @@ def recv_cfg(self, msg):
222249 # topology
223250 task_self ().topology = self .topology = msg .data_decode ()
224251 self .logger .debug ('decoded propagation tree' )
225- self .logger .debug ('\n %s' % self .topology )
252+ self .logger .debug ('\n %s' , self .topology )
226253 self .setup = True
227254 self ._ack (msg )
228255
@@ -273,7 +300,7 @@ def recv_ctl(self, msg):
273300 self ._ack (msg )
274301 elif msg .action == 'write' :
275302 data = msg .data_decode ()
276- self .logger .debug ('GatewayChannel write: %d bytes' , \
303+ self .logger .debug ('GatewayChannel write: %d bytes' ,
277304 len (data ['buf' ]))
278305 self .propagation .write (data ['buf' ])
279306 self ._ack (msg )
@@ -314,7 +341,7 @@ def gateway_main():
314341 sys .excepthook = gateway_excepthook
315342
316343 logger .debug ('Starting gateway on %s' , host )
317- logger .debug ("environ=%s" % os .environ )
344+ logger .debug ("environ=%s" , os .environ )
318345
319346
320347 set_nonblock_flag (sys .stdin .fileno ())
@@ -323,9 +350,9 @@ def gateway_main():
323350
324351 task = task_self ()
325352
326- # Pre-enable MsgTree buffering on gateway (FIXME)
327- task .set_default ("stdout_msgtree" , True )
328- task .set_default ("stderr_msgtree" , True )
353+ # Disable MsgTree buffering, it is enabled later when needed
354+ task .set_default ("stdout_msgtree" , False )
355+ task .set_default ("stderr_msgtree" , False )
329356
330357 if sys .stdin .isatty ():
331358 logger .critical ('Gateway failure: sys.stdin.isatty() is True' )
@@ -345,10 +372,10 @@ def gateway_main():
345372 except EngineAbortException , exc :
346373 logger .debug ('EngineAbortException' )
347374 except IOError , exc :
348- logger .debug ('Broken pipe (%s)' % exc )
375+ logger .debug ('Broken pipe (%s)' , exc )
349376 raise
350377 except Exception , exc :
351- logger .exception ('Gateway failure: %s' % exc )
378+ logger .exception ('Gateway failure: %s' , exc )
352379 logger .debug ('-------- The End --------' )
353380
354381if __name__ == '__main__' :
0 commit comments