Skip to content

Commit 4289e42

Browse files
committed
Add rollback support for failed cluster operations
- Introduced RollbackHandler base class and Upgrade rollback implementation - Integrated rollback trigger in FinalRollbackSubHandler based on handler failure - Annotated rollback state using Kopf progress annotations
1 parent ff0ab4d commit 4289e42

File tree

8 files changed

+621
-54
lines changed

8 files changed

+621
-54
lines changed

CHANGES.rst

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ Changelog
55
Unreleased
66
----------
77

8+
* Implemented rollback framework for CrateDB cluster operations. Introduced
9+
``RollbackHandler`` and specialized subhandler ``RollbackUpgradeSubHandler``
10+
to support targeted rollback logic.
11+
812
2.49.0 (2025-07-14)
913
-------------------
1014

crate/operator/constants.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,13 @@ class BackupStorageProvider(str, enum.Enum):
101101

102102

103103
DEFAULT_BACKUP_STORAGE_PROVIDER = BackupStorageProvider.AWS
104+
105+
106+
class OperationType(str, enum.Enum):
107+
UPGRADE = "upgrade"
108+
SCALE = "scale"
109+
CREATE = "create"
110+
SUSPEND = "suspend"
111+
RESUME = "resume"
112+
CHANGE_COMPUTE = "change_compute"
113+
UNKNOWN = "unknown"

crate/operator/handlers/handle_update_cratedb.py

Lines changed: 117 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import datetime
2323
import hashlib
24+
import logging
2425
from typing import List
2526

