Skip to content

Commit 94cd0a0

Browse files
committed
feat(kubelet): only returns logs that match the given stream
Signed-off-by: Jian Zeng <[email protected]>
1 parent 0793f65 commit 94cd0a0

File tree

5 files changed

+361
-31
lines changed

5 files changed

+361
-31
lines changed

pkg/kubelet/kubelet_pods.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1566,11 +1566,14 @@ func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, con
15661566
return err
15671567
}
15681568

1569-
// Do a zero-byte write to stdout before handing off to the container runtime.
1570-
// This ensures at least one Write call is made to the writer when copying starts,
1571-
// even if we then block waiting for log output from the container.
1572-
if _, err := stdout.Write([]byte{}); err != nil {
1573-
return err
1569+
// Since v1.32, stdout may be nil if the stream is not requested.
1570+
if stdout != nil {
1571+
// Do a zero-byte write to stdout before handing off to the container runtime.
1572+
// This ensures at least one Write call is made to the writer when copying starts,
1573+
// even if we then block waiting for log output from the container.
1574+
if _, err := stdout.Write([]byte{}); err != nil {
1575+
return err
1576+
}
15741577
}
15751578

15761579
return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)

pkg/kubelet/server/server.go

Lines changed: 42 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ import (
4141
oteltrace "go.opentelemetry.io/otel/trace"
4242
"google.golang.org/grpc"
4343
"k8s.io/klog/v2"
44-
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
4544
"k8s.io/utils/clock"
4645
netutils "k8s.io/utils/net"
46+
"k8s.io/utils/ptr"
4747

4848
v1 "k8s.io/api/core/v1"
4949
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -83,6 +83,7 @@ import (
8383
apisgrpc "k8s.io/kubernetes/pkg/kubelet/apis/grpc"
8484
"k8s.io/kubernetes/pkg/kubelet/apis/podresources"
8585
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
86+
"k8s.io/kubernetes/pkg/kubelet/metrics/collectors"
8687
"k8s.io/kubernetes/pkg/kubelet/prober"
8788
servermetrics "k8s.io/kubernetes/pkg/kubelet/server/metrics"
8889
"k8s.io/kubernetes/pkg/kubelet/server/stats"
@@ -723,6 +724,13 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
723724
response.WriteError(http.StatusBadRequest, fmt.Errorf(`{"message": "Unable to decode query."}`))
724725
return
725726
}
727+
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
728+
// Even with defaulters, logOptions.Stream can be nil if no arguments are provided at all.
729+
if logOptions.Stream == nil {
730+
// Default to "All" to maintain backward compatibility.
731+
logOptions.Stream = ptr.To(v1.LogStreamAll)
732+
}
733+
}
726734
logOptions.TypeMeta = metav1.TypeMeta{}
727735
if errs := validation.ValidatePodLogOptions(logOptions); len(errs) > 0 {
728736
response.WriteError(http.StatusUnprocessableEntity, fmt.Errorf(`{"message": "Invalid request."}`))
@@ -744,9 +752,40 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
744752
response.WriteError(http.StatusInternalServerError, fmt.Errorf("unable to convert %v into http.Flusher, cannot show logs", reflect.TypeOf(response)))
745753
return
746754
}
747-
fw := flushwriter.Wrap(response.ResponseWriter)
755+
756+
var (
757+
stdout io.Writer
758+
stderr io.Writer
759+
fw = flushwriter.Wrap(response.ResponseWriter)
760+
)
761+
if utilfeature.DefaultFeatureGate.Enabled(features.PodLogsQuerySplitStreams) {
762+
wantedStream := logOptions.Stream
763+
// No stream type specified, default to All
764+
if wantedStream == nil {
765+
allStream := v1.LogStreamAll
766+
wantedStream = &allStream
767+
}
768+
switch *wantedStream {
769+
case v1.LogStreamStdout:
770+
stdout, stderr = fw, nil
771+
case v1.LogStreamStderr:
772+
stdout, stderr = nil, fw
773+
case v1.LogStreamAll:
774+
stdout, stderr = fw, fw
775+
default:
776+
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("invalid stream type %q", *logOptions.Stream))
777+
return
778+
}
779+
} else {
780+
if logOptions.Stream != nil && *logOptions.Stream != v1.LogStreamAll {
781+
_ = response.WriteError(http.StatusBadRequest, fmt.Errorf("unable to return the given log stream: %q. Please enable PodLogsQuerySplitStreams feature gate in kubelet", *logOptions.Stream))
782+
return
783+
}
784+
stdout, stderr = fw, fw
785+
}
786+
748787
response.Header().Set("Transfer-Encoding", "chunked")
749-
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
788+
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, stdout, stderr); err != nil {
750789
response.WriteError(http.StatusBadRequest, err)
751790
return
752791
}

