feat: Add Celeborn rolling restart validation and production best practices#1026
feat: Add Celeborn rolling restart validation and production best practices#1026vara-bonthu wants to merge 5 commits intomainfrom
Conversation
…ns guide - Fixed Celeborn configuration with all 4 fixed worker ports for graceful shutdown - Added production rolling restart scripts (decommission-based and simple approaches) - Comprehensive Day 2 operations documentation for 200+ node clusters - TPC-DS 10TB benchmark configuration with detailed header documentation - Architecture diagrams and critical configuration warnings - Validated zero-downtime rolling restarts with 15+ hour benchmark test
...ks/spark-on-eks/benchmarks/celeborn-benchmarks/rolling-restart-celeborn-with-decommission.sh
Outdated
Show resolved
Hide resolved
| - Node is terminated (EC2 retirement, spot interruption) | ||
| - Node is replaced (Karpenter consolidation, AMI updates) | ||
| - Instance is stopped or hibernated | ||
| - Hardware failure occurs |
There was a problem hiding this comment.
We should mention that instance stores do not scale like EBS. This is especially important when you have a lot of jobs writing shuffle data.
|
|
||
| To prevent job failures with NVMe storage, you MUST: | ||
| 1. ✅ Enable replication: `spark.celeborn.client.push.replicate.enabled: "true"` (factor=2 minimum) | ||
| 2. ✅ Configure aggressive retries: `maxRetriesForEachReplica: 10`, `retryWait: 20s` |
There was a problem hiding this comment.
celeborn.client.fetch.maxRetriesForEachReplica, celeborn.client.registerShuffle.retryWait, and celeborn.client.reserveSlots.retryWait
| # Graceful Shutdown (Enables Zero-Downtime Rolling Updates) | ||
| # ============================================================================ | ||
| celeborn.worker.graceful.shutdown.enabled: "true" | ||
| celeborn.worker.graceful.shutdown.timeout: "600s" |
There was a problem hiding this comment.
We really have to be careful about this. 600s is probably too short for multiple jobs using the worker
| :::danger CRITICAL: Always Decommission Before Draining Nodes | ||
| When replacing nodes (EKS upgrades, AMI updates), you **MUST** decommission Celeborn workers BEFORE draining the node. | ||
|
|
||
| **Correct Order:** |
There was a problem hiding this comment.
Even with replication and following this order, it's possible you will lose data if two pods were replaced at the same time.
|
|
||
| **Option 2: Blue-Green Node Pool** | ||
|
|
||
| For maximum safety, deploy a new node pool alongside the existing one: |
There was a problem hiding this comment.
Honestly, this is the only sure and safe way of upgrading.
| ### Pre-Deployment Checklist | ||
|
|
||
| **Infrastructure:** | ||
| - [ ] EKS cluster version is supported (1.28+) |
There was a problem hiding this comment.
| - [ ] EKS cluster version is supported (1.28+) | |
| - [ ] EKS cluster version is supported |
1.28 is already out of support. I would not mention a version here
| **Infrastructure:** | ||
| - [ ] EKS cluster version is supported (1.28+) | ||
| - [ ] Karpenter installed and configured for Celeborn NodePool | ||
| - [ ] Storage provisioned (EBS gp3 or instance stores) |
There was a problem hiding this comment.
| - [ ] Storage provisioned (EBS gp3 or instance stores) | |
| - [ ] Storage strategy established (EBS gp3 or instance stores) |
| kubernetes.namespace:"spark-team-a" AND message:"FetchFailed" | ||
| ``` | ||
|
|
||
| ## Production Readiness Checklist |
There was a problem hiding this comment.
I'd remove this tbh. mostly just repeats a lot of things we have said above. more things to maintain going forward.
| # FIX (from original): celeborn.storage.availableTypes defaults to HDD which covers SSD too | ||
| # when using local disks. Only required when using HDFS/S3. Setting SSD explicitly | ||
| # is documented as valid and ensures the master slot assignment treats these as SSD class. | ||
| celeborn.storage.availableTypes: SSD |
There was a problem hiding this comment.
I'd add memory. Otherwise, we waste a lot of resources.
|
We should set |
…1028) Signed-off-by: dependabot[bot] <support@github.com> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Signed-off-by: Vara Bonthu <vara.bonthu@gmail.com>
| | 2 | **Workers** | Keep all workers in a single AZ, co-located with executors | Cross-AZ shuffle costs $0.01/GB. When an AZ fails, its executors die too, so spreading workers buys nothing. | | ||
| | 3 | **Instance Type Selection** | r8g.8xlarge for small/medium, r8g.12xlarge or r8g.16xlarge for large, r8gd.16xlarge or i4i.16xlarge for I/O-bound | Start with 6 workers and scale out before scaling up. r8g.8xlarge ran 10 TB TPC-DS at under 20% disk and under 1% CPU. More workers distribute both network and disk load better than fewer larger instances. | | ||
| | 4 | **Storage** | Use EBS for most teams. NVMe only after profiling confirms I/O bottleneck | EBS volumes follow pods across nodes, resize online, and survive node failures without replication. | | ||
| | 5 | **Replication** | `spark.celeborn.client.push.replicate.enabled: "true"` on every job | Without it, any worker restart causes job failure. Non-negotiable. | |
There was a problem hiding this comment.
Some jobs are ok to have no replication even when using Celeborn IMO. I think it comes down to trade off between replication cost, re-computation cost, dynamic allocation benefits, and more durable storage in Celeborn.
| spec: | ||
| template: | ||
| spec: | ||
| terminationGracePeriodSeconds: 3600 # 600s is enough for EBS; use 3600s for NVMe |
There was a problem hiding this comment.
We should clarify why longer duration is recommended for nvme. This value also need to be slightly more than celeborn.worker.graceful.shutdown.timeout.
| ## TL;DR | ||
|
|
||
| Celeborn solves this by externalizing shuffle operations to dedicated worker nodes that persist shuffle data independently of executor lifecycles. This enables true elastic scaling where Spark can safely add and remove executors without losing intermediate computation results, significantly improving resource utilization and cost efficiency. The service provides high availability through data replication and asynchronous processing, making dynamic allocation more reliable compared to traditional local shuffle mechanisms. | ||
| | # | Area | What to Do | Why | |
There was a problem hiding this comment.
| | # | Area | What to Do | Why | | |
| | # | Area | Recommendations | Reasons | |
| | 8 | **Local shuffle reader** | `spark.sql.adaptive.localShuffleReader.enabled: "false"` on every job | If true, Spark reads from executor local disks where Celeborn data does not exist. Jobs fail with `FileNotFoundException`. | | ||
| | 9 | **terminationGracePeriodSeconds** | Set to at least 600s for EBS workers, 3600s for NVMe | Kubernetes default is 30s. At 30s, SIGKILL fires before graceful shutdown can flush in-flight writes, which corrupts data and causes job failures. | | ||
| | 10 | **DNS registration** | `celeborn.network.bind.preferIpAddress: "false"` | Workers register with pod IPs by default. Pod IPs change on restart, so the master ends up with stale mappings and clients can't reconnect. DNS names are stable. | | ||
| | 11 | **Rolling restarts** | `kubectl delete pod` with 120s delay between workers | SIGTERM triggers graceful shutdown (requires rows 7 and 9 above). Replication covers the ~70s restart window. Zero job failures validated on TPC-DS 10 TB. | |
There was a problem hiding this comment.
I think seconds suggested here is very much job and query dependent. We also shouldn't encourage using kubectl delete pod.
| | 9 | **terminationGracePeriodSeconds** | Set to at least 600s for EBS workers, 3600s for NVMe | Kubernetes default is 30s. At 30s, SIGKILL fires before graceful shutdown can flush in-flight writes, which corrupts data and causes job failures. | | ||
| | 10 | **DNS registration** | `celeborn.network.bind.preferIpAddress: "false"` | Workers register with pod IPs by default. Pod IPs change on restart, so the master ends up with stale mappings and clients can't reconnect. DNS names are stable. | | ||
| | 11 | **Rolling restarts** | `kubectl delete pod` with 120s delay between workers | SIGTERM triggers graceful shutdown (requires rows 7 and 9 above). Replication covers the ~70s restart window. Zero job failures validated on TPC-DS 10 TB. | | ||
| | 12 | **Decommission API** | Optional. Use for 100+ worker clusters, not required for correctness | It stops new writes to a worker but does not migrate existing shuffle data. Fetch errors still happen (20-30 per worker) and are handled by Spark retries. Simple pod delete with replication achieves the same data safety outcome. | |
There was a problem hiding this comment.
I'd argue it's not optional. We should recommend people to use it because that's Celeborn docs suggest. We can mention it not mattering during our testing but I think we should not call it optional.
|
|
||
| Why decommission drained in 0 seconds: the API only waits for in-flight *writes* to finish, not for any kind of data migration. Worker-5 had 66 active slots holding 261.2 GiB of data and still drained instantly. | ||
|
|
||
| #### Recommended Approach: Simple Restart with Replication |
There was a problem hiding this comment.
I really do not recommend this. The restart script states so too. We should encourage using the api but for smaller scale workload simple restart is ok.
| **For EBS-backed workers:** | ||
| ```bash | ||
| # 1. Verify replication is enabled in all running jobs | ||
| kubectl logs <driver-pod> -n spark-operator | grep "push.replicate.enabled" |
There was a problem hiding this comment.
| kubectl logs <driver-pod> -n spark-operator | grep "push.replicate.enabled" | |
| kubectl logs <driver-pod> | grep "push.replicate.enabled" |
Do you see this in logs to begin with? Better to check CM instead?
| kubectl logs <driver-pod> -n spark-operator | grep "push.replicate.enabled" | ||
|
|
||
| # 2. Check disk usage (keep below 70%) | ||
| bash benchmarks/celeborn-benchmarks/check-celeborn-disk-usage.sh |
| done | ||
|
|
||
| # 6. Remove the old pool | ||
| kubectl delete statefulset celeborn-worker -n celeborn |
There was a problem hiding this comment.
we should scale it down instead to allow for rollback and the next blue green deployment
| spec: | ||
| disruption: | ||
| consolidationPolicy: WhenEmpty # only consolidate nodes that have no pods at all | ||
| expireAfter: 720h # force rotation every 30 days to pick up AMI updates |
There was a problem hiding this comment.
Is it a good idea for celeborn? do we not want a controlled way as stated above?
What does this PR do?
🛑 Please open an issue first to discuss any significant work and flesh out details/direction. When we triage the issues, we will add labels to the issue like "Enhancement", "Bug" which should indicate to you that this issue can be worked on and we are looking forward to your PR. We would hate for your time to be wasted.
Consult the CONTRIBUTING guide for submitting pull-requests.
Motivation
More
website/docsorwebsite/blogsection for this featurepre-commit run -awith this PR. Link for installing pre-commit locallyFor Moderators
Additional Notes