2627
import kopf
@@ -30,7 +31,7 @@
3031
ChangeComputeSubHandler,
3132
)
3233
from crate.operator.config import config
33-
from crate.operator.constants import CLUSTER_UPDATE_ID
34+
from crate.operator.constants import CLUSTER_UPDATE_ID, OperationType
3435
from crate.operator.expand_volume import ExpandVolumeSubHandler
3536
from crate.operator.operations import (
3637
DELAY_CRONJOB,
@@ -42,6 +43,7 @@
4243
SuspendClusterSubHandler,
4344
set_cronjob_delay,
4445
)
46+
from crate.operator.rollback import FinalRollbackSubHandler, RollbackUpgradeSubHandler
4547
from crate.operator.scale import ScaleSubHandler
4648
from crate.operator.upgrade import AfterUpgradeSubHandler, UpgradeSubHandler
4749
from crate.operator.utils.crd import has_compute_changed
@@ -53,9 +55,11 @@ async def update_cratedb(
5355
namespace: str,
5456
name: str,
5557
patch: kopf.Patch,
58+
body: kopf.Body,
5659
status: kopf.Status,
5760
diff: kopf.Diff,
5861
started: datetime.datetime,
62+
logger: logging.Logger,
5963
):
6064
"""
6165
Handle cluster updates.
@@ -101,6 +105,10 @@ async def update_cratedb(
101105
elif context.get("ref", "") != change_hash:
102106
context["ref"] = change_hash
103107

108+
rollback_handlers = [
109+
RollbackUpgradeSubHandler(namespace, name, body, patch, logger),
110+
]
111+
104112
# Determines whether the before_cluster_update and after_cluster_update handlers
105113
# will be registered
106114
do_before_update = True
@@ -112,32 +120,39 @@ async def update_cratedb(
112120
do_scale = False
113121
do_expand_volume = False
114122

123+
operation = OperationType.UNKNOWN
124+
115125
for _, field_path, old_spec, new_spec in diff:
116126
if field_path in {
117127
("spec", "cluster", "imageRegistry"),
118128
("spec", "cluster", "version"),
119129
}:
120130
do_upgrade = True
121131
do_restart = True
132+
operation = OperationType.UPGRADE
122133
elif field_path == ("spec", "nodes", "master", "replicas"):
123134
do_scale = True
135+
operation = OperationType.SCALE
124136
elif field_path == ("spec", "nodes", "data"):
125137
for node_spec_idx in range(len(old_spec)):
126138
old_spec = old_spec[node_spec_idx]
127139
new_spec = new_spec[node_spec_idx]
128140

129141
if old_spec.get("replicas") != new_spec.get("replicas"):
130142
do_scale = True
143+
operation = OperationType.SCALE
131144
# When resuming the cluster do not register before_update
132145
if old_spec.get("replicas") == 0:
133146
do_before_update = False
147+
operation = OperationType.RESUME
134148

135149
# Delay the cronjob re-enabling after resuming the cluster
136150
await set_cronjob_delay(patch)
137151

138152
# When suspending the cluster do not register after_update
139153
elif new_spec.get("replicas") == 0:
140154
do_after_update = False
155+
operation = OperationType.SUSPEND
141156

142157
# Do not re-enable the cronjobs if the cluster is suspended
143158
patch.status[DELAY_CRONJOB] = False
@@ -151,6 +166,7 @@ async def update_cratedb(
151166
do_after_update = False
152167
elif has_compute_changed(old_spec, new_spec):
153168
do_change_compute = True
169+
operation = OperationType.CHANGE_COMPUTE
154170
# pod resources won't change until each pod is recreated
155171
do_restart = True
156172

@@ -163,15 +179,33 @@ async def update_cratedb(
163179
):
164180
return
165181

182+
for handler in rollback_handlers:
183+
if handler.is_in_rollback():
184+
rollback_operation = handler.get_operation_type()
185+
if rollback_operation == operation:
186+
logger.info(
187+
f"Rollback in progress for {rollback_operation}: "
188+
f"{handler.annotation_key()}, skipping update."
189+
)
190+
handler.clear_rollback()
191+
return
192+
else:
193+
logger.info(
194+
f"Rollback active for {rollback_operation}, but current operation "
195+
f"is {operation}. Proceeding with update."
196+
)
197+
166198
depends_on: List[str] = []
167199

168200
if do_before_update:
169201
register_before_update_handlers(
170-
namespace, name, change_hash, context, depends_on
202+
namespace, name, change_hash, context, depends_on, operation
171203
)
172204

173205
if do_upgrade:
174-
register_upgrade_handlers(namespace, name, change_hash, context, depends_on)
206+
register_upgrade_handlers(
207+
namespace, name, change_hash, context, depends_on, operation
208+
)
175209

176210
# Delay the cronjob re-enabling after upgrading a cluster
177211
# It is called here to not mess up with the values stored in the status
@@ -180,7 +214,7 @@ async def update_cratedb(
180214

181215
if do_change_compute:
182216
register_change_compute_handlers(
183-
namespace, name, change_hash, context, depends_on
217+
namespace, name, change_hash, context, depends_on, operation
184218
)
185219

186220
if do_restart:
@@ -192,10 +226,13 @@ async def update_cratedb(
192226
depends_on,
193227
do_upgrade,
194228
do_change_compute,
229+
operation,
195230
)
196231

197232
if do_scale:
198-
register_scale_handlers(namespace, name, change_hash, context, depends_on)
233+
register_scale_handlers(
234+
namespace, name, change_hash, context, depends_on, operation
235+
)
199236

200237
if do_expand_volume:
201238
register_storage_expansion_handlers(
@@ -204,7 +241,7 @@ async def update_cratedb(
204241

205242
if do_after_update:
206243
register_after_update_handlers(
207-
namespace, name, change_hash, context, depends_on
244+
namespace, name, change_hash, context, depends_on, operation
208245
)
209246

210247
patch.status[CLUSTER_UPDATE_ID] = context
@@ -223,6 +260,21 @@ async def update_cratedb(
223260
backoff=get_backoff(),
224261
)
225262

263+
# Rollback operation in case of failed dependencies
264+
kopf.register(
265+
fn=FinalRollbackSubHandler(
266+
namespace,
267+
name,
268+
change_hash,
269+
context,
270+
depends_on=depends_on.copy(),
271+
run_on_dep_failures=True,
272+
operation=operation,
273+
)(),
274+
id="final_rollback",
275+
backoff=get_backoff(),
276+
)
277+
226278

227279
def register_storage_expansion_handlers(
228280
namespace: str, name: str, change_hash: str, context: dict, depends_on: list
@@ -278,10 +330,16 @@ def register_restart_handlers(
278330
depends_on: list,
279331
do_upgrade: bool,
280332
do_change_compute: bool,
333+
operation: OperationType,
281334
):
282335
kopf.register(
283336
fn=RestartSubHandler(
284-
namespace, name, change_hash, context, depends_on=depends_on.copy()
337+
namespace,
338+
name,
339+
change_hash,
340+
context,
341+
depends_on=depends_on.copy(),
342+
operation=operation,
285343
)(action=WebhookAction.UPGRADE if do_upgrade else WebhookAction.CHANGE_COMPUTE),
286344
id="restart",
287345
backoff=get_backoff(),
@@ -323,11 +381,21 @@ def register_restart_handlers(
323381

324382

325383
def register_change_compute_handlers(
326-
namespace: str, name: str, change_hash: str, context: dict, depends_on: list
384+
namespace: str,
385+
name: str,
386+
change_hash: str,
387+
context: dict,
388+
depends_on: list,
389+
operation: OperationType,
327390
):
328391
kopf.register(
329392
fn=ChangeComputeSubHandler(
330-
namespace, name, change_hash, context, depends_on=depends_on.copy()
393+
namespace,
394+
name,
395+
change_hash,
396+
context,
397+
depends_on=depends_on.copy(),
398+
operation=operation,
331399
)(),
332400
id="change_compute",
333401
backoff=get_backoff(),
@@ -336,11 +404,21 @@ def register_change_compute_handlers(
336404

337405

338406
def register_scale_handlers(
339-
namespace: str, name: str, change_hash: str, context: dict, depends_on: list
407+
namespace: str,
408+
name: str,
409+
change_hash: str,
410+
context: dict,
411+
depends_on: list,
412+
operation: OperationType,
340413
):
341414
kopf.register(
342415
fn=ScaleSubHandler(
343-
namespace, name, change_hash, context, depends_on=depends_on.copy()
416+
namespace,
417+
name,
418+
change_hash,
419+
context,
420+
depends_on=depends_on.copy(),
421+
operation=operation,
344422
)(),
345423
id="scale",
346424
backoff=get_backoff(),
@@ -349,11 +427,21 @@ def register_scale_handlers(
349427

350428

351429
def register_upgrade_handlers(
352-
namespace: str, name: str, change_hash: str, context: dict, depends_on: list
430+
namespace: str,
431+
name: str,
432+
change_hash: str,
433+
context: dict,
434+
depends_on: list,
435+
operation: OperationType,
353436
):
354437
kopf.register(
355438
fn=UpgradeSubHandler(
356-
namespace, name, change_hash, context, depends_on=depends_on.copy()
439+
namespace,
440+
name,
441+
change_hash,
442+
context,
443+
depends_on=depends_on.copy(),
444+
operation=operation,
357445
)(),
358446
id="upgrade",
359447
backoff=get_backoff(),
@@ -362,7 +450,12 @@ def register_upgrade_handlers(
362450

363451

364452
def register_after_update_handlers(
365-
namespace: str, name: str, change_hash: str, context: dict, depends_on: list
453+
namespace: str,
454+
name: str,
455+
change_hash: str,
456+
context: dict,
457+
depends_on: list,
458+
operation: OperationType,
366459
):
367460
kopf.register(
368461
fn=AfterClusterUpdateSubHandler(
@@ -372,6 +465,7 @@ def register_after_update_handlers(
372465
context,
373466
depends_on=depends_on.copy(),
374467
run_on_dep_failures=True,
468+
operation=operation,
375469
)(),
376470
id="after_cluster_update",
377471
backoff=get_backoff(),
@@ -380,10 +474,17 @@ def register_after_update_handlers(
380474

381475

382476
def register_before_update_handlers(
383-
namespace: str, name: str, change_hash: str, context: dict, depends_on: list
477+
namespace: str,
478+
name: str,
479+
change_hash: str,
480+
context: dict,
481+
depends_on: list,
482+
operation: OperationType,
384483
):
385484
kopf.register(
386-
fn=BeforeClusterUpdateSubHandler(namespace, name, change_hash, context)(),
485+
fn=BeforeClusterUpdateSubHandler(
486+
namespace, name, change_hash, context, operation=operation
487+
)(),
387488
id="before_cluster_update",
388489
backoff=get_backoff(),
389490
)

crate/operator/main.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,16 +161,18 @@ async def cluster_update(
161161
namespace: str,
162162
name: str,
163163
patch: kopf.Patch,
164+
body: kopf.Body,
164165
status: kopf.Status,
165166
diff: kopf.Diff,
166167
started: datetime.datetime,
168+
logger: logging.Logger,
167169
**_kwargs,
168170
):
169171
"""
170172
Handles updates to the CrateDB resource.
171173
"""
172174
await raise_on_namespace_terminating(namespace)
173-
await update_cratedb(namespace, name, patch, status, diff, started)
175+
await update_cratedb(namespace, name, patch, body, status, diff, started, logger)
174176

175177

176178
@kopf.on.update(

0 commit comments

Comments
 (0)