[CELEBORN-2257] Fix remote disks not being reported on registration#3597
[CELEBORN-2257] Fix remote disks not being reported on registration#3597Dzeri96 wants to merge 2 commits intoapache:mainfrom
Conversation
There was a problem hiding this comment.
Pull request overview
Fixes worker registration disk reporting so the master can see remote storage (HDFS/S3/OSS) immediately (before the first heartbeat), and refactors disk snapshot APIs / slot-allocation logic to distinguish local vs remote disks more clearly.
Changes:
- Renamed disk snapshot / healthy-dir helpers to explicitly mean “local” and added an “all disks” snapshot.
- Updated worker registration/heartbeat disk reporting to incorporate remote disks.
- Simplified master slot-allocation filtering by embedding disk-type metadata into
StorageInfo.Typeand using it in allocation logic.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala | Updates mocks to the renamed localDisksSnapshot() API. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala | Introduces localDisksSnapshot() / allDisksSnapshot() and renames “healthy working dirs” to local-only. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala | Switches registration to report all disks; refactors heartbeat disk update flow. |
| worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala | Uses local-only healthy working dirs check for slot reservation. |
| tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala | Updates test to use localDisksSnapshot(). |
| master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java | Simplifies disk filtering using StorageInfo.Type metadata; refactors usable-slot bookkeeping. |
| common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala | Refactors slot recomputation / propagation logic and uses isDFS. |
| common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java | Adds isDFS + mask metadata into StorageInfo.Type and introduces isAvailable(...). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
Show resolved
Hide resolved
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
Outdated
Show resolved
Hide resolved
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
Show resolved
Hide resolved
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java
Outdated
Show resolved
Hide resolved
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## main #3597 +/- ##
==========================================
- Coverage 67.13% 67.07% -0.06%
==========================================
Files 357 357
Lines 21860 21935 +75
Branches 1943 1947 +4
==========================================
+ Hits 14674 14711 +37
- Misses 6166 6213 +47
+ Partials 1020 1011 -9 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala
Show resolved
Hide resolved
| private val diskInfos = storageManager | ||
| .allDisksSnapshot() | ||
| .map { diskInfo => diskInfo.mountPoint -> diskInfo } | ||
| .toMap.asJava |
There was a problem hiding this comment.
This PR changes worker registration/heartbeat disk reporting to include remote disks (allDisksSnapshot) and introduces new slot-availability semantics (StorageInfo.isAvailable, Type.isDFS). There doesn’t appear to be a test asserting that remote disk infos are (a) included in the initial registration payload and (b) preserved across subsequent heartbeats so the master can allocate slots from them before/without the first heartbeat. Adding a focused unit/integration test around worker->master disk info propagation would help prevent regressions here.
There was a problem hiding this comment.
This is what I wrote in the original PR. I need someone from the existing community to guide me on writing an integration test.
SteNicholas
left a comment
There was a problem hiding this comment.
@Dzeri96, thanks for contribution. Could you explain which fix mainly provided in this pull request?
| Type(int value) { | ||
| Type(int value, boolean isDFS, int mask) { | ||
| this.value = value; | ||
| this.isDFS = isDFS; |
There was a problem hiding this comment.
IMO, it's unnecessary to add isDFS variable. The isDFS method is enough for usage.
There was a problem hiding this comment.
So my initial idea for implementing this was a HashMap. I had built a map that was being filled in the static block, like the other maps, but then I realized you had to make sure each ENUM member was in this map, so I wrote a test to enforce it.
In the end I found this static solution much more elegant. The compiler forces you to assign each enum member a isDFS value. It's also less code. Let me know if you want me to change it though.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 8 out of 8 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) { | ||
| newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get | ||
| newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get | ||
| } |
There was a problem hiding this comment.
This change introduces new slot-update semantics for DFS vs local disks inside updateThenGetDiskInfos, but there are no unit tests covering that remote/DFS disk slot fields are preserved across successive updates (e.g., registration updateDiskSlots(...) followed by heartbeat updateThenGetDiskInfos(...)). Adding a focused test in WorkerInfoSuite for a DFS DiskInfo would help prevent regressions like remote disks becoming unavailable after the first heartbeat.
|
@SteNicholas So my changes are explained in the PR description pretty well I think. While @eolivelli was running his tests, he noticed that the current faulty behaviour presents a problem when the auto-scaling spawns new nodes. In this moment, the system is under pressure, and yet the newly-spawned nodes don't report remote disks, leading to performance degredation and the need to spawn more nodes. In hindsight though, I should have limited the PR to just this. It's just that while I was trying to understand the code, I made the other changes to make it more readable for myself. In the end I decided to include them too since we will be working on this part of the project a lot in the future. Also, don't forget to help me with writing a test! |
What changes were proposed in this pull request?
Why are the changes needed?
Does this PR resolve a correctness bug?
Yes
Does this PR introduce any user-facing change?
No
How was this patch tested?
Important: I want help from the community on how to write tests for this.