Skip to content

Commit 4eebbd4

Browse files
zustonJunfan Zhang
andauthored
[#2241] improvement(server): Introduce storage flush operation timeout cancel to avoid disk hang (#2247)
### What changes were proposed in this pull request? Introduce storage flush operation timeout cancel to avoid disk hang ### Why are the changes needed? For #2241 ### Does this PR introduce _any_ user-facing change? Yes. `rss.server.storage.flushOptionTimeoutSec` is introduced, the default value = -1 means this timeout cancel will not be activated by default ### How was this patch tested? unit tests and existing tests. --------- Co-authored-by: Junfan Zhang <zhangjunfan@qiyi.com>
1 parent 477bc30 commit 4eebbd4

File tree

4 files changed

+181
-1
lines changed

4 files changed

+181
-1
lines changed
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.future;
19+
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.Executors;
23+
import java.util.concurrent.TimeUnit;
24+
import java.util.concurrent.TimeoutException;
25+
import java.util.function.Supplier;
26+
27+
import org.slf4j.Logger;
28+
import org.slf4j.LoggerFactory;
29+
30+
import org.apache.uniffle.common.exception.RssException;
31+
import org.apache.uniffle.common.util.ThreadUtils;
32+
33+
public class CompletableFutureUtils {
34+
private static final Logger LOGGER = LoggerFactory.getLogger(CompletableFutureUtils.class);
35+
36+
public static <T> T withTimeoutCancel(Supplier<T> supplier, long timeoutMills) throws Exception {
37+
return withTimeoutCancel(supplier, timeoutMills, "");
38+
}
39+
40+
public static <T> T withTimeoutCancel(
41+
Supplier<T> supplier, long timeoutMills, String operationName) throws Exception {
42+
CompletableFuture<T> future =
43+
CompletableFuture.supplyAsync(
44+
supplier,
45+
Executors.newSingleThreadExecutor(ThreadUtils.getThreadFactory(operationName)));
46+
future.exceptionally(
47+
throwable -> {
48+
throw new RssException(throwable);
49+
});
50+
51+
CompletableFuture<T> extended =
52+
CompletableFutureExtension.orTimeout(future, timeoutMills, TimeUnit.MILLISECONDS);
53+
try {
54+
return extended.get();
55+
} catch (Exception e) {
56+
if (e instanceof ExecutionException) {
57+
Throwable internalThrowable = e.getCause();
58+
if (internalThrowable instanceof TimeoutException) {
59+
LOGGER.error(
60+
"The operation of [{}] haven't finished in the {}(millis). Drop this execution!",
61+
operationName,
62+
timeoutMills);
63+
throw new TimeoutException();
64+
}
65+
if (internalThrowable instanceof Exception) {
66+
throw (Exception) internalThrowable;
67+
}
68+
}
69+
throw e;
70+
}
71+
}
72+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.uniffle.common.future;
19+
20+
import java.util.concurrent.TimeoutException;
21+
import java.util.function.Supplier;
22+
23+
import org.junit.jupiter.api.Test;
24+
25+
import org.apache.uniffle.common.exception.RssException;
26+
27+
import static org.junit.jupiter.api.Assertions.assertEquals;
28+
import static org.junit.jupiter.api.Assertions.fail;
29+
30+
public class CompletableFutureUtilsTest {
31+
32+
@Test
33+
public void timeoutTest() {
34+
// case1: legal operation
35+
Supplier<Integer> supplier = () -> 1;
36+
try {
37+
int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100);
38+
assertEquals(1, result);
39+
} catch (Exception e) {
40+
fail();
41+
}
42+
43+
// case2: illegal
44+
supplier =
45+
() -> {
46+
try {
47+
Thread.sleep(100000);
48+
} catch (InterruptedException e) {
49+
throw new RuntimeException(e);
50+
}
51+
return 10;
52+
};
53+
try {
54+
int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100);
55+
fail();
56+
} catch (Exception e) {
57+
if (!(e instanceof TimeoutException)) {
58+
fail();
59+
}
60+
}
61+
62+
// case3: fast fail when internal supplier throw exception
63+
supplier =
64+
() -> {
65+
throw new RssException("Hello");
66+
};
67+
try {
68+
int result = CompletableFutureUtils.withTimeoutCancel(supplier, 100);
69+
fail();
70+
} catch (Exception e) {
71+
if (e instanceof RssException) {
72+
// ignore
73+
} else {
74+
fail();
75+
}
76+
}
77+
}
78+
}

