Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/kube-apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ import (
_ "k8s.io/component-base/metrics/prometheus/clientgo" // load all the prometheus client-go plugins
_ "k8s.io/component-base/metrics/prometheus/version" // for version metric registration
"k8s.io/kubernetes/cmd/kube-apiserver/app"
"k8s.io/kubernetes/pkg/util/httptrace"
)

func main() {
rand.Seed(time.Now().UnixNano())

command := app.NewAPIServerCommand()

shutdown := httptrace.InitTracer()
defer shutdown()

// TODO: once we switch everything over to Cobra commands, we can go back to calling
// utilflag.InitFlags() (by removing its pflag.Parse() call). For now, we have to set the
// normalize func and add the go flag set by hand.
Expand Down
16 changes: 8 additions & 8 deletions pkg/controller/controller_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ type PodControlInterface interface {
// and sets the ControllerRef.
CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// CreatePodsWithControllerRef creates new pods according to the spec, and sets object as the pod's controller.
CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error
// DeletePod deletes the pod identified by podID.
DeletePod(namespace string, podID string, object runtime.Object) error
// PatchPod patches the pod.
Expand Down Expand Up @@ -519,21 +519,21 @@ func validateControllerRef(controllerRef *metav1.OwnerReference) error {
}

func (r RealPodControl) CreatePods(namespace string, template *v1.PodTemplateSpec, object runtime.Object) error {
return r.createPods("", namespace, template, object, nil)
return r.createPods(context.Background(), "", namespace, template, object, nil)
}

func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, controllerObject runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods("", namespace, template, controllerObject, controllerRef)
return r.createPods(ctx, "", namespace, template, controllerObject, controllerRef)
}

func (r RealPodControl) CreatePodsOnNode(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
if err := validateControllerRef(controllerRef); err != nil {
return err
}
return r.createPods(nodeName, namespace, template, object, controllerRef)
return r.createPods(context.Background(), nodeName, namespace, template, object, controllerRef)
}

func (r RealPodControl) PatchPod(namespace, name string, data []byte) error {
Expand Down Expand Up @@ -566,7 +566,7 @@ func GetPodFromTemplate(template *v1.PodTemplateSpec, parentObject runtime.Objec
return pod, nil
}

func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (r RealPodControl) createPods(ctx context.Context, nodeName, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
pod, err := GetPodFromTemplate(template, object, controllerRef)
if err != nil {
return err
Expand All @@ -577,7 +577,7 @@ func (r RealPodControl) createPods(nodeName, namespace string, template *v1.PodT
if len(labels.Set(pod.Labels)) == 0 {
return fmt.Errorf("unable to create pods, no labels")
}
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(context.TODO(), pod, metav1.CreateOptions{})
newPod, err := r.KubeClient.CoreV1().Pods(namespace).Create(ctx, pod, metav1.CreateOptions{})
if err != nil {
// only send an event if the namespace isn't terminating
if !apierrors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
Expand Down Expand Up @@ -652,7 +652,7 @@ func (f *FakePodControl) CreatePods(namespace string, spec *v1.PodTemplateSpec,
return nil
}

func (f *FakePodControl) CreatePodsWithControllerRef(namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *FakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, spec *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
f.CreateCallCount++
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/daemon/daemon_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,7 +968,7 @@ func (dsc *DaemonSetsController) syncNodes(ds *apps.DaemonSet, podsToDelete, nod
podTemplate.Spec.Affinity = util.ReplaceDaemonSetPodNodeNameNodeAffinity(
podTemplate.Spec.Affinity, nodesNeedingDaemonPods[ix])

err := dsc.podControl.CreatePodsWithControllerRef(ds.Namespace, podTemplate,
err := dsc.podControl.CreatePodsWithControllerRef(context.Background(), ds.Namespace, podTemplate,
ds, metav1.NewControllerRef(ds, controllerKind))

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/daemon/daemon_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,10 +267,10 @@ func (f *fakePodControl) CreatePodsOnNode(nodeName, namespace string, template *
return nil
}

func (f *fakePodControl) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (f *fakePodControl) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
f.Lock()
defer f.Unlock()
if err := f.FakePodControl.CreatePodsWithControllerRef(namespace, template, object, controllerRef); err != nil {
if err := f.FakePodControl.CreatePodsWithControllerRef(context.Background(), namespace, template, object, controllerRef); err != nil {
return fmt.Errorf("failed to create pod for DaemonSet")
}

Expand Down
38 changes: 27 additions & 11 deletions pkg/controller/deployment/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ import (
"strconv"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/klog/v2"
"k8s.io/kubernetes/pkg/controller"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/util/httptrace"
labelsutil "k8s.io/kubernetes/pkg/util/labels"
)

Expand Down Expand Up @@ -72,6 +73,9 @@ func (dc *DeploymentController) sync(d *apps.Deployment, rsList []*apps.ReplicaS
// These conditions are needed so that we won't accidentally report lack of progress for resumed deployments
// that were paused for longer than progressDeadlineSeconds.
func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

if !deploymentutil.HasProgressDeadline(d) {
return nil
}
Expand All @@ -98,7 +102,7 @@ func (dc *DeploymentController) checkPausedConditions(d *apps.Deployment) error
}

var err error
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
return err
}

Expand Down Expand Up @@ -136,6 +140,9 @@ const (
// 3. If there's no existing new RS and createIfNotExisted is true, create one with appropriate revision number (maxOldRevision + 1) and replicas.
// Note that the pod-template-hash will be added to adopted RSes and pods.
func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, oldRSs []*apps.ReplicaSet, createIfNotExisted bool) (*apps.ReplicaSet, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

existingNewRS := deploymentutil.FindNewReplicaSet(d, rsList)

// Calculate the max revision number among all old RSes
Expand All @@ -155,7 +162,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
minReadySecondsNeedsUpdate := rsCopy.Spec.MinReadySeconds != d.Spec.MinReadySeconds
if annotationsUpdated || minReadySecondsNeedsUpdate {
rsCopy.Spec.MinReadySeconds = d.Spec.MinReadySeconds
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
return dc.client.AppsV1().ReplicaSets(rsCopy.ObjectMeta.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
}

// Should use the revision in existingNewRS's annotation, since it set by before
Expand All @@ -173,7 +180,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old

if needsUpdate {
var err error
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{}); err != nil {
if _, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{}); err != nil {
return nil, err
}
}
Expand Down Expand Up @@ -220,7 +227,8 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
// hash collisions. If there is any other error, we need to report it in the status of
// the Deployment.
alreadyExists := false
createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(context.TODO(), &newRS, metav1.CreateOptions{})

createdRS, err := dc.client.AppsV1().ReplicaSets(d.Namespace).Create(ctx, &newRS, metav1.CreateOptions{})
switch {
// We may end up hitting this due to a slow cache or a fast resync of the Deployment.
case errors.IsAlreadyExists(err):
Expand Down Expand Up @@ -252,7 +260,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
*d.Status.CollisionCount++
// Update the collisionCount for the Deployment and let it requeue by returning the original
// error.
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, dErr := dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
if dErr == nil {
klog.V(2).Infof("Found a hash collision for deployment %q - bumping collisionCount (%d->%d) to resolve it", d.Name, preCollisionCount, *d.Status.CollisionCount)
}
Expand All @@ -268,7 +276,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
// We don't really care about this error at this point, since we have a bigger issue to report.
// TODO: Identify which errors are permanent and switch DeploymentIsFailed to take into account
// these reasons as well. Related issue: https://github.com/kubernetes/kubernetes/issues/18568
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, _ = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
dc.eventRecorder.Eventf(d, v1.EventTypeWarning, deploymentutil.FailedRSCreateReason, msg)
return nil, err
Expand All @@ -285,7 +293,7 @@ func (dc *DeploymentController) getNewReplicaSet(d *apps.Deployment, rsList, old
needsUpdate = true
}
if needsUpdate {
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(context.TODO(), d, metav1.UpdateOptions{})
_, err = dc.client.AppsV1().Deployments(d.Namespace).UpdateStatus(ctx, d, metav1.UpdateOptions{})
}
return createdRS, err
}
Expand Down Expand Up @@ -409,6 +417,8 @@ func (dc *DeploymentController) scaleReplicaSetAndRecordEvent(rs *apps.ReplicaSe
}

func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale int32, deployment *apps.Deployment, scalingOperation string) (bool, *apps.ReplicaSet, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations())

sizeNeedsUpdate := *(rs.Spec.Replicas) != newScale

Expand All @@ -420,7 +430,7 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
rsCopy := rs.DeepCopy()
*(rsCopy.Spec.Replicas) = newScale
deploymentutil.SetReplicasAnnotations(rsCopy, *(deployment.Spec.Replicas), *(deployment.Spec.Replicas)+deploymentutil.MaxSurge(*deployment))
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(context.TODO(), rsCopy, metav1.UpdateOptions{})
rs, err = dc.client.AppsV1().ReplicaSets(rsCopy.Namespace).Update(ctx, rsCopy, metav1.UpdateOptions{})
if err == nil && sizeNeedsUpdate {
scaled = true
dc.eventRecorder.Eventf(deployment, v1.EventTypeNormal, "ScalingReplicaSet", "Scaled %s replica set %s to %d", scalingOperation, rs.Name, newScale)
Expand All @@ -433,6 +443,9 @@ func (dc *DeploymentController) scaleReplicaSet(rs *apps.ReplicaSet, newScale in
// where N=d.Spec.RevisionHistoryLimit. Old replica sets are older versions of the podtemplate of a deployment kept
// around by default 1) for historical reasons and 2) for the ability to rollback a deployment.
func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, deployment *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), deployment.GetAnnotations())

if !deploymentutil.HasRevisionHistoryLimit(deployment) {
return nil
}
Expand All @@ -458,7 +471,7 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep
continue
}
klog.V(4).Infof("Trying to cleanup replica set %q for deployment %q", rs.Name, deployment.Name)
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(context.TODO(), rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
if err := dc.client.AppsV1().ReplicaSets(rs.Namespace).Delete(ctx, rs.Name, metav1.DeleteOptions{}); err != nil && !errors.IsNotFound(err) {
// Return error instead of aggregating and continuing DELETEs on the theory
// that we may be overloading the api server.
return err
Expand All @@ -470,6 +483,9 @@ func (dc *DeploymentController) cleanupDeployment(oldRSs []*apps.ReplicaSet, dep

// syncDeploymentStatus checks if the status is up-to-date and sync it if necessary
func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet, newRS *apps.ReplicaSet, d *apps.Deployment) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), d.GetAnnotations())

newStatus := calculateStatus(allRSs, newRS, d)

if reflect.DeepEqual(d.Status, newStatus) {
Expand All @@ -478,7 +494,7 @@ func (dc *DeploymentController) syncDeploymentStatus(allRSs []*apps.ReplicaSet,

newDeployment := d
newDeployment.Status = newStatus
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(context.TODO(), newDeployment, metav1.UpdateOptions{})
_, err := dc.client.AppsV1().Deployments(newDeployment.Namespace).UpdateStatus(ctx, newDeployment, metav1.UpdateOptions{})
return err
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/job/job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
"time"

batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -776,7 +776,7 @@ func (jm *Controller) manageJob(activePods []*v1.Pod, succeeded int32, job *batc
for i := int32(0); i < batchSize; i++ {
go func() {
defer wait.Done()
err := jm.podControl.CreatePodsWithControllerRef(job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
err := jm.podControl.CreatePodsWithControllerRef(context.Background(), job.Namespace, &job.Spec.Template, job, metav1.NewControllerRef(job, controllerKind))
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// If the namespace is being torn down, we can safely ignore
Expand Down
14 changes: 11 additions & 3 deletions pkg/controller/replicaset/replica_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import (
"time"

apps "k8s.io/api/apps/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -60,6 +60,7 @@ import (
"k8s.io/klog/v2"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/httptrace"
"k8s.io/utils/integer"
)

Expand Down Expand Up @@ -539,6 +540,9 @@ func (rsc *ReplicaSetController) processNextWorkItem() bool {
// Does NOT modify <filteredPods>.
// It will requeue the replica set in case of an error while creating/deleting pods.
func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps.ReplicaSet) error {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations())

diff := len(filteredPods) - int(*(rs.Spec.Replicas))
rsKey, err := controller.KeyFunc(rs)
if err != nil {
Expand Down Expand Up @@ -566,7 +570,8 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *apps
// after one of its pods fails. Conveniently, this also prevents the
// event spam that those failures would generate.
successfulCreations, err := slowStartBatch(diff, controller.SlowStartInitialBatchSize, func() error {
err := rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))

err = rsc.podControl.CreatePodsWithControllerRef(ctx, rs.Namespace, &rs.Spec.Template, rs, metav1.NewControllerRef(rs, rsc.GroupVersionKind))
if err != nil {
if errors.HasStatusCause(err, v1.NamespaceTerminatingCause) {
// if the namespace is being terminated, we don't have to do
Expand Down Expand Up @@ -711,10 +716,13 @@ func (rsc *ReplicaSetController) syncReplicaSet(key string) error {
}

func (rsc *ReplicaSetController) claimPods(rs *apps.ReplicaSet, selector labels.Selector, filteredPods []*v1.Pod) ([]*v1.Pod, error) {
// Get span from annotations and set to ctx
ctx := httptrace.SpanContextFromAnnotations(context.Background(), rs.GetAnnotations())

// If any adoptions are attempted, we should first recheck for deletion with
// an uncached quorum read sometime after listing Pods (see #42639).
canAdoptFunc := controller.RecheckDeletionTimestamp(func() (metav1.Object, error) {
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(context.TODO(), rs.Name, metav1.GetOptions{})
fresh, err := rsc.kubeClient.AppsV1().ReplicaSets(rs.Namespace).Get(ctx, rs.Name, metav1.GetOptions{})
if err != nil {
return nil, err
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/replication/conversion.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (

apps "k8s.io/api/apps/v1"
autoscalingv1 "k8s.io/api/autoscaling/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -333,12 +333,12 @@ func (pc podControlAdapter) CreatePodsOnNode(nodeName, namespace string, templat
return errors.New("CreatePodsOnNode() is not implemented for podControlAdapter")
}

func (pc podControlAdapter) CreatePodsWithControllerRef(namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
func (pc podControlAdapter) CreatePodsWithControllerRef(ctx context.Context, namespace string, template *v1.PodTemplateSpec, object runtime.Object, controllerRef *metav1.OwnerReference) error {
rc, err := convertRStoRC(object.(*apps.ReplicaSet))
if err != nil {
return err
}
return pc.PodControlInterface.CreatePodsWithControllerRef(namespace, template, rc, controllerRef)
return pc.PodControlInterface.CreatePodsWithControllerRef(ctx, namespace, template, rc, controllerRef)
}

func (pc podControlAdapter) DeletePod(namespace string, podID string, object runtime.Object) error {
Expand Down
Loading