@@ -22,13 +22,148 @@ import (
2222)
2323
2424const (
25- defaultCrateNodePrefix = "data-hot"
26- defaultPID = 1
27- defaultProto = "https"
28- defaultMinAvailability = "FULL"
29- defaultTimeout = "7200s"
25+ defaultCrateNodePrefix = "data-hot"
26+ defaultPID = 1
27+ defaultProto = "https"
28+ defaultMinAvailability = "FULL"
29+ defaultTimeout = "7200s"
30+ defaultTerminationGracePeriodSeconds = 30 // Kubernetes default
31+ gracePeriodBuffer = 120 // seconds to subtract from terminationGracePeriodSeconds
32+ minimumTimeout = 360 // minimum effective timeout in seconds
33+
34+ // StatefulSet label keys (using dashes instead of underscores)
35+ labelMinAvailability = "dc-util-min-availability"
36+ labelGracefulStop = "dc-util-graceful-stop"
3037)
3138
39+ type customError struct { msg string }
40+
41+ func (e * customError ) Error () string { return e .msg }
42+
43+ // ErrMalformedHostname is returned when hostname cannot be parsed
44+ var ErrMalformedHostname = & customError {"malformed hostname" }
45+
46+ // splitHostname splits hostname by "-"
47+ func splitHostname (hostname string ) []string {
48+ return strings .Split (hostname , "-" )
49+ }
50+
51+ // makeDecommissionStmt creates the decommission statement
52+ func makeDecommissionStmt (nodeName string ) string {
53+ return fmt .Sprintf ("alter cluster decommission '%s'" , nodeName )
54+ }
55+
56+ // extractNodeName extracts the CrateDB node name from hostname
57+ func extractNodeName (hostname , crateNodePrefix , defaultPrefix string ) (string , error ) {
58+ parts := splitHostname (hostname )
59+
60+ // If custom prefix provided (not default), use it
61+ if crateNodePrefix != defaultPrefix && crateNodePrefix != "" {
62+ if len (parts ) > 0 {
63+ podNumber := parts [len (parts )- 1 ]
64+ return crateNodePrefix + "-" + podNumber , nil
65+ }
66+ return "" , ErrMalformedHostname
67+ }
68+
69+ // Extract from hostname if using default prefix
70+ // Expected format: crate-<prefix-parts>-<uuid-parts>-<pod-number>
71+ // We want: <prefix-parts>-<pod-number>
72+ if len (parts ) >= 4 && parts [0 ] == "crate" {
73+ podNumber := parts [len (parts )- 1 ]
74+
75+ // Look for the node prefix pattern after "crate"
76+ // Use the provided crateNodePrefix (which equals defaultPrefix in this case)
77+ prefixParts := strings .Split (crateNodePrefix , "-" )
78+
79+ // Check if the hostname contains the expected prefix parts after "crate"
80+ if len (parts ) >= len (prefixParts )+ 2 { // crate + prefix parts + pod number (minimum)
81+ // Extract the prefix parts that match our expected pattern
82+ prefixMatches := true
83+ for i , expectedPart := range prefixParts {
84+ if parts [1 + i ] != expectedPart {
85+ prefixMatches = false
86+ break
87+ }
88+ }
89+
90+ if prefixMatches {
91+ return crateNodePrefix + "-" + podNumber , nil
92+ }
93+ }
94+ }
95+
96+ return "" , ErrMalformedHostname
97+ }
98+
99+ // calculateEffectiveTimeout determines the timeout to use based on terminationGracePeriodSeconds
100+ func calculateEffectiveTimeout (flagTimeout string , terminationGracePeriodSeconds * int64 ) (string , error ) {
101+ // Parse the flag timeout value
102+ flagTimeoutDuration , err := time .ParseDuration (flagTimeout )
103+ if err != nil {
104+ return "" , fmt .Errorf ("invalid timeout format: %w" , err )
105+ }
106+ flagTimeoutSeconds := int (flagTimeoutDuration .Seconds ())
107+
108+ // If terminationGracePeriodSeconds is not set or is the default value (30s), use flag timeout
109+ // The default 30s is too small for CrateDB decommissioning, so we rely on the flag timeout
110+ if terminationGracePeriodSeconds == nil || * terminationGracePeriodSeconds == defaultTerminationGracePeriodSeconds {
111+ return flagTimeout , nil
112+ }
113+
114+ // Calculate effective timeout: terminationGracePeriodSeconds - buffer
115+ effectiveTimeoutSeconds := int (* terminationGracePeriodSeconds ) - gracePeriodBuffer
116+
117+ // Ensure minimum timeout
118+ if effectiveTimeoutSeconds < minimumTimeout {
119+ effectiveTimeoutSeconds = minimumTimeout
120+ log .Printf ("Calculated timeout (%ds) is below minimum, using %ds instead" ,
121+ int (* terminationGracePeriodSeconds )- gracePeriodBuffer , minimumTimeout )
122+ }
123+
124+ // Log when using different timeout than flag
125+ if effectiveTimeoutSeconds != flagTimeoutSeconds {
126+ log .Printf ("Using timeout derived from terminationGracePeriodSeconds: %ds (terminationGracePeriodSeconds=%ds, buffer=%ds) instead of flag value: %ds" ,
127+ effectiveTimeoutSeconds , * terminationGracePeriodSeconds , gracePeriodBuffer , flagTimeoutSeconds )
128+ }
129+
130+ return fmt .Sprintf ("%ds" , effectiveTimeoutSeconds ), nil
131+ }
132+
133+ // getMinAvailabilityFromLabels reads min-availability from StatefulSet labels or returns default
134+ func getMinAvailabilityFromLabels (labels map [string ]string , defaultValue string ) string {
135+ if value , exists := labels [labelMinAvailability ]; exists {
136+ // Validate the value
137+ switch value {
138+ case "NONE" , "FULL" , "PRIMARIES" :
139+ log .Printf ("Using min-availability from StatefulSet label: %s" , value )
140+ return value
141+ default :
142+ log .Printf ("Invalid min-availability value in StatefulSet label '%s': %s, using default: %s" ,
143+ labelMinAvailability , value , defaultValue )
144+ }
145+ }
146+ return defaultValue
147+ }
148+
149+ // getGracefulStopForceFromLabels reads graceful stop force setting from StatefulSet labels
150+ func getGracefulStopForceFromLabels (labels map [string ]string ) bool {
151+ if value , exists := labels [labelGracefulStop ]; exists {
152+ switch value {
153+ case "TRUE" , "true" , "True" :
154+ log .Printf ("Using graceful stop force from StatefulSet label: true" )
155+ return true
156+ case "FALSE" , "false" , "False" :
157+ log .Printf ("Using graceful stop force from StatefulSet label: false" )
158+ return false
159+ default :
160+ log .Printf ("Invalid graceful stop value in StatefulSet label '%s': %s, using default: true" ,
161+ labelGracefulStop , value )
162+ }
163+ }
164+ return true // default behavior
165+ }
166+
32167func sendSQLStatement (proto , stmt string ) error {
33168 payload := map [string ]string {"stmt" : stmt }
34169 payloadBytes , err := json .Marshal (payload )
@@ -159,6 +294,14 @@ func run() error {
159294 }
160295 statefulSetName := strings .Join (hostnameParts [:len (hostnameParts )- 1 ], "-" )
161296
297+ // Determine crateNodeName using extracted logic
298+ log .Printf ("Parsing hostname: %s" , hostname )
299+ actualCrateNodeName , err := extractNodeName (hostname , crateNodePrefix , defaultCrateNodePrefix )
300+ if err != nil {
301+ return fmt .Errorf ("failed to extract node name from hostname %s: %w" , hostname , err )
302+ }
303+ log .Printf ("Extracted CrateDB node name: %s" , actualCrateNodeName )
304+
162305 ctx := context .Background ()
163306 statefulSet , err := clientset .AppsV1 ().StatefulSets (namespace ).Get (ctx , statefulSetName , metav1.GetOptions {})
164307 if err != nil {
@@ -168,19 +311,34 @@ func run() error {
168311
169312 log .Printf ("StatefulSet has %d replicas configured" , replicas )
170313
314+ // Get configuration from StatefulSet labels
315+ effectiveMinAvailability := getMinAvailabilityFromLabels (statefulSet .Labels , minAvailability )
316+ gracefulStopForce := getGracefulStopForceFromLabels (statefulSet .Labels )
317+
318+ // Calculate effective timeout based on terminationGracePeriodSeconds
319+ effectiveTimeout , err := calculateEffectiveTimeout (decommissionTimeout , statefulSet .Spec .Template .Spec .TerminationGracePeriodSeconds )
320+ if err != nil {
321+ return fmt .Errorf ("failed to calculate effective timeout: %w" , err )
322+ }
323+
324+ if statefulSet .Spec .Template .Spec .TerminationGracePeriodSeconds != nil {
325+ log .Printf ("StatefulSet terminationGracePeriodSeconds: %ds" , * statefulSet .Spec .Template .Spec .TerminationGracePeriodSeconds )
326+ } else {
327+ log .Printf ("StatefulSet terminationGracePeriodSeconds: not set (using Kubernetes default)" )
328+ }
329+
171330 time .Sleep (2 * time .Second ) // Sleep to catch up with the replica settings
172331
173332 if replicas > 0 {
174- podNumber := hostnameParts [len (hostnameParts )- 1 ]
175-
176333 // Send the SQL statements to decommission the node
177- log .Printf ("Decommissioning node %s with graceful_stop.timeout of %s" , podNumber , decommissionTimeout )
334+ log .Printf ("Decommissioning node %s with graceful_stop.timeout of %s, min_availability=%s, force=%t" ,
335+ actualCrateNodeName , effectiveTimeout , effectiveMinAvailability , gracefulStopForce )
178336
179337 statements := []string {
180- fmt .Sprintf (`set global transient "cluster.graceful_stop.timeout" = '%s';` , decommissionTimeout ),
181- `set global transient "cluster.graceful_stop.force" = True;` ,
182- fmt .Sprintf (`set global transient "cluster.graceful_stop.min_availability"='%s';` , minAvailability ),
183- fmt . Sprintf ( `alter cluster decommission '%s-%s'` , crateNodePrefix , podNumber ),
338+ fmt .Sprintf (`set global transient "cluster.graceful_stop.timeout" = '%s';` , effectiveTimeout ),
339+ fmt . Sprintf ( `set global transient "cluster.graceful_stop.force" = %t;` , gracefulStopForce ) ,
340+ fmt .Sprintf (`set global transient "cluster.graceful_stop.min_availability"='%s';` , effectiveMinAvailability ),
341+ makeDecommissionStmt ( actualCrateNodeName ),
184342 }
185343
186344 for _ , stmt := range statements {
0 commit comments