Skip to content

Commit af342e6

Browse files
authored
[#2083] improvement: Quickly delete local or HDFS data at the shuffleId level. (#2084)
### What changes were proposed in this pull request? At the shuffleId level, data on the local or HDFS needs to be deleted synchronously. In some scenarios, the deletion time needs to be shortened. You can rename folders and delete them asynchronously. ### Why are the changes needed? Fix: #2083 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT.
1 parent 12d611b commit af342e6

15 files changed

+360
-11
lines changed

server/src/main/java/org/apache/uniffle/server/ShuffleServerGrpcService.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,9 @@ public void registerShuffle(
272272
taskInfo.refreshLatestStageAttemptNumber(shuffleId, stageAttemptNumber);
273273
try {
274274
long start = System.currentTimeMillis();
275-
shuffleServer.getShuffleTaskManager().removeShuffleDataSync(appId, shuffleId);
275+
shuffleServer
276+
.getShuffleTaskManager()
277+
.removeShuffleDataSyncRenameAndDelete(appId, shuffleId);
276278
LOG.info(
277279
"Deleted the previous stage attempt data due to stage recomputing for app: {}, "
278280
+ "shuffleId: {}, stageAttemptNumber: {}. It costs {} ms",

server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,9 @@ public class ShuffleServerMetrics {
170170
public static final String REPORTED_BLOCK_COUNT = "reported_block_count";
171171
public static final String CACHED_BLOCK_COUNT = "cached_block_count";
172172

173+
private static final String TOTAL_LOCAL_RENAME_AND_DELETION_FAILED =
174+
"total_local_rename_and_deletion_failed";
175+
173176
public static Counter.Child counterTotalAppNum;
174177
public static Counter.Child counterTotalAppWithHugePartitionNum;
175178
public static Counter.Child counterTotalPartitionNum;
@@ -245,6 +248,7 @@ public class ShuffleServerMetrics {
245248
public static Gauge.Child gaugeReadLocalDataFileBufferSize;
246249
public static Gauge.Child gaugeReadLocalIndexFileBufferSize;
247250
public static Gauge.Child gaugeReadMemoryDataBufferSize;
251+
public static Counter.Child counterLocalRenameAndDeletionFaileTd;
248252

249253
public static Gauge gaugeTotalDataSizeUsage;
250254
public static Gauge gaugeInMemoryDataSizeUsage;
@@ -440,6 +444,8 @@ private static void setUpMetrics(ShuffleServerConf serverConf) {
440444
counterTotalHugePartitionNum = metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_NUM);
441445
counterTotalHugePartitionExceedHardLimitNum =
442446
metricsManager.addLabeledCounter(TOTAL_HUGE_PARTITION_EXCEED_HARD_LIMIT_NUM);
447+
counterLocalRenameAndDeletionFaileTd =
448+
metricsManager.addLabeledCounter(TOTAL_LOCAL_RENAME_AND_DELETION_FAILED);
443449

444450
gaugeLocalStorageIsWritable =
445451
metricsManager.addGauge(LOCAL_STORAGE_IS_WRITABLE, LOCAL_DISK_PATH_LABEL);

server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -798,6 +798,11 @@ public boolean isAppExpired(String appId) {
798798
* @param shuffleIds
799799
*/
800800
public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds) {
801+
removeResourcesByShuffleIds(appId, shuffleIds, false);
802+
}
803+
804+
public void removeResourcesByShuffleIds(
805+
String appId, List<Integer> shuffleIds, boolean isRenameAndDelete) {
801806
Lock writeLock = getAppWriteLock(appId);
802807
writeLock.lock();
803808
try {
@@ -830,7 +835,7 @@ public void removeResourcesByShuffleIds(String appId, List<Integer> shuffleIds)
830835
withTimeoutExecution(
831836
() -> {
832837
storageManager.removeResources(
833-
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds));
838+
new ShufflePurgeEvent(appId, getUserByAppId(appId), shuffleIds, isRenameAndDelete));
834839
return null;
835840
},
836841
storageRemoveOperationTimeoutSec,
@@ -1037,6 +1042,10 @@ public void removeShuffleDataSync(String appId, int shuffleId) {
10371042
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId));
10381043
}
10391044

1045+
public void removeShuffleDataSyncRenameAndDelete(String appId, int shuffleId) {
1046+
removeResourcesByShuffleIds(appId, Arrays.asList(shuffleId), true);
1047+
}
1048+
10401049
public ShuffleDataDistributionType getDataDistributionType(String appId) {
10411050
return shuffleTaskInfos.get(appId).getDataDistType();
10421051
}

server/src/main/java/org/apache/uniffle/server/event/PurgeEvent.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,11 +25,19 @@ public abstract class PurgeEvent {
2525
private String appId;
2626
private String user;
2727
private List<Integer> shuffleIds;
28+
// Whether to enable the deletion mode: Rename files and then delete them asynchronously.
29+
private boolean isRenameAndDelete;
2830

2931
public PurgeEvent(String appId, String user, List<Integer> shuffleIds) {
32+
this(appId, user, shuffleIds, false);
33+
}
34+
35+
public PurgeEvent(
36+
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
3037
this.appId = appId;
3138
this.user = user;
3239
this.shuffleIds = shuffleIds;
40+
this.isRenameAndDelete = isRenameAndDelete;
3341
}
3442

3543
public String getAppId() {
@@ -44,6 +52,10 @@ public List<Integer> getShuffleIds() {
4452
return shuffleIds;
4553
}
4654

55+
public boolean isRenameAndDelete() {
56+
return isRenameAndDelete;
57+
}
58+
4759
@Override
4860
public String toString() {
4961
return this.getClass().getSimpleName()
@@ -56,6 +68,8 @@ public String toString() {
5668
+ '\''
5769
+ ", shuffleIds="
5870
+ shuffleIds
71+
+ ", isRenameAndDelete="
72+
+ isRenameAndDelete
5973
+ '}';
6074
}
6175
}

server/src/main/java/org/apache/uniffle/server/event/ShufflePurgeEvent.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,11 @@
2222
public class ShufflePurgeEvent extends PurgeEvent {
2323

2424
public ShufflePurgeEvent(String appId, String user, List<Integer> shuffleIds) {
25-
super(appId, user, shuffleIds);
25+
this(appId, user, shuffleIds, false);
26+
}
27+
28+
public ShufflePurgeEvent(
29+
String appId, String user, List<Integer> shuffleIds, boolean isRenameAndDelete) {
30+
super(appId, user, shuffleIds, isRenameAndDelete);
2631
}
2732
}

server/src/main/java/org/apache/uniffle/server/storage/HadoopStorageManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,8 @@ public void removeResources(PurgeEvent event) {
114114
new CreateShuffleDeleteHandlerRequest(
115115
StorageType.HDFS.name(),
116116
storage.getConf(),
117-
purgeForExpired ? shuffleServerId : null));
117+
purgeForExpired ? shuffleServerId : null,
118+
event.isRenameAndDelete()));
118119

119120
String basicPath =
120121
ShuffleStorageUtils.getFullShuffleDataFolder(storage.getStoragePath(), appId);

server/src/main/java/org/apache/uniffle/server/storage/LocalStorageManager.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -312,7 +312,7 @@ public void removeResources(PurgeEvent event) {
312312
ShuffleHandlerFactory.getInstance()
313313
.createShuffleDeleteHandler(
314314
new CreateShuffleDeleteHandlerRequest(
315-
StorageType.LOCALFILE.name(), new Configuration()));
315+
StorageType.LOCALFILE.name(), new Configuration(), event.isRenameAndDelete()));
316316

317317
List<String> deletePaths =
318318
localStorages.stream()
@@ -353,7 +353,11 @@ public void removeResources(PurgeEvent event) {
353353
})
354354
.collect(Collectors.toList());
355355

356-
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
356+
boolean isSuccess =
357+
deleteHandler.delete(deletePaths.toArray(new String[deletePaths.size()]), appId, user);
358+
if (!isSuccess && event.isRenameAndDelete()) {
359+
ShuffleServerMetrics.counterLocalRenameAndDeletionFaileTd.inc();
360+
}
357361
removeAppStorageInfo(event);
358362
}
359363

storage/src/main/java/org/apache/uniffle/storage/factory/ShuffleHandlerFactory.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,11 @@
3232
import org.apache.uniffle.common.util.RssUtils;
3333
import org.apache.uniffle.storage.handler.api.ClientReadHandler;
3434
import org.apache.uniffle.storage.handler.api.ShuffleDeleteHandler;
35+
import org.apache.uniffle.storage.handler.impl.AsynDeletionEventManager;
3536
import org.apache.uniffle.storage.handler.impl.ComposedClientReadHandler;
3637
import org.apache.uniffle.storage.handler.impl.HadoopClientReadHandler;
3738
import org.apache.uniffle.storage.handler.impl.HadoopShuffleDeleteHandler;
39+
import org.apache.uniffle.storage.handler.impl.LocalFileAsyncDeleteHandler;
3840
import org.apache.uniffle.storage.handler.impl.LocalFileClientReadHandler;
3941
import org.apache.uniffle.storage.handler.impl.LocalFileDeleteHandler;
4042
import org.apache.uniffle.storage.handler.impl.MemoryClientReadHandler;
@@ -189,7 +191,10 @@ public ShuffleDeleteHandler createShuffleDeleteHandler(
189191
CreateShuffleDeleteHandlerRequest request) {
190192
if (StorageType.HDFS.name().equals(request.getStorageType())) {
191193
return new HadoopShuffleDeleteHandler(request.getConf(), request.getShuffleServerId());
192-
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())) {
194+
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType()) && request.isAsync()) {
195+
return new LocalFileAsyncDeleteHandler(AsynDeletionEventManager.getInstance());
196+
} else if (StorageType.LOCALFILE.name().equals(request.getStorageType())
197+
&& !request.isAsync()) {
193198
return new LocalFileDeleteHandler();
194199
} else {
195200
throw new UnsupportedOperationException(
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.storage.handler;
19+
20+
import java.util.List;
21+
import java.util.Map;
22+
import java.util.function.Function;
23+
import java.util.stream.Collectors;
24+
25+
import org.apache.commons.lang3.StringUtils;
26+
import org.apache.hadoop.conf.Configuration;
27+
28+
public class AsynDeletionEvent {
29+
private static final String TEMPORARYSUFFIX = "_tmp";
30+
private String appId;
31+
private String user;
32+
private String shuffleServerId;
33+
private Configuration conf;
34+
/** Records the mapping between the path to be deleted and the path to be renamed. */
35+
private Map<String, String> needDeletePathAndRenamePath;
36+
37+
private String storageType;
38+
39+
public AsynDeletionEvent(
40+
String appId,
41+
String user,
42+
Configuration conf,
43+
String shuffleServerId,
44+
List<String> needDeletePath,
45+
String storageType) {
46+
this.appId = appId;
47+
this.user = user;
48+
this.shuffleServerId = shuffleServerId;
49+
this.conf = conf;
50+
this.needDeletePathAndRenamePath =
51+
needDeletePath.stream()
52+
.collect(
53+
Collectors.toMap(Function.identity(), s -> StringUtils.join(s, TEMPORARYSUFFIX)));
54+
this.storageType = storageType;
55+
}
56+
57+
public String getAppId() {
58+
return appId;
59+
}
60+
61+
public String getUser() {
62+
return user;
63+
}
64+
65+
public Configuration getConf() {
66+
return conf;
67+
}
68+
69+
public Map<String, String> getNeedDeletePathAndRenamePath() {
70+
return needDeletePathAndRenamePath;
71+
}
72+
73+
public String[] getNeedDeleteRenamePaths() {
74+
return needDeletePathAndRenamePath.values().stream().toArray(String[]::new);
75+
}
76+
77+
public String getShuffleServerId() {
78+
return shuffleServerId;
79+
}
80+
81+
public String getStorageType() {
82+
return storageType;
83+
}
84+
}

storage/src/main/java/org/apache/uniffle/storage/handler/api/ShuffleDeleteHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,5 @@ public interface ShuffleDeleteHandler {
2424
*
2525
* @param appId ApplicationId for delete
2626
*/
27-
void delete(String[] storageBasePaths, String appId, String user);
27+
boolean delete(String[] storageBasePaths, String appId, String user);
2828
}

0 commit comments

Comments
 (0)