-
Notifications
You must be signed in to change notification settings - Fork 42
Expand file tree
/
Copy pathshared_cluster.go
More file actions
586 lines (498 loc) · 20.2 KB
/
shared_cluster.go
File metadata and controls
586 lines (498 loc) · 20.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
// /*
// Copyright 2025 The Grove Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// */
package setup
import (
"context"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/ai-dynamo/grove/operator/api/common"
"github.com/ai-dynamo/grove/operator/e2e/utils"
"github.com/ai-dynamo/grove/operator/internal/utils/ioutil"
"github.com/docker/docker/api/types/image"
dockerclient "github.com/docker/docker/client"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
)
const (
// cleanupTimeout is the maximum time to wait for all resources and pods to be deleted during cleanup.
// This needs to be long enough to allow for cascade deletion propagation through
// PodCliqueSet -> PodCliqueScalingGroup -> PodClique -> Pod
cleanupTimeout = 2 * time.Minute
// cleanupPollInterval is the interval between checks during cleanup polling
cleanupPollInterval = 1 * time.Second
// Default registry port for local development clusters (e.g., k3d with local registry)
defaultRegistryPort = "5001"
// Environment variables for cluster configuration.
// The cluster must be created beforehand with Grove operator and Kai scheduler deployed.
// For local development with k3d, use: ./operator/hack/infra-manager.py setup e2e
// EnvRegistryPort specifies the container registry port for test images (optional)
EnvRegistryPort = "E2E_REGISTRY_PORT"
)
// resourceType represents a Kubernetes resource type for cleanup operations
type resourceType struct {
group string
version string
resource string
name string
}
// groveManagedResourceTypes defines all resource types managed by Grove operator that need to be tracked for cleanup
var groveManagedResourceTypes = []resourceType{
// Grove CRDs
{"grove.io", "v1alpha1", "podcliquesets", "PodCliqueSets"},
{"grove.io", "v1alpha1", "podcliquescalinggroups", "PodCliqueScalingGroups"},
{"scheduler.grove.io", "v1alpha1", "podgangs", "PodGangs"},
{"grove.io", "v1alpha1", "podcliques", "PodCliques"},
// Kubernetes core resources
{"", "v1", "services", "Services"},
{"", "v1", "serviceaccounts", "ServiceAccounts"},
{"", "v1", "secrets", "Secrets"},
// RBAC resources
{"rbac.authorization.k8s.io", "v1", "roles", "Roles"},
{"rbac.authorization.k8s.io", "v1", "rolebindings", "RoleBindings"},
// Autoscaling resources
{"autoscaling", "v2", "horizontalpodautoscalers", "HorizontalPodAutoscalers"},
}
// SharedClusterManager manages a shared Kubernetes cluster for E2E tests.
// It connects to an existing cluster via kubeconfig and manages test lifecycle operations
// like workload cleanup and node cordoning.
type SharedClusterManager struct {
clientset *kubernetes.Clientset
restConfig *rest.Config
dynamicClient dynamic.Interface
cleanup func()
logger *utils.Logger
isSetup bool
workerNodes []string
registryPort string
cleanupFailed bool // Set to true if CleanupWorkloads fails, causing subsequent tests to fail
cleanupError string // The error message from the failed cleanup
}
var (
sharedCluster *SharedClusterManager
once sync.Once
)
// SharedCluster returns the singleton shared cluster manager
func SharedCluster(logger *utils.Logger) *SharedClusterManager {
once.Do(func() {
sharedCluster = &SharedClusterManager{
logger: logger,
}
})
return sharedCluster
}
// Setup initializes the shared cluster by connecting to an existing Kubernetes cluster.
//
// The cluster must be created beforehand with:
// - Grove operator deployed and ready
// - Kai scheduler deployed and ready
// - Required node labels and topology configuration
//
// For local development with k3d:
//
// ./operator/hack/infra-manager.py setup e2e
//
// Optional environment variables:
// - E2E_REGISTRY_PORT (default: 5001, for pushing test images to local registry)
//
// The cluster connection is established via standard kubeconfig resolution:
// - KUBECONFIG environment variable, or
// - ~/.kube/config, or
// - In-cluster config
func (scm *SharedClusterManager) Setup(ctx context.Context, testImages []string) error {
if scm.isSetup {
return nil
}
return scm.connectToCluster(ctx, testImages)
}
// connectToCluster connects to an existing Kubernetes cluster using standard kubeconfig resolution.
// It expects the cluster to already have Grove operator, Kai scheduler, etc. installed.
func (scm *SharedClusterManager) connectToCluster(ctx context.Context, testImages []string) error {
// Get registry port from env or use default
registryPort := os.Getenv(EnvRegistryPort)
if registryPort == "" {
registryPort = defaultRegistryPort
}
scm.registryPort = registryPort
scm.logger.Info("🔗 Connecting to existing Kubernetes cluster...")
// Get REST config using standard kubeconfig resolution
restConfig, err := getRestConfig()
if err != nil {
return fmt.Errorf("failed to get cluster config: %w", err)
}
scm.restConfig = restConfig
// Create clientset from restConfig
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create clientset: %w", err)
}
scm.clientset = clientset
// Create dynamic client from restConfig
dynamicClient, err := dynamic.NewForConfig(restConfig)
if err != nil {
return fmt.Errorf("failed to create dynamic client: %w", err)
}
scm.dynamicClient = dynamicClient
// Setup test images in registry (if registry port is configured)
if scm.registryPort != "" && len(testImages) > 0 {
if err := SetupRegistryTestImages(scm.registryPort, testImages); err != nil {
return fmt.Errorf("failed to setup registry test images: %w", err)
}
}
// Get list of worker nodes for cordoning management
nodes, err := clientset.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list nodes: %w", err)
}
scm.workerNodes = make([]string, 0)
for _, node := range nodes.Items {
if _, isServer := node.Labels["node-role.kubernetes.io/control-plane"]; !isServer {
scm.workerNodes = append(scm.workerNodes, node.Name)
}
}
// Start node monitoring to handle unhealthy k3d nodes during test execution.
// This is critical for k3d clusters where nodes can become NotReady due to resource pressure.
// The monitoring automatically detects and replaces unhealthy nodes to prevent test flakiness.
nodeMonitoringCleanup := StartNodeMonitoring(ctx, clientset, scm.logger)
// Cleanup function stops node monitoring - we don't delete the cluster when tests finish
scm.cleanup = func() {
nodeMonitoringCleanup()
scm.logger.Info("ℹ️ Test run complete - cluster preserved for inspection or reuse")
}
scm.logger.Infof("✅ Connected to cluster with %d worker nodes", len(scm.workerNodes))
scm.isSetup = true
return nil
}
// PrepareForTest prepares the cluster for a specific test by cordoning the appropriate nodes.
// It ensures exactly `requiredWorkerNodes` nodes are schedulable by cordoning excess nodes.
// Returns an error if a previous cleanup operation failed, preventing potentially corrupted test state.
func (scm *SharedClusterManager) PrepareForTest(ctx context.Context, requiredWorkerNodes int) error {
if scm.cleanupFailed {
return fmt.Errorf("cannot prepare cluster: a previous test cleanup failed - cluster may have orphaned resources. Original error: %s", scm.cleanupError)
}
if !scm.isSetup {
return fmt.Errorf("shared cluster not setup")
}
totalWorkerNodes := len(scm.workerNodes)
if requiredWorkerNodes > totalWorkerNodes {
return fmt.Errorf("required worker nodes (%d) is greater than the number of worker nodes in the cluster (%d)", requiredWorkerNodes, totalWorkerNodes)
}
if requiredWorkerNodes < totalWorkerNodes {
// Cordon nodes that are not needed for this test
nodesToCordon := scm.workerNodes[requiredWorkerNodes:]
scm.logger.Debugf("🔧 Preparing cluster: keeping %d nodes schedulable, cordoning %d nodes", requiredWorkerNodes, len(nodesToCordon))
for i, nodeName := range nodesToCordon {
scm.logger.Debugf(" Cordoning node %d/%d: %s", i+1, len(nodesToCordon), nodeName)
if err := utils.SetNodeSchedulable(ctx, scm.clientset, nodeName, false); err != nil {
scm.logger.Errorf("Failed to cordon node %s (attempt to cordon node %d/%d): %v", nodeName, i+1, len(nodesToCordon), err)
return fmt.Errorf("failed to cordon node %s: %w", nodeName, err)
}
}
scm.logger.Debugf("✅ Successfully cordoned %d nodes", len(nodesToCordon))
} else {
scm.logger.Debugf("🔧 Preparing cluster: all %d worker nodes will be schedulable", requiredWorkerNodes)
}
return nil
}
// CleanupWorkloads removes all test workloads from the cluster
func (scm *SharedClusterManager) CleanupWorkloads(ctx context.Context) error {
if !scm.isSetup {
return nil
}
scm.logger.Info("🧹 Cleaning up workloads from shared cluster...")
// Step 1: Delete PodCliqueSets first (should cascade delete other resources)
if err := scm.deleteAllResources(ctx, "grove.io", "v1alpha1", "podcliquesets"); err != nil {
scm.logger.Warnf("failed to delete PodCliqueSets: %v", err)
}
// Step 2: Poll for all resources and pods to be cleaned up
scm.logger.Infof("⏳ Waiting up to %v for cascade deletion to complete...", cleanupTimeout)
if err := scm.waitForAllGroveManagedResourcesAndPodsDeleted(ctx, cleanupTimeout, cleanupPollInterval); err != nil {
// List remaining resources and pods for debugging
scm.listRemainingGroveManagedResources(ctx)
scm.listRemainingPods(ctx, "default")
return fmt.Errorf("failed to delete all resources and pods: %w", err)
}
// Step 3: Reset node cordoning state
if err := scm.resetNodeStates(ctx); err != nil {
scm.logger.Warnf("failed to reset node states: %v", err)
}
return nil
}
// deleteAllResources deletes all resources of a specific type across all namespaces
func (scm *SharedClusterManager) deleteAllResources(ctx context.Context, group, version, resource string) error {
gvr := schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
}
// List all resources across all namespaces
resourceList, err := scm.dynamicClient.Resource(gvr).List(ctx, metav1.ListOptions{})
if err != nil {
return fmt.Errorf("failed to list %s: %w", resource, err)
}
if len(resourceList.Items) == 0 {
scm.logger.Debugf("No %s found to delete", resource)
return nil
}
scm.logger.Infof("🗑️ Deleting %d %s...", len(resourceList.Items), resource)
// Delete each resource
for _, item := range resourceList.Items {
namespace := item.GetNamespace()
name := item.GetName()
scm.logger.Debugf("Deleting %s %s/%s", resource, namespace, name)
err := scm.dynamicClient.Resource(gvr).Namespace(namespace).Delete(ctx, name, metav1.DeleteOptions{})
if err != nil {
scm.logger.Warnf("failed to delete %s %s/%s: %v", resource, namespace, name, err)
} else {
scm.logger.Debugf("✓ Delete request sent for %s %s/%s", resource, namespace, name)
}
}
return nil
}
// isSystemPod checks if a pod is a system pod that should be ignored during cleanup
func isSystemPod(pod *v1.Pod) bool {
// Skip pods managed by DaemonSets or system namespaces
if pod.Namespace == "kube-system" || pod.Namespace == OperatorNamespace {
return true
}
// Skip pods with system owner references
for _, owner := range pod.OwnerReferences {
if owner.Kind == "DaemonSet" {
return true
}
}
return false
}
// listRemainingPods lists remaining pods for debugging
func (scm *SharedClusterManager) listRemainingPods(ctx context.Context, namespace string) {
pods, err := scm.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
if err != nil {
scm.logger.Errorf("Failed to list remaining pods: %v", err)
return
}
nonSystemPods := []string{}
for _, pod := range pods.Items {
if !isSystemPod(&pod) {
nonSystemPods = append(nonSystemPods, fmt.Sprintf("%s (Phase: %s)", pod.Name, pod.Status.Phase))
}
}
if len(nonSystemPods) > 0 {
scm.logger.Errorf("Remaining non-system pods: %v", nonSystemPods)
}
}
// resetNodeStates uncordons all worker nodes to reset cluster state for the next test
func (scm *SharedClusterManager) resetNodeStates(ctx context.Context) error {
scm.logger.Debugf("🔄 Resetting node states: uncordoning %d worker nodes", len(scm.workerNodes))
for i, nodeName := range scm.workerNodes {
if err := utils.SetNodeSchedulable(ctx, scm.clientset, nodeName, true); err != nil {
scm.logger.Errorf("Failed to uncordon node %s (node %d/%d): %v", nodeName, i+1, len(scm.workerNodes), err)
return fmt.Errorf("failed to uncordon node %s: %w", nodeName, err)
}
}
scm.logger.Debugf("✅ Successfully reset all %d worker nodes to schedulable", len(scm.workerNodes))
return nil
}
// waitForAllGroveManagedResourcesAndPodsDeleted waits for all Grove resources and pods to be deleted
func (scm *SharedClusterManager) waitForAllGroveManagedResourcesAndPodsDeleted(ctx context.Context, timeout time.Duration, interval time.Duration) error {
startTime := time.Now()
lastLogTime := startTime
logInterval := 30 * time.Second // Log progress every 30 seconds
return utils.PollForCondition(ctx, timeout, interval, func() (bool, error) {
allResourcesDeleted := true
totalResources := 0
var resourceDetails []string
// Create label selector for Grove-managed resources
labelSelector := fmt.Sprintf("%s=%s", common.LabelManagedByKey, common.LabelManagedByValue)
// Check Grove resources
for _, rt := range groveManagedResourceTypes {
gvr := schema.GroupVersionResource{
Group: rt.group,
Version: rt.version,
Resource: rt.resource,
}
// PodCliqueSets are user-created top-level resources and don't have the managed-by label,
// so we need to check for them without the label selector
listOptions := metav1.ListOptions{
LabelSelector: labelSelector,
}
if rt.resource == "podcliquesets" {
listOptions = metav1.ListOptions{}
}
resourceList, err := scm.dynamicClient.Resource(gvr).List(ctx, listOptions)
if err != nil {
// If we can't list the resource type, assume it doesn't exist or is being deleted
continue
}
if len(resourceList.Items) > 0 {
allResourcesDeleted = false
totalResources += len(resourceList.Items)
for _, item := range resourceList.Items {
deletionTS := item.GetDeletionTimestamp()
if deletionTS != nil {
resourceDetails = append(resourceDetails, fmt.Sprintf("%s/%s (deleting since %v)", item.GetNamespace(), item.GetName(), time.Since(deletionTS.Time).Round(time.Second)))
} else {
resourceDetails = append(resourceDetails, fmt.Sprintf("%s/%s (NOT marked for deletion!)", item.GetNamespace(), item.GetName()))
}
}
}
}
// Check pods
allPodsDeleted := true
nonSystemPods := 0
pods, err := scm.clientset.CoreV1().Pods("default").List(ctx, metav1.ListOptions{})
if err == nil {
for _, pod := range pods.Items {
if !isSystemPod(&pod) {
allPodsDeleted = false
nonSystemPods++
}
}
}
if allResourcesDeleted && allPodsDeleted {
scm.logger.Infof("✅ All resources deleted after %v", time.Since(startTime).Round(time.Second))
return true, nil
}
// Log progress at intervals for visibility
now := time.Now()
if now.Sub(lastLogTime) >= logInterval {
lastLogTime = now
elapsed := now.Sub(startTime).Round(time.Second)
scm.logger.Infof("⏳ [%v elapsed] Waiting for %d resources and %d pods. Details: %v", elapsed, totalResources, nonSystemPods, resourceDetails)
}
return false, nil
})
}
// listRemainingGroveManagedResources lists remaining Grove resources for debugging
func (scm *SharedClusterManager) listRemainingGroveManagedResources(ctx context.Context) {
// Create label selector for Grove-managed resources
labelSelector := fmt.Sprintf("%s=%s", common.LabelManagedByKey, common.LabelManagedByValue)
for _, rt := range groveManagedResourceTypes {
gvr := schema.GroupVersionResource{
Group: rt.group,
Version: rt.version,
Resource: rt.resource,
}
// PodCliqueSets are user-created top-level resources and don't have the managed-by label
listOptions := metav1.ListOptions{
LabelSelector: labelSelector,
}
if rt.resource == "podcliquesets" {
listOptions = metav1.ListOptions{}
}
resourceList, err := scm.dynamicClient.Resource(gvr).List(ctx, listOptions)
if err != nil {
scm.logger.Errorf("Failed to list %s: %v", rt.name, err)
continue
}
if len(resourceList.Items) > 0 {
resourceNames := make([]string, 0, len(resourceList.Items))
for _, item := range resourceList.Items {
resourceNames = append(resourceNames, fmt.Sprintf("%s/%s", item.GetNamespace(), item.GetName()))
}
scm.logger.Errorf("Remaining %s: %v", rt.name, resourceNames)
}
}
}
// GetClients returns the kubernetes clients for tests to use
func (scm *SharedClusterManager) GetClients() (*kubernetes.Clientset, *rest.Config, dynamic.Interface) {
return scm.clientset, scm.restConfig, scm.dynamicClient
}
// GetRegistryPort returns the registry port for test image setup
func (scm *SharedClusterManager) GetRegistryPort() string {
return scm.registryPort
}
// GetWorkerNodes returns the list of worker node names
func (scm *SharedClusterManager) GetWorkerNodes() []string {
return scm.workerNodes
}
// IsSetup returns whether the shared cluster is setup
func (scm *SharedClusterManager) IsSetup() bool {
return scm.isSetup
}
// MarkCleanupFailed marks that a cleanup operation has failed.
// This causes all subsequent tests to fail immediately when they try to prepare the cluster.
func (scm *SharedClusterManager) MarkCleanupFailed(err error) {
scm.cleanupFailed = true
scm.cleanupError = err.Error()
}
// HasCleanupFailed returns true if a previous cleanup operation failed.
func (scm *SharedClusterManager) HasCleanupFailed() bool {
return scm.cleanupFailed
}
// GetCleanupError returns the error message from the failed cleanup, or empty string if no failure.
func (scm *SharedClusterManager) GetCleanupError() string {
return scm.cleanupError
}
// Teardown cleans up the shared cluster
func (scm *SharedClusterManager) Teardown() {
if scm.cleanup != nil {
scm.cleanup()
scm.isSetup = false
}
}
// SetupRegistryTestImages pulls images and pushes them to a local container registry.
// This is used for test images that need to be available inside the cluster.
// The registry is expected to be accessible at localhost:<registryPort>.
func SetupRegistryTestImages(registryPort string, images []string) error {
if len(images) == 0 {
return nil
}
ctx := context.Background()
// Initialize Docker client
cli, err := dockerclient.NewClientWithOpts(dockerclient.FromEnv, dockerclient.WithAPIVersionNegotiation())
if err != nil {
return fmt.Errorf("failed to create Docker client: %w", err)
}
defer ioutil.CloseQuietly(cli)
// Process each image
for _, imageName := range images {
registryImage := fmt.Sprintf("localhost:%s/%s", registryPort, imageName)
// Step 1: Pull the image
pullReader, err := cli.ImagePull(ctx, imageName, image.PullOptions{})
if err != nil {
return fmt.Errorf("failed to pull %s: %w", imageName, err)
}
// Consume the pull output to avoid blocking
_, err = io.Copy(io.Discard, pullReader)
ioutil.CloseQuietly(pullReader)
if err != nil {
return fmt.Errorf("failed to read pull output for %s: %w", imageName, err)
}
// Step 2: Tag the image for the local registry
err = cli.ImageTag(ctx, imageName, registryImage)
if err != nil {
return fmt.Errorf("failed to tag image %s as %s: %w", imageName, registryImage, err)
}
// Step 3: Push the image to the local registry
pushReader, err := cli.ImagePush(ctx, registryImage, image.PushOptions{})
if err != nil {
return fmt.Errorf("failed to push %s: %w", registryImage, err)
}
// Consume the push output to avoid blocking
_, err = io.Copy(io.Discard, pushReader)
ioutil.CloseQuietly(pushReader)
if err != nil {
return fmt.Errorf("failed to read push output for %s: %w", registryImage, err)
}
}
return nil
}