pkg/kubelet/server/server_test.go

Lines changed: 250 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ import (
3737
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
3838
"github.com/stretchr/testify/assert"
3939
"github.com/stretchr/testify/require"
40+
"k8s.io/utils/ptr"
41+
4042
v1 "k8s.io/api/core/v1"
4143
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4244
"k8s.io/apimachinery/pkg/types"
@@ -50,7 +52,6 @@ import (
5052
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
5153
statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
5254
api "k8s.io/kubernetes/pkg/apis/core"
53-
"k8s.io/utils/ptr"
5455

5556
// Do some initialization to decode the query parameters correctly.
5657
"k8s.io/apiserver/pkg/server/healthz"
@@ -820,29 +821,41 @@ func TestContainerLogs(t *testing.T) {
820821
}
821822

822823
for desc, test := range tests {
823-
t.Run(desc, func(t *testing.T) {
824-
output := "foo bar"
825-
podNamespace := "other"
826-
podName := "foo"
827-
expectedPodName := getPodName(podName, podNamespace)
828-
expectedContainerName := "baz"
829-
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
830-
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, test.podLogOption, output)
831-
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
832-
if err != nil {
833-
t.Errorf("Got error GETing: %v", err)
834-
}
835-
defer resp.Body.Close()
824+
// To make sure the original behavior doesn't change no matter the feature PodLogsQuerySplitStreams is enabled or not.
825+
for _, enablePodLogsQuerySplitStreams := range []bool{true, false} {
826+
t.Run(fmt.Sprintf("%s (enablePodLogsQuerySplitStreams=%v)", desc, enablePodLogsQuerySplitStreams), func(t *testing.T) {
827+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, enablePodLogsQuerySplitStreams)
828+
expectedLogOptions := test.podLogOption.DeepCopy()
829+
if enablePodLogsQuerySplitStreams && expectedLogOptions.Stream == nil {
830+
// The HTTP handler will internally set the default stream value.
831+
expectedLogOptions.Stream = ptr.To(v1.LogStreamAll)
832+
}
836833

837-
body, err := io.ReadAll(resp.Body)
838-
if err != nil {
839-
t.Errorf("Error reading container logs: %v", err)
840-
}
841-
result := string(body)
842-
if result != output {
843-
t.Errorf("Expected: '%v', got: '%v'", output, result)
844-
}
845-
})
834+
output := "foo bar"
835+
podNamespace := "other"
836+
podName := "foo"
837+
expectedPodName := getPodName(podName, podNamespace)
838+
expectedContainerName := "baz"
839+
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
840+
setGetContainerLogsFunc(fw, t, expectedPodName, expectedContainerName, expectedLogOptions, output)
841+
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + test.query)
842+
if err != nil {
843+
t.Errorf("Got error GETing: %v", err)
844+
}
845+
defer func() {
846+
_ = resp.Body.Close()
847+
}()
848+
849+
body, err := io.ReadAll(resp.Body)
850+
if err != nil {
851+
t.Errorf("Error reading container logs: %v", err)
852+
}
853+
result := string(body)
854+
if result != output {
855+
t.Errorf("Expected: '%v', got: '%v'", output, result)
856+
}
857+
})
858+
}
846859
}
847860
}
848861

@@ -866,6 +879,220 @@ func TestContainerLogsWithInvalidTail(t *testing.T) {
866879
}
867880
}
868881

