Skip to content

Commit 1cbe7b6

Browse files
authored
Merge pull request kubernetes#124189 from toddtreece/kube-aggregator-proxy-tracing
Add tracing to kube-aggregator proxyHandler
2 parents 1ab06ef + 92da422 commit 1cbe7b6

File tree

3 files changed

+133
-0
lines changed

3 files changed

+133
-0
lines changed

staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ import (
3737
utilfeature "k8s.io/apiserver/pkg/util/feature"
3838
"k8s.io/client-go/kubernetes"
3939
"k8s.io/client-go/transport"
40+
"k8s.io/component-base/tracing"
4041
"k8s.io/component-base/version"
4142
v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
4243
v1helper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
@@ -169,6 +170,9 @@ type APIAggregator struct {
169170

170171
// rejectForwardingRedirects is whether to allow to forward redirect response
171172
rejectForwardingRedirects bool
173+
174+
// tracerProvider is used to wrap the proxy transport and handler with tracing
175+
tracerProvider tracing.TracerProvider
172176
}
173177

174178
// Complete fills in any fields not set that are required to have valid data. It's mutating the receiver.
@@ -239,6 +243,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
239243
openAPIV3Config: c.GenericConfig.OpenAPIV3Config,
240244
proxyCurrentCertKeyContent: func() (bytes []byte, bytes2 []byte) { return nil, nil },
241245
rejectForwardingRedirects: c.ExtraConfig.RejectForwardingRedirects,
246+
tracerProvider: c.GenericConfig.TracerProvider,
242247
}
243248

244249
// used later to filter the served resource by those that have expired.
@@ -518,6 +523,7 @@ func (s *APIAggregator) AddAPIService(apiService *v1.APIService) error {
518523
proxyTransportDial: s.proxyTransportDial,
519524
serviceResolver: s.serviceResolver,
520525
rejectForwardingRedirects: s.rejectForwardingRedirects,
526+
tracerProvider: s.tracerProvider,
521527
}
522528
proxyHandler.updateAPIService(apiService)
523529
if s.openAPIAggregationController != nil {

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,13 @@ import (
2727
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
2828
endpointmetrics "k8s.io/apiserver/pkg/endpoints/metrics"
2929
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
30+
genericfeatures "k8s.io/apiserver/pkg/features"
31+
utilfeature "k8s.io/apiserver/pkg/util/feature"
3032
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
3133
apiserverproxyutil "k8s.io/apiserver/pkg/util/proxy"
3234
"k8s.io/apiserver/pkg/util/x509metrics"
3335
"k8s.io/client-go/transport"
36+
"k8s.io/component-base/tracing"
3437
"k8s.io/klog/v2"
3538
apiregistrationv1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
3639
apiregistrationv1apihelper "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1/helper"
@@ -59,6 +62,9 @@ type proxyHandler struct {
5962

6063
// reject to forward redirect response
6164
rejectForwardingRedirects bool
65+
66+
// tracerProvider is used to wrap the proxy transport and handler with tracing
67+
tracerProvider tracing.TracerProvider
6268
}
6369

6470
type proxyHandlingInfo struct {
@@ -155,6 +161,11 @@ func (r *proxyHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
155161

156162
proxyRoundTripper = transport.NewAuthProxyRoundTripper(user.GetName(), user.GetGroups(), user.GetExtra(), proxyRoundTripper)
157163

164+
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) && !upgrade {
165+
tracingWrapper := tracing.WrapperFor(r.tracerProvider)
166+
proxyRoundTripper = tracingWrapper(proxyRoundTripper)
167+
}
168+
158169
// If we are upgrading, then the upgrade path tries to use this request with the TLS config we provide, but it does
159170
// NOT use the proxyRoundTripper. It's a direct dial that bypasses the proxyRoundTripper. This means that we have to
160171
// attach the "correct" user headers to the request ahead of time.

staging/src/k8s.io/kube-aggregator/pkg/apiserver/handler_proxy_test.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,17 @@ import (
4040

4141
"golang.org/x/net/websocket"
4242

43+
"go.opentelemetry.io/otel/propagation"
44+
sdktrace "go.opentelemetry.io/otel/sdk/trace"
45+
"go.opentelemetry.io/otel/sdk/trace/tracetest"
46+
"go.opentelemetry.io/otel/trace"
4347
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4448
"k8s.io/apimachinery/pkg/types"
4549
utilnet "k8s.io/apimachinery/pkg/util/net"
4650
"k8s.io/apimachinery/pkg/util/proxy"
4751
"k8s.io/apimachinery/pkg/util/sets"
4852
"k8s.io/apiserver/pkg/authentication/user"
53+
"k8s.io/apiserver/pkg/endpoints/filters"
4954
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
5055
"k8s.io/apiserver/pkg/server/egressselector"
5156
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
@@ -54,6 +59,7 @@ import (
5459
"k8s.io/component-base/metrics/legacyregistry"
5560
apiregistration "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
5661
"k8s.io/utils/pointer"
62+
"k8s.io/utils/ptr"
5763
)
5864

5965
type targetHTTPHandler struct {
@@ -774,6 +780,116 @@ func TestGetContextForNewRequest(t *testing.T) {
774780

775781
}
776782

783+
func TestTracerProvider(t *testing.T) {
784+
fakeRecorder := tracetest.NewSpanRecorder()
785+
otelTracer := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(fakeRecorder))
786+
target := &targetHTTPHandler{}
787+
user := &user.DefaultInfo{
788+
Name: "username",
789+
Groups: []string{"one", "two"},
790+
}
791+
path := "/request/path"
792+
apiService := &apiregistration.APIService{
793+
ObjectMeta: metav1.ObjectMeta{Name: "v1.foo"},
794+
Spec: apiregistration.APIServiceSpec{
795+
Service: &apiregistration.ServiceReference{Name: "test-service", Namespace: "test-ns", Port: ptr.To(int32(443))},
796+
Group: "foo",
797+
Version: "v1",
798+
CABundle: testCACrt,
799+
},
800+
Status: apiregistration.APIServiceStatus{
801+
Conditions: []apiregistration.APIServiceCondition{
802+
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
803+
},
804+
},
805+
}
806+
targetServer := httptest.NewUnstartedServer(target)
807+
serviceCert := svcCrt
808+
if cert, err := tls.X509KeyPair(serviceCert, svcKey); err != nil {
809+
t.Fatalf("TestTracerProvider: failed to parse key pair: %v", err)
810+
} else {
811+
targetServer.TLS = &tls.Config{Certificates: []tls.Certificate{cert}}
812+
}
813+
targetServer.StartTLS()
814+
defer targetServer.Close()
815+
816+
serviceResolver := &mockedRouter{destinationHost: targetServer.Listener.Addr().String()}
817+
handler := &proxyHandler{
818+
localDelegate: http.NewServeMux(),
819+
serviceResolver: serviceResolver,
820+
proxyCurrentCertKeyContent: func() ([]byte, []byte) { return emptyCert(), emptyCert() },
821+
tracerProvider: otelTracer,
822+
}
823+
824+
server := httptest.NewServer(contextHandler(filters.WithTracing(handler, otelTracer), user))
825+
defer server.Close()
826+
827+
handler.updateAPIService(apiService)
828+
curr := handler.handlingInfo.Load().(proxyHandlingInfo)
829+
handler.handlingInfo.Store(curr)
830+
var propagator propagation.TraceContext
831+
req, err := http.NewRequest(http.MethodGet, server.URL+path, nil)
832+
if err != nil {
833+
t.Errorf("expected new request: %v", err)
834+
return
835+
}
836+
837+
t.Logf("Sending request: %v", req)
838+
_, err = http.DefaultClient.Do(req)
839+
if err != nil {
840+
t.Errorf("http request failed: %v", err)
841+
return
842+
}
843+
844+
t.Log("Ensure the target server received the traceparent header")
845+
id, ok := target.headers["Traceparent"]
846+
if !ok {
847+
t.Error("expected traceparent header")
848+
return
849+
}
850+
851+
t.Log("Get the span context from the traceparent header")
852+
h := http.Header{
853+
"Traceparent": id,
854+
}
855+
ctx := propagator.Extract(context.Background(), propagation.HeaderCarrier(h))
856+
span := trace.SpanFromContext(ctx)
857+
858+
t.Log("Ensure that the span context is valid and remote")
859+
if !span.SpanContext().IsValid() {
860+
t.Error("expected valid span context")
861+
return
862+
}
863+
864+
if !span.SpanContext().IsRemote() {
865+
t.Error("expected remote span context")
866+
return
867+
}
868+
869+
t.Log("Ensure that the span ID and trace ID match the expected values")
870+
expectedSpanCtx := fakeRecorder.Ended()[0].SpanContext()
871+
if expectedSpanCtx.TraceID() != span.SpanContext().TraceID() {
872+
t.Errorf("expected trace id to match. expected: %v, but got %v", expectedSpanCtx.TraceID(), span.SpanContext().TraceID())
873+
return
874+
}
875+
876+
if expectedSpanCtx.SpanID() != span.SpanContext().SpanID() {
877+
t.Errorf("expected span id to match. expected: %v, but got: %v", expectedSpanCtx.SpanID(), span.SpanContext().SpanID())
878+
return
879+
}
880+
881+
t.Log("Ensure that the expected spans were recorded when sending a request through the proxy")
882+
expectedSpanNames := []string{"HTTP GET", "GET"}
883+
spanNames := []string{}
884+
for _, span := range fakeRecorder.Ended() {
885+
spanNames = append(spanNames, span.Name())
886+
}
887+
if e, a := expectedSpanNames, spanNames; !reflect.DeepEqual(e, a) {
888+
t.Errorf("expected span names %v, got %v", e, a)
889+
return
890+
}
891+
}
892+
777893
func TestNewRequestForProxyWithAuditID(t *testing.T) {
778894
tests := []struct {
779895
name string

0 commit comments

Comments
 (0)