Skip to content

Commit aa3bbeb

Browse files
committed
pass first test
1 parent 7a356a9 commit aa3bbeb

File tree

6 files changed

+8
-6
lines changed

6 files changed

+8
-6
lines changed

client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SparkShuffleManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.celeborn.spark.StageDependencyManager;
2525
import org.apache.spark.*;
2626
import org.apache.spark.launcher.SparkLauncher;
27+
import org.apache.spark.listener.ListenerHelper;
2728
import org.apache.spark.rdd.DeterministicLevel;
2829
import org.apache.spark.scheduler.RunningStageManager;
2930
import org.apache.spark.scheduler.RunningStageManagerImpl;
@@ -162,6 +163,7 @@ private void initializeLifecycleManager() {
162163
}
163164
if (lifecycleManager.conf().clientShuffleEarlyDeletion()) {
164165
logger.info("register early deletion callbacks");
166+
ListenerHelper.addShuffleStatsTrackingListener();
165167
lifecycleManager.registerStageToWriteCelebornShuffleCallback(
166168
(celebornShuffleId, appShuffleIdentifier) ->
167169
SparkUtils.addStageToWriteCelebornShuffleIdDep(

client-spark/spark-3/src/main/scala/org/apache/spark/listner/CelebornShuffleEarlyCleanup.scala renamed to client-spark/spark-3/src/main/scala/org/apache/spark/listener/CelebornShuffleEarlyCleanup.scala

File renamed without changes.

client-spark/spark-3/src/main/scala/org/apache/spark/listner/CelebornShuffleEarlyCleanupEvent.scala renamed to client-spark/spark-3/src/main/scala/org/apache/spark/listener/CelebornShuffleEarlyCleanupEvent.scala

File renamed without changes.

client-spark/spark-3/src/main/scala/org/apache/spark/listner/ListenerHelper.scala renamed to client-spark/spark-3/src/main/scala/org/apache/spark/listener/ListenerHelper.scala

File renamed without changes.

client-spark/spark-3/src/main/scala/org/apache/spark/listner/ShuffleStatsTrackingListener.scala renamed to client-spark/spark-3/src/main/scala/org/apache/spark/listener/ShuffleStatsTrackingListener.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ class ShuffleStatsTrackingListener extends SparkListener with Logging {
4343
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = {
4444
val stageIdentifier = s"${stageCompleted.stageInfo.stageId}-" +
4545
s"${stageCompleted.stageInfo.attemptNumber()}"
46-
logInfo(s"stage $stageIdentifier finished with" +
46+
println(s"stage $stageIdentifier finished with" +
4747
s" ${stageCompleted.stageInfo.taskMetrics.shuffleWriteMetrics.bytesWritten} shuffle bytes")
4848
val shuffleMgr = SparkEnv.get.shuffleManager.asInstanceOf[SparkShuffleManager]
4949
if (shuffleMgr.getLifecycleManager.conf.clientShuffleEarlyDeletion) {

tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/StorageCheckUtils.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,11 @@ object StorageCheckUtils {
8989
}
9090

9191
def triggerStorageCheckThread(
92-
workerDirs: Seq[String],
93-
shuffleIdShouldNotExist: Seq[Int],
94-
shuffleIdMustExist: Seq[Int],
95-
sparkSession: SparkSession,
96-
forStableStatusChecking: Boolean): CheckingThread = {
92+
workerDirs: Seq[String],
93+
shuffleIdShouldNotExist: Seq[Int],
94+
shuffleIdMustExist: Seq[Int],
95+
sparkSession: SparkSession,
96+
forStableStatusChecking: Boolean): CheckingThread = {
9797
val checkingThread =
9898
if (!forStableStatusChecking) {
9999
new CheckingThread(workerDirs, shuffleIdShouldNotExist, shuffleIdMustExist, sparkSession)

0 commit comments

Comments
 (0)