Skip to content

Commit 423bfc8

Browse files
CAMEL-22784 - Improve FileLockClusterService resilience to long blocking network based file I/O
1 parent 479f5b2 commit 423bfc8

File tree

12 files changed

+480
-140
lines changed

12 files changed

+480
-140
lines changed

components/camel-file/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@
5353
<artifactId>junit-jupiter</artifactId>
5454
<scope>test</scope>
5555
</dependency>
56+
<dependency>
57+
<groupId>org.apache.camel</groupId>
58+
<artifactId>camel-core-engine</artifactId>
59+
<scope>test</scope>
60+
</dependency>
5661

5762
</dependencies>
5863
</project>

components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterLeaderInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,13 @@ public boolean equals(Object o) {
6868
public int hashCode() {
6969
return Objects.hashCode(id);
7070
}
71+
72+
@Override
73+
public String toString() {
74+
return "FileLockClusterLeaderInfo{" +
75+
"id='" + id + '\'' +
76+
", heartbeatUpdateIntervalMilliseconds=" + heartbeatUpdateIntervalMilliseconds +
77+
", heartbeatMilliseconds=" + heartbeatMilliseconds +
78+
'}';
79+
}
7180
}

components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterService.java

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.camel.component.file.cluster;
1818

19+
import java.util.concurrent.ExecutorService;
1920
import java.util.concurrent.ScheduledExecutorService;
2021
import java.util.concurrent.TimeUnit;
2122
import java.util.concurrent.locks.Lock;
@@ -32,13 +33,20 @@ public class FileLockClusterService extends AbstractCamelClusterService<FileLock
3233
private TimeUnit acquireLockIntervalUnit;
3334
private ScheduledExecutorService executor;
3435
private int heartbeatTimeoutMultiplier;
36+
private int clusterDataTaskMaxAttempts;
37+
private long clusterDataTaskTimeout;
38+
private TimeUnit clusterDataTaskTimeoutUnit;
39+
private ExecutorService clusterDataTaskExecutor;
3540

3641
public FileLockClusterService() {
3742
this.acquireLockDelay = 1;
3843
this.acquireLockDelayUnit = TimeUnit.SECONDS;
3944
this.acquireLockInterval = 10;
4045
this.acquireLockIntervalUnit = TimeUnit.SECONDS;
4146
this.heartbeatTimeoutMultiplier = 5;
47+
this.clusterDataTaskMaxAttempts = 5;
48+
this.clusterDataTaskTimeout = 10;
49+
this.clusterDataTaskTimeoutUnit = TimeUnit.SECONDS;
4250
}
4351

