@@ -73,15 +73,17 @@ func NewTaskService(ctx context.Context, publisher shim.Publisher, sd shutdown.S
73
73
}
74
74
go ep .Run (ctx )
75
75
s := & service {
76
- context : ctx ,
77
- events : make (chan interface {}, 128 ),
78
- ec : reaper .Default .Subscribe (),
79
- ep : ep ,
80
- shutdown : sd ,
81
- containers : make (map [string ]* runc.Container ),
82
- running : make (map [int ][]containerProcess ),
83
- pendingExecs : make (map [* runc.Container ]int ),
84
- exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
76
+ context : ctx ,
77
+ events : make (chan interface {}, 128 ),
78
+ ec : reaper .Default .Subscribe (),
79
+ ep : ep ,
80
+ shutdown : sd ,
81
+ containers : make (map [string ]* runc.Container ),
82
+ running : make (map [int ][]containerProcess ),
83
+ runningExecs : make (map [* runc.Container ]int ),
84
+ execCountSubscribers : make (map [* runc.Container ]chan <- int ),
85
+ containerInitExit : make (map [* runc.Container ]runcC.Exit ),
86
+ exitSubscribers : make (map [* map [int ][]runcC.Exit ]struct {}),
85
87
}
86
88
go s .processExits ()
87
89
runcC .Monitor = reaper .Default
@@ -116,7 +118,19 @@ type service struct {
116
118
117
119
lifecycleMu sync.Mutex
118
120
running map [int ][]containerProcess // pid -> running process, guarded by lifecycleMu
119
- pendingExecs map [* runc.Container ]int // container -> num pending execs, guarded by lifecycleMu
121
+ runningExecs map [* runc.Container ]int // container -> num running execs, guarded by lifecycleMu
122
+ // container -> subscription to exec exits/changes to s.runningExecs[container],
123
+ // guarded by lifecycleMu
124
+ execCountSubscribers map [* runc.Container ]chan <- int
125
+ // container -> init exits, guarded by lifecycleMu
126
+ // Used to stash container init process exits, so that we can hold them
127
+ // until after we've made sure to publish all the container's exec exits.
128
+ // Also used to prevent starting new execs from being started if the
129
+ // container's init process (read: pid, not [process.Init]) has already been
130
+ // reaped by the shim.
131
+ // Note that this flag gets updated before the container's [process.Init.Status]
132
+ // is transitioned to "stopped".
133
+ containerInitExit map [* runc.Container ]runcC.Exit
120
134
// Subscriptions to exits for PIDs. Adding/deleting subscriptions and
121
135
// dereferencing the subscription pointers must only be done while holding
122
136
// lifecycleMu.
@@ -137,8 +151,7 @@ type containerProcess struct {
137
151
//
138
152
// The returned handleStarted closure records that the process has started so
139
153
// that its exit can be handled efficiently. If the process has already exited,
140
- // it handles the exit immediately. In addition, if the process is an exec and
141
- // its container's init process has already exited, that exit is also processed.
154
+ // it handles the exit immediately.
142
155
// handleStarted should be called after the event announcing the start of the
143
156
// process has been published. Note that s.lifecycleMu must not be held when
144
157
// calling handleStarted.
@@ -173,44 +186,8 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
173
186
pid = p .Pid ()
174
187
}
175
188
176
- _ , init := p .(* process.Init )
177
189
s .lifecycleMu .Lock ()
178
190
179
- var initExits []runcC.Exit
180
- var initCps []containerProcess
181
- if ! init {
182
- s .pendingExecs [c ]--
183
-
184
- initPid := c .Pid ()
185
- iExits , initExited := exits [initPid ]
186
- if initExited && s .pendingExecs [c ] == 0 {
187
- // c's init process has exited before handleStarted was called and
188
- // this is the last pending exec process start - we need to process
189
- // the exit for the init process after processing this exec, so:
190
- // - delete c from the s.pendingExecs map
191
- // - keep the exits for the init pid to process later (after we process
192
- // this exec's exits)
193
- // - get the necessary containerProcesses for the init process (that we
194
- // need to process the exits), and remove them from s.running (which we skipped
195
- // doing in processExits).
196
- delete (s .pendingExecs , c )
197
- initExits = iExits
198
- var skipped []containerProcess
199
- for _ , initPidCp := range s .running [initPid ] {
200
- if initPidCp .Container == c {
201
- initCps = append (initCps , initPidCp )
202
- } else {
203
- skipped = append (skipped , initPidCp )
204
- }
205
- }
206
- if len (skipped ) == 0 {
207
- delete (s .running , initPid )
208
- } else {
209
- s .running [initPid ] = skipped
210
- }
211
- }
212
- }
213
-
214
191
ees , exited := exits [pid ]
215
192
delete (s .exitSubscribers , & exits )
216
193
exits = nil
@@ -219,11 +196,6 @@ func (s *service) preStart(c *runc.Container) (handleStarted func(*runc.Containe
219
196
for _ , ee := range ees {
220
197
s .handleProcessExit (ee , c , p )
221
198
}
222
- for _ , eee := range initExits {
223
- for _ , cp := range initCps {
224
- s .handleProcessExit (eee , cp .Container , cp .Process )
225
- }
226
- }
227
199
} else {
228
200
// Process start was successful, add to `s.running`.
229
201
s .running [pid ] = append (s .running [pid ], containerProcess {
@@ -304,14 +276,29 @@ func (s *service) Start(ctx context.Context, r *taskAPI.StartRequest) (*taskAPI.
304
276
if r .ExecID == "" {
305
277
cinit = container
306
278
} else {
307
- s .pendingExecs [container ]++
279
+ if _ , initExited := s .containerInitExit [container ]; initExited {
280
+ s .lifecycleMu .Unlock ()
281
+ return nil , errdefs .ToGRPCf (errdefs .ErrFailedPrecondition , "container %s init process is not running" , container .ID )
282
+ }
283
+ s .runningExecs [container ]++
308
284
}
309
285
handleStarted , cleanup := s .preStart (cinit )
310
286
s .lifecycleMu .Unlock ()
311
287
defer cleanup ()
312
288
313
289
p , err := container .Start (ctx , r )
314
290
if err != nil {
291
+ // If we failed to even start the process, s.runningExecs
292
+ // won't get decremented in s.handleProcessExit. We still need
293
+ // to update it.
294
+ if r .ExecID != "" {
295
+ s .lifecycleMu .Lock ()
296
+ s .runningExecs [container ]--
297
+ if ch , ok := s .execCountSubscribers [container ]; ok {
298
+ ch <- s .runningExecs [container ]
299
+ }
300
+ s .lifecycleMu .Unlock ()
301
+ }
315
302
handleStarted (container , p )
316
303
return nil , errdefs .ToGRPC (err )
317
304
}
@@ -676,28 +663,23 @@ func (s *service) processExits() {
676
663
// Handle the exit for a created/started process. If there's more than
677
664
// one, assume they've all exited. One of them will be the correct
678
665
// process.
679
- var cps , skipped []containerProcess
666
+ var cps []containerProcess
680
667
for _ , cp := range s .running [e .Pid ] {
681
668
_ , init := cp .Process .(* process.Init )
682
- if init && s .pendingExecs [cp .Container ] != 0 {
683
- // This exit relates to a container for which we have pending execs. In
684
- // order to ensure order between execs and the init process for a given
685
- // container, skip processing this exit here and let the `handleStarted`
686
- // closure for the pending exec publish it.
687
- skipped = append (skipped , cp )
688
- } else {
689
- cps = append (cps , cp )
669
+ if init {
670
+ s .containerInitExit [cp .Container ] = e
690
671
}
672
+ cps = append (cps , cp )
691
673
}
692
- if len (skipped ) > 0 {
693
- s .running [e .Pid ] = skipped
694
- } else {
695
- delete (s .running , e .Pid )
696
- }
674
+ delete (s .running , e .Pid )
697
675
s .lifecycleMu .Unlock ()
698
676
699
677
for _ , cp := range cps {
700
- s .handleProcessExit (e , cp .Container , cp .Process )
678
+ if ip , ok := cp .Process .(* process.Init ); ok {
679
+ s .handleInitExit (e , cp .Container , ip )
680
+ } else {
681
+ s .handleProcessExit (e , cp .Container , cp .Process )
682
+ }
701
683
}
702
684
}
703
685
}
@@ -706,18 +688,60 @@ func (s *service) send(evt interface{}) {
706
688
s .events <- evt
707
689
}
708
690
709
- // s.mu must be locked when calling handleProcessExit
710
- func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
711
- if ip , ok := p .(* process.Init ); ok {
712
- // Ensure all children are killed
713
- if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
714
- if err := ip .KillAll (s .context ); err != nil {
715
- logrus .WithError (err ).WithField ("id" , ip .ID ()).
716
- Error ("failed to kill init's children" )
717
- }
691
+ // handleInitExit processes container init process exits.
692
+ // This is handled separately from non-init exits, because there
693
+ // are some extra invariants we want to ensure in this case, namely:
694
+ // - for a given container, the init process exit MUST be the last exit published
695
+ // This is achieved by:
696
+ // - killing all running container processes (if the container has a shared pid
697
+ // namespace, otherwise all other processes have been reaped already).
698
+ // - waiting for the container's running exec counter to reach 0.
699
+ // - finally, publishing the init exit.
700
+ func (s * service ) handleInitExit (e runcC.Exit , c * runc.Container , p * process.Init ) {
701
+ // kill all running container processes
702
+ if runc .ShouldKillAllOnExit (s .context , c .Bundle ) {
703
+ if err := p .KillAll (s .context ); err != nil {
704
+ logrus .WithError (err ).WithField ("id" , p .ID ()).
705
+ Error ("failed to kill init's children" )
718
706
}
719
707
}
720
708
709
+ s .lifecycleMu .Lock ()
710
+ numRunningExecs := s .runningExecs [c ]
711
+ if numRunningExecs == 0 {
712
+ delete (s .runningExecs , c )
713
+ s .lifecycleMu .Unlock ()
714
+ s .handleProcessExit (e , c , p )
715
+ return
716
+ }
717
+
718
+ events := make (chan int , numRunningExecs )
719
+ s .execCountSubscribers [c ] = events
720
+
721
+ s .lifecycleMu .Unlock ()
722
+
723
+ go func () {
724
+ defer func () {
725
+ s .lifecycleMu .Lock ()
726
+ defer s .lifecycleMu .Unlock ()
727
+ delete (s .execCountSubscribers , c )
728
+ delete (s .runningExecs , c )
729
+ }()
730
+
731
+ // wait for running processes to exit
732
+ for {
733
+ if runningExecs := <- events ; runningExecs == 0 {
734
+ break
735
+ }
736
+ }
737
+
738
+ // all running processes have exited now, and no new
739
+ // ones can start, so we can publish the init exit
740
+ s .handleProcessExit (e , c , p )
741
+ }()
742
+ }
743
+
744
+ func (s * service ) handleProcessExit (e runcC.Exit , c * runc.Container , p process.Process ) {
721
745
p .SetExited (e .Status )
722
746
s .send (& eventstypes.TaskExit {
723
747
ContainerID : c .ID ,
@@ -726,6 +750,14 @@ func (s *service) handleProcessExit(e runcC.Exit, c *runc.Container, p process.P
726
750
ExitStatus : uint32 (e .Status ),
727
751
ExitedAt : protobuf .ToTimestamp (p .ExitedAt ()),
728
752
})
753
+ if _ , init := p .(* process.Init ); ! init {
754
+ s .lifecycleMu .Lock ()
755
+ s .runningExecs [c ]--
756
+ if ch , ok := s .execCountSubscribers [c ]; ok {
757
+ ch <- s .runningExecs [c ]
758
+ }
759
+ s .lifecycleMu .Unlock ()
760
+ }
729
761
}
730
762
731
763
func (s * service ) getContainerPids (ctx context.Context , container * runc.Container ) ([]uint32 , error ) {
0 commit comments