Skip to content

Commit 386e7fa

Browse files
authored
Fix multiple context canceled issues on the Workloads manager (#1530)
Signed-off-by: lujunsan <[email protected]>
1 parent 8a11e1d commit 386e7fa

File tree

1 file changed

+52
-53
lines changed

1 file changed

+52
-53
lines changed

pkg/workloads/manager.go

Lines changed: 52 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -192,7 +192,14 @@ func (d *defaultManager) StopWorkloads(ctx context.Context, names []string) (*er
192192
containers = append(containers, &container)
193193
}
194194

195-
return d.stopWorkloads(ctx, containers), nil
195+
group := &errgroup.Group{}
196+
for _, container := range containers {
197+
group.Go(func() error {
198+
return d.stopSingleWorkload(container)
199+
})
200+
}
201+
202+
return group, nil
196203
}
197204

198205
func (d *defaultManager) RunWorkload(ctx context.Context, runConfig *runner.RunConfig) error {
@@ -341,19 +348,19 @@ func (d *defaultManager) GetLogs(ctx context.Context, workloadName string, follo
341348
}
342349

343350
// deleteWorkload handles deletion of a single workload
344-
func (d *defaultManager) deleteWorkload(ctx context.Context, name string) error {
351+
func (d *defaultManager) deleteWorkload(name string) error {
345352
// Create a child context with a longer timeout
346353
childCtx, cancel := context.WithTimeout(context.Background(), AsyncOperationTimeout)
347354
defer cancel()
348355

349356
// Find and validate the container
350-
container, err := d.getWorkloadContainer(childCtx, ctx, name)
357+
container, err := d.getWorkloadContainer(childCtx, name)
351358
if err != nil {
352359
return err
353360
}
354361

355362
// Set status to removing
356-
if err := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusRemoving, ""); err != nil {
363+
if err := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusRemoving, ""); err != nil {
357364
logger.Warnf("Failed to set workload %s status to removing: %v", name, err)
358365
}
359366

@@ -367,7 +374,7 @@ func (d *defaultManager) deleteWorkload(ctx context.Context, name string) error
367374
}
368375

369376
// Remove the container
370-
if err := d.removeContainer(childCtx, ctx, name); err != nil {
377+
if err := d.removeContainer(childCtx, name); err != nil {
371378
return err
372379
}
373380

@@ -376,23 +383,23 @@ func (d *defaultManager) deleteWorkload(ctx context.Context, name string) error
376383
}
377384

378385
// Remove the workload status from the status store
379-
if err := d.statuses.DeleteWorkloadStatus(ctx, name); err != nil {
386+
if err := d.statuses.DeleteWorkloadStatus(childCtx, name); err != nil {
380387
logger.Warnf("failed to delete workload status for %s: %v", name, err)
381388
}
382389

383390
return nil
384391
}
385392

386393
// getWorkloadContainer retrieves workload container info with error handling
387-
func (d *defaultManager) getWorkloadContainer(childCtx, ctx context.Context, name string) (*rt.ContainerInfo, error) {
394+
func (d *defaultManager) getWorkloadContainer(childCtx context.Context, name string) (*rt.ContainerInfo, error) {
388395
container, err := d.runtime.GetWorkloadInfo(childCtx, name)
389396
if err != nil {
390397
if errors.Is(err, rt.ErrWorkloadNotFound) {
391398
// Log but don't fail the entire operation for not found containers
392399
logger.Warnf("Warning: Failed to get workload %s: %v", name, err)
393400
return nil, nil
394401
}
395-
if statusErr := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusError, err.Error()); statusErr != nil {
402+
if statusErr := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusError, err.Error()); statusErr != nil {
396403
logger.Warnf("Failed to set workload %s status to error: %v", name, statusErr)
397404
}
398405
return nil, fmt.Errorf("failed to find workload %s: %v", name, err)
@@ -409,10 +416,10 @@ func (*defaultManager) stopProxyIfNeeded(name, baseName string) {
409416
}
410417

411418
// removeContainer removes the container from the runtime
412-
func (d *defaultManager) removeContainer(childCtx, ctx context.Context, name string) error {
419+
func (d *defaultManager) removeContainer(childCtx context.Context, name string) error {
413420
logger.Infof("Removing container %s...", name)
414421
if err := d.runtime.RemoveWorkload(childCtx, name); err != nil {
415-
if statusErr := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusError, err.Error()); statusErr != nil {
422+
if statusErr := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusError, err.Error()); statusErr != nil {
416423
logger.Warnf("Failed to set workload %s status to error: %v", name, statusErr)
417424
}
418425
return fmt.Errorf("failed to remove container: %v", err)
@@ -448,7 +455,7 @@ func (d *defaultManager) cleanupWorkloadResources(childCtx context.Context, name
448455
logger.Infof("Container %s removed", name)
449456
}
450457

451-
func (d *defaultManager) DeleteWorkloads(ctx context.Context, names []string) (*errgroup.Group, error) {
458+
func (d *defaultManager) DeleteWorkloads(_ context.Context, names []string) (*errgroup.Group, error) {
452459
// Validate all workload names to prevent path traversal attacks
453460
for _, name := range names {
454461
if err := types.ValidateWorkloadName(name); err != nil {
@@ -460,15 +467,15 @@ func (d *defaultManager) DeleteWorkloads(ctx context.Context, names []string) (*
460467

461468
for _, name := range names {
462469
group.Go(func() error {
463-
return d.deleteWorkload(ctx, name)
470+
return d.deleteWorkload(name)
464471
})
465472
}
466473

467474
return group, nil
468475
}
469476

470477
// RestartWorkloads restarts the specified workloads by name.
471-
func (d *defaultManager) RestartWorkloads(ctx context.Context, names []string, foreground bool) (*errgroup.Group, error) {
478+
func (d *defaultManager) RestartWorkloads(_ context.Context, names []string, foreground bool) (*errgroup.Group, error) {
472479
// Validate all workload names to prevent path traversal attacks
473480
for _, name := range names {
474481
if err := types.ValidateWorkloadName(name); err != nil {
@@ -480,15 +487,15 @@ func (d *defaultManager) RestartWorkloads(ctx context.Context, names []string, f
480487

481488
for _, name := range names {
482489
group.Go(func() error {
483-
return d.restartSingleWorkload(ctx, name, foreground)
490+
return d.restartSingleWorkload(name, foreground)
484491
})
485492
}
486493

487494
return group, nil
488495
}
489496

490497
// restartSingleWorkload handles the restart logic for a single workload
491-
func (d *defaultManager) restartSingleWorkload(ctx context.Context, name string, foreground bool) error {
498+
func (d *defaultManager) restartSingleWorkload(name string, foreground bool) error {
492499
// Create a child context with a longer timeout
493500
childCtx, cancel := context.WithTimeout(context.Background(), AsyncOperationTimeout)
494501
defer cancel()
@@ -511,18 +518,18 @@ func (d *defaultManager) restartSingleWorkload(ctx context.Context, name string,
511518
}
512519

513520
// Set workload status to starting
514-
if err := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusStarting, ""); err != nil {
521+
if err := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusStarting, ""); err != nil {
515522
logger.Warnf("Failed to set workload %s status to starting: %v", name, err)
516523
}
517524
logger.Infof("Loaded configuration from state for %s", workloadState.BaseName)
518525

519526
// Stop container if running but proxy is not
520-
if err := d.stopContainerIfNeeded(childCtx, ctx, name, workloadState); err != nil {
527+
if err := d.stopContainerIfNeeded(childCtx, name, workloadState); err != nil {
521528
return err
522529
}
523530

524531
// Start the workload
525-
return d.startWorkload(ctx, name, mcpRunner, foreground)
532+
return d.startWorkload(childCtx, name, mcpRunner, foreground)
526533
}
527534

528535
// workloadState holds the current state of a workload for restart operations
@@ -570,14 +577,14 @@ func (*defaultManager) isWorkloadAlreadyRunning(name string, workloadSt *workloa
570577
}
571578

572579
// stopContainerIfNeeded stops the container if it's running but proxy is not
573-
func (d *defaultManager) stopContainerIfNeeded(childCtx, ctx context.Context, name string, workloadSt *workloadState) error {
580+
func (d *defaultManager) stopContainerIfNeeded(childCtx context.Context, name string, workloadSt *workloadState) error {
574581
if !workloadSt.Running {
575582
return nil
576583
}
577584

578585
logger.Infof("Container %s is running but proxy is not. Stopping container...", name)
579586
if err := d.runtime.StopWorkload(childCtx, name); err != nil {
580-
if statusErr := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusError, ""); statusErr != nil {
587+
if statusErr := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusError, ""); statusErr != nil {
581588
logger.Warnf("Failed to set workload %s status to error: %v", name, statusErr)
582589
}
583590
return fmt.Errorf("failed to stop container %s: %v", name, err)
@@ -679,43 +686,35 @@ func (*defaultManager) cleanupTempPermissionProfile(ctx context.Context, baseNam
679686
return nil
680687
}
681688

682-
// stopWorkloads stops the named workloads concurrently.
683-
// It assumes that the workloads exist in the running state.
684-
func (d *defaultManager) stopWorkloads(ctx context.Context, workloads []*rt.ContainerInfo) *errgroup.Group {
685-
group := errgroup.Group{}
686-
for _, workload := range workloads {
687-
group.Go(func() error {
688-
childCtx, cancel := context.WithTimeout(context.Background(), AsyncOperationTimeout)
689-
defer cancel()
690-
691-
name := labels.GetContainerBaseName(workload.Labels)
692-
// Stop the proxy process
693-
proxy.StopProcess(name)
694-
695-
logger.Infof("Stopping containers for %s...", name)
696-
// Stop the container
697-
if err := d.runtime.StopWorkload(childCtx, workload.Name); err != nil {
698-
if statusErr := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusError, err.Error()); statusErr != nil {
699-
logger.Warnf("Failed to set workload %s status to error: %v", name, statusErr)
700-
}
701-
return fmt.Errorf("failed to stop container: %w", err)
702-
}
689+
// stopSingleWorkload stops a single workload
690+
func (d *defaultManager) stopSingleWorkload(workload *rt.ContainerInfo) error {
691+
childCtx, cancel := context.WithTimeout(context.Background(), AsyncOperationTimeout)
692+
defer cancel()
703693

704-
if err := removeClientConfigurations(name); err != nil {
705-
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
706-
} else {
707-
logger.Infof("Client configurations for %s removed", name)
708-
}
694+
name := labels.GetContainerBaseName(workload.Labels)
695+
// Stop the proxy process
696+
proxy.StopProcess(name)
709697

710-
if err := d.statuses.SetWorkloadStatus(ctx, name, rt.WorkloadStatusStopped, ""); err != nil {
711-
logger.Warnf("Failed to set workload %s status to stopped: %v", name, err)
712-
}
713-
logger.Infof("Successfully stopped %s...", name)
714-
return nil
715-
})
698+
logger.Infof("Stopping containers for %s...", name)
699+
// Stop the container
700+
if err := d.runtime.StopWorkload(childCtx, workload.Name); err != nil {
701+
if statusErr := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusError, err.Error()); statusErr != nil {
702+
logger.Warnf("Failed to set workload %s status to error: %v", name, statusErr)
703+
}
704+
return fmt.Errorf("failed to stop container: %w", err)
705+
}
706+
707+
if err := removeClientConfigurations(name); err != nil {
708+
logger.Warnf("Warning: Failed to remove client configurations: %v", err)
709+
} else {
710+
logger.Infof("Client configurations for %s removed", name)
716711
}
717712

718-
return &group
713+
if err := d.statuses.SetWorkloadStatus(childCtx, name, rt.WorkloadStatusStopped, ""); err != nil {
714+
logger.Warnf("Failed to set workload %s status to stopped: %v", name, err)
715+
}
716+
logger.Infof("Successfully stopped %s...", name)
717+
return nil
719718
}
720719

721720
// MoveToGroup moves the specified workloads from one group to another by updating their runconfig.

0 commit comments

Comments
 (0)