Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,34 @@

public class StorageInfo implements Serializable {
public enum Type {
MEMORY(0),
HDD(1),
SSD(2),
HDFS(3),
OSS(4),
S3(5);
MEMORY(0, false, MEMORY_MASK),
HDD(1, false, LOCAL_DISK_MASK),
SSD(2, false, LOCAL_DISK_MASK),
HDFS(3, true, HDFS_MASK),
OSS(4, true, OSS_MASK),
S3(5, true, S3_MASK);

private final int value;
private final boolean isDFS;
private final int mask;

Type(int value) {
Type(int value, boolean isDFS, int mask) {
this.value = value;
this.isDFS = isDFS;
this.mask = mask;
}

public int getValue() {
return value;
}

public boolean isDFS() {
return isDFS;
}

public int getMask() {
return mask;
}
}

public static final Map<Integer, Type> typesMap = new HashMap<>();
Expand Down Expand Up @@ -228,6 +240,11 @@ public boolean S3Available() {
return StorageInfo.S3Available(availableStorageTypes);
}

public static boolean isAvailable(Type type, int availableStorageTypes) {
return availableStorageTypes == ALL_TYPES_AVAILABLE_MASK
|| (availableStorageTypes & type.getMask()) > 0;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,25 +214,21 @@ class WorkerInfo(
for (newDisk <- newDiskInfos.values().asScala) {
val mountPoint: String = newDisk.mountPoint
val curDisk = diskInfos.get(mountPoint)
if (estimatedPartitionSize.nonEmpty && !newDisk.storageType.isDFS) {
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
}
if (curDisk != null) {
curDisk.actualUsableSpace = newDisk.actualUsableSpace
curDisk.totalSpace = newDisk.totalSpace
// Update master's diskinfo activeslots to worker's value
curDisk.activeSlots = newDisk.activeSlots
curDisk.avgFlushTime = newDisk.avgFlushTime
curDisk.avgFetchTime = newDisk.avgFetchTime
if (estimatedPartitionSize.nonEmpty && curDisk.storageType != StorageInfo.Type.HDFS
&& curDisk.storageType != StorageInfo.Type.S3 && curDisk.storageType != StorageInfo.Type.OSS) {
curDisk.maxSlots = curDisk.totalSpace / estimatedPartitionSize.get
curDisk.availableSlots = curDisk.actualUsableSpace / estimatedPartitionSize.get
}
curDisk.maxSlots = newDisk.maxSlots
curDisk.availableSlots = newDisk.availableSlots
curDisk.setStatus(newDisk.status)
} else {
if (estimatedPartitionSize.nonEmpty && newDisk.storageType != StorageInfo.Type.HDFS
&& newDisk.storageType != StorageInfo.Type.S3 && newDisk.storageType != StorageInfo.Type.OSS) {
newDisk.maxSlots = newDisk.totalSpace / estimatedPartitionSize.get
newDisk.availableSlots = newDisk.actualUsableSpace / estimatedPartitionSize.get
}
diskInfos.put(mountPoint, newDisk)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.celeborn.service.deploy.master;

import static org.apache.celeborn.common.protocol.StorageInfo.Type.*;

import java.util.*;
import java.util.function.IntUnaryOperator;
import java.util.stream.Collectors;
Expand All @@ -40,6 +42,12 @@ static class UsableDiskInfo {
DiskInfo diskInfo;
long usableSlots;

/** @param diskInfo will be used as source for usableSlots. */
UsableDiskInfo(DiskInfo diskInfo) {
this.diskInfo = diskInfo;
this.usableSlots = diskInfo.getAvailableSlots();
}

UsableDiskInfo(DiskInfo diskInfo, long usableSlots) {
this.diskInfo = diskInfo;
this.usableSlots = usableSlots;
Expand Down Expand Up @@ -70,31 +78,10 @@ static class UsableDiskInfo {
for (WorkerInfo worker : workers) {
List<UsableDiskInfo> usableDisks =
slotsRestrictions.computeIfAbsent(worker, v -> new ArrayList<>());
for (Map.Entry<String, DiskInfo> diskInfoEntry : worker.diskInfos().entrySet()) {
if (diskInfoEntry.getValue().status().equals(DiskStatus.HEALTHY)) {
if (StorageInfo.localDiskAvailable(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.HDFS
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.S3
&& diskInfoEntry.getValue().storageType() != StorageInfo.Type.OSS) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
} else if (StorageInfo.HDFSAvailable(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.HDFS) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
} else if (StorageInfo.S3Available(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.S3) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().getAvailableSlots()));
} else if (StorageInfo.OSSAvailable(availableStorageTypes)
&& diskInfoEntry.getValue().storageType() == StorageInfo.Type.OSS) {
usableDisks.add(
new UsableDiskInfo(
diskInfoEntry.getValue(), diskInfoEntry.getValue().availableSlots()));
}
for (DiskInfo diskInfo : worker.diskInfos().values()) {
if (diskInfo.status().equals(DiskStatus.HEALTHY)
&& StorageInfo.isAvailable(diskInfo.storageType(), availableStorageTypes)) {
usableDisks.add(new UsableDiskInfo(diskInfo));
}
}
}
Expand Down Expand Up @@ -157,9 +144,7 @@ static class UsableDiskInfo {
diskToWorkerMap.put(diskInfo, i);
if (diskInfo.actualUsableSpace() > 0
&& diskInfo.status().equals(DiskStatus.HEALTHY)
&& diskInfo.storageType() != StorageInfo.Type.HDFS
&& diskInfo.storageType() != StorageInfo.Type.S3
&& diskInfo.storageType() != StorageInfo.Type.OSS) {
&& !diskInfo.storageType().isDFS()) {
usableDisks.add(diskInfo);
}
}));
Expand Down Expand Up @@ -225,12 +210,8 @@ private static StorageInfo getStorageInfo(
}
usableDiskInfos.get(diskIndex).usableSlots--;
DiskInfo selectedDiskInfo = usableDiskInfos.get(diskIndex).diskInfo;
if (selectedDiskInfo.storageType() == StorageInfo.Type.HDFS) {
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes);
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.S3) {
storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes);
} else if (selectedDiskInfo.storageType() == StorageInfo.Type.OSS) {
storageInfo = new StorageInfo("", StorageInfo.Type.OSS, availableStorageTypes);
if (selectedDiskInfo.storageType().isDFS()) {
storageInfo = new StorageInfo("", selectedDiskInfo.storageType(), availableStorageTypes);
} else {
storageInfo =
new StorageInfo(
Expand All @@ -243,9 +224,7 @@ private static StorageInfo getStorageInfo(
if (StorageInfo.localDiskAvailable(availableStorageTypes)) {
DiskInfo[] diskInfos =
selectedWorker.diskInfos().values().stream()
.filter(p -> p.storageType() != StorageInfo.Type.HDFS)
.filter(p -> p.storageType() != StorageInfo.Type.S3)
.filter(p -> p.storageType() != StorageInfo.Type.OSS)
.filter(p -> !p.storageType().isDFS())
.collect(Collectors.toList())
.toArray(new DiskInfo[0]);
int diskIndex =
Expand All @@ -257,11 +236,11 @@ private static StorageInfo getStorageInfo(
availableStorageTypes);
workerDiskIndex.put(selectedWorker, (diskIndex + 1) % diskInfos.length);
} else if (StorageInfo.S3Available(availableStorageTypes)) {
storageInfo = new StorageInfo("", StorageInfo.Type.S3, availableStorageTypes);
storageInfo = new StorageInfo("", S3, availableStorageTypes);
} else if (StorageInfo.OSSAvailable(availableStorageTypes)) {
storageInfo = new StorageInfo("", StorageInfo.Type.OSS, availableStorageTypes);
storageInfo = new StorageInfo("", OSS, availableStorageTypes);
} else {
storageInfo = new StorageInfo("", StorageInfo.Type.HDFS, availableStorageTypes);
storageInfo = new StorageInfo("", HDFS, availableStorageTypes);
}
}
return storageInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
// shuffle key not expired, diskInfo.actualUsableSpace <= 0, no space
workers.foreach { worker =>
worker.storageManager.updateDiskInfos()
worker.storageManager.disksSnapshot().foreach { diskInfo =>
worker.storageManager.localDisksSnapshot().foreach { diskInfo =>
assert(diskInfo.actualUsableSpace <= 0)
}
}
Expand All @@ -89,7 +89,7 @@ class CelebornHashCheckDiskSuite extends SparkTestBase {
assert(t.size() === 0)
}
// after shuffle key expired, diskInfo.actualUsableSpace will equal capacity=1000
worker.storageManager.disksSnapshot().foreach { diskInfo =>
worker.storageManager.localDisksSnapshot().foreach { diskInfo =>
assert(diskInfo.actualUsableSpace === 1000)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ private[deploy] class Controller(
return
}

if (storageManager.healthyWorkingDirs().size <= 0 && remoteStorageDirs.isEmpty) {
if (storageManager.healthyLocalWorkingDirs().size <= 0 && remoteStorageDirs.isEmpty) {
val msg = "Local storage has no available dirs!"
logError(s"[handleReserveSlots] $msg")
context.reply(ReserveSlotsResponse(StatusCode.NO_AVAILABLE_WORKING_DIR, msg))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,11 +284,10 @@ private[celeborn] class Worker(
storageManager.updateDiskInfos()
storageManager.startDeviceMonitor()

// WorkerInfo's diskInfos is a reference to storageManager.diskInfos
val diskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
storageManager.disksSnapshot().foreach { diskInfo =>
diskInfos.put(diskInfo.mountPoint, diskInfo)
}
private val diskInfos = storageManager
.allDisksSnapshot()
.map { diskInfo => diskInfo.mountPoint -> diskInfo }
.toMap.asJava

val workerInfo =
new WorkerInfo(
Expand Down Expand Up @@ -514,10 +513,10 @@ private[celeborn] class Worker(
activeShuffleKeys.addAll(partitionLocationInfo.shuffleKeySet)
activeShuffleKeys.addAll(storageManager.shuffleKeySet())
storageManager.updateDiskInfos()
val diskInfos =
workerInfo.updateThenGetDiskInfos(storageManager.disksSnapshot().map { disk =>
disk.mountPoint -> disk
}.toMap.asJava).values().asScala.toSeq ++ storageManager.remoteDiskInfos.getOrElse(Set.empty)
val currentDiskMap = storageManager.allDisksSnapshot().map { disk =>
disk.mountPoint -> disk
}.toMap.asJava
val diskInfos = workerInfo.updateThenGetDiskInfos(currentDiskMap).asScala.values.toSeq
workerStatusManager.checkIfNeedTransitionStatus()
val response = masterClient.askSync[HeartbeatFromWorkerResponse](
HeartbeatFromWorker(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,19 +102,23 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
if (diskInfoSet.nonEmpty) Some(diskInfoSet) else None
}

def disksSnapshot(): List[DiskInfo] = {
def localDisksSnapshot(): List[DiskInfo] = {
diskInfos.synchronized {
val disks = new util.ArrayList[DiskInfo](diskInfos.values())
disks.asScala.toList
}
}

def healthyWorkingDirs(): List[File] =
disksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)
def allDisksSnapshot(): List[DiskInfo] = {
localDisksSnapshot() ++ remoteDiskInfos.getOrElse(Nil)
}

def healthyLocalWorkingDirs(): List[File] =
localDisksSnapshot().filter(_.status == DiskStatus.HEALTHY).flatMap(_.dirs)

private val diskOperators: ConcurrentHashMap[String, ThreadPoolExecutor] = {
val cleaners = JavaUtils.newConcurrentHashMap[String, ThreadPoolExecutor]()
disksSnapshot().foreach {
localDisksSnapshot().foreach {
diskInfo =>
cleaners.put(
diskInfo.mountPoint,
Expand All @@ -126,7 +130,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}

val tmpDiskInfos = JavaUtils.newConcurrentHashMap[String, DiskInfo]()
disksSnapshot().foreach { diskInfo =>
localDisksSnapshot().foreach { diskInfo =>
tmpDiskInfos.put(diskInfo.mountPoint, diskInfo)
}
private val deviceMonitor =
Expand All @@ -141,7 +145,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
_totalLocalFlusherThread: Int) = {
val flushers = JavaUtils.newConcurrentHashMap[String, LocalFlusher]()
var totalThread = 0
disksSnapshot().foreach { diskInfo =>
localDisksSnapshot().foreach { diskInfo =>
if (!flushers.containsKey(diskInfo.mountPoint)) {
val flusher = new LocalFlusher(
workerSource,
Expand Down Expand Up @@ -268,7 +272,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
private val counter = new AtomicInteger()
private val counterOperator = new IntUnaryOperator() {
override def applyAsInt(operand: Int): Int = {
val dirs = healthyWorkingDirs()
val dirs = healthyLocalWorkingDirs()
if (dirs.nonEmpty) {
(operand + 1) % dirs.length
} else 0
Expand Down Expand Up @@ -448,7 +452,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
userIdentifier: UserIdentifier,
partitionSplitEnabled: Boolean,
isSegmentGranularityVisible: Boolean): PartitionDataWriter = {
if (healthyWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
if (healthyLocalWorkingDirs().isEmpty && remoteStorageDirs.isEmpty) {
throw new IOException("No available working dirs!")
}
val partitionDataWriterContext = new PartitionDataWriterContext(
Expand Down Expand Up @@ -646,7 +650,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}
}
val (appId, shuffleId) = Utils.splitShuffleKey(shuffleKey)
disksSnapshot().filter(diskInfo =>
localDisksSnapshot().filter(diskInfo =>
diskInfo.status == DiskStatus.HEALTHY
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE).foreach { diskInfo =>
diskInfo.dirs.foreach { dir =>
Expand Down Expand Up @@ -710,7 +714,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
TimeUnit.MINUTES)

private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
val diskInfoAndAppDirs = disksSnapshot()
val diskInfoAndAppDirs = localDisksSnapshot()
.filter(diskInfo =>
diskInfo.status == DiskStatus.HEALTHY
|| diskInfo.status == DiskStatus.HIGH_DISK_USAGE)
Expand Down Expand Up @@ -760,7 +764,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
val appIds = shuffleKeySet().asScala.map(key => Utils.splitShuffleKey(key)._1)
while (retryTimes < conf.workerCheckFileCleanMaxRetries) {
val localCleaned =
!disksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo =>
!localDisksSnapshot().filter(_.status != DiskStatus.IO_HANG).exists { diskInfo =>
diskInfo.dirs.exists {
case workingDir if workingDir.exists() =>
// Don't check appDirs that store information in the fileInfos
Expand Down Expand Up @@ -902,7 +906,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
}

def updateDiskInfos(): Unit = this.synchronized {
disksSnapshot()
localDisksSnapshot()
.filter(diskInfo =>
diskInfo.status != DiskStatus.IO_HANG && diskInfo.status != DiskStatus.READ_OR_WRITE_FAILURE)
.foreach {
Expand Down Expand Up @@ -941,7 +945,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
diskInfo.updateFlushTime()
diskInfo.updateFetchTime()
}
logInfo(s"Updated diskInfos:\n${disksSnapshot().mkString("\n")}")
logInfo(s"Updated diskInfos:\n${localDisksSnapshot().mkString("\n")}")
}

def getFileSystemReportedSpace(mountPoint: String): (Long, Long) = {
Expand Down Expand Up @@ -1098,7 +1102,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
logInfo(s"Disk(${diskInfo.mountPoint}) unavailable for $suggestedMountPoint, return all healthy" +
s" working dirs.")
}
healthyWorkingDirs()
healthyLocalWorkingDirs()
}
if (dirs.isEmpty && hdfsFlusher.isEmpty && s3Flusher.isEmpty && ossFlusher.isEmpty) {
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class StorageManagerSuite extends CelebornFunSuite with MockitoHelper {
diskInfo.setUsableSpace(-1L)

var diskSetSpace = (0L, 0L)
doReturn(List(diskInfo)).when(spyStorageManager).disksSnapshot()
doReturn(List(diskInfo)).when(spyStorageManager).localDisksSnapshot()
doAnswer(diskSetSpace).when(spyStorageManager).getFileSystemReportedSpace(any)

// disk usable 80g, total 80g, worker config 8EB
Expand Down
Loading