Skip to content

Commit b828c9f

Browse files
committed
[CELEBORN-2143] Create DiskFile sequentially based on createFileOrder
1 parent 8966c9b commit b828c9f

File tree

6 files changed

+129
-81
lines changed

6 files changed

+129
-81
lines changed

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala

Lines changed: 93 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -238,9 +238,16 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
238238
val activeTypes = conf.availableStorageTypes
239239

240240
lazy val localOrDfsStorageAvailable: Boolean = {
241+
localStorageAvailable || dfsStorageAvailable
242+
}
243+
244+
lazy val localStorageAvailable: Boolean = {
245+
StorageInfo.localDiskAvailable(activeTypes) || !diskInfos.isEmpty
246+
}
247+
248+
lazy val dfsStorageAvailable: Boolean = {
241249
StorageInfo.OSSAvailable(activeTypes) || StorageInfo.S3Available(activeTypes) ||
242-
StorageInfo.HDFSAvailable(activeTypes) || StorageInfo.localDiskAvailable(activeTypes) ||
243-
hdfsDir.nonEmpty || !diskInfos.isEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
250+
StorageInfo.HDFSAvailable(activeTypes) || hdfsDir.nonEmpty || s3Dir.nonEmpty || ossDir.nonEmpty
244251
}
245252

246253
override def notifyError(mountPoint: String, diskStatus: DiskStatus): Unit = this.synchronized {
@@ -1029,7 +1036,17 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
10291036
} else if (location.getStorageInfo.localDiskAvailable() || location.getStorageInfo.HDFSAvailable()
10301037
|| location.getStorageInfo.S3Available() || location.getStorageInfo.OSSAvailable()) {
10311038
logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
1032-
val createDiskFileResult = createDiskFile(
1039+
val createDiskFileResult = createLocalDiskFile(
1040+
location,
1041+
partitionDataWriterContext.getAppId,
1042+
partitionDataWriterContext.getShuffleId,
1043+
location.getFileName,
1044+
partitionDataWriterContext.getUserIdentifier,
1045+
partitionDataWriterContext.getPartitionType,
1046+
partitionDataWriterContext.isPartitionSplitEnabled)
1047+
(null, createDiskFileResult._1, createDiskFileResult._2, createDiskFileResult._3)
1048+
} else if (location.getStorageInfo.HDFSAvailable() || location.getStorageInfo.S3Available() || location.getStorageInfo.OSSAvailable()) {
1049+
val createDiskFileResult = createDfsDiskFile(
10331050
location,
10341051
partitionDataWriterContext.getAppId,
10351052
partitionDataWriterContext.getShuffleId,
@@ -1069,10 +1086,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
10691086
memoryFileInfo
10701087
}
10711088

1072-
/**
1073-
* @return (Flusher,DiskFileInfo,workingDir)
1074-
*/
1075-
def createDiskFile(
1089+
def createLocalDiskFile(
10761090
location: PartitionLocation,
10771091
appId: String,
10781092
shuffleId: Int,
@@ -1102,61 +1116,7 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
11021116
throw new IOException(s"No available disks! suggested mountPoint $suggestedMountPoint")
11031117
}
11041118

1105-
if (dirs.isEmpty && location.getStorageInfo.HDFSAvailable()) {
1106-
val shuffleDir =
1107-
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
1108-
FileSystem.mkdirs(
1109-
StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
1110-
shuffleDir,
1111-
hdfsPermission)
1112-
val hdfsFilePath = new Path(shuffleDir, fileName).toString
1113-
val hdfsFileInfo = new DiskFileInfo(
1114-
userIdentifier,
1115-
partitionSplitEnabled,
1116-
getFileMeta(partitionType, s"hdfs", conf.shuffleChunkSize),
1117-
hdfsFilePath,
1118-
StorageInfo.Type.HDFS)
1119-
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
1120-
fileName,
1121-
hdfsFileInfo)
1122-
return (hdfsFlusher.get, hdfsFileInfo, null)
1123-
} else if (dirs.isEmpty && location.getStorageInfo.S3Available()) {
1124-
val shuffleDir =
1125-
new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
1126-
FileSystem.mkdirs(
1127-
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
1128-
shuffleDir,
1129-
hdfsPermission)
1130-
val s3FilePath = new Path(shuffleDir, fileName).toString
1131-
val s3FileInfo = new DiskFileInfo(
1132-
userIdentifier,
1133-
partitionSplitEnabled,
1134-
new ReduceFileMeta(conf.shuffleChunkSize),
1135-
s3FilePath,
1136-
StorageInfo.Type.S3)
1137-
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
1138-
fileName,
1139-
s3FileInfo)
1140-
return (s3Flusher.get, s3FileInfo, null)
1141-
} else if (dirs.isEmpty && location.getStorageInfo.OSSAvailable()) {
1142-
val shuffleDir =
1143-
new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
1144-
FileSystem.mkdirs(
1145-
StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
1146-
shuffleDir,
1147-
hdfsPermission)
1148-
val ossFilePath = new Path(shuffleDir, fileName).toString
1149-
val ossFileInfo = new DiskFileInfo(
1150-
userIdentifier,
1151-
partitionSplitEnabled,
1152-
new ReduceFileMeta(conf.shuffleChunkSize),
1153-
ossFilePath,
1154-
StorageInfo.Type.OSS)
1155-
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
1156-
fileName,
1157-
ossFileInfo)
1158-
return (ossFlusher.get, ossFileInfo, null)
1159-
} else if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) {
1119+
if (dirs.nonEmpty && location.getStorageInfo.localDiskAvailable()) {
11601120
val dir = dirs(getNextIndex % dirs.size)
11611121
val mountPoint = DeviceInfo.getMountPoint(dir.getAbsolutePath, mountPoints)
11621122
val shuffleDir = new File(dir, s"$appId/$shuffleId")
@@ -1210,9 +1170,80 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs
12101170
}
12111171
retryCount += 1
12121172
}
1173+
if (dfsStorageAvailable) {
1174+
logWarning("Failed to create localFileWriter", exception)
1175+
return (null, null, null)
1176+
}
12131177
throw exception
12141178
}
12151179

