Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,15 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c
return err
}

// Update Service/Ingress resources if needed.
// These functions use create-or-update pattern, so they will update
// existing resources if configuration has changed.
if err := r.updateServiceIngressResources(ctx, app); err != nil {
logger.Error(err, "Failed to update Service/Ingress resources")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we put this in events only or events and controller log? Makes debugging issues related to services easier.

// Don't return error to avoid blocking the reconcile loop.
// Service/Ingress updates are best-effort during running state.
}

if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
Expand All @@ -483,6 +492,78 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c
return ctrl.Result{}, nil
}

// updateServiceIngressResources updates Service and Ingress resources for a running application.
// This allows hot-updating Service/Ingress configuration without restarting the application.
func (r *Reconciler) updateServiceIngressResources(ctx context.Context, app *v1beta2.SparkApplication) error {
logger := log.FromContext(ctx)

// Update web UI service if enabled
if r.options.EnableUIService {
service, err := r.createWebUIService(ctx, app)
if err != nil {
r.recorder.Eventf(app, corev1.EventTypeWarning, common.EventSparkUIServiceUpdateFailed,
"Failed to update Spark UI service: %v", err)
return fmt.Errorf("failed to update web UI service: %v", err)
}
logger.V(1).Info("Updated web UI service", "name", service.serviceName)
r.recorder.Eventf(app, corev1.EventTypeNormal, common.EventSparkUIServiceUpdated,
"Spark UI service %s updated", service.serviceName)

// Update UI Ingress if ingress-format is set
if r.options.IngressURLFormat != "" {
ingressURL, err := getDriverIngressURL(r.options.IngressURLFormat, app)
if err != nil {
r.recorder.Eventf(app, corev1.EventTypeWarning, common.EventSparkUIIngressUpdateFailed,
"Failed to get ingress URL: %v", err)
return fmt.Errorf("failed to get ingress url: %v", err)
}
ingress, err := r.createWebUIIngress(ctx, app, *service, ingressURL, r.options.IngressClassName, r.options.IngressTLS, r.options.IngressAnnotations)
if err != nil {
r.recorder.Eventf(app, corev1.EventTypeWarning, common.EventSparkUIIngressUpdateFailed,
"Failed to update Spark UI ingress: %v", err)
return fmt.Errorf("failed to update web UI ingress: %v", err)
}
logger.V(1).Info("Updated web UI ingress", "name", ingress.ingressName)
r.recorder.Eventf(app, corev1.EventTypeNormal, common.EventSparkUIIngressUpdated,
"Spark UI ingress %s updated", ingress.ingressName)
}
}

// Update driver ingress services and ingresses
for _, driverIngressConfiguration := range app.Spec.DriverIngressOptions {
service, err := r.createDriverIngressServiceFromConfiguration(ctx, app, &driverIngressConfiguration)
if err != nil {
r.recorder.Eventf(app, corev1.EventTypeWarning, common.EventSparkDriverIngressServiceUpdateFailed,
"Failed to update driver ingress service: %v", err)
return fmt.Errorf("failed to update driver ingress service: %v", err)
}
logger.V(1).Info("Updated driver ingress service", "name", service.serviceName)
r.recorder.Eventf(app, corev1.EventTypeNormal, common.EventSparkDriverIngressServiceUpdated,
"Driver ingress service %s updated", service.serviceName)

// Update ingress if ingress-format is set
if driverIngressConfiguration.IngressURLFormat != "" {
ingressURL, err := getDriverIngressURL(driverIngressConfiguration.IngressURLFormat, app)
if err != nil {
r.recorder.Eventf(app, corev1.EventTypeWarning, common.EventSparkDriverIngressUpdateFailed,
"Failed to get driver ingress URL: %v", err)
return fmt.Errorf("failed to get driver ingress url: %v", err)
}
ingress, err := r.createDriverIngress(ctx, app, &driverIngressConfiguration, *service, ingressURL, r.options.IngressClassName)
if err != nil {
r.recorder.Eventf(app, corev1.EventTypeWarning, common.EventSparkDriverIngressUpdateFailed,
"Failed to update driver ingress: %v", err)
return fmt.Errorf("failed to update driver ingress: %v", err)
}
logger.V(1).Info("Updated driver ingress", "name", ingress.ingressName)
r.recorder.Eventf(app, corev1.EventTypeNormal, common.EventSparkDriverIngressUpdated,
"Driver ingress %s updated", ingress.ingressName)
}
}

return nil
}

