Skip to content

Commit 5d185ba

Browse files
committed
CAMEL-20919: camel-ftp - Add producer health check
1 parent 6400120 commit 5d185ba

File tree

5 files changed

+194
-7
lines changed

5 files changed

+194
-7
lines changed

components/camel-ftp/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@
4040
<groupId>org.apache.camel</groupId>
4141
<artifactId>camel-file</artifactId>
4242
</dependency>
43+
<dependency>
44+
<groupId>org.apache.camel</groupId>
45+
<artifactId>camel-health</artifactId>
46+
</dependency>
4347
<!-- needed for dynamic to -->
4448
<dependency>
4549
<groupId>org.apache.camel</groupId>

components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileEndpoint.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,11 @@ protected RemoteFileEndpoint(String uri, RemoteFileComponent<T> component, Remot
9393
setPollStrategy(new RemoteFilePollingConsumerPollStrategy());
9494
}
9595

96+
@Override
97+
public RemoteFileComponent getComponent() {
98+
return (RemoteFileComponent) super.getComponent();
99+
}
100+
96101
@Override
97102
public boolean isSingletonProducer() {
98103
// this producer is stateful because the remote file operations is not

components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileProducer.java

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
import org.apache.camel.ExchangePropertyKey;
2121
import org.apache.camel.component.file.GenericFileOperationFailedException;
2222
import org.apache.camel.component.file.GenericFileProducer;
23+
import org.apache.camel.health.HealthCheckHelper;
24+
import org.apache.camel.health.WritableHealthCheckRepository;
2325
import org.apache.camel.util.URISupport;
2426
import org.slf4j.Logger;
2527
import org.slf4j.LoggerFactory;
@@ -31,6 +33,8 @@ public class RemoteFileProducer<T> extends GenericFileProducer<T> {
3133

3234
private static final Logger LOG = LoggerFactory.getLogger(RemoteFileProducer.class);
3335
private boolean loggedIn;
36+
private RemoteFileProducerHealthCheck producerHealthCheck;
37+
private WritableHealthCheckRepository healthCheckRepository;
3438

3539
private transient String remoteFileProducerToString;
3640

@@ -105,11 +109,14 @@ public void disconnect() throws GenericFileOperationFailedException {
105109

106110
@Override
107111
public void preWriteCheck(Exchange exchange) throws Exception {
108-
// before writing send a noop to see if the connection is alive and
109-
// works
112+
doPreWriteCheck(exchange, getEndpoint().getConfiguration().isSendNoop());
113+
}
114+
115+
protected void doPreWriteCheck(Exchange exchange, boolean sendNoop) throws Exception {
116+
// before writing send a noop to see if the connection is alive and works
110117
boolean noop = false;
111118
if (loggedIn) {
112-
if (getEndpoint().getConfiguration().isSendNoop()) {
119+
if (sendNoop) {
113120
try {
114121
noop = getOperations().sendNoop();
115122
} catch (Exception e) {
@@ -120,8 +127,7 @@ public void preWriteCheck(Exchange exchange) throws Exception {
120127
}
121128
LOG.trace("preWriteCheck send noop success: {}", noop);
122129
} else {
123-
// okay send noop is disabled then we would regard the op as
124-
// success
130+
// okay send noop is disabled then we would regard the op as success
125131
noop = true;
126132
LOG.trace("preWriteCheck send noop disabled");
127133
}
@@ -162,14 +168,30 @@ public void postWriteCheck(Exchange exchange) {
162168
@Override
163169
protected void doStart() throws Exception {
164170
LOG.debug("Starting");
171+
172+
// health-check is optional so discover and resolve
173+
healthCheckRepository = HealthCheckHelper.getHealthCheckRepository(
174+
endpoint.getCamelContext(),
175+
"producers",
176+
WritableHealthCheckRepository.class);
177+
178+
if (healthCheckRepository != null) {
179+
producerHealthCheck = new RemoteFileProducerHealthCheck(this);
180+
producerHealthCheck.setEnabled(this.getEndpoint().getComponent().isHealthCheckProducerEnabled());
181+
healthCheckRepository.addHealthCheck(producerHealthCheck);
182+
}
183+
165184
// do not connect when component starts, just wait until we process as
166-
// we will
167-
// connect at that time if needed
185+
// we will connect at that time if needed
168186
super.doStart();
169187
}
170188

171189
@Override
172190
protected void doStop() throws Exception {
191+
if (healthCheckRepository != null && producerHealthCheck != null) {
192+
healthCheckRepository.removeHealthCheck(producerHealthCheck);
193+
producerHealthCheck = null;
194+
}
173195
try {
174196
disconnect();
175197
} catch (Exception e) {
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.file.remote;
18+
19+
import java.util.Map;
20+
21+
import org.apache.camel.Exchange;
22+
import org.apache.camel.component.file.GenericFileOperationFailedException;
23+
import org.apache.camel.health.HealthCheckResultBuilder;
24+
import org.apache.camel.impl.health.AbstractHealthCheck;
25+
26+
/**
27+
* FTP producer readiness health-check
28+
*/
29+
public class RemoteFileProducerHealthCheck extends AbstractHealthCheck {
30+
31+
private final RemoteFileProducer<?> producer;
32+
33+
public RemoteFileProducerHealthCheck(RemoteFileProducer<?> producer) {
34+
super("camel", "producer:ftp-" + producer.getEndpoint().getConfiguration().getHost());
35+
this.producer = producer;
36+
}
37+
38+
@Override
39+
protected void doCall(HealthCheckResultBuilder builder, Map<String, Object> options) {
40+
Exchange dummy = producer.createExchange();
41+
Exception cause = null;
42+
try {
43+
producer.doPreWriteCheck(dummy, true);
44+
} catch (Exception e) {
45+
cause = e;
46+
}
47+
if (cause != null) {
48+
builder.down();
49+
builder.message("FtpProducer is not ready");
50+
builder.detail("serviceUrl", producer.getEndpoint().getServiceUrl());
51+
builder.error(cause);
52+
if (cause instanceof GenericFileOperationFailedException gfe) {
53+
int code = gfe.getCode();
54+
String msg = gfe.getReason();
55+
if (code > 0 && msg != null) {
56+
builder.detail("ftp.code", code);
57+
builder.detail("ftp.reason", msg.trim());
58+
}
59+
}
60+
} else {
61+
builder.up();
62+
}
63+
}
64+
}
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.component.file.remote.integration;
18+
19+
import java.util.Collection;
20+
import java.util.Optional;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.apache.camel.CamelContext;
24+
import org.apache.camel.Exchange;
25+
import org.apache.camel.builder.RouteBuilder;
26+
import org.apache.camel.component.file.remote.FtpConstants;
27+
import org.apache.camel.component.mock.MockEndpoint;
28+
import org.apache.camel.health.HealthCheck;
29+
import org.apache.camel.health.HealthCheckHelper;
30+
import org.junit.jupiter.api.Assertions;
31+
import org.junit.jupiter.api.Test;
32+
33+
import static org.awaitility.Awaitility.await;
34+
35+
public class FtpProducerHealthCheckIT extends FtpServerTestSupport {
36+
37+
private String getFtpUrl() {
38+
return "ftp://admin@localhost:{{ftp.server.port}}/reply?password=admin";
39+
}
40+
41+
@Override
42+
protected CamelContext createCamelContext() throws Exception {
43+
CamelContext context = super.createCamelContext();
44+
HealthCheckHelper.getHealthCheckRepository(context, "producers").setEnabled(true);
45+
return context;
46+
}
47+
48+
@Test
49+
public void testHealthCheck() throws Exception {
50+
MockEndpoint mock = getMockEndpoint("mock:result");
51+
mock.expectedBodiesReceived("Bye World");
52+
mock.expectedHeaderReceived(FtpConstants.FTP_REPLY_CODE, 226);
53+
mock.expectedHeaderReceived(FtpConstants.FTP_REPLY_STRING, "226 Transfer complete.");
54+
55+
template.requestBodyAndHeader("direct:start", "Bye World", Exchange.FILE_NAME, "hello.txt");
56+
57+
MockEndpoint.assertIsSatisfied(context);
58+
59+
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
60+
Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context);
61+
boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
62+
Assertions.assertTrue(up, "readiness check");
63+
});
64+
65+
// stop FTP server
66+
service.shutdown();
67+
68+
// health-check should then become down
69+
await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
70+
Collection<HealthCheck.Result> res = HealthCheckHelper.invokeReadiness(context);
71+
Optional<HealthCheck.Result> hr = res.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN)).findFirst();
72+
Assertions.assertTrue(hr.isPresent());
73+
HealthCheck.Result r = hr.get();
74+
Assertions.assertEquals(HealthCheck.State.DOWN, r.getState());
75+
Assertions.assertEquals("FtpProducer is not ready", r.getMessage().get());
76+
Assertions.assertEquals(200, r.getDetails().get("ftp.code"));
77+
Assertions.assertEquals("Connection refused", r.getDetails().get("ftp.reason"));
78+
});
79+
80+
}
81+
82+
@Override
83+
protected RouteBuilder createRouteBuilder() {
84+
return new RouteBuilder() {
85+
@Override
86+
public void configure() {
87+
from("direct:start").to(getFtpUrl()).to("mock:result");
88+
}
89+
};
90+
}
91+
92+
}

0 commit comments

Comments
 (0)