Skip to content

Commit 071adb4

Browse files
committed
fix(#1470): Add message store endpoint
- Adds endpoint to send/receive messages to/from the local message store
1 parent 2c8c3b2 commit 071adb4

File tree

39 files changed

+879
-122
lines changed

39 files changed

+879
-122
lines changed

connectors/citrus-kubernetes/src/test/java/org/citrusframework/kubernetes/endpoint/KubernetesEndpointComponentTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.citrusframework.endpoint.Endpoint;
2323
import org.citrusframework.endpoint.EndpointComponent;
2424
import org.citrusframework.endpoint.direct.DirectEndpointComponent;
25+
import org.citrusframework.endpoint.context.MessageStoreEndpointComponent;
2526
import org.citrusframework.http.client.HttpEndpointComponent;
2627
import org.citrusframework.http.client.HttpsEndpointComponent;
2728
import org.citrusframework.kubernetes.client.KubernetesClient;
@@ -69,9 +70,11 @@ public void testCreateClientEndpointWithParameters() throws Exception {
6970
@Test
7071
public void testLookupAll() {
7172
Map<String, EndpointComponent> validators = EndpointComponent.lookup();
72-
Assert.assertEquals(validators.size(), 4L);
73+
Assert.assertEquals(validators.size(), 5L);
7374
Assert.assertNotNull(validators.get("direct"));
7475
Assert.assertEquals(validators.get("direct").getClass(), DirectEndpointComponent.class);
76+
Assert.assertNotNull(validators.get("message-store"));
77+
Assert.assertEquals(validators.get("message-store").getClass(), MessageStoreEndpointComponent.class);
7578
Assert.assertNotNull(validators.get("http"));
7679
Assert.assertEquals(validators.get("http").getClass(), HttpEndpointComponent.class);
7780
Assert.assertNotNull(validators.get("https"));

connectors/citrus-selenium/src/test/java/org/citrusframework/selenium/endpoint/SeleniumEndpointComponentTest.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.citrusframework.endpoint.Endpoint;
2323
import org.citrusframework.endpoint.EndpointComponent;
2424
import org.citrusframework.endpoint.direct.DirectEndpointComponent;
25+
import org.citrusframework.endpoint.context.MessageStoreEndpointComponent;
2526
import org.citrusframework.http.client.HttpEndpointComponent;
2627
import org.citrusframework.http.client.HttpsEndpointComponent;
2728
import org.citrusframework.spi.ReferenceResolver;
@@ -87,9 +88,11 @@ public void testCreateBrowserEndpointWithParameters() throws Exception {
8788
@Test
8889
public void testLookupAll() {
8990
Map<String, EndpointComponent> validators = EndpointComponent.lookup();
90-
Assert.assertEquals(validators.size(), 4L);
91+
Assert.assertEquals(validators.size(), 5L);
9192
Assert.assertNotNull(validators.get("direct"));
9293
Assert.assertEquals(validators.get("direct").getClass(), DirectEndpointComponent.class);
94+
Assert.assertNotNull(validators.get("message-store"));
95+
Assert.assertEquals(validators.get("message-store").getClass(), MessageStoreEndpointComponent.class);
9396
Assert.assertNotNull(validators.get("http"));
9497
Assert.assertEquals(validators.get("http").getClass(), HttpEndpointComponent.class);
9598
Assert.assertNotNull(validators.get("https"));

core/citrus-api/src/main/java/org/citrusframework/endpoint/EndpointComponent.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public interface EndpointComponent {
3838
Logger logger = LoggerFactory.getLogger(EndpointComponent.class);
3939

4040
String ENDPOINT_NAME = "endpointName";
41+
String ENDPOINT_CACHE = "endpointCache";
4142
String AUTO_CLOSE = "autoClose";
4243
String AUTO_REMOVE = "autoRemove";
4344

@@ -49,29 +50,29 @@ public interface EndpointComponent {
4950

5051
/**
5152
* Creates proper endpoint instance from endpoint uri.
52-
* @param endpointUri
53-
* @param context
54-
* @return
5553
*/
5654
Endpoint createEndpoint(String endpointUri, TestContext context);
5755

5856
/**
5957
* Gets the name of this endpoint component.
60-
* @return
6158
*/
6259
String getName();
6360

6461
/**
6562
* Construct endpoint name from endpoint uri.
66-
* @param endpointUri
67-
* @return
6863
*/
6964
Map<String, String> getParameters(String endpointUri);
7065

66+
/**
67+
* Returns setting defining whether this endpoint component allows endpoint caching.
68+
*/
69+
default boolean supportsEndpointCaching() {
70+
return true;
71+
}
72+
7173
/**
7274
* Resolves all available endpoint components from resource path lookup. Scans classpath for endpoint component meta information
7375
* and instantiates those components.
74-
* @return
7576
*/
7677
static Map<String, EndpointComponent> lookup() {
7778
Map<String, EndpointComponent> components = TYPE_RESOLVER.resolveAll();
@@ -86,8 +87,6 @@ static Map<String, EndpointComponent> lookup() {
8687
* Resolves endpoint component from resource path lookup with given resource name. Scans classpath for endpoint component meta information
8788
* with given name and returns instance of the component. Returns optional instead of throwing exception when no endpoint component
8889
* could be found.
89-
* @param component
90-
* @return
9190
*/
9291
static Optional<EndpointComponent> lookup(String component) {
9392
try {

core/citrus-base/src/main/java/org/citrusframework/endpoint/AbstractEndpoint.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ public abstract class AbstractEndpoint implements Endpoint {
3535

3636
/**
3737
* Default constructor using endpoint configuration.
38-
* @param endpointConfiguration
3938
*/
4039
public AbstractEndpoint(EndpointConfiguration endpointConfiguration) {
4140
this.endpointConfiguration = endpointConfiguration;
@@ -48,15 +47,13 @@ public EndpointConfiguration getEndpointConfiguration() {
4847

4948
/**
5049
* Gets the endpoints consumer name.
51-
* @return
5250
*/
5351
public String getConsumerName() {
5452
return name + ":consumer";
5553
}
5654

5755
/**
5856
* Gets the endpoints producer name.
59-
* @return
6057
*/
6158
public String getProducerName() {
6259
return name + ":producer";

core/citrus-base/src/main/java/org/citrusframework/endpoint/AbstractEndpointComponent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,7 @@ public Endpoint createEndpoint(String endpointUri, TestContext context) {
108108
* Removes other settings from given parameter map and returns just the endpoint parameters.
109109
*/
110110
private Map<String, String> getEndpointParameters(Map<String, String> parameters) {
111-
Set<String> internalSettings = Set.of(ENDPOINT_NAME, AUTO_CLOSE, AUTO_REMOVE);
111+
Set<String> internalSettings = Set.of(ENDPOINT_NAME, ENDPOINT_CACHE, AUTO_CLOSE, AUTO_REMOVE);
112112
HashMap<String, String> result = new HashMap<>();
113113

114114
for (String key : parameters.keySet()) {

core/citrus-base/src/main/java/org/citrusframework/endpoint/DefaultEndpointFactory.java

Lines changed: 52 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -56,60 +56,42 @@ public class DefaultEndpointFactory implements EndpointFactory {
5656
@Override
5757
public Endpoint create(String endpointName, Annotation endpointConfig, TestContext context) {
5858
String qualifier = endpointConfig.annotationType().getAnnotation(CitrusEndpointConfig.class).qualifier();
59-
Optional<AnnotationConfigParser> parser = Optional.ofNullable(context.getReferenceResolver().resolveAll(AnnotationConfigParser.class).get(qualifier));
59+
AnnotationConfigParser parser = Optional.ofNullable(context.getReferenceResolver().resolveAll(AnnotationConfigParser.class).get(qualifier))
60+
.or(() -> AnnotationConfigParser.lookup(qualifier))
61+
.orElseThrow(() -> new CitrusRuntimeException(String.format("Unable to create endpoint annotation parser with name '%s'", qualifier)));
6062

61-
if (!parser.isPresent()) {
62-
// try to get parser from default Citrus modules
63-
parser = AnnotationConfigParser.lookup(qualifier);
64-
}
65-
66-
if (parser.isPresent()) {
67-
Endpoint endpoint = parser.get().parse(endpointConfig, context.getReferenceResolver());
68-
endpoint.setName(endpointName);
69-
70-
if (endpoint instanceof ReferenceResolverAware referenceResolverAware) {
71-
referenceResolverAware.setReferenceResolver(context.getReferenceResolver());
72-
}
63+
Endpoint endpoint = parser.parse(endpointConfig, context.getReferenceResolver());
64+
endpoint.setName(endpointName);
7365

74-
if (endpoint instanceof InitializingPhase initializingBean) {
75-
initializingBean.initialize();
76-
}
66+
if (endpoint instanceof ReferenceResolverAware referenceResolverAware) {
67+
referenceResolverAware.setReferenceResolver(context.getReferenceResolver());
68+
}
7769

78-
PropertyUtils.configure(endpointName, endpoint, context.getReferenceResolver());
79-
return endpoint;
70+
if (endpoint instanceof InitializingPhase initializingBean) {
71+
initializingBean.initialize();
8072
}
8173

82-
throw new CitrusRuntimeException(String.format("Unable to create endpoint annotation parser with name '%s'", qualifier));
74+
PropertyUtils.configure(endpointName, endpoint, context.getReferenceResolver());
75+
return endpoint;
8376
}
8477

8578
@Override
8679
public Endpoint create(String endpointName, CitrusEndpoint endpointConfig, Class<?> endpointType, TestContext context) {
87-
Optional<EndpointBuilder> builder = context.getReferenceResolver().resolveAll(EndpointBuilder.class)
88-
.values()
89-
.stream()
90-
.filter(endpointBuilder -> endpointBuilder.supports(endpointType))
91-
.findFirst();
92-
93-
if (builder.isPresent()) {
94-
Endpoint endpoint = builder.get().build(endpointConfig, context.getReferenceResolver());
95-
endpoint.setName(endpointName);
96-
return endpoint;
97-
}
98-
99-
// try to get builder from default Citrus modules
100-
Optional<EndpointBuilder<?>> lookup = EndpointBuilder.lookup()
80+
EndpointBuilder<?> builder = context.getReferenceResolver().resolveAll(EndpointBuilder.class)
10181
.values()
10282
.stream()
10383
.filter(endpointBuilder -> endpointBuilder.supports(endpointType))
104-
.findFirst();
105-
106-
if (lookup.isPresent()) {
107-
Endpoint endpoint = lookup.get().build(endpointConfig, context.getReferenceResolver());
108-
endpoint.setName(endpointName);
109-
return endpoint;
110-
}
111-
112-
throw new CitrusRuntimeException(String.format("Unable to create endpoint builder for type '%s'", endpointType.getName()));
84+
.findFirst()
85+
.or(() -> EndpointBuilder.lookup()
86+
.values()
87+
.stream()
88+
.filter(endpointBuilder -> endpointBuilder.supports(endpointType))
89+
.findFirst())
90+
.orElseThrow(() -> new CitrusRuntimeException(String.format("Unable to create endpoint builder for type '%s'", endpointType.getName())));
91+
92+
Endpoint endpoint = builder.build(endpointConfig, context.getReferenceResolver());
93+
endpoint.setName(endpointName);
94+
return endpoint;
11395
}
11496

11597
@Override
@@ -131,18 +113,11 @@ public Endpoint create(String uri, TestContext context) {
131113
}
132114

133115
String componentName = getComponentName(endpointUri);
134-
Optional<EndpointComponent> component = Optional.ofNullable(getEndpointComponents(context.getReferenceResolver()).get(componentName));
116+
EndpointComponent component = Optional.ofNullable(getEndpointComponents(context.getReferenceResolver()).get(componentName))
117+
.or(() -> EndpointComponent.lookup(componentName))
118+
.orElseThrow(() -> new CitrusRuntimeException(String.format("Unable to create endpoint component with name '%s'", componentName)));
135119

136-
if (component.isEmpty()) {
137-
// try to get component from default Citrus modules
138-
component = EndpointComponent.lookup(componentName);
139-
}
140-
141-
if (component.isEmpty()) {
142-
throw new CitrusRuntimeException(String.format("Unable to create endpoint component with name '%s'", componentName));
143-
}
144-
145-
Map<String, String> parameters = component.get().getParameters(endpointUri);
120+
Map<String, String> parameters = component.getParameters(endpointUri);
146121
String cachedEndpointName = parameters.getOrDefault(EndpointComponent.ENDPOINT_NAME, endpointUri);
147122

148123
synchronized (endpointCache) {
@@ -152,23 +127,30 @@ public Endpoint create(String uri, TestContext context) {
152127
}
153128
return endpointCache.get(cachedEndpointName);
154129
} else {
155-
Endpoint endpoint = component.get().createEndpoint(endpointUri, context);
156-
endpointCache.put(cachedEndpointName, endpoint);
157-
158-
boolean autoRemove = Optional.ofNullable(parameters.get(EndpointComponent.AUTO_REMOVE))
159-
.map(Boolean::parseBoolean)
160-
.orElseGet(CitrusSettings::isAutoRemoveDynamicEndpoints);
161-
if (autoRemove) {
162-
context.doFinally(() -> ctx -> {
163-
logger.info("Stopping and removing endpoint '{}' due to auto remove setting", endpoint.getName());
164-
if (endpoint instanceof ShutdownPhase destroyable) {
165-
destroyable.destroy();
166-
}
167-
168-
synchronized (endpointCache) {
169-
endpointCache.remove(cachedEndpointName);
170-
}
171-
});
130+
Endpoint endpoint = component.createEndpoint(endpointUri, context);
131+
132+
boolean endpointCacheEnabled = Optional.ofNullable(parameters.get(EndpointComponent.ENDPOINT_CACHE))
133+
.map(Boolean::parseBoolean)
134+
.orElse(component.supportsEndpointCaching());
135+
136+
if (endpointCacheEnabled) {
137+
endpointCache.put(cachedEndpointName, endpoint);
138+
139+
boolean autoRemove = Optional.ofNullable(parameters.get(EndpointComponent.AUTO_REMOVE))
140+
.map(Boolean::parseBoolean)
141+
.orElseGet(CitrusSettings::isAutoRemoveDynamicEndpoints);
142+
if (autoRemove) {
143+
context.doFinally(() -> ctx -> {
144+
logger.info("Stopping and removing endpoint '{}' due to auto remove setting", endpoint.getName());
145+
if (endpoint instanceof ShutdownPhase destroyable) {
146+
destroyable.destroy();
147+
}
148+
149+
synchronized (endpointCache) {
150+
endpointCache.remove(cachedEndpointName);
151+
}
152+
});
153+
}
172154
}
173155
return endpoint;
174156
}

core/citrus-base/src/main/java/org/citrusframework/endpoint/StaticEndpoint.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,22 @@
1717
package org.citrusframework.endpoint;
1818

1919
import org.citrusframework.context.TestContext;
20+
import org.citrusframework.exceptions.MessageTimeoutException;
2021
import org.citrusframework.message.DefaultMessage;
2122
import org.citrusframework.message.Message;
2223
import org.citrusframework.messaging.Consumer;
2324
import org.citrusframework.messaging.Producer;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
2427

2528
/**
2629
* Special endpoint implementation that produces/consumes static messages.
2730
*/
2831
public class StaticEndpoint extends AbstractEndpoint {
2932

33+
/** Logger */
34+
private static final Logger logger = LoggerFactory.getLogger(StaticEndpoint.class);
35+
3036
private Message message;
3137

3238
public StaticEndpoint() {
@@ -65,7 +71,7 @@ public void send(Message message, TestContext context) {
6571

6672
@Override
6773
public String getName() {
68-
return StaticEndpoint.this.getName() + "-producer";
74+
return StaticEndpoint.this.getProducerName();
6975
}
7076
};
7177
}
@@ -75,17 +81,24 @@ public Consumer createConsumer() {
7581
return new Consumer() {
7682
@Override
7783
public Message receive(TestContext context) {
78-
return getMessage();
84+
return receive(context, getEndpointConfiguration().getTimeout());
7985
}
8086

8187
@Override
8288
public Message receive(TestContext context, long timeout) {
83-
return getMessage();
89+
Message received = getMessage();
90+
if (received == null) {
91+
throw new MessageTimeoutException(timeout, StaticEndpoint.this.getName());
92+
}
93+
94+
logger.info("Received message from static endpoint: '{}'", StaticEndpoint.this.getName());
95+
96+
return received;
8497
}
8598

8699
@Override
87100
public String getName() {
88-
return StaticEndpoint.this.getName() + "-consumer";
101+
return StaticEndpoint.this.getConsumerName();
89102
}
90103
};
91104
}
@@ -100,6 +113,10 @@ public void setMessage(Message message) {
100113
}
101114

102115
public Message getMessage() {
116+
if (message == null) {
117+
return null;
118+
}
119+
103120
if (getEndpointConfiguration().isReuseMessage()) {
104121
return message;
105122
} else {
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
/*
2+
* Copyright the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.citrusframework.endpoint.context;
18+
19+
/**
20+
* Combines all test context related endpoint builders.
21+
*/
22+
public class ContextEndpoints {
23+
24+
/**
25+
* Private constructor because static instantiation method should be used.
26+
*/
27+
private ContextEndpoints() {
28+
}
29+
30+
/**
31+
* Static entry method for in memory endpoint builders.
32+
*/
33+
public static ContextEndpoints context() {
34+
return new ContextEndpoints();
35+
}
36+
37+
public MessageStoreEndpointBuilder messageStore() {
38+
return new MessageStoreEndpointBuilder();
39+
}
40+
}

0 commit comments

Comments
 (0)