@@ -72,15 +72,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
72
72
}
73
73
go ep .Run (ctx )
74
74
s := & service {
75
- context : ctx ,
76
- events : make (chan interface {}, 128 ),
77
- ec : reaper .Default .Subscribe (),
78
- ep : ep ,
79
- shutdown : sd ,
80
- containers : make (map [string ]* runc.Container ),
81
- running : make (map [int ][]containerProcess ),
82
- pendingExecs : make (map [* runc.Container ]int ),
83
- exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
75
+ context : ctx ,
76
+ events : make (chan interface {}, 128 ),
77
+ ec : reaper .Default .Subscribe (),
78
+ ep : ep ,
79
+ shutdown : sd ,
80
+ containers : make (map [string ]* runc.Container ),
81
+ running : make (map [int ][]containerProcess ),
82
+ runningExecs : make (map [* runc.Container ]int ),
83
+ execCountSubscribers : make (map [* runc.Container ]chan <- int ),
84
+ containerInitExit : make (map [* runc.Container ]runcC.Exit ),
85
+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
84
86
}
85
87
go s .processExits ()
86
88
runcC .Monitor = reaper .Default
@@ -115,7 +117,19 @@ type service struct {
115
117
116
118
lifecycleMu sync.Mutex
117
119
running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
118
- pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
120
+ runningExecs map [* runc.Container ]int // container -> num running execs, guarded by lifecycleMu
121
+ // container -> subscription to exec exits/changes to s.runningExecs[container],
122
+ // guarded by lifecycleMu
123
+ execCountSubscribers map [* runc.Container ]chan <- int
124
+ // container -> init exits, guarded by lifecycleMu
125
+ // Used to stash container init process exits, so that we can hold them
126
+ // until after we've made sure to publish all the container's exec exits.
127
+ // Also used to prevent starting new execs from being started if the
128
+ // container's init process (read: pid, not [process.Init]) has already been
129
+ // reaped by the shim.
130
+ // Note that this flag gets updated before the container's [process.Init.Status]
131
+ // is transitioned to "stopped".
132
+ containerInitExit map [* runc.Container ]runcC.Exit
119
133
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
120
134
// dereferencing the subscription pointers must only be done while holding
121
135
// lifecycleMu.
@@ -136,8 +150,7 @@ type containerProcess struct {
136
150
//
137
151
// The returned handleStarted closure records that the process has started so
138
152
// that its exit can be handled efficiently. If the process has already exited,
139
- // it handles the exit immediately. In addition, if the process is an exec and
140
- // its container's init process has already exited, that exit is also processed.
153
+ // it handles the exit immediately.
141
154
// handleStarted should be called after the event announcing the start of the
142
155
// process has been published. Note that s.lifecycleMu must not be held when
143
156
// calling handleStarted.
@@ -172,44 +185,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
172
185
pid = p .Pid ()
173
186
}
174
187
175
- _ , init := p .(* process.Init )
176
188
s .lifecycleMu .Lock ()
177
189
178
- var initExits []runcC.Exit
179
- var initCps []containerProcess
180
- if ! init {
181
- s .pendingExecs [c ]--
182
-
183
- initPid := c .Pid ()
184
- iExits , initExited := exits [initPid ]
185
- if initExited && s .pendingExecs [c ] == 0 {
186
- // c's init process has exited before handleStarted was called and
187
- // this is the last pending exec process start - we need to process
188
- // the exit for the init process after processing this exec, so:
189
- // - delete c from the s.pendingExecs map
190
- // - keep the exits for the init pid to process later (after we process
191
- // this exec's exits)
192
- // - get the necessary containerProcesses for the init process (that we
193
- // need to process the exits), and remove them from s.running (which we skipped
194
- // doing in processExits).
195
- delete (s .pendingExecs , c )
196
- initExits = iExits
197
- var skipped []containerProcess
198
- for _ , initPidCp := range s .running [initPid ] {
199
- if initPidCp .Container == c {
200
- initCps = append (initCps , initPidCp )
201
- } else {
202
- skipped = append (skipped , initPidCp )
203
- }
204
- }
205
- if len (skipped ) == 0 {
206
- delete (s .running , initPid )
207
- } else {
208
- s .running [initPid ] = skipped
209
- }
210
- }
211
- }
212
-
213
190
ees , exited := exits [pid ]
214
191
delete (s .exitSubscribers , & exits )
215
192
exits = nil
@@ -218,11 +195,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
218
195
for _ , ee := range ees {
219
196
s .handleProcessExit (ee , c , p )
220
197
}
221
- for _ , eee := range initExits {
222
- for _ , cp := range initCps {
223
- s .handleProcessExit (eee , cp .Container , cp .Process )
224
- }
225
- }
226
198
} else {
227
199
// Process start was successful, add to `s.running`.
228
200
s .running [pid ] = append (s .running [pid ], containerProcess {
@@ -303,14 +275,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
303
275
if r .ExecID == "" {
304
276
cinit = container
305
277
} else {
306
- s .pendingExecs [container ]++
278
+ if _ , initExited := s .containerInitExit [container ]; initExited {
279
+ s .lifecycleMu .Unlock ()
280
+ return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container %s init process is not running" , container .ID )
281
+ }
282
+ s .runningExecs [container ]++
307
283
}
308
284
handleStarted , cleanup := s .preStart (cinit )
309
285
s .lifecycleMu .Unlock ()
310
286
defer cleanup ()
311
287
312
288
p , err := container .Start (ctx , r )
313
289
if err != nil {
290
+ // If we failed to even start the process, s.runningExecs
291
+ // won't get decremented in s.handleProcessExit. We still need
292
+ // to update it.
293
+ if r .ExecID != "" {
294
+ s .lifecycleMu .Lock ()
295
+ s .runningExecs [container ]--
296
+ if ch , ok := s .execCountSubscribers [container ]; ok {
297
+ ch <- s .runningExecs [container ]
298
+ }
299
+ s .lifecycleMu .Unlock ()
300
+ }
314
301
handleStarted (container , p )
315
302
return nil , errdefs .ToGRPC (err )
316
303
}
@@ -675,28 +662,23 @@ func (s *service) processExits() {
675
662
// Handle the exit for a created/started process. If there's more than
676
663
// one, assume they've all exited. One of them will be the correct
677
664
// process.
678
- var cps , skipped []containerProcess
665
+ var cps []containerProcess
679
666
for _ , cp := range s .running [e .Pid ] {
680
667
_ , init := cp .Process .(* process.Init )
681
- if init && s .pendingExecs [cp .Container ] != 0 {
682
- // This exit relates to a container for which we have pending execs. In
683
- // order to ensure order between execs and the init process for a given
684
- // container, skip processing this exit here and let the `handleStarted`
685
- // closure for the pending exec publish it.
686
- skipped = append (skipped , cp )
687
- } else {
688
- cps = append (cps , cp )
668
+ if init {
669
+ s .containerInitExit [cp .Container ] = e
689
670
}
671
+ cps = append (cps , cp )
690
672
}
691
- if len (skipped ) > 0 {
692
- s .running [e .Pid ] = skipped
693
- } else {
694
- delete (s .running , e .Pid )
695
- }
673
+ delete (s .running , e .Pid )
696
674
s .lifecycleMu .Unlock ()
697
675
698
676
for _ , cp := range cps {
699
- s .handleProcessExit (e , cp .Container , cp .Process )
677
+ if ip , ok := cp .Process .(* process.Init ); ok {
678
+ s .handleInitExit (e , cp .Container , ip )
679
+ } else {
680
+ s .handleProcessExit (e , cp .Container , cp .Process )
681
+ }
700
682
}
701
683
}
702
684
}
@@ -705,18 +687,60 @@ func (s *service) send(evt interface{}) {
705
687
s .events <- evt
706
688
}
707
689
708
- // s.mu must be locked when calling handleProcessExit
709
- func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
710
- if ip , ok := p .(* process.Init ); ok {
711
- // Ensure all children are killed
712
- if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
713
- if err := ip .KillAll (s .context ); err != nil {
714
- logrus .WithError (err ).WithField ("id" , ip .ID ()).
715
- Error ("failed to kill init's children" )
716
- }
690
+ // handleInitExit processes container init process exits.
691
+ // This is handled separately from non-init exits, because there
692
+ // are some extra invariants we want to ensure in this case, namely:
693
+ // - for a given container, the init process exit MUST be the last exit published
694
+ // This is achieved by:
695
+ // - killing all running container processes (if the container has a shared pid
696
+ // namespace, otherwise all other processes have been reaped already).
697
+ // - waiting for the container's running exec counter to reach 0.
698
+ // - finally, publishing the init exit.
699
+ func (s * service ) handleInitExit (e runcC.Exit , c * runc.Container , p * process.Init ) {
700
+ // kill all running container processes
701
+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
702
+ if err := p .KillAll (s .context ); err != nil {
703
+ logrus .WithError (err ).WithField ("id" , p .ID ()).
704
+ Error ("failed to kill init's children" )
717
705
}
718
706
}
719
707
708
+ s .lifecycleMu .Lock ()
709
+ numRunningExecs := s .runningExecs [c ]
710
+ if numRunningExecs == 0 {
711
+ delete (s .runningExecs , c )
712
+ s .lifecycleMu .Unlock ()
713
+ s .handleProcessExit (e , c , p )
714
+ return
715
+ }
716
+
717
+ events := make (chan int , numRunningExecs )
718
+ s .execCountSubscribers [c ] = events
719
+
720
+ s .lifecycleMu .Unlock ()
721
+
722
+ go func () {
723
+ defer func () {
724
+ s .lifecycleMu .Lock ()
725
+ defer s .lifecycleMu .Unlock ()
726
+ delete (s .execCountSubscribers , c )
727
+ delete (s .runningExecs , c )
728
+ }()
729
+
730
+ // wait for running processes to exit
731
+ for {
732
+ if runningExecs := <- events ; runningExecs == 0 {
733
+ break
734
+ }
735
+ }
736
+
737
+ // all running processes have exited now, and no new
738
+ // ones can start, so we can publish the init exit
739
+ s .handleProcessExit (e , c , p )
740
+ }()
741
+ }
742
+
743
+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
720
744
p .SetExited (e .Status )
721
745
s .send (& eventstypes.TaskExit {
722
746
ContainerID : c .ID ,
@@ -725,6 +749,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
725
749
ExitStatus : uint32 (e .Status ),
726
750
ExitedAt : p .ExitedAt (),
727
751
})
752
+ if _ , init := p .(* process.Init ); ! init {
753
+ s .lifecycleMu .Lock ()
754
+ s .runningExecs [c ]--
755
+ if ch , ok := s .execCountSubscribers [c ]; ok {
756
+ ch <- s .runningExecs [c ]
757
+ }
758
+ s .lifecycleMu .Unlock ()
759
+ }
728
760
}
729
761
730
762
func (s * service ) getContainerPids (ctx context.Context , container * runc.Container ) ([]uint32 , error ) {
0 commit comments