@@ -73,16 +73,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
73
73
}
74
74
go ep .Run (ctx )
75
75
s := & service {
76
- context : ctx ,
77
- events : make (chan interface {}, 128 ),
78
- ec : reaper .Default .Subscribe (),
79
- ep : ep ,
80
- shutdown : sd ,
81
- containers : make (map [string ]* runc.Container ),
82
- running : make (map [int ][]containerProcess ),
83
- pendingExecs : make (map [* runc.Container ]int ),
84
- execable : make (map [* runc.Container ]bool ),
85
- exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
76
+ context : ctx ,
77
+ events : make (chan interface {}, 128 ),
78
+ ec : reaper .Default .Subscribe (),
79
+ ep : ep ,
80
+ shutdown : sd ,
81
+ containers : make (map [string ]* runc.Container ),
82
+ running : make (map [int ][]containerProcess ),
83
+ runningExecs : make (map [* runc.Container ]int ),
84
+ execCountSubscribers : make (map [* runc.Container ]chan <- int ),
85
+ containerInitExit : make (map [* runc.Container ]runcC.Exit ),
86
+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
86
87
}
87
88
go s .processExits ()
88
89
runcC .Monitor = reaper .Default
@@ -117,13 +118,19 @@ type service struct {
117
118
118
119
lifecycleMu sync.Mutex
119
120
running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
120
- pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
121
- // container -> execs can be started, guarded by lifecycleMu.
122
- // Execs can be started if the container's init process (read: pid, not [process.Init])
123
- // has been started and not yet reaped by the shim.
121
+ runningExecs map [* runc.Container ]int // container -> num running execs, guarded by lifecycleMu
122
+ // container -> subscription to exec exits/changes to s.runningExecs[container],
123
+ // guarded by lifecycleMu
124
+ execCountSubscribers map [* runc.Container ]chan <- int
125
+ // container -> init exits, guarded by lifecycleMu
126
+ // Used to stash container init process exits, so that we can hold them
127
+ // until after we've made sure to publish all the container's exec exits.
128
+ // Also used to prevent starting new execs from being started if the
129
+ // container's init process (read: pid, not [process.Init]) has already been
130
+ // reaped by the shim.
124
131
// Note that this flag gets updated before the container's [process.Init.Status]
125
132
// is transitioned to "stopped".
126
- execable map [* runc.Container ]bool
133
+ containerInitExit map [* runc.Container ]runcC. Exit
127
134
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
128
135
// dereferencing the subscription pointers must only be done while holding
129
136
// lifecycleMu.
@@ -144,8 +151,7 @@ type containerProcess struct {
144
151
//
145
152
// The returned handleStarted closure records that the process has started so
146
153
// that its exit can be handled efficiently. If the process has already exited,
147
- // it handles the exit immediately. In addition, if the process is an exec and
148
- // its container's init process has already exited, that exit is also processed.
154
+ // it handles the exit immediately.
149
155
// handleStarted should be called after the event announcing the start of the
150
156
// process has been published. Note that s.lifecycleMu must not be held when
151
157
// calling handleStarted.
@@ -180,44 +186,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
180
186
pid = p .Pid ()
181
187
}
182
188
183
- _ , init := p .(* process.Init )
184
189
s .lifecycleMu .Lock ()
185
190
186
- var initExits []runcC.Exit
187
- var initCps []containerProcess
188
- if ! init {
189
- s .pendingExecs [c ]--
190
-
191
- initPid := c .Pid ()
192
- iExits , initExited := exits [initPid ]
193
- if initExited && s .pendingExecs [c ] == 0 {
194
- // c's init process has exited before handleStarted was called and
195
- // this is the last pending exec process start - we need to process
196
- // the exit for the init process after processing this exec, so:
197
- // - delete c from the s.pendingExecs map
198
- // - keep the exits for the init pid to process later (after we process
199
- // this exec's exits)
200
- // - get the necessary containerProcesses for the init process (that we
201
- // need to process the exits), and remove them from s.running (which we skipped
202
- // doing in processExits).
203
- delete (s .pendingExecs , c )
204
- initExits = iExits
205
- var skipped []containerProcess
206
- for _ , initPidCp := range s .running [initPid ] {
207
- if initPidCp .Container == c {
208
- initCps = append (initCps , initPidCp )
209
- } else {
210
- skipped = append (skipped , initPidCp )
211
- }
212
- }
213
- if len (skipped ) == 0 {
214
- delete (s .running , initPid )
215
- } else {
216
- s .running [initPid ] = skipped
217
- }
218
- }
219
- }
220
-
221
191
ees , exited := exits [pid ]
222
192
delete (s .exitSubscribers , & exits )
223
193
exits = nil
@@ -226,20 +196,12 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
226
196
for _ , ee := range ees {
227
197
s .handleProcessExit (ee , c , p )
228
198
}
229
- for _ , eee := range initExits {
230
- for _ , cp := range initCps {
231
- s .handleProcessExit (eee , cp .Container , cp .Process )
232
- }
233
- }
234
199
} else {
235
200
// Process start was successful, add to `s.running`.
236
201
s .running [pid ] = append (s .running [pid ], containerProcess {
237
202
Container : c ,
238
203
Process : p ,
239
204
})
240
- if init {
241
- s .execable [c ] = true
242
- }
243
205
s .lifecycleMu .Unlock ()
244
206
}
245
207
}
@@ -314,18 +276,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
314
276
if r .ExecID == "" {
315
277
cinit = container
316
278
} else {
317
- if ! s . execable [container ] {
279
+ if _ , initExited := s . containerInitExit [container ]; initExited {
318
280
s .lifecycleMu .Unlock ()
319
281
return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container %s init process is not running" , container .ID )
320
282
}
321
- s .pendingExecs [container ]++
283
+ s .runningExecs [container ]++
322
284
}
323
285
handleStarted , cleanup := s .preStart (cinit )
324
286
s .lifecycleMu .Unlock ()
325
287
defer cleanup ()
326
288
327
289
p , err := container .Start (ctx , r )
328
290
if err != nil {
291
+ // If we failed to even start the process, s.runningExecs
292
+ // won't get decremented in s.handleProcessExit. We still need
293
+ // to update it.
294
+ if r .ExecID != "" {
295
+ s .lifecycleMu .Lock ()
296
+ s .runningExecs [container ]--
297
+ if ch , ok := s .execCountSubscribers [container ]; ok {
298
+ ch <- s .runningExecs [container ]
299
+ }
300
+ s .lifecycleMu .Unlock ()
301
+ }
329
302
handleStarted (container , p )
330
303
return nil , errdefs .ToGRPC (err )
331
304
}
@@ -690,31 +663,23 @@ func (s *service) processExits() {
690
663
// Handle the exit for a created/started process. If there's more than
691
664
// one, assume they've all exited. One of them will be the correct
692
665
// process.
693
- var cps , skipped []containerProcess
666
+ var cps []containerProcess
694
667
for _ , cp := range s .running [e .Pid ] {
695
668
_ , init := cp .Process .(* process.Init )
696
669
if init {
697
- delete (s .execable , cp .Container )
698
- }
699
- if init && s .pendingExecs [cp .Container ] != 0 {
700
- // This exit relates to a container for which we have pending execs. In
701
- // order to ensure order between execs and the init process for a given
702
- // container, skip processing this exit here and let the `handleStarted`
703
- // closure for the pending exec publish it.
704
- skipped = append (skipped , cp )
705
- } else {
706
- cps = append (cps , cp )
670
+ s .containerInitExit [cp .Container ] = e
707
671
}
672
+ cps = append (cps , cp )
708
673
}
709
- if len (skipped ) > 0 {
710
- s .running [e .Pid ] = skipped
711
- } else {
712
- delete (s .running , e .Pid )
713
- }
674
+ delete (s .running , e .Pid )
714
675
s .lifecycleMu .Unlock ()
715
676
716
677
for _ , cp := range cps {
717
- s .handleProcessExit (e , cp .Container , cp .Process )
678
+ if ip , ok := cp .Process .(* process.Init ); ok {
679
+ s .handleInitExit (e , cp .Container , ip )
680
+ } else {
681
+ s .handleProcessExit (e , cp .Container , cp .Process )
682
+ }
718
683
}
719
684
}
720
685
}
@@ -723,17 +688,60 @@ func (s *service) send(evt interface{}) {
723
688
s .events <- evt
724
689
}
725
690
726
- func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
727
- if ip , ok := p .(* process.Init ); ok {
728
- // Ensure all children are killed
729
- if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
730
- if err := ip .KillAll (s .context ); err != nil {
731
- logrus .WithError (err ).WithField ("id" , ip .ID ()).
732
- Error ("failed to kill init's children" )
733
- }
691
+ // handleInitExit processes container init process exits.
692
+ // This is handled separately from non-init exits, because there
693
+ // are some extra invariants we want to ensure in this case, namely:
694
+ // - for a given container, the init process exit MUST be the last exit published
695
+ // This is achieved by:
696
+ // - killing all running container processes (if the container has a shared pid
697
+ // namespace, otherwise all other processes have been reaped already).
698
+ // - waiting for the container's running exec counter to reach 0.
699
+ // - finally, publishing the init exit.
700
+ func (s * service ) handleInitExit (e runcC.Exit , c * runc.Container , p * process.Init ) {
701
+ // kill all running container processes
702
+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
703
+ if err := p .KillAll (s .context ); err != nil {
704
+ logrus .WithError (err ).WithField ("id" , p .ID ()).
705
+ Error ("failed to kill init's children" )
734
706
}
735
707
}
736
708
709
+ s .lifecycleMu .Lock ()
710
+ numRunningExecs := s .runningExecs [c ]
711
+ if numRunningExecs == 0 {
712
+ delete (s .runningExecs , c )
713
+ s .lifecycleMu .Unlock ()
714
+ s .handleProcessExit (e , c , p )
715
+ return
716
+ }
717
+
718
+ events := make (chan int , numRunningExecs )
719
+ s .execCountSubscribers [c ] = events
720
+
721
+ s .lifecycleMu .Unlock ()
722
+
723
+ go func () {
724
+ defer func () {
725
+ s .lifecycleMu .Lock ()
726
+ defer s .lifecycleMu .Unlock ()
727
+ delete (s .execCountSubscribers , c )
728
+ delete (s .runningExecs , c )
729
+ }()
730
+
731
+ // wait for running processes to exit
732
+ for {
733
+ if runningExecs := <- events ; runningExecs == 0 {
734
+ break
735
+ }
736
+ }
737
+
738
+ // all running processes have exited now, and no new
739
+ // ones can start, so we can publish the init exit
740
+ s .handleProcessExit (e , c , p )
741
+ }()
742
+ }
743
+
744
+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
737
745
p .SetExited (e .Status )
738
746
s .send (& eventstypes.TaskExit {
739
747
ContainerID : c .ID ,
@@ -742,6 +750,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
742
750
ExitStatus : uint32 (e .Status ),
743
751
ExitedAt : protobuf .ToTimestamp (p .ExitedAt ()),
744
752
})
753
+ if _ , init := p .(* process.Init ); ! init {
754
+ s .lifecycleMu .Lock ()
755
+ s .runningExecs [c ]--
756
+ if ch , ok := s .execCountSubscribers [c ]; ok {
757
+ ch <- s .runningExecs [c ]
758
+ }
759
+ s .lifecycleMu .Unlock ()
760
+ }
745
761
}
746
762
747
763
func (s * service ) getContainerPids (ctx context.Context , container * runc.Container ) ([]uint32 , error ) {
0 commit comments