4452
@Override
@@ -120,13 +128,74 @@ public void setAcquireLockIntervalUnit(TimeUnit acquireLockIntervalUnit) {
120128
* <p>
121129
*/
122130
public void setHeartbeatTimeoutMultiplier(int heartbeatTimeoutMultiplier) {
131+
if (heartbeatTimeoutMultiplier <= 0) {
132+
throw new IllegalArgumentException("HeartbeatTimeoutMultiplier must be greater than 0");
133+
}
123134
this.heartbeatTimeoutMultiplier = heartbeatTimeoutMultiplier;
124135
}
125136

126137
public int getHeartbeatTimeoutMultiplier() {
127138
return heartbeatTimeoutMultiplier;
128139
}
129140

141+
/**
142+
* Sets how many times a cluster data task will run, counting both the first execution and subsequent retries in
143+
* case of failure or timeout. The default is 5 attempts.
144+
* <p>
145+
* This can be useful when the cluster data root is on network based file storage, where I/O operations may
146+
* occasionally block for long or unpredictable periods.
147+
*/
148+
public void setClusterDataTaskMaxAttempts(int clusterDataTaskMaxAttempts) {
149+
if (clusterDataTaskMaxAttempts <= 0) {
150+
throw new IllegalArgumentException("clusterDataTaskMaxRetries must be greater than 0");
151+
}
152+
this.clusterDataTaskMaxAttempts = clusterDataTaskMaxAttempts;
153+
}
154+
155+
public int getClusterDataTaskMaxAttempts() {
156+
return clusterDataTaskMaxAttempts;
157+
}
158+
159+
/**
160+
* Sets the timeout for a cluster data task (reading or writing cluster data). The default is 10 seconds.
161+
* <p>
162+
* Timeouts are useful when the cluster data root is on network storage, where I/O operations may occasionally block
163+
* for long or unpredictable periods.
164+
*/
165+
public void setClusterDataTaskTimeout(long clusterDataTaskTimeout) {
166+
if (clusterDataTaskTimeout <= 0) {
167+
throw new IllegalArgumentException("clusterDataTaskMaxRetries must be greater than 0");
168+
}
169+
this.clusterDataTaskTimeout = clusterDataTaskTimeout;
170+
}
171+
172+
public long getClusterDataTaskTimeout() {
173+
return clusterDataTaskTimeout;
174+
}
175+
176+
/**
177+
* The time unit for the clusterDataTaskTimeoutUnit, default to TimeUnit.SECONDS.
178+
*/
179+
public void setClusterDataTaskTimeoutUnit(TimeUnit clusterDataTaskTimeoutUnit) {
180+
this.clusterDataTaskTimeoutUnit = clusterDataTaskTimeoutUnit;
181+
}
182+
183+
public TimeUnit getClusterDataTaskTimeoutUnit() {
184+
return clusterDataTaskTimeoutUnit;
185+
}
186+
187+
/**
188+
* Sets the timeout for a cluster data task (reading or writing cluster data). The default is 10 seconds.
189+
* <p>
190+
* Timeouts are useful when the cluster data root is on network storage, where I/O operations may occasionally block
191+
* for long or unpredictable periods.
192+
* <p>
193+
*/
194+
public void setClusterDataTaskTimeout(long clusterDataTaskTimeout, TimeUnit clusterDataTaskTimeoutUnit) {
195+
setClusterDataTaskTimeout(clusterDataTaskTimeout);
196+
setClusterDataTaskTimeoutUnit(clusterDataTaskTimeoutUnit);
197+
}
198+
130199
@Override
131200
protected void doStop() throws Exception {
132201
super.doStop();
@@ -142,6 +211,14 @@ protected void doStop() throws Exception {
142211

143212
executor = null;
144213
}
214+
215+
if (clusterDataTaskExecutor != null) {
216+
if (context != null) {
217+
context.getExecutorServiceManager().shutdown(clusterDataTaskExecutor);
218+
} else {
219+
clusterDataTaskExecutor.shutdown();
220+
}
221+
}
145222
}
146223

147224
ScheduledExecutorService getExecutor() {
@@ -161,4 +238,19 @@ ScheduledExecutorService getExecutor() {
161238
internalLock.unlock();
162239
}
163240
}
241+
242+
ExecutorService getClusterDataTaskExecutor() {
243+
Lock internalLock = getInternalLock();
244+
internalLock.lock();
245+
try {
246+
if (clusterDataTaskExecutor == null) {
247+
final CamelContext context = ObjectHelper.notNull(getCamelContext(), "CamelContext");
248+
clusterDataTaskExecutor = context.getExecutorServiceManager().newFixedThreadPool(this,
249+
"FileLockClusterDataTask-" + getId(), 5);
250+
}
251+
return clusterDataTaskExecutor;
252+
} finally {
253+
internalLock.unlock();
254+
}
255+
}
164256
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.file.cluster;
18+
19+
import java.util.Objects;
20+
import java.util.concurrent.CompletableFuture;
21+
import java.util.concurrent.ExecutionException;
22+
import java.util.concurrent.TimeoutException;
23+
import java.util.function.Supplier;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
/**
29+
* Executes cluster data read / write tasks asynchronously, with timeouts to guard against potential unpredictable
30+
* blocking I/O periods.
31+
*/
32+
class FileLockClusterTaskExecutor {
33+
private static final Logger LOGGER = LoggerFactory.getLogger(FileLockClusterTaskExecutor.class);
34+
private final FileLockClusterService service;
35+
36+
FileLockClusterTaskExecutor(FileLockClusterService service) {
37+
Objects.requireNonNull(service, "FileLockClusterService cannot be null");
38+
this.service = service;
39+
}
40+
41+
/**
42+
* If the cluster data root is network based, like an NFS mount, avoid potential long blocking I/O to fail fast and
43+
* reliably reason about the cluster state.
44+
*
45+
* @param task Supplier representing a task to run
46+
*/
47+
<T> T run(Supplier<T> task) throws ExecutionException, TimeoutException {
48+
Objects.requireNonNull(task, "Task cannot be null");
49+
50+
int maxAttempts = service.getClusterDataTaskMaxAttempts();
51+
for (int attempt = 1; attempt <= maxAttempts; attempt++) {
52+
LOGGER.debug("Running cluster task attempt {} of {}", attempt, maxAttempts);
53+
54+
CompletableFuture<T> future = CompletableFuture.supplyAsync(task, service.getClusterDataTaskExecutor());
55+
try {
56+
return future.get(service.getClusterDataTaskTimeout(), service.getClusterDataTaskTimeoutUnit());
57+
} catch (InterruptedException e) {
58+
LOGGER.trace("Cluster task interrupted on attempt {} of {}", attempt, maxAttempts);
59+
future.cancel(true);
60+
Thread.currentThread().interrupt();
61+
return null;
62+
} catch (ExecutionException | TimeoutException e) {
63+
LOGGER.debug("Cluster task encountered an exception on attempt {} of {}", attempt, maxAttempts, e);
64+
future.cancel(true);
65+
if (attempt == maxAttempts) {
66+
LOGGER.debug("Cluster task retry limit ({}) reached", maxAttempts, e);
67+
throw e;
68+
}
69+
} finally {
70+
LOGGER.debug("Cluster task attempt {} ended", attempt);
71+
}
72+
}
73+
return null;
74+
}
75+
}

components/camel-file/src/main/java/org/apache/camel/component/file/cluster/FileLockClusterUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ static void writeClusterLeaderInfo(
6262
FileChannel channel,
6363
FileLockClusterLeaderInfo clusterLeaderInfo,
6464
boolean forceMetaData)
65-
throws IOException {
65+
throws Exception {
6666

6767
Objects.requireNonNull(channel, "channel cannot be null");
6868
Objects.requireNonNull(clusterLeaderInfo, "clusterLeaderInfo cannot be null");
@@ -100,7 +100,7 @@ static void writeClusterLeaderInfo(
100100
* inconsistent state
101101
* @throws IOException If reading the lock file failed
102102
*/
103-
static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) throws IOException {
103+
static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) throws Exception {
104104
try {
105105
byte[] bytes = Files.readAllBytes(leaderDataPath);
106106

@@ -119,7 +119,7 @@ static FileLockClusterLeaderInfo readClusterLeaderInfo(Path leaderDataPath) thro
119119
long lastHeartbeat = buf.getLong();
120120

121121
return new FileLockClusterLeaderInfo(uuidStr, intervalMillis, lastHeartbeat);
122-
} catch (NoSuchFileException e) {
122+
} catch (FileNotFoundException | NoSuchFileException e) {
123123
// Handle NoSuchFileException to give the ClusterView a chance to recreate the leadership data
124124
return null;
125125
}

0 commit comments

Comments
 (0)