@@ -30,12 +30,14 @@ import (
30
30
"sync"
31
31
"time"
32
32
33
+ containerd "github.com/containerd/containerd/v2/client"
33
34
"github.com/fsnotify/fsnotify"
34
35
"github.com/muesli/cancelreader"
35
36
36
37
"github.com/containerd/containerd/v2/core/runtime/v2/logging"
37
38
"github.com/containerd/errdefs"
38
39
"github.com/containerd/log"
40
+ "github.com/containerd/nerdctl/v2/pkg/lockutil"
39
41
)
40
42
41
43
const (
@@ -149,7 +151,60 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) {
149
151
return logConfig , nil
150
152
}
151
153
152
- func loggingProcessAdapter (ctx context.Context , driver Driver , dataStore string , config * logging.Config ) error {
154
+ func getLockPath (dataStore , ns , id string ) string {
155
+ return filepath .Join (dataStore , "containers" , ns , id , "logger-lock" )
156
+ }
157
+
158
+ // WaitForLogger waits until the logger has finished executing and processing container logs
159
+ func WaitForLogger (dataStore , ns , id string ) error {
160
+ return lockutil .WithDirLock (getLockPath (dataStore , ns , id ), func () error {
161
+ return nil
162
+ })
163
+ }
164
+
165
+ func getContainerWait (ctx context.Context , address string , config * logging.Config ) (<- chan containerd.ExitStatus , error ) {
166
+ client , err := containerd .New (address , containerd .WithDefaultNamespace (config .Namespace ))
167
+ if err != nil {
168
+ return nil , err
169
+ }
170
+ con , err := client .LoadContainer (ctx , config .ID )
171
+ if err != nil {
172
+ return nil , err
173
+ }
174
+
175
+ task , err := con .Task (ctx , nil )
176
+ if err == nil {
177
+ return task .Wait (ctx )
178
+ }
179
+ if ! errdefs .IsNotFound (err ) {
180
+ return nil , err
181
+ }
182
+
183
+ // If task was not found, it's possible that the container runtime is still being created.
184
+ // Retry every 100ms.
185
+ ticker := time .NewTicker (100 * time .Millisecond )
186
+ defer ticker .Stop ()
187
+
188
+ for {
189
+ select {
190
+ case <- ctx .Done ():
191
+ return nil , errors .New ("timed out waiting for container task to start" )
192
+ case <- ticker .C :
193
+ task , err = con .Task (ctx , nil )
194
+ if err != nil {
195
+ if errdefs .IsNotFound (err ) {
196
+ continue
197
+ }
198
+ return nil , err
199
+ }
200
+ return task .Wait (ctx )
201
+ }
202
+ }
203
+ }
204
+
205
+ type ContainerWaitFunc func (ctx context.Context , address string , config * logging.Config ) (<- chan containerd.ExitStatus , error )
206
+
207
+ func loggingProcessAdapter (ctx context.Context , driver Driver , dataStore , address string , getContainerWait ContainerWaitFunc , config * logging.Config ) error {
153
208
if err := driver .PreProcess (ctx , dataStore , config ); err != nil {
154
209
return err
155
210
}
@@ -168,6 +223,20 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
168
223
stderrR .Cancel ()
169
224
}()
170
225
226
+ // initialize goroutines to copy stdout and stderr streams to a closable pipe
227
+ pipeStdoutR , pipeStdoutW := io .Pipe ()
228
+ pipeStderrR , pipeStderrW := io .Pipe ()
229
+ copyStream := func (reader io.Reader , writer * io.PipeWriter ) {
230
+ // copy using a buffer of size 32K
231
+ buf := make ([]byte , 32 << 10 )
232
+ _ , err := io .CopyBuffer (writer , reader , buf )
233
+ if err != nil {
234
+ log .G (ctx ).Errorf ("failed to copy stream: %s" , err )
235
+ }
236
+ }
237
+ go copyStream (stdoutR , pipeStdoutW )
238
+ go copyStream (stderrR , pipeStderrW )
239
+
171
240
var wg sync.WaitGroup
172
241
wg .Add (3 )
173
242
stdout := make (chan string , 10000 )
@@ -182,7 +251,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
182
251
for err == nil {
183
252
var s string
184
253
s , err = r .ReadString ('\n' )
185
-
186
254
if len (s ) > 0 {
187
255
dataChan <- strings .TrimSuffix (s , "\n " )
188
256
}
@@ -192,12 +260,24 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
192
260
}
193
261
}
194
262
}
195
- go processLogFunc (stdoutR , stdout )
196
- go processLogFunc (stderrR , stderr )
263
+ go processLogFunc (pipeStdoutR , stdout )
264
+ go processLogFunc (pipeStderrR , stderr )
197
265
go func () {
198
266
defer wg .Done ()
199
267
driver .Process (stdout , stderr )
200
268
}()
269
+ go func () {
270
+ // close pipeStdoutW and pipeStderrW upon container exit
271
+ defer pipeStdoutW .Close ()
272
+ defer pipeStderrW .Close ()
273
+
274
+ exitCh , err := getContainerWait (ctx , address , config )
275
+ if err != nil {
276
+ log .G (ctx ).Errorf ("failed to get container task wait channel: %v" , err )
277
+ return
278
+ }
279
+ <- exitCh
280
+ }()
201
281
wg .Wait ()
202
282
return driver .PostProcess ()
203
283
}
@@ -220,11 +300,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
220
300
if err != nil {
221
301
return err
222
302
}
223
- if err := ready (); err != nil {
303
+
304
+ loggerLock := getLockPath (dataStore , config .Namespace , config .ID )
305
+ f , err := os .Create (loggerLock )
306
+ if err != nil {
224
307
return err
225
308
}
226
-
227
- return loggingProcessAdapter (ctx , driver , dataStore , config )
309
+ defer f .Close ()
310
+
311
+ // the logger will obtain an exclusive lock on a file until the container is
312
+ // stopped and the driver has finished processing all output,
313
+ // so that waiting log viewers can be signalled when the process is complete.
314
+ return lockutil .WithDirLock (loggerLock , func () error {
315
+ if err := ready (); err != nil {
316
+ return err
317
+ }
318
+ // getContainerWait is extracted as parameter to allow mocking in tests.
319
+ return loggingProcessAdapter (ctx , driver , dataStore , logConfig .Address , getContainerWait , config )
320
+ })
228
321
} else if ! errors .Is (err , os .ErrNotExist ) {
229
322
// the file does not exist if the container was created with nerdctl < 0.20
230
323
return err
0 commit comments