From e1239228c2ab1080bb4efb3a8b6d2cbf0f78cf2a Mon Sep 17 00:00:00 2001 From: zouyu Date: Wed, 21 Oct 2020 16:25:28 +0800 Subject: [PATCH] Trace by opentelemetry Signed-off-by: zouyu --- cmd/kube-apiserver/apiserver.go | 4 + pkg/controller/controller_utils.go | 16 +- pkg/controller/daemon/daemon_controller.go | 2 +- .../daemon/daemon_controller_test.go | 4 +- pkg/controller/deployment/sync.go | 38 ++-- pkg/controller/job/job_controller.go | 4 +- pkg/controller/replicaset/replica_set.go | 14 +- pkg/controller/replication/conversion.go | 6 +- .../statefulset/stateful_pod_control.go | 24 ++- pkg/util/httptrace/httptrace.go | 162 ++++++++++++++++++ pkg/util/httptrace/httptrace_test.go | 17 ++ pkg/util/httptrace/httptrace_type.go | 106 ++++++++++++ .../src/k8s.io/apiserver/pkg/server/config.go | 2 + staging/src/k8s.io/client-go/rest/request.go | 6 + 14 files changed, 370 insertions(+), 35 deletions(-) create mode 100644 pkg/util/httptrace/httptrace.go create mode 100644 pkg/util/httptrace/httptrace_test.go create mode 100644 pkg/util/httptrace/httptrace_type.go diff --git a/cmd/kube-apiserver/apiserver.go b/cmd/kube-apiserver/apiserver.go index 97ac595d836eb..33407863c6451 100644 --- a/cmd/kube-apiserver/apiserver.go +++ b/cmd/kube-apiserver/apiserver.go @@ -27,6 +27,7 @@ import ( _ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins _ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration "k8s.io/kubernetes/cmd/kube-apiserver/app" + "k8s.io/kubernetes/pkg/util/httptrace" ) func main() { @@ -34,6 +35,9 @@ func main() { command := app.NewAPIServerCommand() + shutdown := httptrace.InitTracer() + defer shutdown() + // TODO: once we switch everything over to Cobra commands, we can go back to calling // utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the // normalize func and add the go flag set by hand. diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e5c66bfacd3c8..0f9012abb69ad 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -453,7 +453,7 @@ type PodControlInterface interface { // and sets the ControllerRef. CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. - CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // DeletePod deletes the pod identified by podID. DeletePod(namespace string, podID string, object runtime.Object) error // PatchPod patches the pod. @@ -519,21 +519,21 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error { } func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods("", namespace, template, object, nil) + return r.createPods(context.Background(), "", namespace, template, object, nil) } -func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { +func (r RealPodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { if err := validateControllerRef(controllerRef); err != nil { return err } - return r.createPods("", namespace, template, controllerObject, controllerRef) + return r.createPods(ctx, "", namespace, template, controllerObject, controllerRef) } func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { if err := validateControllerRef(controllerRef); err != nil { return err } - return r.createPods(nodeName, namespace, template, object, controllerRef) + return r.createPods(context.Background(), nodeName, namespace, template, object, controllerRef) } func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { @@ -566,7 +566,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec return pod, nil } -func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (r RealPodControl) createPods(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { pod, err := GetPodFromTemplate(template, object, controllerRef) if err != nil { return err @@ -577,7 +577,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT if len(labels.Set(pod.Labels)) == 0 { return fmt.Errorf("unable to create pods, no labels") } - newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { // only send an event if the namespace isn't terminating if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { @@ -652,7 +652,7 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, return nil } -func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *FakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() f.CreateCallCount++ diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index a0fc135156ece..70c2065c4e705 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -968,7 +968,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) - err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate, + err := dsc.podControl.CreatePodsWithControllerRef(context.Background(), ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind)) if err != nil { diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 9878da29e5804..678cd232db4d1 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -267,10 +267,10 @@ func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template * return nil } -func (f *fakePodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *fakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.CreatePodsWithControllerRef(namespace, template, object, controllerRef); err != nil { + if err := f.FakePodControl.CreatePodsWithControllerRef(context.Background(), namespace, template, object, controllerRef); err != nil { return fmt.Errorf("failed to create pod for DaemonSet") } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 3c7e808b24bbf..474fbd7230b77 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -24,12 +24,13 @@ import ( "strconv" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/util/httptrace" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -72,6 +73,9 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments // that were paused for longer than progressDeadlineSeconds. func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations()) + if !deploymentutil.HasProgressDeadline(d) { return nil } @@ -98,7 +102,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error } var err error - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) return err } @@ -136,6 +140,9 @@ const ( // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. // Note that the pod-template-hash will be added to adopted RSes and pods. func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations()) + existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList) // Calculate the max revision number among all old RSes @@ -155,7 +162,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds - return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) + return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) } // Should use the revision in existingNewRS's annotation, since it set by before @@ -173,7 +180,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old if needsUpdate { var err error - if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil { + if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil { return nil, err } } @@ -220,7 +227,8 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false - createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) + + createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{}) switch { // We may end up hitting this due to a slow cache or a fast resync of the Deployment. case errors.IsAlreadyExists(err): @@ -252,7 +260,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old *d.Status.CollisionCount++ // Update the collisionCount for the Deployment and let it requeue by returning the original // error. - _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) if dErr == nil { klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) } @@ -268,7 +276,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // We don't really care about this error at this point, since we have a bigger issue to report. // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 - _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) return nil, err @@ -285,7 +293,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old needsUpdate = true } if needsUpdate { - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } return createdRS, err } @@ -409,6 +417,8 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe } func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations()) sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale @@ -420,7 +430,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in rsCopy := rs.DeepCopy() *(rsCopy.Spec.Replicas) = newScale deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) - rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) + rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) if err == nil && sizeNeedsUpdate { scaled = true dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) @@ -433,6 +443,9 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept // around by default 1) for historical reasons and 2) for the ability to rollback a deployment. func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), deployment.GetAnnotations()) + if !deploymentutil.HasRevisionHistoryLimit(deployment) { return nil } @@ -458,7 +471,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep continue } klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) - if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. return err @@ -470,6 +483,9 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations()) + newStatus := calculateStatus(allRSs, newRS, d) if reflect.DeepEqual(d.Status, newStatus) { @@ -478,7 +494,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{}) + _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 70c7504834ba7..57530f7ef95ba 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -26,7 +26,7 @@ import ( "time" batch "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -776,7 +776,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc for i := int32(0); i < batchSize; i++ { go func() { defer wait.Done() - err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) + err := jm.podControl.CreatePodsWithControllerRef(context.Background(), job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil { if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // If the namespace is being torn down, we can safely ignore diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index f52eab6ca924b..1e18b561f50dd 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -37,7 +37,7 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,6 +60,7 @@ import ( "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/httptrace" "k8s.io/utils/integer" ) @@ -539,6 +540,9 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool { // Does NOT modify . // It will requeue the replica set in case of an error while creating/deleting pods. func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations()) + diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { @@ -566,7 +570,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { - err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) + + err = rsc.podControl.CreatePodsWithControllerRef(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) if err != nil { if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // if the namespace is being terminated, we don't have to do @@ -711,10 +716,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { } func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations()) + // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{}) + fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 0214a47967847..deb4430871bef 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -29,7 +29,7 @@ import ( apps "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -333,12 +333,12 @@ func (pc podControlAdapter) CreatePodsOnNode(nodeName, namespace string, templat return errors.New("CreatePodsOnNode() is not implemented for podControlAdapter") } -func (pc podControlAdapter) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (pc podControlAdapter) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { rc, err := convertRStoRC(object.(*apps.ReplicaSet)) if err != nil { return err } - return pc.PodControlInterface.CreatePodsWithControllerRef(namespace, template, rc, controllerRef) + return pc.PodControlInterface.CreatePodsWithControllerRef(ctx, namespace, template, rc, controllerRef) } func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error { diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index b995d0892f5b2..565c0be43143b 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -22,7 +22,7 @@ import ( "strings" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" @@ -32,6 +32,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/util/httptrace" ) // StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods, @@ -73,13 +74,17 @@ type realStatefulPodControl struct { } func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + // Create the Pod's PVCs prior to creating the Pod if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } // If we created the PVCs attempt to create the Pod - _, err := spc.client.CoreV1().Pods(set.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + + _, err := spc.client.CoreV1().Pods(set.Namespace).Create(ctx, pod, metav1.CreateOptions{}) // sink already exists errors if apierrors.IsAlreadyExists(err) { return err @@ -89,6 +94,9 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod } func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + attemptedUpdate := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // assume the Pod is consistent @@ -115,7 +123,7 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod attemptedUpdate = true // commit the update, retrying on conflicts - _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(ctx, pod, metav1.UpdateOptions{}) if updateErr == nil { return nil } @@ -136,7 +144,10 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod } func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { - err := spc.client.CoreV1().Pods(set.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + + err := spc.client.CoreV1().Pods(set.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) spc.recordPodEvent("delete", set, pod, err) return err } @@ -179,12 +190,15 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // set's Spec. func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + var errs []error for _, claim := range getPersistentVolumeClaims(set, pod) { pvc, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name) switch { case apierrors.IsNotFound(err): - _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), &claim, metav1.CreateOptions{}) + _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(ctx, &claim, metav1.CreateOptions{}) if err != nil { errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err)) } diff --git a/pkg/util/httptrace/httptrace.go b/pkg/util/httptrace/httptrace.go new file mode 100644 index 0000000000000..801cfe4c112db --- /dev/null +++ b/pkg/util/httptrace/httptrace.go @@ -0,0 +1,162 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httptrace + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/binary" + "log" + "net/http" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/api/global" + apitrace "go.opentelemetry.io/otel/api/trace" + "go.opentelemetry.io/otel/exporters/stdout" + "go.opentelemetry.io/otel/label" + "go.opentelemetry.io/otel/propagators" + sdktrace "go.opentelemetry.io/otel/sdk/trace" +) + +type contextKeyType int + +// avoid use char `/` in string +const initialTraceIDAnnotationKey string = "trace.kubernetes.io.initial" + +// avoid use char `/` in string +const spanContextAnnotationKey string = "trace.kubernetes.io.span.context" + +var tracePropagator propagators.TraceContext +var baggagePropagator propagators.Baggage + +const initialTraceIDBaggageKey label.Key = "Initial-Trace-Id" + +// InitTracer ... +func InitTracer() func() { + var err error + exp, err := stdout.NewExporter(stdout.WithPrettyPrint()) + if err != nil { + log.Panicf("failed to initialize stdout exporter %v\n", err) + return nil + } + bsp := sdktrace.NewBatchSpanProcessor(exp) + tp := sdktrace.NewTracerProvider( + sdktrace.WithConfig( + sdktrace.Config{ + DefaultSampler: sdktrace.NeverSample(), + }, + ), + sdktrace.WithSpanProcessor(bsp), + ) + global.SetTracerProvider(tp) + return bsp.Shutdown +} + +// WithTracingHandler inject span into http request context +func WithTracingHandler(handler http.Handler) http.Handler { + return otelhttp.NewHandler( + &httpTraceHandler{ + Handler: &handler, + }, + "trace", + otelhttp.WithPropagators(tracePropagator), + ) +} + +// SpanContextToRequest inject span context in golang context to http request header +func SpanContextToRequest(ctx context.Context, req *http.Request) { + spanContext := SpanContextFromContext(ctx) + span := httpTraceSpan{ + spanContext: spanContext, + } + // inject span context into request header + tracePropagator.Inject(apitrace.ContextWithSpan(context.Background(), span), req.Header) + // inject init trace id into request header + baggagePropagator.Inject(ctx, req.Header) +} + +// SpanContextFromRequestHeader extract span context from http request header +func SpanContextFromRequestHeader(req *http.Request) context.Context { + // get span context from request header + ctx := tracePropagator.Extract(req.Context(), req.Header) + // get init trace id from request header + return baggagePropagator.Extract(ctx, req.Header) +} + +// SpanContextFromRequestContext extract span context from http request context +func SpanContextFromRequestContext(req *http.Request) context.Context { + // get span context from span + spanContext := SpanFromContext(req.Context()).SpanContext() + // inject span context into golang context + ctx := SpanContextToContext(req.Context(), spanContext) + // get init trace id from request header + return baggagePropagator.Extract(ctx, req.Header) +} + +// SpanContextFromAnnotations get span context from annotations +func SpanContextFromAnnotations(ctx context.Context, annotations map[string]string) context.Context { + // get init trace id from annotations + ctx = otel.ContextWithBaggageValues( + ctx, + label.KeyValue{ + Key: initialTraceIDBaggageKey, + Value: label.StringValue(annotations[initialTraceIDAnnotationKey]), + }, + ) + // get span context from annotations + spanContext, err := decodeSpanContext(annotations[spanContextAnnotationKey]) + if err != nil { + return ctx + } + return SpanContextToContext(ctx, spanContext) +} + +// decodeSpanContext decode encodedSpanContext to spanContext +func decodeSpanContext(encodedSpanContext string) (apitrace.SpanContext, error) { + // decode to byte + byteList := make([]byte, base64.StdEncoding.DecodedLen(len(encodedSpanContext))) + l, err := base64.StdEncoding.Decode(byteList, []byte(encodedSpanContext)) + if err != nil { + return apitrace.EmptySpanContext(), err + } + byteList = byteList[:l] + // decode to span context + buffer := bytes.NewBuffer(byteList) + spanContext := apitrace.SpanContext{} + err = binary.Read(buffer, binary.LittleEndian, &spanContext) + if err != nil { + return apitrace.EmptySpanContext(), err + } + return spanContext, nil +} + +// SpanFromContext get span from golang context +func SpanFromContext(ctx context.Context) apitrace.Span { + return apitrace.SpanFromContext(ctx) +} + +// SpanContextToContext set span context to golang context +func SpanContextToContext(ctx context.Context, spanContext apitrace.SpanContext) context.Context { + return apitrace.ContextWithRemoteSpanContext(ctx, spanContext) +} + +// SpanContextFromContext get span context from golang context +func SpanContextFromContext(ctx context.Context) apitrace.SpanContext { + return apitrace.RemoteSpanContextFromContext(ctx) +} diff --git a/pkg/util/httptrace/httptrace_test.go b/pkg/util/httptrace/httptrace_test.go new file mode 100644 index 0000000000000..4bf30c92b66b9 --- /dev/null +++ b/pkg/util/httptrace/httptrace_test.go @@ -0,0 +1,17 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httptrace diff --git a/pkg/util/httptrace/httptrace_type.go b/pkg/util/httptrace/httptrace_type.go new file mode 100644 index 0000000000000..520c889ebe9a9 --- /dev/null +++ b/pkg/util/httptrace/httptrace_type.go @@ -0,0 +1,106 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httptrace + +import ( + "context" + "net/http" + "time" + + apitrace "go.opentelemetry.io/otel/api/trace" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/label" +) + +type httpTraceHandler struct { + Handler *http.Handler +} + +func (h *httpTraceHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + // get span context from request context + ctx := SpanContextFromRequestContext(r) + // call next handler + (*h.Handler).ServeHTTP(w, r.WithContext(ctx)) +} + +type httpTraceSpan struct { + spanContext apitrace.SpanContext +} + +// Tracer returns tracer used to create this span. Tracer cannot be nil. +func (span httpTraceSpan) Tracer() apitrace.Tracer { + return nil +} + +// End completes the span. No updates are allowed to span after it +// ends. The only exception is setting status of the span. +func (span httpTraceSpan) End(options ...apitrace.SpanOption) { + return +} + +// AddEvent adds an event to the span. +func (span httpTraceSpan) AddEvent(ctx context.Context, name string, attrs ...label.KeyValue) { + return +} + +// AddEventWithTimestamp adds an event with a custom timestamp +// to the span. +func (span httpTraceSpan) AddEventWithTimestamp(ctx context.Context, timestamp time.Time, name string, attrs ...label.KeyValue) { + return +} + +// IsRecording returns true if the span is active and recording events is enabled. +func (span httpTraceSpan) IsRecording() bool { + return false +} + +// RecordError records an error as a span event. +func (span httpTraceSpan) RecordError(ctx context.Context, err error, opts ...apitrace.ErrorOption) { + return +} + +// SpanContext returns span context of the span. Returned SpanContext is usable +// even after the span ends. +func (span httpTraceSpan) SpanContext() apitrace.SpanContext { + return span.spanContext +} + +// SetStatus sets the status of the span in the form of a code +// and a message. SetStatus overrides the value of previous +// calls to SetStatus on the Span. +// +// The default span status is OK, so it is not necessary to +// explicitly set an OK status on successful Spans unless it +// is to add an OK message or to override a previous status on the Span. +func (span httpTraceSpan) SetStatus(code codes.Code, msg string) { + return +} + +// SetName sets the name of the span. +func (span httpTraceSpan) SetName(name string) { + return +} + +// SetAttributes set span attributes +func (span httpTraceSpan) SetAttributes(kv ...label.KeyValue) { + return +} + +// SetAttribute set singular span attribute, with type inference. +func (span httpTraceSpan) SetAttribute(k string, v interface{}) { + return +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 6ac81588cb7cd..67db16465607c 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -68,6 +68,7 @@ import ( "k8s.io/component-base/logs" "k8s.io/klog/v2" openapicommon "k8s.io/kube-openapi/pkg/common" + "k8s.io/kubernetes/pkg/util/httptrace" utilsnet "k8s.io/utils/net" // install apis @@ -692,6 +693,7 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler = genericapifilters.WithCacheControl(handler) handler = genericapifilters.WithRequestReceivedTimestamp(handler) handler = genericfilters.WithPanicRecovery(handler) + handler = httptrace.WithTracingHandler(handler) return handler } diff --git a/staging/src/k8s.io/client-go/rest/request.go b/staging/src/k8s.io/client-go/rest/request.go index 2f747a28f0126..4660befe656a4 100644 --- a/staging/src/k8s.io/client-go/rest/request.go +++ b/staging/src/k8s.io/client-go/rest/request.go @@ -33,6 +33,9 @@ import ( "sync" "time" + "go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/propagators" "golang.org/x/net/http2" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -869,6 +872,9 @@ func (r *Request) request(ctx context.Context, fn func(*http.Request, *http.Resp req = req.WithContext(ctx) req.Header = r.headers + props := otelhttptrace.WithPropagators(otel.NewCompositeTextMapPropagator(propagators.TraceContext{}, propagators.Baggage{})) + otelhttptrace.Inject(req.Context(), req, props) + r.backoff.Sleep(r.backoff.CalculateBackoff(r.URL())) if retries > 0 { // We are retrying the request that we already send to apiserver