Spy incomming message in tests #34768
-
I have read the following posts to test reactive messaging in memory. My configuration is # Configure the AMQP connector to write to the `messages` address
mp.messaging.outgoing.send.connector=smallrye-amqp
mp.messaging.outgoing.send.address=messages
# Configure the AMQP connector to read from the `messages` queue
mp.messaging.incoming.messages.connector=smallrye-amqp
mp.messaging.incoming.messages.durable=true
%test.mp.messaging.outgoing.send.connector=smallrye-in-memory
%test.mp.messaging.incoming.messages.connector=smallrye-in-memory The message is processed by a CDI bean. @ApplicationScoped
@Slf4j
public class MessageHandler {
@Inject
@Channel("send")
Emitter<String> emitter;
public void send(String message) {
emitter.send(message);
}
@Incoming("messages")
@Outgoing("data-stream")
@Broadcast
Message receive(String message) {
log.info("received: {}", message);
return new Message(message, Instant.now());
}
} And I want to test the functionality of @QuarkusTest
class MessageHandlerTest {
@Inject
@Any
InMemoryConnector connector;
@Inject
MessageHandler handler;
@BeforeEach
void setUp() {
}
@Test
void receive() {
MessageHandler handlerSpy = spy(handler);
var inputCaptor = ArgumentCaptor.forClass(String.class);
InMemorySource<String> messages = connector.source("messages");
InMemorySink<String> sink = connector.sink("send");
handler.send("hello");
assertThat(sink.received().get(0).getPayload()).isEqualTo("hello");
messages.send("hello");
var outputResult = handlerSpy.receive(inputCaptor.capture());
assertThat(inputCaptor.getValue()).isEqualTo("hello");
assertThat(outputResult.body()).isEqualTo("hello");
}
} The The complete codes is here, https://github.com/hantsy/quarkus-sandbox/tree/master/amqp, I have tried the functionality by |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
You cannot invoke the methods annotated with @incoming and @Outgoing like this. It is a protocol violation, and the messages are ignored (2 violations at least: no downstream, and no requests). The test can be rewritten as: @Test
void receive() throws InterruptedException {
InMemorySource<String> messages = connector.source("messages");
InMemorySink<String> sink = connector.sink("send");
InMemorySink<Message> dataStream = connector.sink("data-stream");
handler.send("hello");
assertThat(sink.received().get(0).getPayload()).isEqualTo("hello");
messages.send("hello-123");
assertThat(dataStream.received().get(0).getPayload().body()).isEqualTo("hello-123");
} With:
|
Beta Was this translation helpful? Give feedback.
-
I tried to run tests with kafka, but failed. https://github.com/hantsy/quarkus-sandbox/tree/master/kafka The application.properties is like # Configures the AMQP broker credentials.
%prod.kafka.bootstrap.servers=kafka:9092
kafka.auto.offset.reset=earliest
mp.messaging.outgoing.send.connector=smallrye-kafka
mp.messaging.outgoing.send.topic=messages
mp.messaging.incoming.messages.connector=smallrye-kafka
mp.messaging.incoming.messages.topic=messages
mp.messaging.outgoing.data-stream.connector=smallrye-kafka
#mp.messaging.outgoing.data-stream.topic=data-stream The error is like this. java.lang.RuntimeException: java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.test.junit.QuarkusTestExtension.throwBootFailureException(QuarkusTestExtension.java:640)
at io.quarkus.test.junit.QuarkusTestExtension.interceptTestClassConstructor(QuarkusTestExtension.java:711)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.api.extension.InvocationInterceptor.interceptTestClassConstructor(InvocationInterceptor.java:73)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.lambda$invoke$0(InterceptingExecutableInvoker.java:93)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:92)
at org.junit.jupiter.engine.execution.InterceptingExecutableInvoker.invoke(InterceptingExecutableInvoker.java:62)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeTestClassConstructor(ClassBasedTestDescriptor.java:363)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateTestClass(ClassBasedTestDescriptor.java:310)
at org.junit.jupiter.engine.descriptor.ClassTestDescriptor.instantiateTestClass(ClassTestDescriptor.java:79)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.instantiateAndPostProcessTestInstance(ClassBasedTestDescriptor.java:286)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$4(ClassBasedTestDescriptor.java:278)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$testInstancesProvider$5(ClassBasedTestDescriptor.java:277)
at org.junit.jupiter.engine.execution.TestInstancesProvider.getTestInstances(TestInstancesProvider.java:31)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$prepare$0(TestMethodTestDescriptor.java:105)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:104)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.prepare(TestMethodTestDescriptor.java:68)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$prepare$2(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.prepare(NodeTestTask.java:123)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:90)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:41)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:155)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:141)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$9(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:138)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:95)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:35)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:147)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:127)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:90)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:55)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:102)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:114)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:86)
at org.junit.platform.launcher.core.DefaultLauncherSession$DelegatingLauncher.execute(DefaultLauncherSession.java:86)
at org.junit.platform.launcher.core.SessionPerRequestLauncher.execute(SessionPerRequestLauncher.java:53)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:232)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:55)
Caused by: java.lang.RuntimeException: Failed to start quarkus
at io.quarkus.runner.ApplicationImpl.doStart(Unknown Source)
at io.quarkus.runtime.Application.start(Application.java:101)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:104)
at java.base/java.lang.reflect.Method.invoke(Method.java:578)
at io.quarkus.runner.bootstrap.StartupActionImpl.run(StartupActionImpl.java:273)
at io.quarkus.test.junit.QuarkusTestExtension.doJavaStart(QuarkusTestExtension.java:251)
at io.quarkus.test.junit.QuarkusTestExtension.ensureStarted(QuarkusTestExtension.java:607)
at io.quarkus.test.junit.QuarkusTestExtension.beforeAll(QuarkusTestExtension.java:655)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.lambda$invokeBeforeAllCallbacks$12(ClassBasedTestDescriptor.java:395)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.invokeBeforeAllCallbacks(ClassBasedTestDescriptor.java:395)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:211)
at org.junit.jupiter.engine.descriptor.ClassBasedTestDescriptor.before(ClassBasedTestDescriptor.java:84)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$6(NodeTestTask.java:148)
... 36 more
Caused by: jakarta.enterprise.inject.spi.DeploymentException: java.lang.IllegalArgumentException: SRMSG00071: Invalid channel configuration - the `connector` attribute must be set for channel `data-stream`
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle.onApplicationStart(SmallRyeReactiveMessagingLifecycle.java:57)
at io.quarkus.smallrye.reactivemessaging.runtime.SmallRyeReactiveMessagingLifecycle_Observer_onApplicationStart_68e7b57eb97cb75d597c5b816682366e888d0d9b.notify(Unknown Source)
at io.quarkus.arc.impl.EventImpl$Notifier.notifyObservers(EventImpl.java:346)
at io.quarkus.arc.impl.EventImpl$Notifier.notify(EventImpl.java:328)
at io.quarkus.arc.impl.EventImpl.fire(EventImpl.java:82)
at io.quarkus.arc.runtime.ArcRecorder.fireLifecycleEvent(ArcRecorder.java:155)
at io.quarkus.arc.runtime.ArcRecorder.handleLifecycleEvents(ArcRecorder.java:106)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy_0(Unknown Source)
at io.quarkus.deployment.steps.LifecycleEventsBuildStep$startupEvent1144526294.deploy(Unknown Source)
... 50 more I have moved the in memory properties to a profile public class InMemoryProfile implements QuarkusTestProfile {
public InMemoryProfile() {
}
@Override
public Map<String, String> getConfigOverrides() {
return Map.of(
"mp.messaging.outgoing.send.connector","smallrye-in-memory",
"mp.messaging.incoming.messages.connector","smallrye-in-memory",
"mp.messaging.outgoing.data-stream.connector","smallrye-in-memory"
);
}
@Override
public boolean disableGlobalTestResources() {
return true;
}
@Override
public boolean runMainMethod() {
return false;
}
@Override
public boolean disableApplicationLifecycleObservers() {
return true;
}
} |
Beta Was this translation helpful? Give feedback.
You cannot invoke the methods annotated with @incoming and @Outgoing like this. It is a protocol violation, and the messages are ignored (2 violations at least: no downstream, and no requests).
The test can be rewritten as: