@@ -80,6 +80,7 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
80
80
shutdown : sd ,
81
81
containers : make (map [string ]* runc.Container ),
82
82
running : make (map [int ][]containerProcess ),
83
+ pendingExecs : make (map [* runc.Container ]int ),
83
84
exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
84
85
}
85
86
go s .processExits ()
@@ -113,8 +114,9 @@ type service struct {
113
114
114
115
containers map [string ]* runc.Container
115
116
116
- lifecycleMu sync.Mutex
117
- running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
117
+ lifecycleMu sync.Mutex
118
+ running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
119
+ pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
118
120
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
119
121
// dereferencing the subscription pointers must only be done while holding
120
122
// lifecycleMu.
@@ -129,26 +131,23 @@ type containerProcess struct {
129
131
}
130
132
131
133
// preStart prepares for starting a container process and handling its exit.
132
- // The container being started should be passed in as c when starting the
133
- // container init process for an already-created container. c should be nil when
134
- // creating a container or when starting an exec.
134
+ // The container being started should be passed in as c when starting the container
135
+ // init process for an already-created container. c should be nil when creating a
136
+ // container or when starting an exec.
135
137
//
136
138
// The returned handleStarted closure records that the process has started so
137
139
// that its exit can be handled efficiently. If the process has already exited,
138
- // it handles the exit immediately. handleStarted should be called after the
139
- // event announcing the start of the process has been published .
140
- // Note that handleStarted needs to be aware of whether s.mu is already held
141
- // when it is called. If s.mu has been held, we don't need to lock it when
142
- // calling handleProcessExit .
140
+ // it handles the exit immediately. In addition, if the process is an exec and
141
+ // its container's init process has already exited, that exit is also processed .
142
+ // handleStarted should be called after the event announcing the start of the
143
+ // process has been published. Note that s.lifecycleMu must not be held when
144
+ // calling handleStarted .
143
145
//
144
146
// The returned cleanup closure releases resources used to handle early exits.
145
147
// It must be called before the caller of preStart returns, otherwise severe
146
148
// memory leaks will occur.
147
- func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process , bool ), cleanup func ()) {
149
+ func (s * service ) preStart (c * runc.Container ) (handleStarted func (* runc.Container , process.Process ), cleanup func ()) {
148
150
exits := make (map [int ][]runcC.Exit )
149
-
150
- s .lifecycleMu .Lock ()
151
- defer s .lifecycleMu .Unlock ()
152
151
s .exitSubscribers [& exits ] = struct {}{}
153
152
154
153
if c != nil {
@@ -168,30 +167,65 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
168
167
}
169
168
}
170
169
171
- handleStarted = func (c * runc.Container , p process.Process , muLocked bool ) {
170
+ handleStarted = func (c * runc.Container , p process.Process ) {
172
171
var pid int
173
172
if p != nil {
174
173
pid = p .Pid ()
175
174
}
176
175
176
+ _ , init := p .(* process.Init )
177
177
s .lifecycleMu .Lock ()
178
+
179
+ var initExits []runcC.Exit
180
+ var initCps []containerProcess
181
+ if ! init {
182
+ s .pendingExecs [c ]--
183
+
184
+ initPid := c .Pid ()
185
+ iExits , initExited := exits [initPid ]
186
+ if initExited && s .pendingExecs [c ] == 0 {
187
+ // c's init process has exited before handleStarted was called and
188
+ // this is the last pending exec process start - we need to process
189
+ // the exit for the init process after processing this exec, so:
190
+ // - delete c from the s.pendingExecs map
191
+ // - keep the exits for the init pid to process later (after we process
192
+ // this exec's exits)
193
+ // - get the necessary containerProcesses for the init process (that we
194
+ // need to process the exits), and remove them from s.running (which we skipped
195
+ // doing in processExits).
196
+ delete (s .pendingExecs , c )
197
+ initExits = iExits
198
+ var skipped []containerProcess
199
+ for _ , initPidCp := range s .running [initPid ] {
200
+ if initPidCp .Container == c {
201
+ initCps = append (initCps , initPidCp )
202
+ } else {
203
+ skipped = append (skipped , initPidCp )
204
+ }
205
+ }
206
+ if len (skipped ) == 0 {
207
+ delete (s .running , initPid )
208
+ } else {
209
+ s .running [initPid ] = skipped
210
+ }
211
+ }
212
+ }
213
+
178
214
ees , exited := exits [pid ]
179
215
delete (s .exitSubscribers , & exits )
180
216
exits = nil
181
- if pid == 0 { // no-op
182
- s .lifecycleMu .Unlock ()
183
- } else if exited {
217
+ if pid == 0 || exited {
184
218
s .lifecycleMu .Unlock ()
185
219
for _ , ee := range ees {
186
- if muLocked {
187
- s .handleProcessExit (ee , c , p )
188
- } else {
189
- s .mu .Lock ()
190
- s .handleProcessExit (ee , c , p )
191
- s .mu .Unlock ()
220
+ s .handleProcessExit (ee , c , p )
221
+ }
222
+ for _ , eee := range initExits {
223
+ for _ , cp := range initCps {
224
+ s .handleProcessExit (eee , cp .Container , cp .Process )
192
225
}
193
226
}
194
227
} else {
228
+ // Process start was successful, add to `s.running`.
195
229
s .running [pid ] = append (s .running [pid ], containerProcess {
196
230
Container : c ,
197
231
Process : p ,
@@ -216,7 +250,9 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
216
250
s .mu .Lock ()
217
251
defer s .mu .Unlock ()
218
252
253
+ s .lifecycleMu .Lock ()
219
254
handleStarted , cleanup := s .preStart (nil )
255
+ s .lifecycleMu .Unlock ()
220
256
defer cleanup ()
221
257
222
258
container , err := runc .NewContainer (ctx , s .platform , r )
@@ -244,7 +280,7 @@ func (s *service) Create(ctx context.Context, r *taskAPI.CreateTaskRequest) (_ *
244
280
// could happen would also cause the container.Pid() call above to
245
281
// nil-deference panic.
246
282
proc , _ := container .Process ("" )
247
- handleStarted (container , proc , true )
283
+ handleStarted (container , proc )
248
284
249
285
return & taskAPI.CreateTaskResponse {
250
286
Pid : uint32 (container .Pid ()),
@@ -264,14 +300,19 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
264
300
}
265
301
266
302
var cinit * runc.Container
303
+ s .lifecycleMu .Lock ()
267
304
if r .ExecID == "" {
268
305
cinit = container
306
+ } else {
307
+ s .pendingExecs [container ]++
269
308
}
270
309
handleStarted , cleanup := s .preStart (cinit )
310
+ s .lifecycleMu .Unlock ()
271
311
defer cleanup ()
312
+
272
313
p , err := container .Start (ctx , r )
273
314
if err != nil {
274
- handleStarted (container , p , false )
315
+ handleStarted (container , p )
275
316
return nil , errdefs .ToGRPC (err )
276
317
}
277
318
@@ -311,7 +352,7 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
311
352
Pid : uint32 (p .Pid ()),
312
353
})
313
354
}
314
- handleStarted (container , p , false )
355
+ handleStarted (container , p )
315
356
return & taskAPI.StartResponse {
316
357
Pid : uint32 (p .Pid ()),
317
358
}, nil
@@ -635,14 +676,27 @@ func (s *service) processExits() {
635
676
// Handle the exit for a created/started process. If there's more than
636
677
// one, assume they've all exited. One of them will be the correct
637
678
// process.
638
- cps := s .running [e .Pid ]
639
- delete (s .running , e .Pid )
679
+ var cps , skipped []containerProcess
680
+ for _ , cp := range s .running [e .Pid ] {
681
+ if s .pendingExecs [cp .Container ] != 0 {
682
+ // This exit relates to a container for which we have pending execs. In
683
+ // order to ensure order between execs and the init process for a given
684
+ // container, skip processing this exit here and let the `handleStarted`
685
+ // closure for the pending exec publish it.
686
+ skipped = append (skipped , cp )
687
+ } else {
688
+ cps = append (cps , cp )
689
+ }
690
+ }
691
+ if len (skipped ) > 0 {
692
+ s .running [e .Pid ] = skipped
693
+ } else {
694
+ delete (s .running , e .Pid )
695
+ }
640
696
s .lifecycleMu .Unlock ()
641
697
642
698
for _ , cp := range cps {
643
- s .mu .Lock ()
644
699
s .handleProcessExit (e , cp .Container , cp .Process )
645
- s .mu .Unlock ()
646
700
}
647
701
}
648
702
}
0 commit comments