882+
func TestContainerLogsWithSeparateStream(t *testing.T) {
883+
featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.PodLogsQuerySplitStreams, true)
884+
885+
type logEntry struct {
886+
stream string
887+
msg string
888+
}
889+
890+
fw := newServerTest()
891+
defer fw.testHTTPServer.Close()
892+
893+
var (
894+
streamStdout = v1.LogStreamStdout
895+
streamStderr = v1.LogStreamStderr
896+
streamAll = v1.LogStreamAll
897+
)
898+
899+
testCases := []struct {
900+
name string
901+
query string
902+
logs []logEntry
903+
expectedOutput string
904+
expectedLogOptions *v1.PodLogOptions
905+
}{
906+
{
907+
// Defaulters don't work if the query is empty.
908+
// See also https://github.com/kubernetes/kubernetes/issues/128589
909+
name: "empty query should return all logs",
910+
logs: []logEntry{
911+
{stream: v1.LogStreamStdout, msg: "foo\n"},
912+
{stream: v1.LogStreamStderr, msg: "bar\n"},
913+
},
914+
query: "",
915+
expectedLogOptions: &v1.PodLogOptions{
916+
Stream: &streamAll,
917+
},
918+
expectedOutput: "foo\nbar\n",
919+
},
920+
{
921+
name: "missing stream param should return all logs",
922+
logs: []logEntry{
923+
{stream: v1.LogStreamStdout, msg: "foo\n"},
924+
{stream: v1.LogStreamStderr, msg: "bar\n"},
925+
},
926+
query: "?limitBytes=100",
927+
expectedLogOptions: &v1.PodLogOptions{
928+
Stream: &streamAll,
929+
LimitBytes: ptr.To[int64](100),
930+
},
931+
expectedOutput: "foo\nbar\n",
932+
},
933+
{
934+
name: "only stdout logs",
935+
logs: []logEntry{
936+
{stream: v1.LogStreamStdout, msg: "out1\n"},
937+
{stream: v1.LogStreamStderr, msg: "err1\n"},
938+
{stream: v1.LogStreamStdout, msg: "out2\n"},
939+
},
940+
query: "?stream=Stdout",
941+
expectedLogOptions: &v1.PodLogOptions{
942+
Stream: &streamStdout,
943+
},
944+
expectedOutput: "out1\nout2\n",
945+
},
946+
{
947+
name: "only stderr logs",
948+
logs: []logEntry{
949+
{stream: v1.LogStreamStderr, msg: "err1\n"},
950+
{stream: v1.LogStreamStderr, msg: "err2\n"},
951+
{stream: v1.LogStreamStdout, msg: "out1\n"},
952+
},
953+
query: "?stream=Stderr",
954+
expectedLogOptions: &v1.PodLogOptions{
955+
Stream: &streamStderr,
956+
},
957+
expectedOutput: "err1\nerr2\n",
958+
},
959+
{
960+
name: "return all logs",
961+
logs: []logEntry{
962+
{stream: v1.LogStreamStdout, msg: "out1\n"},
963+
{stream: v1.LogStreamStderr, msg: "err1\n"},
964+
{stream: v1.LogStreamStdout, msg: "out2\n"},
965+
},
966+
query: "?stream=All",
967+
expectedLogOptions: &v1.PodLogOptions{
968+
Stream: &streamAll,
969+
},
970+
expectedOutput: "out1\nerr1\nout2\n",
971+
},
972+
{
973+
name: "stdout logs with legacy tail",
974+
logs: []logEntry{
975+
{stream: v1.LogStreamStdout, msg: "out1\n"},
976+
{stream: v1.LogStreamStderr, msg: "err1\n"},
977+
{stream: v1.LogStreamStdout, msg: "out2\n"},
978+
},
979+
query: "?stream=All&tail=1",
980+
expectedLogOptions: &v1.PodLogOptions{
981+
Stream: &streamAll,
982+
TailLines: ptr.To[int64](1),
983+
},
984+
expectedOutput: "out2\n",
985+
},
986+
{
987+
name: "return the last 2 lines of logs",
988+
logs: []logEntry{
989+
{stream: v1.LogStreamStdout, msg: "out1\n"},
990+
{stream: v1.LogStreamStderr, msg: "err1\n"},
991+
{stream: v1.LogStreamStdout, msg: "out2\n"},
992+
},
993+
query: "?stream=All&tailLines=2",
994+
expectedLogOptions: &v1.PodLogOptions{
995+
Stream: &streamAll,
996+
TailLines: ptr.To[int64](2),
997+
},
998+
expectedOutput: "err1\nout2\n",
999+
},
1000+
{
1001+
name: "return the first 6 bytes of the stdout log stream",
1002+
logs: []logEntry{
1003+
{stream: v1.LogStreamStderr, msg: "err1\n"},
1004+
{stream: v1.LogStreamStdout, msg: "out1\n"},
1005+
{stream: v1.LogStreamStderr, msg: "err2\n"},
1006+
{stream: v1.LogStreamStdout, msg: "out2\n"},
1007+
},
1008+
query: "?stream=Stdout&limitBytes=6",
1009+
expectedLogOptions: &v1.PodLogOptions{
1010+
Stream: &streamStdout,
1011+
LimitBytes: ptr.To[int64](6),
1012+
},
1013+
expectedOutput: "out1\no",
1014+
},
1015+
{
1016+
name: "invalid stream",
1017+
logs: []logEntry{
1018+
{stream: v1.LogStreamStderr, msg: "err1\n"},
1019+
{stream: v1.LogStreamStdout, msg: "out1\n"},
1020+
},
1021+
query: "?stream=invalid",
1022+
expectedLogOptions: nil,
1023+
expectedOutput: `{"message": "Invalid request."}`,
1024+
},
1025+
}
1026+
for _, tc := range testCases {
1027+
t.Run(tc.name, func(t *testing.T) {
1028+
podNamespace := "other"
1029+
podName := "foo"
1030+
expectedContainerName := "baz"
1031+
setPodByNameFunc(fw, podNamespace, podName, expectedContainerName)
1032+
fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
1033+
if !reflect.DeepEqual(tc.expectedLogOptions, logOptions) {
1034+
t.Errorf("expected %#v, got %#v", tc.expectedLogOptions, logOptions)
1035+
}
1036+
1037+
var dst io.Writer
1038+
tailLines := len(tc.logs)
1039+
if logOptions.TailLines != nil {
1040+
tailLines = int(*logOptions.TailLines)
1041+
}
1042+
1043+
remain := 0
1044+
if logOptions.LimitBytes != nil {
1045+
remain = int(*logOptions.LimitBytes)
1046+
} else {
1047+
for _, log := range tc.logs {
1048+
remain += len(log.msg)
1049+
}
1050+
}
1051+
1052+
logs := tc.logs[len(tc.logs)-tailLines:]
1053+
for _, log := range logs {
1054+
switch log.stream {
1055+
case v1.LogStreamStdout:
1056+
dst = stdout
1057+
case v1.LogStreamStderr:
1058+
dst = stderr
1059+
}
1060+
// Skip if the stream is not requested
1061+
if dst == nil {
1062+
continue
1063+
}
1064+
line := log.msg
1065+
if len(line) > remain {
1066+
line = line[:remain]
1067+
}
1068+
_, _ = io.WriteString(dst, line)
1069+
remain -= len(line)
1070+
if remain <= 0 {
1071+
return nil
1072+
}
1073+
}
1074+
return nil
1075+
}
1076+
resp, err := http.Get(fw.testHTTPServer.URL + "/containerLogs/" + podNamespace + "/" + podName + "/" + expectedContainerName + tc.query)
1077+
if err != nil {
1078+
t.Errorf("Got error GETing: %v", err)
1079+
}
1080+
defer func() {
1081+
_ = resp.Body.Close()
1082+
}()
1083+
1084+
body, err := io.ReadAll(resp.Body)
1085+
if err != nil {
1086+
t.Errorf("Error reading container logs: %v", err)
1087+
}
1088+
result := string(body)
1089+
if result != tc.expectedOutput {
1090+
t.Errorf("Expected: %q, got: %q", tc.expectedOutput, result)
1091+
}
1092+
})
1093+
}
1094+
}
1095+
8691096
func TestCheckpointContainer(t *testing.T) {
8701097
podNamespace := "other"
8711098
podName := "foo"

0 commit comments

Comments
 (0)