1180+
def createDfsDiskFile(
1181+
location: PartitionLocation,
1182+
appId: String,
1183+
shuffleId: Int,
1184+
fileName: String,
1185+
userIdentifier: UserIdentifier,
1186+
partitionType: PartitionType,
1187+
partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = {
1188+
val shuffleKey = Utils.makeShuffleKey(appId, shuffleId)
1189+
if (location.getStorageInfo.HDFSAvailable()) {
1190+
val shuffleDir =
1191+
new Path(new Path(hdfsDir, conf.workerWorkingDir), s"$appId/$shuffleId")
1192+
FileSystem.mkdirs(
1193+
StorageManager.hadoopFs.get(StorageInfo.Type.HDFS),
1194+
shuffleDir,
1195+
hdfsPermission)
1196+
val hdfsFilePath = new Path(shuffleDir, fileName).toString
1197+
val hdfsFileInfo = new DiskFileInfo(
1198+
userIdentifier,
1199+
partitionSplitEnabled,
1200+
getFileMeta(partitionType, s"hdfs", conf.shuffleChunkSize),
1201+
hdfsFilePath,
1202+
StorageInfo.Type.HDFS)
1203+
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
1204+
fileName,
1205+
hdfsFileInfo)
1206+
return (hdfsFlusher.get, hdfsFileInfo, null)
1207+
} else if (location.getStorageInfo.S3Available()) {
1208+
val shuffleDir =
1209+
new Path(new Path(s3Dir, conf.workerWorkingDir), s"$appId/$shuffleId")
1210+
FileSystem.mkdirs(
1211+
StorageManager.hadoopFs.get(StorageInfo.Type.S3),
1212+
shuffleDir,
1213+
hdfsPermission)
1214+
val s3FilePath = new Path(shuffleDir, fileName).toString
1215+
val s3FileInfo = new DiskFileInfo(
1216+
userIdentifier,
1217+
partitionSplitEnabled,
1218+
new ReduceFileMeta(conf.shuffleChunkSize),
1219+
s3FilePath,
1220+
StorageInfo.Type.S3)
1221+
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
1222+
fileName,
1223+
s3FileInfo)
1224+
return (s3Flusher.get, s3FileInfo, null)
1225+
} else if (location.getStorageInfo.OSSAvailable()) {
1226+
val shuffleDir =
1227+
new Path(new Path(ossDir, conf.workerWorkingDir), s"$appId/$shuffleId")
1228+
FileSystem.mkdirs(
1229+
StorageManager.hadoopFs.get(StorageInfo.Type.OSS),
1230+
shuffleDir,
1231+
hdfsPermission)
1232+
val ossFilePath = new Path(shuffleDir, fileName).toString
1233+
val ossFileInfo = new DiskFileInfo(
1234+
userIdentifier,
1235+
partitionSplitEnabled,
1236+
new ReduceFileMeta(conf.shuffleChunkSize),
1237+
ossFilePath,
1238+
StorageInfo.Type.OSS)
1239+
diskFileInfos.computeIfAbsent(shuffleKey, diskFileInfoMapFunc).put(
1240+
fileName,
1241+
ossFileInfo)
1242+
return (ossFlusher.get, ossFileInfo, null)
1243+
}
1244+
(null, null, null)
1245+
}
1246+
12161247
def startDeviceMonitor(): Unit = {
12171248
deviceMonitor.startCheck()
12181249
}

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -120,21 +120,20 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
120120
} else {
121121
null
122122
}
123-
case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
124-
if (storageManager.localOrDfsStorageAvailable) {
125-
logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
126-
val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile(
123+
case StorageInfo.Type.HDD | StorageInfo.Type.SSD =>
124+
if (storageManager.localStorageAvailable) {
125+
logDebug(s"create local disk file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
126+
val (flusher, diskFileInfo, workingDir) = storageManager.createLocalDiskFile(
127127
location,
128128
partitionDataWriterContext.getAppId,
129129
partitionDataWriterContext.getShuffleId,
130130
location.getFileName,
131131
partitionDataWriterContext.getUserIdentifier,
132132
partitionDataWriterContext.getPartitionType,
133133
partitionDataWriterContext.isPartitionSplitEnabled)
134-
partitionDataWriterContext.setWorkingDir(workingDir)
135-
val metaHandler = getPartitionMetaHandler(diskFileInfo)
136-
if (flusher.isInstanceOf[LocalFlusher]
137-
&& location.getStorageInfo.localDiskAvailable()) {
134+
if (diskFileInfo != null) {
135+
partitionDataWriterContext.setWorkingDir(workingDir)
136+
val metaHandler = getPartitionMetaHandler(diskFileInfo)
138137
new LocalTierWriter(
139138
conf,
140139
metaHandler,
@@ -147,6 +146,25 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
147146
partitionDataWriterContext,
148147
storageManager)
149148
} else {
149+
null
150+
}
151+
} else {
152+
null
153+
}
154+
case StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 =>
155+
if (storageManager.dfsStorageAvailable) {
156+
logDebug(s"create dfs disk file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}")
157+
val (flusher, diskFileInfo, workingDir) = storageManager.createDfsDiskFile(
158+
location,
159+
partitionDataWriterContext.getAppId,
160+
partitionDataWriterContext.getShuffleId,
161+
location.getFileName,
162+
partitionDataWriterContext.getUserIdentifier,
163+
partitionDataWriterContext.getPartitionType,
164+
partitionDataWriterContext.isPartitionSplitEnabled)
165+
if (diskFileInfo != null) {
166+
partitionDataWriterContext.setWorkingDir(workingDir)
167+
val metaHandler = getPartitionMetaHandler(diskFileInfo)
150168
new DfsTierWriter(
151169
conf,
152170
metaHandler,
@@ -158,6 +176,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source:
158176
diskFileInfo.getStorageType,
159177
partitionDataWriterContext,
160178
storageManager)
179+
} else {
180+
null
161181
}
162182
} else {
163183
null

worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class StoragePolicyCase1 extends CelebornFunSuite {
5959
val mockedFlusher = mock[Flusher]
6060
val mockedFile = mock[File]
6161
when(
62-
mockedStorageManager.createDiskFile(
62+
mockedStorageManager.createLocalDiskFile(
6363
any(),
6464
any(),
6565
any(),
@@ -101,7 +101,6 @@ class StoragePolicyCase1 extends CelebornFunSuite {
101101
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
102102
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
103103
val conf = new CelebornConf()
104-
val flushLock = new AnyRef
105104
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3")
106105
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
107106
val pendingWriters = new AtomicInteger()

worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
5959
val mockedFlusher = mock[LocalFlusher]
6060
val mockedFile = mock[File]
6161
when(
62-
mockedStorageManager.createDiskFile(
62+
mockedStorageManager.createLocalDiskFile(
6363
any(),
6464
any(),
6565
any(),
@@ -100,7 +100,7 @@ class StoragePolicyCase2 extends CelebornFunSuite {
100100
test("test create file order case2") {
101101
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
102102
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
103-
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
103+
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
104104
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
105105
val conf = new CelebornConf()
106106
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "SSD,HDD,HDFS,OSS,S3")

worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class StoragePolicyCase3 extends CelebornFunSuite {
5959
val mockedFlusher = mock[LocalFlusher]
6060
val mockedFile = mock[File]
6161
when(
62-
mockedStorageManager.createDiskFile(
62+
mockedStorageManager.createLocalDiskFile(
6363
any(),
6464
any(),
6565
any(),
@@ -100,11 +100,10 @@ class StoragePolicyCase3 extends CelebornFunSuite {
100100
test("test getEvicted file case1") {
101101
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(localHintPartitionLocatioin)
102102
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
103-
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
103+
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
104104
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
105105
val mockedMemoryFile = mock[LocalTierWriter]
106106
val conf = new CelebornConf()
107-
val flushLock = new AnyRef
108107
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
109108
val pendingWriters = new AtomicInteger()
110109
val notifier = new FlushNotifier
@@ -120,11 +119,10 @@ class StoragePolicyCase3 extends CelebornFunSuite {
120119
test("test evict file case2") {
121120
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(memoryHintPartitionLocation)
122121
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
123-
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
122+
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
124123
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.HDD)
125124
val mockedMemoryFile = mock[LocalTierWriter]
126125
val conf = new CelebornConf()
127-
val flushLock = new AnyRef
128126
val storagePolicy = new StoragePolicy(conf, mockedStorageManager, mockedSource)
129127
val pendingWriters = new AtomicInteger()
130128
val notifier = new FlushNotifier

worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
5959
val mockedFlusher = mock[LocalFlusher]
6060
val mockedFile = mock[File]
6161
when(
62-
mockedStorageManager.createDiskFile(
62+
mockedStorageManager.createLocalDiskFile(
6363
any(),
6464
any(),
6565
any(),
@@ -101,7 +101,7 @@ class StoragePolicyCase4 extends CelebornFunSuite {
101101
when(mockedPartitionWriterContext.getPartitionLocation).thenAnswer(
102102
memoryDisabledHintPartitionLocation)
103103
when(mockedPartitionWriterContext.getPartitionType).thenAnswer(PartitionType.REDUCE)
104-
when(mockedStorageManager.localOrDfsStorageAvailable).thenAnswer(true)
104+
when(mockedStorageManager.localStorageAvailable).thenAnswer(true)
105105
when(mockedDiskFile.getStorageType).thenAnswer(StorageInfo.Type.SSD)
106106
val conf = new CelebornConf()
107107
conf.set("celeborn.worker.storage.storagePolicy.createFilePolicy", "MEMORY,SSD,HDD,HDFS,OSS,S3")

0 commit comments

Comments
 (0)