Skip to content

Commit 27b91ec

Browse files
committed
ensure logger completion after container exit
Introduced mechanisms to ensure logger completion after container exit using file locks and added robust handling for log streams. Updated and added tests to validate functionality, including log behavior with running, non-terminated, and restarted containers. Signed-off-by: fahed dorgaa <[email protected]>
1 parent 2feac61 commit 27b91ec

File tree

4 files changed

+162
-8
lines changed

4 files changed

+162
-8
lines changed

cmd/nerdctl/container/container_logs_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,55 @@ func TestLogsWithFailingContainer(t *testing.T) {
164164
base.Cmd("rm", "-f", containerName).AssertOK()
165165
}
166166

167+
func TestLogsWithRunningContainer(t *testing.T) {
168+
t.Parallel()
169+
base := testutil.NewBase(t)
170+
containerName := testutil.Identifier(t)
171+
defer base.Cmd("rm", "-f", containerName).Run()
172+
expected := make([]string, 10)
173+
for i := 0; i < 10; i++ {
174+
expected[i] = fmt.Sprint(i + 1)
175+
}
176+
177+
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
178+
"sh", "-euc", "for i in `seq 1 10`; do echo $i; sleep 1; done").AssertOK()
179+
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
180+
}
181+
182+
func TestLogsWithoutNewlineOrEOF(t *testing.T) {
183+
if runtime.GOOS != "linux" {
184+
t.Skip("FIXME: test does not work on Windows yet because containerd doesn't send an exit event appropriately after task exit on Windows")
185+
}
186+
t.Parallel()
187+
base := testutil.NewBase(t)
188+
containerName := testutil.Identifier(t)
189+
defer base.Cmd("rm", "-f", containerName).Run()
190+
expected := []string{"Hello World!", "There is no newline"}
191+
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
192+
"printf", "'Hello World!\nThere is no newline'").AssertOK()
193+
time.Sleep(3 * time.Second)
194+
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
195+
}
196+
197+
func TestLogsAfterRestartingContainer(t *testing.T) {
198+
if runtime.GOOS != "linux" {
199+
t.Skip("FIXME: test does not work on Windows yet. Restarting a container fails with: failed to create shim task: hcs::CreateComputeSystem <id>: The requested operation for attach namespace failed.: unknown")
200+
}
201+
t.Parallel()
202+
base := testutil.NewBase(t)
203+
containerName := testutil.Identifier(t)
204+
defer base.Cmd("rm", "-f", containerName).Run()
205+
base.Cmd("run", "-d", "--name", containerName, testutil.CommonImage,
206+
"printf", "'Hello World!\nThere is no newline'").AssertOK()
207+
expected := []string{"Hello World!", "There is no newline"}
208+
time.Sleep(3 * time.Second)
209+
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
210+
// restart and check logs again
211+
base.Cmd("start", containerName)
212+
time.Sleep(3 * time.Second)
213+
base.Cmd("logs", "-f", containerName).AssertOutContainsAll(expected...)
214+
}
215+
167216
func TestLogsWithForegroundContainers(t *testing.T) {
168217
testCase := nerdtest.Setup()
169218
// dual logging is not supported on Windows

pkg/cmd/container/logs.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,10 @@ func Logs(ctx context.Context, client *containerd.Client, container string, opti
9191
// Setup goroutine to send stop event if container task finishes:
9292
go func() {
9393
<-waitCh
94+
// Wait for logger to process remaining logs after container exit
95+
if err = logging.WaitForLogger(dataStore, l[labels.Namespace], found.Container.ID()); err != nil {
96+
log.G(ctx).WithError(err).Error("failed to wait for logger shutdown")
97+
}
9498
log.G(ctx).Debugf("container task has finished, sending kill signal to log viewer")
9599
stopChannel <- os.Interrupt
96100
}()

pkg/logging/logging.go

Lines changed: 100 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ import (
3030
"sync"
3131
"time"
3232

33+
containerd "github.com/containerd/containerd/v2/client"
3334
"github.com/fsnotify/fsnotify"
3435
"github.com/muesli/cancelreader"
3536

3637
"github.com/containerd/containerd/v2/core/runtime/v2/logging"
3738
"github.com/containerd/errdefs"
3839
"github.com/containerd/log"
40+
"github.com/containerd/nerdctl/v2/pkg/lockutil"
3941
)
4042

4143
const (
@@ -149,7 +151,60 @@ func LoadLogConfig(dataStore, ns, id string) (LogConfig, error) {
149151
return logConfig, nil
150152
}
151153

152-
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string, config *logging.Config) error {
154+
func getLockPath(dataStore, ns, id string) string {
155+
return filepath.Join(dataStore, "containers", ns, id, "logger-lock")
156+
}
157+
158+
// WaitForLogger waits until the logger has finished executing and processing container logs
159+
func WaitForLogger(dataStore, ns, id string) error {
160+
return lockutil.WithDirLock(getLockPath(dataStore, ns, id), func() error {
161+
return nil
162+
})
163+
}
164+
165+
func getContainerWait(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
166+
client, err := containerd.New(address, containerd.WithDefaultNamespace(config.Namespace))
167+
if err != nil {
168+
return nil, err
169+
}
170+
con, err := client.LoadContainer(ctx, config.ID)
171+
if err != nil {
172+
return nil, err
173+
}
174+
175+
task, err := con.Task(ctx, nil)
176+
if err == nil {
177+
return task.Wait(ctx)
178+
}
179+
if !errdefs.IsNotFound(err) {
180+
return nil, err
181+
}
182+
183+
// If task was not found, it's possible that the container runtime is still being created.
184+
// Retry every 100ms.
185+
ticker := time.NewTicker(100 * time.Millisecond)
186+
defer ticker.Stop()
187+
188+
for {
189+
select {
190+
case <-ctx.Done():
191+
return nil, errors.New("timed out waiting for container task to start")
192+
case <-ticker.C:
193+
task, err = con.Task(ctx, nil)
194+
if err != nil {
195+
if errdefs.IsNotFound(err) {
196+
continue
197+
}
198+
return nil, err
199+
}
200+
return task.Wait(ctx)
201+
}
202+
}
203+
}
204+
205+
type ContainerWaitFunc func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error)
206+
207+
func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore, address string, getContainerWait ContainerWaitFunc, config *logging.Config) error {
153208
if err := driver.PreProcess(ctx, dataStore, config); err != nil {
154209
return err
155210
}
@@ -168,6 +223,20 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
168223
stderrR.Cancel()
169224
}()
170225

226+
// initialize goroutines to copy stdout and stderr streams to a closable pipe
227+
pipeStdoutR, pipeStdoutW := io.Pipe()
228+
pipeStderrR, pipeStderrW := io.Pipe()
229+
copyStream := func(reader io.Reader, writer *io.PipeWriter) {
230+
// copy using a buffer of size 32K
231+
buf := make([]byte, 32<<10)
232+
_, err := io.CopyBuffer(writer, reader, buf)
233+
if err != nil {
234+
log.G(ctx).Errorf("failed to copy stream: %s", err)
235+
}
236+
}
237+
go copyStream(stdoutR, pipeStdoutW)
238+
go copyStream(stderrR, pipeStderrW)
239+
171240
var wg sync.WaitGroup
172241
wg.Add(3)
173242
stdout := make(chan string, 10000)
@@ -182,7 +251,6 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
182251
for err == nil {
183252
var s string
184253
s, err = r.ReadString('\n')
185-
186254
if len(s) > 0 {
187255
dataChan <- strings.TrimSuffix(s, "\n")
188256
}
@@ -192,12 +260,24 @@ func loggingProcessAdapter(ctx context.Context, driver Driver, dataStore string,
192260
}
193261
}
194262
}
195-
go processLogFunc(stdoutR, stdout)
196-
go processLogFunc(stderrR, stderr)
263+
go processLogFunc(pipeStdoutR, stdout)
264+
go processLogFunc(pipeStderrR, stderr)
197265
go func() {
198266
defer wg.Done()
199267
driver.Process(stdout, stderr)
200268
}()
269+
go func() {
270+
// close pipeStdoutW and pipeStderrW upon container exit
271+
defer pipeStdoutW.Close()
272+
defer pipeStderrW.Close()
273+
274+
exitCh, err := getContainerWait(ctx, address, config)
275+
if err != nil {
276+
log.G(ctx).Errorf("failed to get container task wait channel: %v", err)
277+
return
278+
}
279+
<-exitCh
280+
}()
201281
wg.Wait()
202282
return driver.PostProcess()
203283
}
@@ -220,11 +300,24 @@ func loggerFunc(dataStore string) (logging.LoggerFunc, error) {
220300
if err != nil {
221301
return err
222302
}
223-
if err := ready(); err != nil {
303+
304+
loggerLock := getLockPath(dataStore, config.Namespace, config.ID)
305+
f, err := os.Create(loggerLock)
306+
if err != nil {
224307
return err
225308
}
226-
227-
return loggingProcessAdapter(ctx, driver, dataStore, config)
309+
defer f.Close()
310+
311+
// the logger will obtain an exclusive lock on a file until the container is
312+
// stopped and the driver has finished processing all output,
313+
// so that waiting log viewers can be signalled when the process is complete.
314+
return lockutil.WithDirLock(loggerLock, func() error {
315+
if err := ready(); err != nil {
316+
return err
317+
}
318+
// getContainerWait is extracted as parameter to allow mocking in tests.
319+
return loggingProcessAdapter(ctx, driver, dataStore, logConfig.Address, getContainerWait, config)
320+
})
228321
} else if !errors.Is(err, os.ErrNotExist) {
229322
// the file does not exist if the container was created with nerdctl < 0.20
230323
return err

pkg/logging/logging_test.go

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"testing"
2626
"time"
2727

28+
containerd "github.com/containerd/containerd/v2/client"
2829
"github.com/containerd/containerd/v2/core/runtime/v2/logging"
2930
)
3031

@@ -78,7 +79,14 @@ func TestLoggingProcessAdapter(t *testing.T) {
7879
ctx, cancel := context.WithCancel(context.Background())
7980
defer cancel()
8081

81-
err := loggingProcessAdapter(ctx, driver, "testDataStore", config)
82+
var getContainerWaitMock ContainerWaitFunc = func(ctx context.Context, address string, config *logging.Config) (<-chan containerd.ExitStatus, error) {
83+
exitChan := make(chan containerd.ExitStatus, 1)
84+
time.Sleep(50 * time.Millisecond)
85+
exitChan <- containerd.ExitStatus{}
86+
return exitChan, nil
87+
}
88+
89+
err := loggingProcessAdapter(ctx, driver, "testDataStore", "", getContainerWaitMock, config)
8290
if err != nil {
8391
t.Fatal(err)
8492
}

0 commit comments

Comments
 (0)