diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index e5c66bfacd3c8..a3a1a7a9647fa 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -48,6 +48,7 @@ import ( _ "k8s.io/kubernetes/pkg/apis/core/install" "k8s.io/kubernetes/pkg/apis/core/validation" hashutil "k8s.io/kubernetes/pkg/util/hash" + "k8s.io/kubernetes/pkg/util/httptrace" taintutils "k8s.io/kubernetes/pkg/util/taints" "k8s.io/utils/integer" @@ -448,12 +449,12 @@ func (r RealControllerRevisionControl) PatchControllerRevision(namespace, name s // created as an interface to allow testing. type PodControlInterface interface { // CreatePods creates new pods according to the spec. - CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error + CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error // CreatePodsOnNode creates a new pod according to the spec on the specified node, // and sets the ControllerRef. - CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller. - CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error + CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error // DeletePod deletes the pod identified by podID. DeletePod(namespace string, podID string, object runtime.Object) error // PatchPod patches the pod. @@ -518,22 +519,22 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error { return nil } -func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { - return r.createPods("", namespace, template, object, nil) +func (r RealPodControl) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { + return r.createPods(ctx, "", namespace, template, object, nil) } -func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { +func (r RealPodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error { if err := validateControllerRef(controllerRef); err != nil { return err } - return r.createPods("", namespace, template, controllerObject, controllerRef) + return r.createPods(ctx, "", namespace, template, controllerObject, controllerRef) } -func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (r RealPodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { if err := validateControllerRef(controllerRef); err != nil { return err } - return r.createPods(nodeName, namespace, template, object, controllerRef) + return r.createPods(ctx, nodeName, namespace, template, object, controllerRef) } func (r RealPodControl) PatchPod(namespace, name string, data []byte) error { @@ -566,7 +567,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec return pod, nil } -func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (r RealPodControl) createPods(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { pod, err := GetPodFromTemplate(template, object, controllerRef) if err != nil { return err @@ -577,7 +578,8 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT if len(labels.Set(pod.Labels)) == 0 { return fmt.Errorf("unable to create pods, no labels") } - newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + httptrace.SpanContextToAnnotations(ctx, &pod.Annotations) + newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{}) if err != nil { // only send an event if the namespace isn't terminating if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) { @@ -638,7 +640,7 @@ func (f *FakePodControl) PatchPod(namespace, name string, data []byte) error { return nil } -func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error { +func (f *FakePodControl) CreatePods(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object) error { f.Lock() defer f.Unlock() f.CreateCallCount++ @@ -652,7 +654,7 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec, return nil } -func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *FakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() f.CreateCallCount++ @@ -667,7 +669,7 @@ func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1. return nil } -func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *FakePodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() f.CreateCallCount++ diff --git a/pkg/controller/controller_utils_test.go b/pkg/controller/controller_utils_test.go index 625695f3aaad7..1caadd744c4b4 100644 --- a/pkg/controller/controller_utils_test.go +++ b/pkg/controller/controller_utils_test.go @@ -298,7 +298,7 @@ func TestCreatePods(t *testing.T) { controllerSpec := newReplicationController(1) // Make sure createReplica sends a POST to the apiserver with a pod from the controllers pod template - err := podControl.CreatePods(ns, controllerSpec.Spec.Template, controllerSpec) + err := podControl.CreatePods(context.Background(), ns, controllerSpec.Spec.Template, controllerSpec) assert.NoError(t, err, "unexpected error: %v", err) expectedPod := v1.Pod{ diff --git a/pkg/controller/daemon/daemon_controller.go b/pkg/controller/daemon/daemon_controller.go index a0fc135156ece..70c2065c4e705 100644 --- a/pkg/controller/daemon/daemon_controller.go +++ b/pkg/controller/daemon/daemon_controller.go @@ -968,7 +968,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity( podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix]) - err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate, + err := dsc.podControl.CreatePodsWithControllerRef(context.Background(), ds.Namespace, podTemplate, ds, metav1.NewControllerRef(ds, controllerKind)) if err != nil { diff --git a/pkg/controller/daemon/daemon_controller_test.go b/pkg/controller/daemon/daemon_controller_test.go index 9878da29e5804..abfacaea00b01 100644 --- a/pkg/controller/daemon/daemon_controller_test.go +++ b/pkg/controller/daemon/daemon_controller_test.go @@ -236,10 +236,10 @@ func newFakePodControl() *fakePodControl { } } -func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *fakePodControl) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.CreatePodsOnNode(nodeName, namespace, template, object, controllerRef); err != nil { + if err := f.FakePodControl.CreatePodsOnNode(context.Background(), nodeName, namespace, template, object, controllerRef); err != nil { return fmt.Errorf("failed to create pod on node %q", nodeName) } @@ -267,10 +267,10 @@ func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template * return nil } -func (f *fakePodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (f *fakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { f.Lock() defer f.Unlock() - if err := f.FakePodControl.CreatePodsWithControllerRef(namespace, template, object, controllerRef); err != nil { + if err := f.FakePodControl.CreatePodsWithControllerRef(context.Background(), namespace, template, object, controllerRef); err != nil { return fmt.Errorf("failed to create pod for DaemonSet") } diff --git a/pkg/controller/deployment/sync.go b/pkg/controller/deployment/sync.go index 3c7e808b24bbf..474fbd7230b77 100644 --- a/pkg/controller/deployment/sync.go +++ b/pkg/controller/deployment/sync.go @@ -24,12 +24,13 @@ import ( "strconv" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util" + "k8s.io/kubernetes/pkg/util/httptrace" labelsutil "k8s.io/kubernetes/pkg/util/labels" ) @@ -72,6 +73,9 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS // These conditions are needed so that we won't accidentally report lack of progress for resumed deployments // that were paused for longer than progressDeadlineSeconds. func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations()) + if !deploymentutil.HasProgressDeadline(d) { return nil } @@ -98,7 +102,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error } var err error - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) return err } @@ -136,6 +140,9 @@ const ( // 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas. // Note that the pod-template-hash will be added to adopted RSes and pods. func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations()) + existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList) // Calculate the max revision number among all old RSes @@ -155,7 +162,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds if annotationsUpdated || minReadySecondsNeedsUpdate { rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds - return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) + return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) } // Should use the revision in existingNewRS's annotation, since it set by before @@ -173,7 +180,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old if needsUpdate { var err error - if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil { + if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil { return nil, err } } @@ -220,7 +227,8 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // hash collisions. If there is any other error, we need to report it in the status of // the Deployment. alreadyExists := false - createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{}) + + createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{}) switch { // We may end up hitting this due to a slow cache or a fast resync of the Deployment. case errors.IsAlreadyExists(err): @@ -252,7 +260,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old *d.Status.CollisionCount++ // Update the collisionCount for the Deployment and let it requeue by returning the original // error. - _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) if dErr == nil { klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount) } @@ -268,7 +276,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old // We don't really care about this error at this point, since we have a bigger issue to report. // TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account // these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568 - _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg) return nil, err @@ -285,7 +293,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old needsUpdate = true } if needsUpdate { - _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}) + _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}) } return createdRS, err } @@ -409,6 +417,8 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe } func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations()) sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale @@ -420,7 +430,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in rsCopy := rs.DeepCopy() *(rsCopy.Spec.Replicas) = newScale deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment)) - rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{}) + rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{}) if err == nil && sizeNeedsUpdate { scaled = true dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale) @@ -433,6 +443,9 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in // where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept // around by default 1) for historical reasons and 2) for the ability to rollback a deployment. func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), deployment.GetAnnotations()) + if !deploymentutil.HasRevisionHistoryLimit(deployment) { return nil } @@ -458,7 +471,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep continue } klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name) - if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { + if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) { // Return error instead of aggregating and continuing DELETEs on the theory // that we may be overloading the api server. return err @@ -470,6 +483,9 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep // syncDeploymentStatus checks if the status is up-to-date and sync it if necessary func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations()) + newStatus := calculateStatus(allRSs, newRS, d) if reflect.DeepEqual(d.Status, newStatus) { @@ -478,7 +494,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newDeployment := d newDeployment.Status = newStatus - _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{}) + _, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{}) return err } diff --git a/pkg/controller/job/job_controller.go b/pkg/controller/job/job_controller.go index 70c7504834ba7..57530f7ef95ba 100644 --- a/pkg/controller/job/job_controller.go +++ b/pkg/controller/job/job_controller.go @@ -26,7 +26,7 @@ import ( "time" batch "k8s.io/api/batch/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -776,7 +776,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc for i := int32(0); i < batchSize; i++ { go func() { defer wait.Done() - err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) + err := jm.podControl.CreatePodsWithControllerRef(context.Background(), job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind)) if err != nil { if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // If the namespace is being torn down, we can safely ignore diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index f52eab6ca924b..1e18b561f50dd 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -37,7 +37,7 @@ import ( "time" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -60,6 +60,7 @@ import ( "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/util/httptrace" "k8s.io/utils/integer" ) @@ -539,6 +540,9 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool { // Does NOT modify . // It will requeue the replica set in case of an error while creating/deleting pods. func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations()) + diff := len(filteredPods) - int(*(rs.Spec.Replicas)) rsKey, err := controller.KeyFunc(rs) if err != nil { @@ -566,7 +570,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps // after one of its pods fails. Conveniently, this also prevents the // event spam that those failures would generate. successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error { - err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) + + err = rsc.podControl.CreatePodsWithControllerRef(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind)) if err != nil { if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) { // if the namespace is being terminated, we don't have to do @@ -711,10 +716,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error { } func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations()) + // If any adoptions are attempted, we should first recheck for deletion with // an uncached quorum read sometime after listing Pods (see #42639). canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) { - fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{}) + fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/pkg/controller/replication/conversion.go b/pkg/controller/replication/conversion.go index 0214a47967847..ace98ffff8ede 100644 --- a/pkg/controller/replication/conversion.go +++ b/pkg/controller/replication/conversion.go @@ -29,7 +29,7 @@ import ( apps "k8s.io/api/apps/v1" autoscalingv1 "k8s.io/api/autoscaling/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" @@ -323,22 +323,22 @@ type podControlAdapter struct { controller.PodControlInterface } -func (pc podControlAdapter) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { +func (pc podControlAdapter) CreatePods(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object) error { // This is not used by RSC. return errors.New("CreatePods() is not implemented for podControlAdapter") } -func (pc podControlAdapter) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (pc podControlAdapter) CreatePodsOnNode(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { // This is not used by RSC. return errors.New("CreatePodsOnNode() is not implemented for podControlAdapter") } -func (pc podControlAdapter) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { +func (pc podControlAdapter) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error { rc, err := convertRStoRC(object.(*apps.ReplicaSet)) if err != nil { return err } - return pc.PodControlInterface.CreatePodsWithControllerRef(namespace, template, rc, controllerRef) + return pc.PodControlInterface.CreatePodsWithControllerRef(ctx, namespace, template, rc, controllerRef) } func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error { diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index b995d0892f5b2..565c0be43143b 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -22,7 +22,7 @@ import ( "strings" apps "k8s.io/api/apps/v1" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" errorutils "k8s.io/apimachinery/pkg/util/errors" @@ -32,6 +32,7 @@ import ( corelisters "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/record" "k8s.io/client-go/util/retry" + "k8s.io/kubernetes/pkg/util/httptrace" ) // StatefulPodControlInterface defines the interface that StatefulSetController uses to create, update, and delete Pods, @@ -73,13 +74,17 @@ type realStatefulPodControl struct { } func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + // Create the Pod's PVCs prior to creating the Pod if err := spc.createPersistentVolumeClaims(set, pod); err != nil { spc.recordPodEvent("create", set, pod, err) return err } // If we created the PVCs attempt to create the Pod - _, err := spc.client.CoreV1().Pods(set.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}) + + _, err := spc.client.CoreV1().Pods(set.Namespace).Create(ctx, pod, metav1.CreateOptions{}) // sink already exists errors if apierrors.IsAlreadyExists(err) { return err @@ -89,6 +94,9 @@ func (spc *realStatefulPodControl) CreateStatefulPod(set *apps.StatefulSet, pod } func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + attemptedUpdate := false err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { // assume the Pod is consistent @@ -115,7 +123,7 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod attemptedUpdate = true // commit the update, retrying on conflicts - _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{}) + _, updateErr := spc.client.CoreV1().Pods(set.Namespace).Update(ctx, pod, metav1.UpdateOptions{}) if updateErr == nil { return nil } @@ -136,7 +144,10 @@ func (spc *realStatefulPodControl) UpdateStatefulPod(set *apps.StatefulSet, pod } func (spc *realStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error { - err := spc.client.CoreV1().Pods(set.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + + err := spc.client.CoreV1().Pods(set.Namespace).Delete(ctx, pod.Name, metav1.DeleteOptions{}) spc.recordPodEvent("delete", set, pod, err) return err } @@ -179,12 +190,15 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with // set's Spec. func (spc *realStatefulPodControl) createPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + // Get span from annotations and set to ctx + ctx := httptrace.SpanContextFromAnnotations(context.Background(), set.GetAnnotations()) + var errs []error for _, claim := range getPersistentVolumeClaims(set, pod) { pvc, err := spc.pvcLister.PersistentVolumeClaims(claim.Namespace).Get(claim.Name) switch { case apierrors.IsNotFound(err): - _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(context.TODO(), &claim, metav1.CreateOptions{}) + _, err := spc.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Create(ctx, &claim, metav1.CreateOptions{}) if err != nil { errs = append(errs, fmt.Errorf("failed to create PVC %s: %s", claim.Name, err)) } diff --git a/pkg/util/httptrace/httptrace.go b/pkg/util/httptrace/httptrace.go new file mode 100644 index 0000000000000..5d840852028f9 --- /dev/null +++ b/pkg/util/httptrace/httptrace.go @@ -0,0 +1,116 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package httptrace + +import ( + "bytes" + "context" + "encoding/base64" + "encoding/binary" + "fmt" + "reflect" + + "go.opentelemetry.io/otel" + apitrace "go.opentelemetry.io/otel/api/trace" + "go.opentelemetry.io/otel/label" +) + +type contextKeyType int + +// avoid use char `/` in string +const initialTraceIDAnnotationKey string = "trace.kubernetes.io.initial" + +// avoid use char `/` in string +const spanContextAnnotationKey string = "trace.kubernetes.io.span.context" + +const initialTraceIDBaggageKey label.Key = "Initial-Trace-Id" + +// SpanContextFromAnnotations extera span context from annotations +func SpanContextFromAnnotations(ctx context.Context, annotations map[string]string) context.Context { + // get init trace id from annotations + ctx = otel.ContextWithBaggageValues( + ctx, + label.KeyValue{ + Key: initialTraceIDBaggageKey, + Value: label.StringValue(annotations[initialTraceIDAnnotationKey]), + }, + ) + spanContext, _ := decodeSpanContext(annotations[spanContextAnnotationKey]) + // get span context from annotations + return SpanContextToContext(ctx, spanContext) +} + +// SpanContextToAnnotations put inject span context into annotations +func SpanContextToAnnotations(ctx context.Context, annotations *map[string]string) { + if *annotations == nil { + *annotations = make(map[string]string) + } + // get init trace id from ctx, inject into annotations + if otel.BaggageValue(ctx, initialTraceIDBaggageKey).AsString() != "" { + (*annotations)[initialTraceIDAnnotationKey] = otel.BaggageValue(ctx, initialTraceIDBaggageKey).AsString() + } + // get spancontext from ctx, inject into annotations + spanContext := SpanContextFromContext(ctx) + if !reflect.DeepEqual(spanContext, apitrace.EmptySpanContext()) { + encodeSpanContext, _ := encodedSpanContext(spanContext) + (*annotations)[spanContextAnnotationKey] = encodeSpanContext + } +} + +// SpanContextToContext set span context to golang context +func SpanContextToContext(ctx context.Context, spanContext apitrace.SpanContext) context.Context { + return apitrace.ContextWithRemoteSpanContext(ctx, spanContext) +} + +// SpanContextFromContext get span context from golang context +func SpanContextFromContext(ctx context.Context) apitrace.SpanContext { + return apitrace.RemoteSpanContextFromContext(ctx) +} + +// encodedSpanContext encode span to string +func encodedSpanContext(spanContext apitrace.SpanContext) (string, error) { + if reflect.DeepEqual(spanContext, apitrace.SpanContext{}) { + return "", fmt.Errorf("span context is nil") + } + // encode to byte + buffer := new(bytes.Buffer) + err := binary.Write(buffer, binary.LittleEndian, spanContext) + if err != nil { + return "", err + } + // encode to string + return base64.StdEncoding.EncodeToString(buffer.Bytes()), nil +} + +// decodeSpanContext decode encodedSpanContext to spanContext +func decodeSpanContext(encodedSpanContext string) (apitrace.SpanContext, error) { + // decode to byte + byteList := make([]byte, base64.StdEncoding.DecodedLen(len(encodedSpanContext))) + l, err := base64.StdEncoding.Decode(byteList, []byte(encodedSpanContext)) + if err != nil { + return apitrace.EmptySpanContext(), err + } + byteList = byteList[:l] + // decode to span context + buffer := bytes.NewBuffer(byteList) + spanContext := apitrace.SpanContext{} + err = binary.Read(buffer, binary.LittleEndian, &spanContext) + if err != nil { + return apitrace.EmptySpanContext(), err + } + return spanContext, nil +}