server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@
3030
import org.slf4j.LoggerFactory;
3131

3232
import org.apache.uniffle.common.config.RssBaseConf;
33+
import org.apache.uniffle.common.exception.RssException;
3334
import org.apache.uniffle.common.executor.ThreadPoolManager;
3435
import org.apache.uniffle.common.function.ConsumerWithException;
36+
import org.apache.uniffle.common.future.CompletableFutureUtils;
3537
import org.apache.uniffle.common.util.ThreadUtils;
3638
import org.apache.uniffle.server.flush.EventDiscardException;
3739
import org.apache.uniffle.server.flush.EventInvalidException;
@@ -42,6 +44,7 @@
4244
import org.apache.uniffle.storage.common.Storage;
4345
import org.apache.uniffle.storage.util.StorageType;
4446

47+
import static org.apache.uniffle.server.ShuffleServerConf.STORAGE_FLUSH_OPERATION_TIMEOUT_SEC;
4548
import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE;
4649

4750
public class DefaultFlushEventHandler implements FlushEventHandler {
@@ -56,6 +59,7 @@ public class DefaultFlushEventHandler implements FlushEventHandler {
5659
protected final BlockingQueue<ShuffleDataFlushEvent> flushQueue = Queues.newLinkedBlockingQueue();
5760
private ConsumerWithException<ShuffleDataFlushEvent> eventConsumer;
5861
private final ShuffleServer shuffleServer;
62+
private final long flushMaxWaitTimeoutSec;
5963

6064
private volatile boolean stopped = false;
6165

@@ -65,6 +69,7 @@ public DefaultFlushEventHandler(
6569
ShuffleServer shuffleServer,
6670
ConsumerWithException<ShuffleDataFlushEvent> eventConsumer) {
6771
this.shuffleServerConf = conf;
72+
this.flushMaxWaitTimeoutSec = conf.getLong(STORAGE_FLUSH_OPERATION_TIMEOUT_SEC);
6873
this.storageType =
6974
StorageType.valueOf(shuffleServerConf.get(RssBaseConf.RSS_STORAGE_TYPE).name());
7075
this.storageManager = storageManager;
@@ -83,6 +88,24 @@ public void handle(ShuffleDataFlushEvent event) {
8388
}
8489
}
8590

91+
private void consumeEvent(ShuffleDataFlushEvent event) throws Exception {
92+
if (flushMaxWaitTimeoutSec <= 0) {
93+
eventConsumer.accept(event);
94+
return;
95+
}
96+
97+
Supplier<Void> supplier =
98+
() -> {
99+
try {
100+
this.eventConsumer.accept(event);
101+
} catch (Exception e) {
102+
throw new RssException(e);
103+
}
104+
return null;
105+
};
106+
CompletableFutureUtils.withTimeoutCancel(supplier, flushMaxWaitTimeoutSec * 1000);
107+
}
108+
86109
/**
87110
* @param event
88111
* @param storage
@@ -95,7 +118,7 @@ private void handleEventAndUpdateMetrics(ShuffleDataFlushEvent event, Storage st
95118
try {
96119
readLock.lock();
97120
try {
98-
eventConsumer.accept(event);
121+
consumeEvent(event);
99122
} finally {
100123
readLock.unlock();
101124
}

server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,13 @@ public class ShuffleServerConf extends RssBaseConf {
676676
.defaultValue(10 * 60L)
677677
.withDescription("The storage remove resource operation timeout.");
678678

679+
public static final ConfigOption<Long> STORAGE_FLUSH_OPERATION_TIMEOUT_SEC =
680+
ConfigOptions.key("rss.server.storage.flushOperationTimeoutSec")
681+
.longType()
682+
.defaultValue(-1L)
683+
.withDescription(
684+
"The storage flush max timeout second, this will not be activated by default");
685+
679686
public static final ConfigOption<Boolean> SERVER_MERGE_ENABLE =
680687
ConfigOptions.key("rss.server.merge.enable")
681688
.booleanType()

0 commit comments

Comments
 (0)