Skip to content

Commit 6f8dfdf

Browse files
jdefJames DeFelice
authored andcommitted
Fix docker/journald logging conformance
See issue kubernetes#86367
1 parent cad4460 commit 6f8dfdf

File tree

3 files changed

+166
-1
lines changed

3 files changed

+166
-1
lines changed

pkg/kubelet/dockershim/docker_legacy_service.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package dockershim
1818

1919
import (
2020
"context"
21+
"errors"
2122
"fmt"
2223
"io"
2324
"strconv"
@@ -29,6 +30,7 @@ import (
2930
"k8s.io/api/core/v1"
3031
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3132
kubetypes "k8s.io/apimachinery/pkg/types"
33+
"k8s.io/klog"
3234
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
3335
"k8s.io/kubernetes/pkg/kubelet/kuberuntime"
3436

@@ -76,12 +78,23 @@ func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, contain
7678
opts.Tail = strconv.FormatInt(*logOptions.TailLines, 10)
7779
}
7880

81+
if logOptions.LimitBytes != nil {
82+
// stdout and stderr share the total write limit
83+
max := *logOptions.LimitBytes
84+
stderr = limitedWriter(stderr, &max)
85+
stdout = limitedWriter(stdout, &max)
86+
}
7987
sopts := libdocker.StreamOptions{
8088
OutputStream: stdout,
8189
ErrorStream: stderr,
8290
RawTerminal: container.Config.Tty,
8391
}
84-
return d.client.Logs(containerID.ID, opts, sopts)
92+
err = d.client.Logs(containerID.ID, opts, sopts)
93+
if errors.Is(err, errMaximumWrite) {
94+
klog.V(2).Infof("finished logs, hit byte limit %d", *logOptions.LimitBytes)
95+
err = nil
96+
}
97+
return err
8598
}
8699

87100
// GetContainerLogTail attempts to read up to MaxContainerTerminationMessageLogLength

pkg/kubelet/dockershim/helpers.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@ limitations under the License.
1717
package dockershim
1818

1919
import (
20+
"errors"
2021
"fmt"
22+
"io"
2123
"regexp"
2224
"strconv"
2325
"strings"
26+
"sync/atomic"
2427

2528
dockertypes "github.com/docker/docker/api/types"
2629
dockercontainer "github.com/docker/docker/api/types/container"
@@ -393,3 +396,48 @@ type dockerOpt struct {
393396
func (d dockerOpt) GetKV() (string, string) {
394397
return d.key, d.value
395398
}
399+
400+
// writeLimiter limits the total output written across one or more streams.
401+
type writeLimiter struct {
402+
delegate io.Writer
403+
limit *int64
404+
}
405+
406+
func (w writeLimiter) Write(p []byte) (int, error) {
407+
if len(p) == 0 {
408+
return 0, nil
409+
}
410+
limit := atomic.LoadInt64(w.limit)
411+
if limit <= 0 {
412+
return 0, errMaximumWrite
413+
}
414+
var truncated bool
415+
if limit < int64(len(p)) {
416+
p = p[0:limit]
417+
truncated = true
418+
}
419+
n, err := w.delegate.Write(p)
420+
if n > 0 {
421+
newLimit := limit - int64(n)
422+
for !atomic.CompareAndSwapInt64(w.limit, limit, newLimit) {
423+
limit = atomic.LoadInt64(w.limit)
424+
newLimit = limit - int64(n)
425+
}
426+
}
427+
if err == nil && truncated {
428+
err = errMaximumWrite
429+
}
430+
return n, err
431+
}
432+
433+
func limitedWriter(w io.Writer, limit *int64) io.Writer {
434+
if w == nil {
435+
return nil
436+
}
437+
return &writeLimiter{
438+
delegate: w,
439+
limit: limit,
440+
}
441+
}
442+
443+
var errMaximumWrite = errors.New("maximum write")

pkg/kubelet/dockershim/helpers_test.go

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@ limitations under the License.
1717
package dockershim
1818

1919
import (
20+
"bytes"
21+
"errors"
2022
"fmt"
23+
"sync"
2124
"testing"
2225

2326
dockertypes "github.com/docker/docker/api/types"
@@ -332,3 +335,104 @@ func TestGenerateMountBindings(t *testing.T) {
332335

333336
assert.Equal(t, expectedResult, result)
334337
}
338+
339+
func TestLimitedWriter(t *testing.T) {
340+
max := func(x, y int64) int64 {
341+
if x > y {
342+
return x
343+
}
344+
return y
345+
}
346+
for name, tc := range map[string]struct {
347+
w bytes.Buffer
348+
toWrite string
349+
limit int64
350+
wants string
351+
wantsErr error
352+
}{
353+
"nil": {},
354+
"neg": {
355+
toWrite: "a",
356+
wantsErr: errMaximumWrite,
357+
limit: -1,
358+
},
359+
"1byte-over": {
360+
toWrite: "a",
361+
wantsErr: errMaximumWrite,
362+
},
363+
"1byte-maxed": {
364+
toWrite: "a",
365+
wants: "a",
366+
limit: 1,
367+
},
368+
"1byte-under": {
369+
toWrite: "a",
370+
wants: "a",
371+
limit: 2,
372+
},
373+
"6byte-over": {
374+
toWrite: "foobar",
375+
wants: "foo",
376+
limit: 3,
377+
wantsErr: errMaximumWrite,
378+
},
379+
"6byte-maxed": {
380+
toWrite: "foobar",
381+
wants: "foobar",
382+
limit: 6,
383+
},
384+
"6byte-under": {
385+
toWrite: "foobar",
386+
wants: "foobar",
387+
limit: 20,
388+
},
389+
} {
390+
t.Run(name, func(t *testing.T) {
391+
limit := tc.limit
392+
w := limitedWriter(&tc.w, &limit)
393+
n, err := w.Write([]byte(tc.toWrite))
394+
if int64(n) > max(0, tc.limit) {
395+
t.Fatalf("bytes written (%d) exceeds limit (%d)", n, tc.limit)
396+
}
397+
if (err != nil) != (tc.wantsErr != nil) {
398+
if err != nil {
399+
t.Fatal("unexpected error:", err)
400+
}
401+
t.Fatal("expected error:", err)
402+
}
403+
if err != nil {
404+
if !errors.Is(err, tc.wantsErr) {
405+
t.Fatal("expected error: ", tc.wantsErr, " instead of: ", err)
406+
}
407+
if !errors.Is(err, errMaximumWrite) {
408+
return
409+
}
410+
// check contents for errMaximumWrite
411+
}
412+
if s := tc.w.String(); s != tc.wants {
413+
t.Fatalf("expected %q instead of %q", tc.wants, s)
414+
}
415+
})
416+
}
417+
418+
// test concurrency. run this test a bunch of times to attempt to flush
419+
// out any data races or concurrency issues.
420+
for i := 0; i < 1000; i++ {
421+
var (
422+
b1, b2 bytes.Buffer
423+
limit = int64(10)
424+
w1 = limitedWriter(&b1, &limit)
425+
w2 = limitedWriter(&b2, &limit)
426+
ch = make(chan struct{})
427+
wg sync.WaitGroup
428+
)
429+
wg.Add(2)
430+
go func() { defer wg.Done(); <-ch; w1.Write([]byte("hello")) }()
431+
go func() { defer wg.Done(); <-ch; w2.Write([]byte("world")) }()
432+
close(ch)
433+
wg.Wait()
434+
if limit != 0 {
435+
t.Fatalf("expected max limit to be reached, instead of %d", limit)
436+
}
437+
}
438+
}

0 commit comments

Comments
 (0)