@@ -17,7 +17,6 @@ limitations under the License.
17
17
package plugin
18
18
19
19
import (
20
- "context"
21
20
"fmt"
22
21
"io"
23
22
"os/exec"
@@ -144,15 +143,15 @@ func readFromReader(reader io.ReadCloser, maxBytes int64) ([]byte, error) {
144
143
}
145
144
146
145
func (p * Plugin ) run (rule cpmtypes.CustomRule ) (exitStatus cpmtypes.Status , output string ) {
147
- var ctx context. Context
148
- var cancel context. CancelFunc
146
+ isTimeout := false
147
+ isHung := false
149
148
149
+ var timeoutDuration time.Duration
150
150
if rule .Timeout != nil && * rule .Timeout < * p .config .PluginGlobalConfig .Timeout {
151
- ctx , cancel = context . WithTimeout ( context . Background (), * rule .Timeout )
151
+ timeoutDuration = * rule .Timeout
152
152
} else {
153
- ctx , cancel = context . WithTimeout ( context . Background (), * p .config .PluginGlobalConfig .Timeout )
153
+ timeoutDuration = * p .config .PluginGlobalConfig .Timeout
154
154
}
155
- defer cancel ()
156
155
157
156
cmd := util .Exec (rule .Path , rule .Args ... )
158
157
@@ -171,37 +170,6 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
171
170
return cpmtypes .Unknown , "Error in starting plugin. Please check the error log"
172
171
}
173
172
174
- waitChan := make (chan struct {})
175
- defer close (waitChan )
176
-
177
- var m sync.Mutex
178
- timeout := false
179
-
180
- go func () {
181
- select {
182
- case <- ctx .Done ():
183
- if ctx .Err () == context .Canceled {
184
- return
185
- }
186
- klog .Errorf ("Error in running plugin timeout %q" , rule .Path )
187
- if cmd .Process == nil || cmd .Process .Pid == 0 {
188
- klog .Errorf ("Error in cmd.Process check %q" , rule .Path )
189
- break
190
- }
191
-
192
- m .Lock ()
193
- timeout = true
194
- m .Unlock ()
195
-
196
- err := util .Kill (cmd )
197
- if err != nil {
198
- klog .Errorf ("Error in kill process %d, %v" , cmd .Process .Pid , err )
199
- }
200
- case <- waitChan :
201
- return
202
- }
203
- }()
204
-
205
173
var (
206
174
wg sync.WaitGroup
207
175
stdout []byte
@@ -221,14 +189,46 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
221
189
}()
222
190
// This will wait for the reads to complete. If the execution times out, the pipes
223
191
// will be closed and the wait group unblocks.
224
- wg .Wait ()
192
+ // If the timeout is caused by the plugin process or sub-process hung due to GPU device errors or other reasons,
193
+ // wg.Wait() will be blocked forever, so we need to add a timeout to the wait group.
194
+ waitChan := make (chan struct {})
195
+ go func () {
196
+ wg .Wait ()
197
+ close (waitChan )
198
+ }()
199
+ select {
200
+ case <- waitChan :
201
+ // The reads are done.
202
+ break
203
+ case <- time .After (timeoutDuration ):
204
+ klog .Errorf ("Waiting for command output timed out when running plugin %q" , rule .Path )
205
+ isTimeout = true
206
+ err := util .Kill (cmd )
207
+ if err != nil {
208
+ klog .Errorf ("Error when killing process %d: %v" , cmd .Process .Pid , err )
209
+ } else {
210
+ klog .Infof ("Killed process %d successfully" , cmd .Process .Pid )
211
+ }
225
212
226
- if stdoutErr != nil {
213
+ // Check if the process is in D state. If it is, the process is hung and can not be killed.
214
+ // It also means that the plugin can not report the correct status, instead reports Unknown status.
215
+ // On a GPU machine, a plugin with Python script calling pynvml API may hang in D state due to some GPU device errors.
216
+ if util .IsProcessInDState (cmd .Process .Pid ) {
217
+ klog .Errorf ("Process %d is hung in D state" , cmd .Process .Pid )
218
+ isHung = true
219
+ }
220
+ }
221
+
222
+ if isHung {
223
+ return cpmtypes .Unknown , fmt .Sprintf ("Process is hung when running plugin %s" , rule .Path )
224
+ }
225
+
226
+ if ! isTimeout && stdoutErr != nil {
227
227
klog .Errorf ("Error reading stdout for plugin %q: error - %v" , rule .Path , err )
228
228
return cpmtypes .Unknown , "Error reading stdout for plugin. Please check the error log"
229
229
}
230
230
231
- if stderrErr != nil {
231
+ if ! isTimeout && stderrErr != nil {
232
232
klog .Errorf ("Error reading stderr for plugin %q: error - %v" , rule .Path , err )
233
233
return cpmtypes .Unknown , "Error reading stderr for plugin. Please check the error log"
234
234
}
@@ -240,16 +240,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
240
240
}
241
241
}
242
242
243
- // trim suffix useless bytes
244
- output = string (stdout )
245
- output = strings .TrimSpace (output )
246
-
247
- m .Lock ()
248
- cmdKilled := timeout
249
- m .Unlock ()
250
-
251
- if cmdKilled {
252
- output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), output )
243
+ stderrStr := ""
244
+ if isTimeout {
245
+ output = fmt .Sprintf ("Timeout when running plugin %q: state - %s. output - %q" , rule .Path , cmd .ProcessState .String (), "" )
246
+ } else {
247
+ // trim suffix useless bytes
248
+ output = strings .TrimSpace (string (stdout ))
249
+ stderrStr = strings .TrimSpace (string (stderr ))
253
250
}
254
251
255
252
// cut at position max_output_length if stdout is longer than max_output_length bytes
@@ -260,13 +257,13 @@ func (p *Plugin) run(rule cpmtypes.CustomRule) (exitStatus cpmtypes.Status, outp
260
257
exitCode := cmd .ProcessState .Sys ().(syscall.WaitStatus ).ExitStatus ()
261
258
switch exitCode {
262
259
case 0 :
263
- logPluginStderr (rule , string ( stderr ) , 3 )
260
+ logPluginStderr (rule , stderrStr , 3 )
264
261
return cpmtypes .OK , output
265
262
case 1 :
266
- logPluginStderr (rule , string ( stderr ) , 0 )
263
+ logPluginStderr (rule , stderrStr , 0 )
267
264
return cpmtypes .NonOK , output
268
265
default :
269
- logPluginStderr (rule , string ( stderr ) , 0 )
266
+ logPluginStderr (rule , stderrStr , 0 )
270
267
return cpmtypes .Unknown , output
271
268
}
272
269
}
0 commit comments