Skip to content

Commit e341d5b

Browse files
authored
Merge pull request kubernetes#125626 from linxiulei/fix_routine_log
Fix httplog not logging watch duration
2 parents ff8834c + 06c7058 commit e341d5b

File tree

6 files changed

+139
-27
lines changed

6 files changed

+139
-27
lines changed

staging/src/k8s.io/apiserver/pkg/endpoints/handlers/get.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ import (
4141
"k8s.io/apiserver/pkg/endpoints/request"
4242
"k8s.io/apiserver/pkg/features"
4343
"k8s.io/apiserver/pkg/registry/rest"
44-
genericfilters "k8s.io/apiserver/pkg/server/filters"
44+
"k8s.io/apiserver/pkg/server/routine"
4545
utilfeature "k8s.io/apiserver/pkg/util/feature"
4646
"k8s.io/component-base/tracing"
4747
"k8s.io/klog/v2"
@@ -285,7 +285,7 @@ func ListResource(r rest.Lister, rw rest.Watcher, scope *RequestScope, forceWatc
285285
}
286286

287287
// Run watch serving in a separate goroutine to allow freeing current stack memory
288-
t := genericfilters.TaskFrom(req.Context())
288+
t := routine.TaskFrom(req.Context())
289289
if t != nil {
290290
t.Func = serve
291291
} else {

staging/src/k8s.io/apiserver/pkg/server/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ import (
6464
genericfilters "k8s.io/apiserver/pkg/server/filters"
6565
"k8s.io/apiserver/pkg/server/healthz"
6666
"k8s.io/apiserver/pkg/server/routes"
67+
"k8s.io/apiserver/pkg/server/routine"
6768
serverstore "k8s.io/apiserver/pkg/server/storage"
6869
storagevalue "k8s.io/apiserver/pkg/storage/value"
6970
"k8s.io/apiserver/pkg/storageversion"
@@ -1061,7 +1062,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
10611062
// handler in current goroutine to minimize the stack memory usage. It must be
10621063
// after WithPanicRecover() to be protected from panics.
10631064
if c.FeatureGate.Enabled(genericfeatures.APIServingWithRoutine) {
1064-
handler = genericfilters.WithRoutine(handler, c.LongRunningFunc)
1065+
handler = routine.WithRoutine(handler, c.LongRunningFunc)
10651066
}
10661067
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
10671068
handler = genericapifilters.WithRequestReceivedTimestamp(handler)

staging/src/k8s.io/apiserver/pkg/server/filters/routine_test.go renamed to staging/src/k8s.io/apiserver/pkg/server/filters/wrap_test.go

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
Copyright 2023 The Kubernetes Authors.
2+
Copyright 2024 The Kubernetes Authors.
33
44
Licensed under the Apache License, Version 2.0 (the "License");
55
you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@ import (
2525

2626
"k8s.io/apimachinery/pkg/util/sets"
2727
"k8s.io/apiserver/pkg/endpoints/request"
28+
"k8s.io/apiserver/pkg/server/routine"
2829
"k8s.io/klog/v2"
2930
)
3031

@@ -42,7 +43,7 @@ func TestPropogatingPanic(t *testing.T) {
4243
APIPrefixes: sets.NewString("api", "apis"),
4344
GrouplessAPIPrefixes: sets.NewString("api"),
4445
}
45-
ts := httptest.NewServer(WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
46+
ts := httptest.NewServer(routine.WithRoutine(WithPanicRecovery(handler, resolver), func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
4647
defer ts.Close()
4748
_, err := http.Get(ts.URL)
4849
if err == nil {
@@ -57,23 +58,3 @@ func TestPropogatingPanic(t *testing.T) {
5758
t.Errorf("unexpected out captured actual = %v", capturedOutput)
5859
}
5960
}
60-
61-
func TestExecutionWithRoutine(t *testing.T) {
62-
var executed bool
63-
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
64-
t := TaskFrom(r.Context())
65-
t.Func = func() {
66-
executed = true
67-
}
68-
})
69-
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
70-
defer ts.Close()
71-
72-
_, err := http.Get(ts.URL)
73-
if err != nil {
74-
t.Errorf("got unexpected error on request: %v", err)
75-
}
76-
if !executed {
77-
t.Error("expected to execute")
78-
}
79-
}

staging/src/k8s.io/apiserver/pkg/server/httplog/httplog.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ import (
3131
"k8s.io/apiserver/pkg/endpoints/metrics"
3232
"k8s.io/apiserver/pkg/endpoints/request"
3333
"k8s.io/apiserver/pkg/endpoints/responsewriter"
34+
"k8s.io/apiserver/pkg/server/routine"
3435
"k8s.io/klog/v2"
3536
)
3637

@@ -125,10 +126,26 @@ func withLogging(handler http.Handler, stackTracePred StacktracePred, shouldLogR
125126
rl := newLoggedWithStartTime(req, w, startTime)
126127
rl.StacktraceWhen(stackTracePred)
127128
req = req.WithContext(context.WithValue(ctx, respLoggerContextKey, rl))
128-
defer rl.Log()
129+
130+
var logFunc func()
131+
logFunc = rl.Log
132+
defer func() {
133+
if logFunc != nil {
134+
logFunc()
135+
}
136+
}()
129137

130138
w = responsewriter.WrapForHTTP1Or2(rl)
131139
handler.ServeHTTP(w, req)
140+
141+
// We need to ensure that the request is logged after it is processed.
142+
// In case the request is executed in a separate goroutine created via
143+
// WithRoutine handler in the handler chain (i.e. above handler.ServeHTTP()
144+
// would return request is completely responsed), we want the logging to
145+
// happen in that goroutine too, so we append it to the task.
146+
if routine.AppendTask(ctx, &routine.Task{Func: rl.Log}) {
147+
logFunc = nil
148+
}
132149
})
133150
}
134151

staging/src/k8s.io/apiserver/pkg/server/filters/routine.go renamed to staging/src/k8s.io/apiserver/pkg/server/routine/routine.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
package filters
17+
package routine
1818

1919
import (
2020
"context"
@@ -35,6 +35,20 @@ func WithTask(parent context.Context, t *Task) context.Context {
3535
return request.WithValue(parent, taskKey, t)
3636
}
3737

38+
// AppendTask appends a task executed after completion of existing task.
39+
// It is a no-op if there is no existing task.
40+
func AppendTask(ctx context.Context, t *Task) bool {
41+
if existTask := TaskFrom(ctx); existTask != nil && existTask.Func != nil {
42+
existFunc := existTask.Func
43+
existTask.Func = func() {
44+
existFunc()
45+
t.Func()
46+
}
47+
return true
48+
}
49+
return false
50+
}
51+
3852
func TaskFrom(ctx context.Context) *Task {
3953
t, _ := ctx.Value(taskKey).(*Task)
4054
return t
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/*
2+
Copyright 2023 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package routine
18+
19+
import (
20+
"net/http"
21+
"net/http/httptest"
22+
"testing"
23+
24+
"k8s.io/apiserver/pkg/endpoints/request"
25+
)
26+
27+
func TestExecutionWithRoutine(t *testing.T) {
28+
var executed bool
29+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
30+
t := TaskFrom(r.Context())
31+
t.Func = func() {
32+
executed = true
33+
}
34+
})
35+
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
36+
defer ts.Close()
37+
38+
_, err := http.Get(ts.URL)
39+
if err != nil {
40+
t.Errorf("got unexpected error on request: %v", err)
41+
}
42+
if !executed {
43+
t.Error("expected to execute")
44+
}
45+
}
46+
47+
func TestAppendTask(t *testing.T) {
48+
tests := []struct {
49+
name string
50+
existingTask bool
51+
taskAppended bool
52+
shouldExecute bool
53+
}{
54+
{
55+
name: "append task when existing",
56+
existingTask: true,
57+
taskAppended: true,
58+
shouldExecute: true,
59+
},
60+
{
61+
name: "not append task when no existing tasks",
62+
existingTask: false,
63+
taskAppended: false,
64+
shouldExecute: false,
65+
},
66+
}
67+
for _, test := range tests {
68+
t.Run(test.name, func(t *testing.T) {
69+
var executed, appended bool
70+
taskToAppend := func() {
71+
executed = true
72+
}
73+
handler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
74+
ctx := r.Context()
75+
if test.existingTask {
76+
t := TaskFrom(ctx)
77+
t.Func = func() {}
78+
}
79+
80+
appended = AppendTask(ctx, &Task{taskToAppend})
81+
})
82+
ts := httptest.NewServer(WithRoutine(handler, func(_ *http.Request, _ *request.RequestInfo) bool { return true }))
83+
defer ts.Close()
84+
85+
_, err := http.Get(ts.URL)
86+
if err != nil {
87+
t.Errorf("got unexpected error on request: %v", err)
88+
}
89+
90+
if test.taskAppended != appended {
91+
t.Errorf("expected taskAppended: %t, got: %t", test.taskAppended, executed)
92+
}
93+
94+
if test.shouldExecute != executed {
95+
t.Errorf("expected shouldExecute: %t, got: %t", test.shouldExecute, executed)
96+
}
97+
})
98+
}
99+
}

0 commit comments

Comments
 (0)