Skip to content

Commit 0f45702

Browse files
Re-try failed upgrades (#3893)
* Re-try failed upgrades Currently, the operator upgrades all OpenTelemetryCollector CRs on startup of the operator. If any upgrade fails (e.g. intermittent errors of the Kubernetes API server), it won't be re-tried until the operator is restarted. This commit moves the upgrade step to the reconcile loop. The operator reconciles all managed instances on startup, and in case of an error, re-tries the upgrade with exponential backoff. Additionally, I changed the event type from Error to Warning, because "Error" is not a valid event type. Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * add description to changelog Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * extract upgrade to separate methods, return and re-queue reconciliation on upgrade Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * move NeedsUpgrade to upgrade package Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * re-add ManagementState and UpgradeStrategy check to Upgrade function Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * check NeedsUpgrade() in Upgrade() fn Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * add unit tests, remove upgrade from status handling Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> * fix tests Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com> --------- Signed-off-by: Andreas Gerstmayr <agerstmayr@redhat.com>
1 parent 65af20a commit 0f45702

File tree

7 files changed

+340
-89
lines changed

7 files changed

+340
-89
lines changed

.chloggen/retry_upgrades.yaml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
2+
change_type: enhancement
3+
4+
# The name of the component, or a single word describing the area of concern, (e.g. collector, target allocator, auto-instrumentation, opamp, github action)
5+
component: collector
6+
7+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
8+
note: Re-try failed upgrades
9+
10+
# One or more tracking issues related to the change
11+
issues: [3515]
12+
13+
# (Optional) One or more lines of additional information to render under the primary note.
14+
# These lines will be padded with 2 spaces and then inserted directly into the document.
15+
# Use pipe (|) for multiline entries.
16+
subtext: |
17+
The upgrade process runs now in the reconcile loop.
18+
In case of an error, the operator re-tries the upgrade with exponential backoff.

controllers/opentelemetrycollector_controller.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package controllers
77
import (
88
"context"
99
"sort"
10+
"time"
1011

1112
"github.com/go-logr/logr"
1213
routev1 "github.com/openshift/api/route/v1"
@@ -39,6 +40,8 @@ import (
3940
"github.com/open-telemetry/opentelemetry-operator/internal/manifests/manifestutils"
4041
internalRbac "github.com/open-telemetry/opentelemetry-operator/internal/rbac"
4142
collectorStatus "github.com/open-telemetry/opentelemetry-operator/internal/status/collector"
43+
"github.com/open-telemetry/opentelemetry-operator/internal/version"
44+
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/upgrade"
4245
"github.com/open-telemetry/opentelemetry-operator/pkg/constants"
4346
"github.com/open-telemetry/opentelemetry-operator/pkg/featuregate"
4447
)
@@ -60,6 +63,7 @@ type OpenTelemetryCollectorReconciler struct {
6063
log logr.Logger
6164
config config.Config
6265
reviewer *internalRbac.Reviewer
66+
upgrade *upgrade.VersionUpgrade
6367
}
6468

6569
// Params is the set of options to build a new OpenTelemetryCollectorReconciler.
@@ -70,6 +74,7 @@ type Params struct {
7074
Log logr.Logger
7175
Config config.Config
7276
Reviewer *internalRbac.Reviewer
77+
Version version.Version
7378
}
7479

7580
func (r *OpenTelemetryCollectorReconciler) findOtelOwnedObjects(ctx context.Context, params manifests.Params) (map[types.UID]client.Object, error) {
@@ -187,13 +192,21 @@ func (r *OpenTelemetryCollectorReconciler) getTargetAllocator(ctx context.Contex
187192

188193
// NewReconciler creates a new reconciler for OpenTelemetryCollector objects.
189194
func NewReconciler(p Params) *OpenTelemetryCollectorReconciler {
195+
up := &upgrade.VersionUpgrade{
196+
Client: p.Client,
197+
Log: p.Log.WithName("collector-upgrade"),
198+
Recorder: p.Recorder,
199+
Version: p.Version,
200+
}
201+
190202
r := &OpenTelemetryCollectorReconciler{
191203
Client: p.Client,
192204
log: p.Log,
193205
scheme: p.Scheme,
194206
config: p.Config,
195207
recorder: p.Recorder,
196208
reviewer: p.Reviewer,
209+
upgrade: up,
197210
}
198211
return r
199212
}
@@ -263,6 +276,15 @@ func (r *OpenTelemetryCollectorReconciler) Reconcile(ctx context.Context, req ct
263276
return ctrl.Result{}, nil
264277
}
265278

279+
if r.upgrade.NeedsUpgrade(instance) {
280+
err = r.upgrade.Upgrade(ctx, instance)
281+
if err != nil {
282+
return ctrl.Result{}, err
283+
}
284+
// if the OpenTelemetryCollector CR was upgraded (modified), return here and re-queue the reconcile event.
285+
return ctrl.Result{Requeue: true, RequeueAfter: 1 * time.Second}, nil
286+
}
287+
266288
// Add finalizer for this CR
267289
if !controllerutil.ContainsFinalizer(&instance, collectorFinalizer) {
268290
if controllerutil.AddFinalizer(&instance, collectorFinalizer) {

controllers/reconcile_test.go

Lines changed: 184 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ import (
4545
"github.com/open-telemetry/opentelemetry-operator/internal/config"
4646
"github.com/open-telemetry/opentelemetry-operator/internal/manifests"
4747
"github.com/open-telemetry/opentelemetry-operator/internal/naming"
48+
"github.com/open-telemetry/opentelemetry-operator/internal/version"
49+
"github.com/open-telemetry/opentelemetry-operator/pkg/collector/upgrade"
4850
"github.com/open-telemetry/opentelemetry-operator/pkg/featuregate"
4951
)
5052

@@ -1298,6 +1300,181 @@ service:
12981300
require.Error(t, clientErr)
12991301
}
13001302

1303+
func TestUpgrade(t *testing.T) {
1304+
testCtx, cancel := context.WithCancel(context.Background())
1305+
defer cancel()
1306+
1307+
reconciler := createTestReconcilerWithVersion(
1308+
t, testCtx,
1309+
config.New(
1310+
config.WithCollectorImage("default-collector"),
1311+
config.WithTargetAllocatorImage("default-ta-allocator"),
1312+
),
1313+
version.Version{OpenTelemetryCollector: upgrade.Latest.String()},
1314+
)
1315+
1316+
for _, tt := range []struct {
1317+
name string
1318+
input v1beta1.OpenTelemetryCollector
1319+
expectRequeue bool
1320+
expected check[v1beta1.OpenTelemetryCollector]
1321+
}{
1322+
{
1323+
name: "needs-upgrade",
1324+
input: v1beta1.OpenTelemetryCollector{
1325+
Spec: v1beta1.OpenTelemetryCollectorSpec{
1326+
OpenTelemetryCommonFields: v1beta1.OpenTelemetryCommonFields{
1327+
Args: map[string]string{
1328+
// this feature gate will be removed in the 0.110.0 upgrade step
1329+
"feature-gates": "-component.UseLocalHostAsDefaultHost",
1330+
},
1331+
},
1332+
Config: v1beta1.Config{
1333+
Receivers: v1beta1.AnyConfig{
1334+
Object: map[string]any{
1335+
"nop": map[string]any{},
1336+
},
1337+
},
1338+
Exporters: v1beta1.AnyConfig{
1339+
Object: map[string]any{
1340+
"nop": map[string]any{},
1341+
},
1342+
},
1343+
Service: v1beta1.Service{
1344+
Pipelines: map[string]*v1beta1.Pipeline{
1345+
"traces": {
1346+
Exporters: []string{"nop"},
1347+
Receivers: []string{"nop"},
1348+
},
1349+
},
1350+
},
1351+
},
1352+
},
1353+
Status: v1beta1.OpenTelemetryCollectorStatus{
1354+
Version: "0.109.0",
1355+
},
1356+
},
1357+
expectRequeue: true,
1358+
expected: func(t *testing.T, otelcol v1beta1.OpenTelemetryCollector) {
1359+
assert.Equal(t, upgrade.Latest.String(), otelcol.Status.Version)
1360+
// The '-component.UseLocalHostAsDefaultHost' feature gate was removed in the 0.110.0 upgrade step
1361+
assert.Equal(t, "", otelcol.Spec.OpenTelemetryCommonFields.Args["feature-gates"])
1362+
},
1363+
},
1364+
{
1365+
name: "up-to-date",
1366+
input: v1beta1.OpenTelemetryCollector{
1367+
Spec: v1beta1.OpenTelemetryCollectorSpec{
1368+
Config: v1beta1.Config{
1369+
Receivers: v1beta1.AnyConfig{
1370+
Object: map[string]any{
1371+
"nop": map[string]any{},
1372+
},
1373+
},
1374+
Exporters: v1beta1.AnyConfig{
1375+
Object: map[string]any{
1376+
"nop": map[string]any{},
1377+
},
1378+
},
1379+
Service: v1beta1.Service{
1380+
Pipelines: map[string]*v1beta1.Pipeline{
1381+
"traces": {
1382+
Exporters: []string{"nop"},
1383+
Receivers: []string{"nop"},
1384+
},
1385+
},
1386+
},
1387+
},
1388+
},
1389+
Status: v1beta1.OpenTelemetryCollectorStatus{
1390+
Version: upgrade.Latest.String(),
1391+
},
1392+
},
1393+
expectRequeue: false,
1394+
expected: func(t *testing.T, otelcol v1beta1.OpenTelemetryCollector) {
1395+
assert.Equal(t, upgrade.Latest.String(), otelcol.Status.Version)
1396+
},
1397+
},
1398+
{
1399+
name: "empty-version",
1400+
input: v1beta1.OpenTelemetryCollector{
1401+
Spec: v1beta1.OpenTelemetryCollectorSpec{
1402+
Config: v1beta1.Config{
1403+
Receivers: v1beta1.AnyConfig{
1404+
Object: map[string]any{
1405+
"nop": map[string]any{},
1406+
},
1407+
},
1408+
Exporters: v1beta1.AnyConfig{
1409+
Object: map[string]any{
1410+
"nop": map[string]any{},
1411+
},
1412+
},
1413+
Service: v1beta1.Service{
1414+
Pipelines: map[string]*v1beta1.Pipeline{
1415+
"traces": {
1416+
Exporters: []string{"nop"},
1417+
Receivers: []string{"nop"},
1418+
},
1419+
},
1420+
},
1421+
},
1422+
},
1423+
Status: v1beta1.OpenTelemetryCollectorStatus{
1424+
Version: "",
1425+
},
1426+
},
1427+
expectRequeue: false,
1428+
expected: func(t *testing.T, otelcol v1beta1.OpenTelemetryCollector) {
1429+
// updateCollectorStatus() sets version.OpenTelemetryCollector(), which is 0.0.0 in unit tests
1430+
assert.Equal(t, "0.0.0", otelcol.Status.Version)
1431+
},
1432+
},
1433+
} {
1434+
t.Run(tt.name, func(t *testing.T) {
1435+
otelcol := tt.input.DeepCopy()
1436+
otelcol.Namespace = "default"
1437+
otelcol.Name = tt.name
1438+
nsn := types.NamespacedName{Name: otelcol.Name, Namespace: otelcol.Namespace}
1439+
1440+
// Create otelcol CR
1441+
err := k8sClient.Create(testCtx, otelcol)
1442+
require.NoError(t, err)
1443+
1444+
// The status subresource must be updated separately
1445+
otelcol.Status = tt.input.Status
1446+
err = k8sClient.Status().Update(testCtx, otelcol)
1447+
require.NoError(t, err)
1448+
1449+
// Update cache.
1450+
// Otherwise, adding the finalizer in opentelemetrycollector_controller intermittently fails with:
1451+
// Operation cannot be fulfilled on opentelemetrycollectors.opentelemetry.io "...": the object has been modified; please apply your changes to the latest version and try again
1452+
_ = k8sClient.Get(ctx, nsn, otelcol)
1453+
1454+
// First reconcile
1455+
req := k8sreconcile.Request{NamespacedName: nsn}
1456+
reconcile, err := reconciler.Reconcile(testCtx, req)
1457+
require.NoError(t, err)
1458+
require.Equal(t, tt.expectRequeue, reconcile.Requeue)
1459+
1460+
// Second reconcile (if upgrade was run)
1461+
if reconcile.Requeue {
1462+
time.Sleep(reconcile.RequeueAfter)
1463+
1464+
reconcile, err = reconciler.Reconcile(testCtx, req)
1465+
require.NoError(t, err)
1466+
require.False(t, reconcile.Requeue)
1467+
}
1468+
1469+
// Compare upgraded CR with expected
1470+
upgraded := v1beta1.OpenTelemetryCollector{}
1471+
err = k8sClient.Get(ctx, nsn, &upgraded)
1472+
require.NoError(t, err)
1473+
tt.expected(t, upgraded)
1474+
})
1475+
}
1476+
}
1477+
13011478
func namespacedObjectName(name string, namespace string) types.NamespacedName {
13021479
return types.NamespacedName{
13031480
Namespace: namespace,
@@ -1353,7 +1530,7 @@ func IsOwnedBy(obj metav1.Object, owner *v1beta1.OpenTelemetryCollector) bool {
13531530
return isOwner
13541531
}
13551532

1356-
func createTestReconciler(t *testing.T, ctx context.Context, cfg config.Config) *controllers.OpenTelemetryCollectorReconciler {
1533+
func createTestReconcilerWithVersion(t *testing.T, ctx context.Context, cfg config.Config, v version.Version) *controllers.OpenTelemetryCollectorReconciler {
13571534
t.Helper()
13581535
// we need to set up caches for our reconciler
13591536
runtimeCluster, err := runtimecluster.New(restCfg, func(options *runtimecluster.Options) {
@@ -1372,6 +1549,7 @@ func createTestReconciler(t *testing.T, ctx context.Context, cfg config.Config)
13721549
Scheme: testScheme,
13731550
Recorder: record.NewFakeRecorder(20),
13741551
Config: cfg,
1552+
Version: v,
13751553
})
13761554
err = reconciler.SetupCaches(runtimeCluster)
13771555
require.NoError(t, err)
@@ -1380,6 +1558,11 @@ func createTestReconciler(t *testing.T, ctx context.Context, cfg config.Config)
13801558
return reconciler
13811559
}
13821560

1561+
func createTestReconciler(t *testing.T, ctx context.Context, cfg config.Config) *controllers.OpenTelemetryCollectorReconciler {
1562+
t.Helper()
1563+
return createTestReconcilerWithVersion(t, ctx, cfg, version.Get())
1564+
}
1565+
13831566
func sanitizeResourceName(name string) string {
13841567
sanitized := strings.ToLower(name)
13851568
re := regexp.MustCompile("[^a-z0-9-]")

internal/status/collector/handle.go

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,19 +8,15 @@ import (
88
"fmt"
99

1010
"github.com/go-logr/logr"
11+
corev1 "k8s.io/api/core/v1"
1112
ctrl "sigs.k8s.io/controller-runtime"
1213
"sigs.k8s.io/controller-runtime/pkg/client"
1314

1415
"github.com/open-telemetry/opentelemetry-operator/apis/v1beta1"
1516
"github.com/open-telemetry/opentelemetry-operator/internal/manifests"
16-
"github.com/open-telemetry/opentelemetry-operator/internal/version"
17-
collectorupgrade "github.com/open-telemetry/opentelemetry-operator/pkg/collector/upgrade"
1817
)
1918

2019
const (
21-
eventTypeNormal = "Normal"
22-
eventTypeWarning = "Warning"
23-
2420
reasonError = "Error"
2521
reasonStatusFailure = "StatusFailure"
2622
reasonInfo = "Info"
@@ -30,32 +26,21 @@ const (
3026
func HandleReconcileStatus(ctx context.Context, log logr.Logger, params manifests.Params, otelcol v1beta1.OpenTelemetryCollector, err error) (ctrl.Result, error) {
3127
log.V(2).Info("updating collector status")
3228
if err != nil {
33-
params.Recorder.Event(&otelcol, eventTypeWarning, reasonError, err.Error())
29+
params.Recorder.Event(&otelcol, corev1.EventTypeWarning, reasonError, err.Error())
3430
return ctrl.Result{}, err
3531
}
36-
changed := otelcol.DeepCopy()
3732

38-
up := &collectorupgrade.VersionUpgrade{
39-
Log: params.Log,
40-
Version: version.Get(),
41-
Client: params.Client,
42-
Recorder: params.Recorder,
43-
}
44-
upgraded, upgradeErr := up.ManagedInstance(ctx, *changed)
45-
if upgradeErr != nil {
46-
// don't fail to allow setting the status
47-
log.V(2).Error(upgradeErr, "failed to upgrade the OpenTelemetry CR")
48-
}
49-
changed = &upgraded
33+
changed := otelcol.DeepCopy()
5034
statusErr := updateCollectorStatus(ctx, params.Client, changed)
35+
5136
if statusErr != nil {
52-
params.Recorder.Event(changed, eventTypeWarning, reasonStatusFailure, statusErr.Error())
37+
params.Recorder.Event(changed, corev1.EventTypeWarning, reasonStatusFailure, statusErr.Error())
5338
return ctrl.Result{}, statusErr
5439
}
5540
statusPatch := client.MergeFrom(&otelcol)
5641
if err := params.Client.Status().Patch(ctx, changed, statusPatch); err != nil {
5742
return ctrl.Result{}, fmt.Errorf("failed to apply status changes to the OpenTelemetry CR: %w", err)
5843
}
59-
params.Recorder.Event(changed, eventTypeNormal, reasonInfo, "applied status changes")
44+
params.Recorder.Event(changed, corev1.EventTypeNormal, reasonInfo, "applied status changes")
6045
return ctrl.Result{}, nil
6146
}

0 commit comments

Comments
 (0)