Skip to content

Commit 561b114

Browse files
Kevinz857Kevinmmt
authored andcommitted
feat: hot update Service/Ingress in Running state
Allow updating Service and Ingress resources without restarting the application when only service/ingress related fields change. When the application is in Running state and only the following fields change, the controller will update the Service/Ingress resources directly without transitioning to Invalidating state: - SparkUIOptions (ServiceAnnotations, ServiceLabels, IngressAnnotations, etc.) - DriverIngressOptions (ServiceAnnotations, ServiceLabels, IngressAnnotations, etc.) - Driver.ServiceAnnotations - Driver.ServiceLabels Key changes: - Add isServiceIngressFieldsOnlyChange() to detect service/ingress-only changes - Skip setting Invalidating state for service/ingress-only changes - Add updateServiceIngressResources() to update resources in Running state - Add comprehensive unit tests for change detection logic Signed-off-by: Kevinz857 <[email protected]>
1 parent 4af4bc0 commit 561b114

File tree

3 files changed

+404
-0
lines changed

3 files changed

+404
-0
lines changed

internal/controller/sparkapplication/controller.go

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -469,6 +469,15 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c
469469
return err
470470
}
471471

472+
// Update Service/Ingress resources if needed.
473+
// These functions use create-or-update pattern, so they will update
474+
// existing resources if configuration has changed.
475+
if err := r.updateServiceIngressResources(ctx, app); err != nil {
476+
logger.Error(err, "Failed to update Service/Ingress resources")
477+
// Don't return error to avoid blocking the reconcile loop.
478+
// Service/Ingress updates are best-effort during running state.
479+
}
480+
472481
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
473482
return err
474483
}
@@ -483,6 +492,58 @@ func (r *Reconciler) reconcileRunningSparkApplication(ctx context.Context, req c
483492
return ctrl.Result{}, nil
484493
}
485494

495+
// updateServiceIngressResources updates Service and Ingress resources for a running application.
496+
// This allows hot-updating Service/Ingress configuration without restarting the application.
497+
func (r *Reconciler) updateServiceIngressResources(ctx context.Context, app *v1beta2.SparkApplication) error {
498+
logger := log.FromContext(ctx)
499+
500+
// Update web UI service if enabled
501+
if r.options.EnableUIService {
502+
service, err := r.createWebUIService(ctx, app)
503+
if err != nil {
504+
return fmt.Errorf("failed to update web UI service: %v", err)
505+
}
506+
logger.V(1).Info("Updated web UI service", "name", service.serviceName)
507+
508+
// Update UI Ingress if ingress-format is set
509+
if r.options.IngressURLFormat != "" {
510+
ingressURL, err := getDriverIngressURL(r.options.IngressURLFormat, app)
511+
if err != nil {
512+
return fmt.Errorf("failed to get ingress url: %v", err)
513+
}
514+
ingress, err := r.createWebUIIngress(ctx, app, *service, ingressURL, r.options.IngressClassName, r.options.IngressTLS, r.options.IngressAnnotations)
515+
if err != nil {
516+
return fmt.Errorf("failed to update web UI ingress: %v", err)
517+
}
518+
logger.V(1).Info("Updated web UI ingress", "name", ingress.ingressName)
519+
}
520+
}
521+
522+
// Update driver ingress services and ingresses
523+
for _, driverIngressConfiguration := range app.Spec.DriverIngressOptions {
524+
service, err := r.createDriverIngressServiceFromConfiguration(ctx, app, &driverIngressConfiguration)
525+
if err != nil {
526+
return fmt.Errorf("failed to update driver ingress service: %v", err)
527+
}
528+
logger.V(1).Info("Updated driver ingress service", "name", service.serviceName)
529+
530+
// Update ingress if ingress-format is set
531+
if driverIngressConfiguration.IngressURLFormat != "" {
532+
ingressURL, err := getDriverIngressURL(driverIngressConfiguration.IngressURLFormat, app)
533+
if err != nil {
534+
return fmt.Errorf("failed to get driver ingress url: %v", err)
535+
}
536+
ingress, err := r.createDriverIngress(ctx, app, &driverIngressConfiguration, *service, ingressURL, r.options.IngressClassName)
537+
if err != nil {
538+
return fmt.Errorf("failed to update driver ingress: %v", err)
539+
}
540+
logger.V(1).Info("Updated driver ingress", "name", ingress.ingressName)
541+
}
542+
}
543+
544+
return nil
545+
}
546+
486547
func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
487548
logger := log.FromContext(ctx)
488549
key := req.NamespacedName

internal/controller/sparkapplication/event_filter.go

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,24 @@ func (f *EventFilter) Update(e event.UpdateEvent) bool {
184184
return true
185185
}
186186

