Skip to content

Commit d5ae63a

Browse files
committed
[CELEBORN-2257] Fixed remote disks not being reported on registration
1 parent 81d89f3 commit d5ae63a

File tree

8 files changed

+81
-86
lines changed

8 files changed

+81
-86
lines changed

common/src/main/java/org/apache/celeborn/common/protocol/StorageInfo.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,22 +22,34 @@
2222

2323
public class StorageInfo implements Serializable {
2424
public enum Type {
25-
MEMORY(0),
26-
HDD(1),
27-
SSD(2),
28-
HDFS(3),
29-
OSS(4),
30-
S3(5);
25+
MEMORY(0, false, MEMORY_MASK),
26+
HDD(1, false, LOCAL_DISK_MASK),
27+
SSD(2, false, LOCAL_DISK_MASK),
28+
HDFS(3, true, HDFS_MASK),
29+
OSS(4, true, OSS_MASK),
30+
S3(5, true, S3_MASK);
3131

3232
private final int value;
33+
private final boolean isDFS;
34+
private final int mask;
3335

34-
Type(int value) {
36+
Type(int value, boolean isDFS, int mask) {
3537
this.value = value;
38+
this.isDFS = isDFS;
39+
this.mask = mask;
3640
}
3741

3842
public int getValue() {
3943
return value;
4044
}
45+
46+
public boolean isDFS() {
47+
return isDFS;
48+
}
49+
50+
public int getMask() {
51+
return mask;
52+
}
4153
}
4254

4355
public static final Map<Integer, Type> typesMap = new HashMap<>();
@@ -228,6 +240,11 @@ public boolean S3Available() {
228240
return StorageInfo.S3Available(availableStorageTypes);
229241
}
230242

243+
public static boolean isAvailable(Type type, int availableStorageTypes) {
244+
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
245+
|| (availableStorageTypes & type.getMask()) > 0;
246+
}
247+
231248
@Override
232249
public boolean equals(Object o) {
233250
if (this == o) return true;

common/src/main/scala/org/apache/celeborn/common/meta/WorkerInfo.scala

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -214,25 +214,21 @@ class WorkerInfo(
214214
for (newDisk <- newDiskInfos.values().asScala) {
215215
val mountPoint: String = newDisk.mountPoint
216216
val curDisk = diskInfos.get(mountPoint)
217+
if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) {
218+
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
219+
newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
220+
}
217221
if (curDisk != null) {
218222
curDisk.actualUsableSpace = newDisk.actualUsableSpace
219223
curDisk.totalSpace = newDisk.totalSpace
220224
// Update master's diskinfo activeslots to worker's value
221225
curDisk.activeSlots = newDisk.activeSlots
222226
curDisk.avgFlushTime = newDisk.avgFlushTime
223227
curDisk.avgFetchTime = newDisk.avgFetchTime
224-
if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS
225-
&& curDisk.storageType != StorageInfo.Type.S3 && curDisk.storageType != StorageInfo.Type.OSS) {
226-
curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
227-
curDisk.availableSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get
228-
}
228+
curDisk.maxSlots = newDisk.maxSlots
229+
curDisk.availableSlots = newDisk.availableSlots
229230
curDisk.setStatus(newDisk.status)
230231
} else {
231-
if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS
232-
&& newDisk.storageType != StorageInfo.Type.S3 && newDisk.storageType != StorageInfo.Type.OSS) {
233-
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
234-
newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
235-
}
236232
diskInfos.put(mountPoint, newDisk)
237233
}
238234
}

master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java

Lines changed: 21 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.celeborn.service.deploy.master;
1919

20+
import static org.apache.celeborn.common.protocol.StorageInfo.Type.*;
21+
2022
import java.util.*;
2123
import java.util.function.IntUnaryOperator;
2224
import java.util.stream.Collectors;
@@ -40,6 +42,12 @@ static class UsableDiskInfo {
4042
DiskInfo diskInfo;
4143
long usableSlots;
4244

45+
/** @param diskInfo will be used as source for usableSlots. */
46+
UsableDiskInfo(DiskInfo diskInfo) {
47+
this.diskInfo = diskInfo;
48+
this.usableSlots = diskInfo.getAvailableSlots();
49+
}
50+
4351
UsableDiskInfo(DiskInfo diskInfo, long usableSlots) {
4452
this.diskInfo = diskInfo;
4553
this.usableSlots = usableSlots;
@@ -69,32 +77,11 @@ static class UsableDiskInfo {
6977
Map<WorkerInfo, List<UsableDiskInfo>> slotsRestrictions = new HashMap<>();
7078
for (WorkerInfo worker : workers) {
7179
List<UsableDiskInfo> usableDisks =
72-
slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
73-
for (Map.Entry<String, DiskInfo> diskInfoEntry : worker.diskInfos().entrySet()) {
74-
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
75-
if (StorageInfo.localDiskAvailable(availableStorageTypes)
76-
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.HDFS
77-
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
78-
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.OSS) {
79-
usableDisks.add(
80-
new UsableDiskInfo(
81-
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
82-
} else if (StorageInfo.HDFSAvailable(availableStorageTypes)
83-
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.HDFS) {
84-
usableDisks.add(
85-
new UsableDiskInfo(
86-
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
87-
} else if (StorageInfo.S3Available(availableStorageTypes)
88-
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.S3) {
89-
usableDisks.add(
90-
new UsableDiskInfo(
91-
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
92-
} else if (StorageInfo.OSSAvailable(availableStorageTypes)
93-
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.OSS) {
94-
usableDisks.add(
95-
new UsableDiskInfo(
96-
diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots()));
97-
}
80+
slotsRestrictions.computeIfAbsent(worker, v -> new LinkedList<>());
81+
for (DiskInfo diskInfo : worker.diskInfos().values()) {
82+
if (diskInfo.status().equals(DiskStatus.HEALTHY)
83+
&& StorageInfo.isAvailable(diskInfo.storageType(), availableStorageTypes)) {
84+
usableDisks.add(new UsableDiskInfo(diskInfo));
9885
}
9986
}
10087
}
@@ -146,7 +133,7 @@ static class UsableDiskInfo {
146133
interruptionAwareThreshold);
147134
}
148135

149-
List<DiskInfo> usableDisks = new ArrayList<>();
136+
List<DiskInfo> usableDisks = new LinkedList<>();
150137
Map<DiskInfo, WorkerInfo> diskToWorkerMap = new HashMap<>();
151138

152139
workers.forEach(
@@ -157,9 +144,7 @@ static class UsableDiskInfo {
157144
diskToWorkerMap.put(diskInfo, i);
158145
if (diskInfo.actualUsableSpace() > 0
159146
&& diskInfo.status().equals(DiskStatus.HEALTHY)
160-
&& diskInfo.storageType() != StorageInfo.Type.HDFS
161-
&& diskInfo.storageType() != StorageInfo.Type.S3
162-
&& diskInfo.storageType() != StorageInfo.Type.OSS) {
147+
&& !diskInfo.storageType().isDFS()) {
163148
usableDisks.add(diskInfo);
164149
}
165150
}));
@@ -225,12 +210,8 @@ private static StorageInfo getStorageInfo(
225210
}
226211
usableDiskInfos.get(diskIndex).usableSlots--;
227212
DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
228-
if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
229-
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes);
230-
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
231-
storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes);
232-
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.OSS) {
233-
storageInfo = new StorageInfo("", StorageInfo.Type.OSS, availableStorageTypes);
213+
if (selectedDiskInfo.storageType().isDFS()) {
214+
storageInfo = new StorageInfo("", selectedDiskInfo.storageType(), availableStorageTypes);
234215
} else {
235216
storageInfo =
236217
new StorageInfo(
@@ -243,9 +224,7 @@ private static StorageInfo getStorageInfo(
243224
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
244225
DiskInfo[] diskInfos =
245226
selectedWorker.diskInfos().values().stream()
246-
.filter(p -> p.storageType() != StorageInfo.Type.HDFS)
247-
.filter(p -> p.storageType() != StorageInfo.Type.S3)
248-
.filter(p -> p.storageType() != StorageInfo.Type.OSS)
227+
.filter(p -> !p.storageType().isDFS())
249228
.collect(Collectors.toList())
250229
.toArray(new DiskInfo[0]);
251230
int diskIndex =
@@ -257,11 +236,11 @@ private static StorageInfo getStorageInfo(
257236
availableStorageTypes);
258237
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % diskInfos.length);
259238
} else if (StorageInfo.S3Available(availableStorageTypes)) {
260-
storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes);
239+
storageInfo = new StorageInfo("", S3, availableStorageTypes);
261240
} else if (StorageInfo.OSSAvailable(availableStorageTypes)) {
262-
storageInfo = new StorageInfo("", StorageInfo.Type.OSS, availableStorageTypes);
241+
storageInfo = new StorageInfo("", OSS, availableStorageTypes);
263242
} else {
264-
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes);
243+
storageInfo = new StorageInfo("", HDFS, availableStorageTypes);
265244
}
266245
}
267246
return storageInfo;

tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/CelebornHashCheckDiskSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
7575
// shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
7676
workers.foreach { worker =>
7777
worker.storageManager.updateDiskInfos()
78-
worker.storageManager.disksSnapshot().foreach { diskInfo =>
78+
worker.storageManager.localDisksSnapshot().foreach { diskInfo =>
7979
assert(diskInfo.actualUsableSpace <= 0)
8080
}
8181
}
@@ -89,7 +89,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
8989
assert(t.size() === 0)
9090
}
9191
// after shuffle key expired, diskInfo.actualUsableSpace will equal capacity=1000
92-
worker.storageManager.disksSnapshot().foreach { diskInfo =>
92+
worker.storageManager.localDisksSnapshot().foreach { diskInfo =>
9393
assert(diskInfo.actualUsableSpace === 1000)
9494
}
9595
}

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Controller.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -187,7 +187,7 @@ private[deploy] class Controller(
187187
return
188188
}
189189

190-
if (storageManager.healthyWorkingDirs().size <= 0 && remoteStorageDirs.isEmpty) {
190+
if (storageManager.healthyLocalWorkingDirs().size <= 0 && remoteStorageDirs.isEmpty) {
191191
val msg = "Local storage has no available dirs!"
192192
logError(s"[handleReserveSlots] $msg")
193193
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, msg))

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/Worker.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -284,11 +284,10 @@ private[celeborn] class Worker(
284284
storageManager.updateDiskInfos()
285285
storageManager.startDeviceMonitor()
286286

287-
// WorkerInfo's diskInfos is a reference to storageManager.diskInfos
288-
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
289-
storageManager.disksSnapshot().foreach { diskInfo =>
290-
diskInfos.put(diskInfo.mountPoint, diskInfo)
291-
}
287+
private val diskInfos = storageManager
288+
.allDisksSnapshot()
289+
.map { diskInfo => diskInfo.mountPoint -> diskInfo }
290+
.toMap.asJava
292291

293292
val workerInfo =
294293
new WorkerInfo(
@@ -514,10 +513,10 @@ private[celeborn] class Worker(
514513
activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
515514
activeShuffleKeys.addAll(storageManager.shuffleKeySet())
516515
storageManager.updateDiskInfos()
517-
val diskInfos =
518-
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk =>
519-
disk.mountPoint -> disk
520-
}.toMap.asJava).values().asScala.toSeq ++ storageManager.remoteDiskInfos.getOrElse(Set.empty)
516+
val currentDiskMap = storageManager.localDisksSnapshot().map { disk =>
517+
disk.mountPoint -> disk
518+
}.toMap.asJava
519+
val diskInfos = workerInfo.updateThenGetDiskInfos(currentDiskMap).asScala.values.toSeq
521520
workerStatusManager.checkIfNeedTransitionStatus()
522521
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
523522
HeartbeatFromWorker(

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -102,19 +102,23 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
102102
if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
103103
}
104104

105-
def disksSnapshot(): List[DiskInfo] = {
105+
def localDisksSnapshot(): List[DiskInfo] = {
106106
diskInfos.synchronized {
107107
val disks = new util.ArrayList[DiskInfo](diskInfos.values())
108108
disks.asScala.toList
109109
}
110110
}
111111

112-
def healthyWorkingDirs(): List[File] =
113-
disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
112+
def allDisksSnapshot(): List[DiskInfo] = {
113+
localDisksSnapshot() ++ remoteDiskInfos.getOrElse(Nil)
114+
}
115+
116+
def healthyLocalWorkingDirs(): List[File] =
117+
localDisksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
114118

115119
private val diskOperators: ConcurrentHashMap[String, ThreadPoolExecutor] = {
116120
val cleaners = JavaUtils.newConcurrentHashMap[String, ThreadPoolExecutor]()
117-
disksSnapshot().foreach {
121+
localDisksSnapshot().foreach {
118122
diskInfo =>
119123
cleaners.put(
120124
diskInfo.mountPoint,
@@ -126,7 +130,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
126130
}
127131

128132
val tmpDiskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
129-
disksSnapshot().foreach { diskInfo =>
133+
localDisksSnapshot().foreach { diskInfo =>
130134
tmpDiskInfos.put(diskInfo.mountPoint, diskInfo)
131135
}
132136
private val deviceMonitor =
@@ -141,7 +145,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
141145
_totalLocalFlusherThread: Int) = {
142146
val flushers = JavaUtils.newConcurrentHashMap[String, LocalFlusher]()
143147
var totalThread = 0
144-
disksSnapshot().foreach { diskInfo =>
148+
localDisksSnapshot().foreach { diskInfo =>
145149
if (!flushers.containsKey(diskInfo.mountPoint)) {
146150
val flusher = new LocalFlusher(
147151
workerSource,
@@ -268,7 +272,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
268272
private val counter = new AtomicInteger()
269273
private val counterOperator = new IntUnaryOperator() {
270274
override def applyAsInt(operand: Int): Int = {
271-
val dirs = healthyWorkingDirs()
275+
val dirs = healthyLocalWorkingDirs()
272276
if (dirs.nonEmpty) {
273277
(operand + 1) % dirs.length
274278
} else 0
@@ -448,7 +452,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
448452
userIdentifier: UserIdentifier,
449453
partitionSplitEnabled: Boolean,
450454
isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
451-
if (healthyWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
455+
if (healthyLocalWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
452456
throw new IOException("No available working dirs!")
453457
}
454458
val partitionDataWriterContext = new PartitionDataWriterContext(
@@ -646,7 +650,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
646650
}
647651
}
648652
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
649-
disksSnapshot().filter(diskInfo =>
653+
localDisksSnapshot().filter(diskInfo =>
650654
diskInfo.status == DiskStatus.HEALTHY
651655
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE).foreach { diskInfo =>
652656
diskInfo.dirs.foreach { dir =>
@@ -710,7 +714,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
710714
TimeUnit.MINUTES)
711715

712716
private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
713-
val diskInfoAndAppDirs = disksSnapshot()
717+
val diskInfoAndAppDirs = localDisksSnapshot()
714718
.filter(diskInfo =>
715719
diskInfo.status == DiskStatus.HEALTHY
716720
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE)
@@ -760,7 +764,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
760764
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
761765
while (retryTimes < conf.workerCheckFileCleanMaxRetries) {
762766
val localCleaned =
763-
!disksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo =>
767+
!localDisksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo =>
764768
diskInfo.dirs.exists {
765769
case workingDir if workingDir.exists() =>
766770
// Don't check appDirs that store information in the fileInfos
@@ -902,7 +906,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
902906
}
903907

904908
def updateDiskInfos(): Unit = this.synchronized {
905-
disksSnapshot()
909+
localDisksSnapshot()
906910
.filter(diskInfo =>
907911
diskInfo.status != DiskStatus.IO_HANG && diskInfo.status != DiskStatus.READ_OR_WRITE_FAILURE)
908912
.foreach {
@@ -941,7 +945,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
941945
diskInfo.updateFlushTime()
942946
diskInfo.updateFetchTime()
943947
}
944-
logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
948+
logInfo(s"Updated diskInfos:\n${localDisksSnapshot().mkString("\n")}")
945949
}
946950

947951
def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
@@ -1098,7 +1102,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
10981102
logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" +
10991103
s" working dirs.")
11001104
}
1101-
healthyWorkingDirs()
1105+
healthyLocalWorkingDirs()
11021106
}
11031107
if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) {
11041108
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")

worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManagerSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
5454
diskInfo.setUsableSpace(-1L)
5555

5656
var diskSetSpace = (0L, 0L)
57-
doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot()
57+
doReturn(List(diskInfo)).when(spyStorageManager).localDisksSnapshot()
5858
doAnswer(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)
5959

6060
// disk usable 80g, total 80g, worker config 8EB

0 commit comments

Comments
 (0)