2121
2222import asyncio
2323import logging
24+ import re
2425from typing import Any , Dict , List , Optional
2526
2627import kopf
27- from kubernetes_asyncio .client import AppsV1Api
28+ from kubernetes_asyncio .client import AppsV1Api , CoreV1Api
2829
2930from crate .operator .config import config
31+ from crate .operator .constants import INTERNAL_TABLES , LUCENE_MIN_VERSION_MAP
32+ from crate .operator .cratedb import connection_factory
3033from crate .operator .operations import get_total_nodes_count
3134from crate .operator .scale import get_container
3235from crate .operator .utils import crate , quorum
3336from crate .operator .utils .k8s_api_client import GlobalApiClient
3437from crate .operator .utils .kopf import StateBasedSubHandler
38+ from crate .operator .utils .kubeapi import get_host , get_system_user_password
3539from crate .operator .utils .version import CrateVersion
3640from crate .operator .webhooks import WebhookEvent , WebhookStatus , WebhookUpgradePayload
3741
@@ -267,6 +271,176 @@ async def upgrade_cluster(
267271 await asyncio .gather (* updates )
268272
269273
274+ def _get_major_or_error (version : CrateVersion ) -> int :
275+ """Helper to safely get the major version or raise an error."""
276+ if version .major is None :
277+ raise kopf .PermanentError (f"Invalid CrateDB version: { version } " )
278+ return version .major
279+
280+
281+ async def check_reindexing_tables (
282+ core : CoreV1Api ,
283+ namespace : str ,
284+ name : str ,
285+ body : kopf .Body ,
286+ old : kopf .Body ,
287+ logger : logging .Logger ,
288+ ):
289+ """
290+ Check if there are any tables that need re-indexing before a
291+ major version upgrade.
292+
293+ :param core: An instance of the Kubernetes Core V1 API.
294+ :param namespace: The Kubernetes namespace for the CrateDB cluster.
295+ :param name: The name for the ``CrateDB`` custom resource.
296+ :param body: The full body of the ``CrateDB`` custom resource per
297+ :class:`kopf.Body`.
298+ :param old: The old resource body. Required to get the old version.
299+ """
300+ old_version = CrateVersion (old ["spec" ]["cluster" ]["version" ])
301+ new_version = CrateVersion (body .spec ["cluster" ]["version" ])
302+
303+ old_major = _get_major_or_error (old_version )
304+ new_major = _get_major_or_error (new_version )
305+
306+ if new_major > old_major :
307+ # Determine required Lucene version based on the target CrateDB version
308+ lucene_min_version = LUCENE_MIN_VERSION_MAP .get (new_major )
309+ if lucene_min_version is None :
310+ raise kopf .PermanentError (
311+ f"No Lucene version mapping found for target CrateDB { new_major } . "
312+ )
313+ host = await get_host (core , namespace , name )
314+ password = await get_system_user_password (core , namespace , name )
315+ conn_factory = connection_factory (host , password )
316+ connection = conn_factory ()
317+
318+ async with connection as conn :
319+ async with conn .cursor () as cursor :
320+ query = f"""
321+ SELECT table_name
322+ FROM (
323+ SELECT table_name,
324+ max(min_lucene_version LIKE '{ lucene_min_version } ')
325+ AS needs_reindex
326+ FROM sys.shards
327+ GROUP BY table_name
328+ ) t
329+ WHERE needs_reindex = TRUE;
330+ """
331+ await cursor .execute (query )
332+ rows = await cursor .fetchall ()
333+ logger .info ("Found tables that need re-indexing %s" , rows )
334+
335+ if rows :
336+ tables = [row [0 ] for row in rows ]
337+ raise kopf .PermanentError (
338+ f"Tables need re-indexing before upgrade: { ', ' .join (tables )} "
339+ )
340+
341+
342+ async def recreate_internal_tables (
343+ core : CoreV1Api ,
344+ namespace : str ,
345+ name : str ,
346+ body : kopf .Body ,
347+ old : kopf .Body ,
348+ logger : logging .Logger ,
349+ ):
350+ """
351+ Re-create internal tables that may have been created with an old
352+ CrateDB major version.
353+
354+ :param core: An instance of the Kubernetes Core V1 API.
355+ :param namespace: The Kubernetes namespace for the CrateDB cluster.
356+ :param name: The name for the ``CrateDB`` custom resource.
357+ :param body: The full body of the ``CrateDB`` custom resource per
358+ :class:`kopf.Body`.
359+ :param old: The old resource body. Required to get the old version.
360+ """
361+ old_version = CrateVersion (old ["spec" ]["cluster" ]["version" ])
362+ new_version = CrateVersion (body .spec ["cluster" ]["version" ])
363+
364+ old_major = _get_major_or_error (old_version )
365+ new_major = _get_major_or_error (new_version )
366+
367+ if new_major > old_major :
368+ host = await get_host (core , namespace , name )
369+ password = await get_system_user_password (core , namespace , name )
370+ conn_factory = connection_factory (host , password )
371+ connection = conn_factory ()
372+
373+ async with connection as conn :
374+ async with conn .cursor () as cursor :
375+ for full_table in INTERNAL_TABLES :
376+ schema , table = full_table .split ("." )
377+
378+ await cursor .execute (
379+ """
380+ SELECT COUNT(*)
381+ FROM information_schema.tables
382+ WHERE table_schema = %s AND table_name = %s
383+ """ ,
384+ (schema , table ),
385+ )
386+ exists = (await cursor .fetchone ())[0 ]
387+
388+ if not exists :
389+ logger .info ("Skipping missing table: %s" , full_table )
390+ continue
391+
392+ try :
393+ tmp_table = f"{ schema } .tmp_{ table } "
394+ logger .info ("Recreating internal table: %s" , full_table )
395+
396+ # Step 1: Fetch original CREATE TABLE statement and replace
397+ # the table name with a temporary one.
398+ await cursor .execute (f"SHOW CREATE TABLE { full_table } " )
399+ ddl = (await cursor .fetchone ())[0 ]
400+ ddl_tmp = re .sub (
401+ r'CREATE TABLE IF NOT EXISTS\s+"([^"]+)"\."([^"]+)"' ,
402+ f'CREATE TABLE IF NOT EXISTS "{ schema } "."tmp_{ table } "' ,
403+ ddl ,
404+ count = 1 ,
405+ )
406+ logger .info ("Original DDL for %s: %s" , full_table , ddl )
407+ logger .info ("Temporary DDL for %s: %s" , tmp_table , ddl_tmp )
408+
409+ # Step 2: Create temporary table
410+ logger .info ("Creating temporary table: %s" , tmp_table )
411+ await cursor .execute (ddl_tmp )
412+
413+ # Step 3: Copy data into temporary table
414+ logger .info ("Copying data to %s" , tmp_table )
415+ await cursor .execute (
416+ f'INSERT INTO "{ schema } "."tmp_{ table } " '
417+ f'SELECT * FROM "{ schema } "."{ table } "'
418+ )
419+
420+ # Step 4: Swap tables atomically
421+ logger .info ("Swapping %s -> %s" , tmp_table , full_table )
422+ await cursor .execute (
423+ f'ALTER CLUSTER SWAP TABLE "{ schema } "."tmp_{ table } " '
424+ f'TO "{ schema } "."{ table } "'
425+ )
426+
427+ # Step 5: Drop temporary table
428+ logger .info ("Dropping obsolete temporary table: %s" , tmp_table )
429+ await cursor .execute (
430+ f'DROP TABLE IF EXISTS "{ schema } "."tmp_{ table } "'
431+ )
432+ except Exception as e :
433+ logger .error (
434+ "Failed to re-create table %s: %s" ,
435+ full_table ,
436+ e ,
437+ exc_info = True ,
438+ )
439+ continue
440+
441+ logger .info ("Successfully re-created all internal tables." )
442+
443+
270444class UpgradeSubHandler (StateBasedSubHandler ):
271445 @crate .on .error (error_handler = crate .send_update_failed_notification )
272446 @crate .timeout (timeout = float (config .CLUSTER_UPDATE_TIMEOUT ))
@@ -296,6 +470,27 @@ async def handle( # type: ignore
296470 )
297471
298472
473+ class BeforeUpgradeSubHandler (StateBasedSubHandler ):
474+ """
475+ A handler which checks if there are any resources created with
476+ an old crateDB version.
477+ """
478+
479+ @crate .on .error (error_handler = crate .send_update_failed_notification )
480+ async def handle ( # type: ignore
481+ self ,
482+ namespace : str ,
483+ name : str ,
484+ body : kopf .Body ,
485+ old : kopf .Body ,
486+ logger : logging .Logger ,
487+ ** kwargs : Any ,
488+ ):
489+ async with GlobalApiClient () as api_client :
490+ core = CoreV1Api (api_client )
491+ await check_reindexing_tables (core , namespace , name , body , old , logger )
492+
493+
299494class AfterUpgradeSubHandler (StateBasedSubHandler ):
300495 """
301496 A handler which depends on ``upgrade`` and ``restart`` having finished
@@ -312,6 +507,10 @@ async def handle( # type: ignore
312507 logger : logging .Logger ,
313508 ** kwargs : Any ,
314509 ):
510+ async with GlobalApiClient () as api_client :
511+ core = CoreV1Api (api_client )
512+ await recreate_internal_tables (core , namespace , name , body , old , logger )
513+
315514 self .schedule_notification (
316515 WebhookEvent .UPGRADE ,
317516 WebhookUpgradePayload (
0 commit comments