187+
// Check if only Service/Ingress related fields changed while application is running.
188+
// These resources can be updated without restarting the application.
189+
if newApp.Status.AppState.State == v1beta2.ApplicationStateRunning &&
190+
isServiceIngressFieldsOnlyChange(oldApp, newApp) {
191+
f.logger.Info("Only Service/Ingress fields changed, updating resources without restart",
192+
"name", newApp.Name, "namespace", newApp.Namespace)
193+
f.recorder.Eventf(
194+
newApp,
195+
corev1.EventTypeNormal,
196+
"SparkApplicationServiceUpdate",
197+
"SparkApplication %s Service/Ingress configuration updated",
198+
newApp.Name,
199+
)
200+
// Return true to trigger reconcile without changing state to Invalidating.
201+
// The reconcileRunningSparkApplication will handle the Service/Ingress update.
202+
return true
203+
}
204+
187205
// Check if only webhook-patched fields changed (requires PartialRestart feature gate).
188206
// These fields are applied by the mutating webhook when new pods are created,
189207
// so we don't need to trigger a reconcile - the webhook cache will automatically
@@ -245,6 +263,76 @@ func (f *EventFilter) filter(app *v1beta2.SparkApplication) bool {
245263
return f.namespaces[metav1.NamespaceAll] || f.namespaces[app.Namespace]
246264
}
247265

266+
// isServiceIngressFieldsOnlyChange checks if the spec changes only involve fields
267+
// related to Service and Ingress configuration that can be updated without
268+
// restarting the application.
269+
//
270+
// Service/Ingress related fields:
271+
// - SparkUIOptions (ServiceAnnotations, ServiceLabels, IngressAnnotations, etc.)
272+
// - DriverIngressOptions (ServiceAnnotations, ServiceLabels, IngressAnnotations, etc.)
273+
// - Driver.ServiceAnnotations
274+
// - Driver.ServiceLabels
275+
func isServiceIngressFieldsOnlyChange(oldApp, newApp *v1beta2.SparkApplication) bool {
276+
// Check if any service/ingress field actually changed
277+
if !hasServiceIngressFieldChanges(oldApp, newApp) {
278+
return false
279+
}
280+
281+
// Create copies to compare non-service/ingress fields
282+
oldCopy := oldApp.DeepCopy()
283+
newCopy := newApp.DeepCopy()
284+
285+
// Zero out service/ingress related fields in both copies
286+
clearServiceIngressFields(oldCopy)
287+
clearServiceIngressFields(newCopy)
288+
289+
// Also zero out Suspend field as it's handled separately
290+
oldCopy.Spec.Suspend = nil
291+
newCopy.Spec.Suspend = nil
292+
293+
// If specs are equal after clearing service/ingress fields,
294+
// then only service/ingress fields changed
295+
return equality.Semantic.DeepEqual(oldCopy.Spec, newCopy.Spec)
296+
}
297+
298+
// clearServiceIngressFields zeros out the Service/Ingress related fields.
299+
func clearServiceIngressFields(app *v1beta2.SparkApplication) {
300+
// Clear SparkUIOptions
301+
app.Spec.SparkUIOptions = nil
302+
303+
// Clear DriverIngressOptions
304+
app.Spec.DriverIngressOptions = nil
305+
306+
// Clear Driver service annotations and labels
307+
app.Spec.Driver.ServiceAnnotations = nil
308+
app.Spec.Driver.ServiceLabels = nil
309+
}
310+
311+
// hasServiceIngressFieldChanges checks if any Service/Ingress related fields changed.
312+
func hasServiceIngressFieldChanges(oldApp, newApp *v1beta2.SparkApplication) bool {
313+
// Check SparkUIOptions
314+
if !equality.Semantic.DeepEqual(oldApp.Spec.SparkUIOptions, newApp.Spec.SparkUIOptions) {
315+
return true
316+
}
317+
318+
// Check DriverIngressOptions
319+
if !equality.Semantic.DeepEqual(oldApp.Spec.DriverIngressOptions, newApp.Spec.DriverIngressOptions) {
320+
return true
321+
}
322+
323+
// Check Driver service annotations
324+
if !equality.Semantic.DeepEqual(oldApp.Spec.Driver.ServiceAnnotations, newApp.Spec.Driver.ServiceAnnotations) {
325+
return true
326+
}
327+
328+
// Check Driver service labels
329+
if !equality.Semantic.DeepEqual(oldApp.Spec.Driver.ServiceLabels, newApp.Spec.Driver.ServiceLabels) {
330+
return true
331+
}
332+
333+
return false
334+
}
335+
248336
// isWebhookPatchedFieldsOnlyChange checks if the spec changes only involve fields
249337
// that are patched by the mutating webhook when pods are created.
250338
// These fields don't require a full application restart because:

0 commit comments

Comments
 (0)