func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
logger := log.FromContext(ctx)
key := req.NamespacedName
Expand Down
76 changes: 76 additions & 0 deletions internal/controller/sparkapplication/event_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,17 @@ func (f *EventFilter) Update(e event.UpdateEvent) bool {
return true
}

// Check if only Service/Ingress related fields changed while application is running.
// These resources can be updated without restarting the application.
if newApp.Status.AppState.State == v1beta2.ApplicationStateRunning &&
isServiceIngressFieldsOnlyChange(oldApp, newApp) {
f.logger.V(1).Info("Only Service/Ingress fields changed, updating resources without restart",
"name", newApp.Name, "namespace", newApp.Namespace)
// Return true to trigger reconcile without changing state to Invalidating.
// The reconcileRunningSparkApplication will handle the Service/Ingress update.
return true
}

// Check if only webhook-patched fields changed (requires PartialRestart feature gate).
// These fields are applied by the mutating webhook when new pods are created,
// so we don't need to trigger a reconcile - the webhook cache will automatically
Expand Down Expand Up @@ -245,6 +256,71 @@ func (f *EventFilter) filter(app *v1beta2.SparkApplication) bool {
return f.namespaces[metav1.NamespaceAll] || f.namespaces[app.Namespace]
}

// isServiceIngressFieldsOnlyChange checks if the spec changes only involve fields
// related to Service and Ingress configuration that can be updated without
// restarting the application.
//
// Service/Ingress related fields:
// - SparkUIOptions (ServiceAnnotations, ServiceLabels, IngressAnnotations, etc.)
// - DriverIngressOptions (ServiceAnnotations, ServiceLabels, IngressAnnotations, etc.)
// - Driver.ServiceAnnotations
// - Driver.ServiceLabels
func isServiceIngressFieldsOnlyChange(oldApp, newApp *v1beta2.SparkApplication) bool {
// Check if any service/ingress field actually changed
if !hasServiceIngressFieldChanges(oldApp, newApp) {
return false
}

// DeepCopy only the Spec (not the entire SparkApplication) for better performance
oldSpec := oldApp.Spec.DeepCopy()
newSpec := newApp.Spec.DeepCopy()

// Zero out service/ingress related fields in both copies
clearServiceIngressFieldsFromSpec(oldSpec)
clearServiceIngressFieldsFromSpec(newSpec)

// Also zero out Suspend field as it's handled separately
oldSpec.Suspend = nil
newSpec.Suspend = nil

// If specs are equal after clearing service/ingress fields,
// then only service/ingress fields changed
return equality.Semantic.DeepEqual(oldSpec, newSpec)
}

// clearServiceIngressFieldsFromSpec zeros out the Service/Ingress related fields from a Spec.
func clearServiceIngressFieldsFromSpec(spec *v1beta2.SparkApplicationSpec) {
spec.SparkUIOptions = nil
spec.DriverIngressOptions = nil
spec.Driver.ServiceAnnotations = nil
spec.Driver.ServiceLabels = nil
}

// hasServiceIngressFieldChanges checks if any Service/Ingress related fields changed.
func hasServiceIngressFieldChanges(oldApp, newApp *v1beta2.SparkApplication) bool {
// Check SparkUIOptions
if !equality.Semantic.DeepEqual(oldApp.Spec.SparkUIOptions, newApp.Spec.SparkUIOptions) {
return true
}

// Check DriverIngressOptions
if !equality.Semantic.DeepEqual(oldApp.Spec.DriverIngressOptions, newApp.Spec.DriverIngressOptions) {
return true
}

// Check Driver service annotations
if !equality.Semantic.DeepEqual(oldApp.Spec.Driver.ServiceAnnotations, newApp.Spec.Driver.ServiceAnnotations) {
return true
}

// Check Driver service labels
if !equality.Semantic.DeepEqual(oldApp.Spec.Driver.ServiceLabels, newApp.Spec.Driver.ServiceLabels) {
return true
}

return false
}

// isWebhookPatchedFieldsOnlyChange checks if the spec changes only involve fields
// that are patched by the mutating webhook when pods are created.
// These fields don't require a full application restart because:
Expand Down
Loading
Loading