Skip to content

Commit 6fc5565

Browse files
kaybhutani子懿
authored andcommitted
[CELEBORN-2291] Support fsync on commit to ensure shuffle data durability
### What changes were proposed in this pull request? Add a new configuration `celeborn.worker.commitFiles.fsync` (default `false`) that calls `FileChannel.force(false)` (fdatasync) before closing the channel in `LocalTierWriter.closeStreams()`. ### Why are the changes needed? Without this, committed shuffle data can sit in the OS page cache before the kernel flushes it to disk. A hard crash in that window loses data even though Celeborn considers it committed. This option lets operators opt into stronger durability guarantees. ### Does this PR resolve a correctness bug? No. It adds an optional durability enhancement. ### Does this PR introduce _any_ user-facing change? Yes. New configuration key `celeborn.worker.commitFiles.fsync` (boolean, default `false`). ### How was this patch tested? Existing unit tests. Configuration verified via `ConfigurationSuite` and for LocalTierWriter added a new test with fsync enabled and ran `TierWriterSuite`. Additional context: [slack](https://apachecelebor-kw08030.slack.com/archives/C04B1FYS6SY/p1774259245973229) Closes #3635 from kaybhutani/kartikay/fsync-on-commit. Authored-by: Kartikay Bhutani <kbhutani0001@gmail.com> Signed-off-by: 子懿 <ziyi.jxf@antgroup.com>
1 parent 3773c65 commit 6fc5565

File tree

4 files changed

+43
-3
lines changed

4 files changed

+43
-3
lines changed

common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -859,6 +859,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable with Logging with Se
859859
def workerCommitThreads: Int =
860860
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else get(WORKER_COMMIT_THREADS)
861861
def workerCommitFilesCheckInterval: Long = get(WORKER_COMMIT_FILES_CHECK_INTERVAL)
862+
def workerCommitFilesFsync: Boolean = get(WORKER_COMMIT_FILES_FSYNC)
862863
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
863864
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
864865
def maxPartitionSizeToEstimate: Long =
@@ -3770,6 +3771,16 @@ object CelebornConf extends Logging {
37703771
.doc("Time length for a window about checking whether commit shuffle data files finished.")
37713772
.timeConf(TimeUnit.MILLISECONDS)
37723773
.createWithDefaultString("100")
3774+
val WORKER_COMMIT_FILES_FSYNC: ConfigEntry[Boolean] =
3775+
buildConf("celeborn.worker.commitFiles.fsync")
3776+
.categories("worker")
3777+
.version("0.7.0")
3778+
.doc("Whether to fsync (fdatasync) shuffle data when committing. " +
3779+
"When enabled, each partition file is fsynced to disk before the commit completes " +
3780+
"ensuring committed data survives OS crashes, hard reboots etc. " +
3781+
"Enabling ensures durability but can add some latency to commit times.")
3782+
.booleanConf
3783+
.createWithDefault(false)
37733784

37743785
val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
37753786
buildConf("celeborn.worker.clean.threads")

docs/configuration/worker.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ license: |
6363
| celeborn.worker.clean.threads | 64 | false | Thread number of worker to clean up expired shuffle keys. | 0.3.2 | |
6464
| celeborn.worker.closeIdleConnections | false | false | Whether worker will close idle connections. | 0.2.0 | |
6565
| celeborn.worker.commitFiles.check.interval | 100 | false | Time length for a window about checking whether commit shuffle data files finished. | 0.6.0 | |
66+
| celeborn.worker.commitFiles.fsync | false | false | Whether to fsync (fdatasync) shuffle data when committing. When enabled, each partition file is fsynced to disk before the commit completes ensuring committed data survives OS crashes, hard reboots etc. Enabling ensures durability but can add some latency to commit times. | 0.7.0 | |
6667
| celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker to commit shuffle data files asynchronously. It's recommended to set at least `128` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.commit.threads |
6768
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn worker to commit files of a shuffle. It's recommended to set at least `240s` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 | celeborn.worker.shuffle.commit.timeout |
6869
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval of worker checks congestion if celeborn.worker.congestionControl.enabled is true. | 0.3.2 | |

worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,6 +418,7 @@ class LocalTierWriter(
418418
FileChannelUtils.createWritableFileChannel(diskFileInfo.getFilePath)
419419

420420
val gatherApiEnabled: Boolean = conf.workerFlusherLocalGatherAPIEnabled
421+
val commitFilesFsync: Boolean = conf.workerCommitFilesFsync
421422

422423
override def needEvict(): Boolean = {
423424
false
@@ -458,7 +459,15 @@ class LocalTierWriter(
458459
}
459460

460461
override def closeStreams(): Unit = {
461-
channel.close()
462+
if (channel != null) {
463+
try {
464+
if (commitFilesFsync) {
465+
channel.force(false)
466+
}
467+
} finally {
468+
channel.close()
469+
}
470+
}
462471
}
463472

464473
override def notifyFileCommitted(): Unit =

worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,9 @@ class TierWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
175175

176176
}
177177

178-
private def prepareLocalTierWriter(rangeFilter: Boolean): LocalTierWriter = {
179-
val celebornConf = new CelebornConf()
178+
private def prepareLocalTierWriter(
179+
rangeFilter: Boolean,
180+
celebornConf: CelebornConf = new CelebornConf()): LocalTierWriter = {
180181
celebornConf.set("celeborn.worker.memoryFileStorage.maxFileSize", "80k")
181182
celebornConf.set("celeborn.client.shuffle.rangeReadFilter.enabled", rangeFilter.toString)
182183
val reduceFileMeta = new ReduceFileMeta(celebornConf.shuffleChunkSize)
@@ -314,4 +315,22 @@ class TierWriterSuite extends AnyFunSuite with BeforeAndAfterEach {
314315
localTierWriter.fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getLastChunkOffset == 10240)
315316

316317
}
318+
319+
test("test local tier writer with fsync enabled") {
320+
val conf = new CelebornConf()
321+
conf.set("celeborn.worker.commitFiles.fsync", "true")
322+
val localTierWriter = prepareLocalTierWriter(false, conf)
323+
324+
assert(localTierWriter.commitFilesFsync === true)
325+
for (i <- 1 to 10) {
326+
localTierWriter.numPendingWrites.incrementAndGet()
327+
localTierWriter.write(WriterUtils.generateSparkFormatData(
328+
UnpooledByteBufAllocator.DEFAULT,
329+
0))
330+
}
331+
332+
val fileLen = localTierWriter.close()
333+
assert(fileLen == 10240)
334+
assert(localTierWriter.closed === true)
335+
}
317336
}

0 commit comments

Comments
 (0)