Skip to content

Commit 4ec2b75

Browse files
acogoluegnesansd
authored andcommitted
Use ProtonJ2 in JMS-to-AMQP interop test
1 parent fd35038 commit 4ec2b75

File tree

4 files changed

+84
-48
lines changed

4 files changed

+84
-48
lines changed

deps/rabbit/test/amqp_jms_SUITE.erl

Lines changed: 2 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -129,48 +129,8 @@ message_types_jms_to_jms(Config) ->
129129

130130
%% Send different message types from JMS client to Erlang AMQP 1.0 client.
131131
message_types_jms_to_amqp(Config) ->
132-
TestName = QName = atom_to_binary(?FUNCTION_NAME),
133-
ok = declare_queue(QName, <<"quorum">>, Config),
134-
Address = rabbitmq_amqp_address:queue(QName),
135-
136-
%% The JMS client sends messaegs.
137-
ok = run_jms_test(TestName, [{"-Dqueue=~ts", [Address]}], Config),
138-
139-
%% The Erlang AMQP 1.0 client receives messages.
140-
OpnConf = connection_config(Config),
141-
{ok, Connection} = amqp10_client:open_connection(OpnConf),
142-
{ok, Session} = amqp10_client:begin_session_sync(Connection),
143-
{ok, Receiver} = amqp10_client:attach_receiver_link(Session, <<"receiver">>, Address, settled),
144-
{ok, Msg1} = amqp10_client:get_msg(Receiver),
145-
146-
?assertEqual(
147-
#'v1_0.amqp_value'{content = {utf8, <<"msg1🥕"/utf8>>}},
148-
amqp10_msg:body(Msg1)),
149-
{ok, Msg2} = amqp10_client:get_msg(Receiver),
150-
?assertEqual(
151-
#'v1_0.amqp_value'{
152-
content = {map, [
153-
{{utf8, <<"key1">>}, {utf8, <<"value">>}},
154-
{{utf8, <<"key2">>}, true},
155-
{{utf8, <<"key3">>}, {double, -1.1}},
156-
{{utf8, <<"key4">>}, {long, -1}}
157-
]}},
158-
amqp10_msg:body(Msg2)),
159-
{ok, Msg3} = amqp10_client:get_msg(Receiver),
160-
?assertEqual(
161-
[
162-
#'v1_0.amqp_sequence'{
163-
content = [{utf8, <<"value">>},
164-
true,
165-
{double, -1.1},
166-
{long, -1}]}
167-
],
168-
amqp10_msg:body(Msg3)),
169-
170-
ok = detach_link_sync(Receiver),
171-
ok = end_session_sync(Session),
172-
ok = close_connection_sync(Connection),
173-
ok = delete_queue(QName, Config).
132+
TestName = atom_to_binary(?FUNCTION_NAME),
133+
ok = run_jms_test(TestName, [], Config).
174134

175135
temporary_queue_rpc(Config) ->
176136
TestName = QName = atom_to_binary(?FUNCTION_NAME),

deps/rabbit/test/amqp_jms_SUITE_data/pom.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
<url>https://www.rabbitmq.com</url>
1010
<properties>
1111
<junit.jupiter.version>5.10.2</junit.jupiter.version>
12+
<assertj.version>3.27.3</assertj.version>
1213
<qpid-jms-client.version>2.6.1</qpid-jms-client.version>
1314
<amqp-client.version>[0.5.0-SNAPSHOT,)</amqp-client.version>
1415
<logback.version>1.2.13</logback.version>
@@ -43,6 +44,12 @@
4344
<version>${amqp-client.version}</version>
4445
<scope>test</scope>
4546
</dependency>
47+
<dependency>
48+
<groupId>org.assertj</groupId>
49+
<artifactId>assertj-core</artifactId>
50+
<version>${assertj.version}</version>
51+
<scope>test</scope>
52+
</dependency>
4653

4754
</dependencies>
4855
<build>

deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/JmsTest.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,22 @@
11
package com.rabbitmq.amqp.tests.jms;
22

3+
import static com.rabbitmq.amqp.tests.jms.TestUtils.protonClient;
4+
import static com.rabbitmq.amqp.tests.jms.TestUtils.protonConnection;
5+
import static org.assertj.core.api.Assertions.assertThat;
36
import static org.junit.jupiter.api.Assertions.*;
47

58
import jakarta.jms.*;
69
import java.util.*;
10+
import java.util.concurrent.TimeUnit;
711
import javax.naming.Context;
12+
13+
import com.rabbitmq.qpid.protonj2.client.Client;
14+
import com.rabbitmq.qpid.protonj2.client.Delivery;
15+
import com.rabbitmq.qpid.protonj2.client.Receiver;
16+
import jakarta.jms.Queue;
817
import org.junit.jupiter.api.Test;
918

19+
@JmsTestInfrastructure
1020
public class JmsTest {
1121

1222
private javax.naming.Context getContext() throws Exception{
@@ -94,18 +104,20 @@ public void message_types_jms_to_jms() throws Exception {
94104
}
95105
}
96106

107+
String destination;
108+
97109
@Test
98110
public void message_types_jms_to_amqp() throws Exception {
99111
Context context = getContext();
100112
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
101113

114+
Queue queue = TestUtils.queue(destination);
115+
String msg1 = "msg1🥕";
102116
try (Connection connection = factory.createConnection()) {
103117
Session session = connection.createSession();
104-
Destination queue = (Destination) context.lookup("myQueue");
105118
MessageProducer producer = session.createProducer(queue);
106119

107120
// TextMessage
108-
String msg1 = "msg1🥕";
109121
TextMessage textMessage = session.createTextMessage(msg1);
110122
producer.send(textMessage);
111123

@@ -126,12 +138,32 @@ public void message_types_jms_to_amqp() throws Exception {
126138
producer.send(streamMessage);
127139
}
128140

141+
try (Client client = protonClient();
142+
com.rabbitmq.qpid.protonj2.client.Connection amqpConnection = protonConnection(client)) {
143+
Receiver receiver = amqpConnection.openReceiver(queue.getQueueName());
144+
Delivery delivery = receiver.receive(10, TimeUnit.SECONDS);
145+
assertNotNull(delivery);
146+
assertEquals(msg1, delivery.message().body());
147+
148+
delivery = receiver.receive(10, TimeUnit.SECONDS);
149+
assertNotNull(delivery);
150+
com.rabbitmq.qpid.protonj2.client.Message<Map<String, Object>> mapMessage = delivery.message();
151+
assertThat(mapMessage.body()).containsEntry("key1", "value")
152+
.containsEntry("key2", true)
153+
.containsEntry("key3", -1.1)
154+
.containsEntry("key4", -1L);
155+
156+
delivery = receiver.receive(10, TimeUnit.SECONDS);
157+
assertNotNull(delivery);
158+
com.rabbitmq.qpid.protonj2.client.Message<List<Object>> listMessage = delivery.message();
159+
assertThat(listMessage.body()).containsExactly("value", true, -1.1, -1L);
129160
}
161+
}
130162

131-
// Test that Request/reply pattern using a TemporaryQueue works.
132-
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee
133-
@Test
134-
public void temporary_queue_rpc() throws Exception {
163+
// Test that Request/reply pattern using a TemporaryQueue works.
164+
// https://jakarta.ee/specifications/messaging/3.1/jakarta-messaging-spec-3.1#requestreply-pattern-using-a-temporaryqueue-jakarta-ee
165+
@Test
166+
public void temporary_queue_rpc() throws Exception {
135167
Context context = getContext();
136168
ConnectionFactory factory = (ConnectionFactory) context.lookup("myConnection");
137169

deps/rabbit/test/amqp_jms_SUITE_data/src/test/java/com/rabbitmq/amqp/tests/jms/TestUtils.java

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,16 @@
1919

2020
import static java.lang.String.format;
2121

22+
import com.rabbitmq.qpid.protonj2.client.Client;
23+
import com.rabbitmq.qpid.protonj2.client.ConnectionOptions;
24+
import com.rabbitmq.qpid.protonj2.client.exceptions.ClientException;
2225
import jakarta.jms.Connection;
2326
import jakarta.jms.ConnectionFactory;
2427
import jakarta.jms.JMSException;
2528
import jakarta.jms.Queue;
2629
import java.lang.reflect.Method;
30+
import java.net.URI;
31+
import java.net.URISyntaxException;
2732
import java.util.UUID;
2833
import org.apache.qpid.jms.JmsConnectionFactory;
2934
import org.apache.qpid.jms.JmsQueue;
@@ -41,6 +46,24 @@ static String brokerUri() {
4146
return uri == null || uri.isEmpty() ? DEFAULT_BROKER_URI : uri;
4247
}
4348

49+
static String brokerHost() {
50+
try {
51+
URI uri = new URI(brokerUri());
52+
return uri.getHost();
53+
} catch (URISyntaxException e) {
54+
throw new RuntimeException(e);
55+
}
56+
}
57+
58+
static int brokerPort() {
59+
try {
60+
URI uri = new URI(brokerUri());
61+
return uri.getPort();
62+
} catch (URISyntaxException e) {
63+
throw new RuntimeException(e);
64+
}
65+
}
66+
4467
static String adminUsername() {
4568
return "guest";
4669
}
@@ -62,6 +85,20 @@ static Queue queue(String name) {
6285
return new JmsQueue("/queues/" + name);
6386
}
6487

88+
static Client protonClient() {
89+
return Client.create();
90+
}
91+
92+
static com.rabbitmq.qpid.protonj2.client.Connection protonConnection(Client client) {
93+
ConnectionOptions connectionOptions = new ConnectionOptions().virtualHost("vhost:/");
94+
connectionOptions.saslOptions().addAllowedMechanism("ANONYMOUS");
95+
try {
96+
return client.connect(brokerHost(), brokerPort(), connectionOptions);
97+
} catch (ClientException e) {
98+
throw new RuntimeException(e);
99+
}
100+
}
101+
65102
static String name(TestInfo info) {
66103
return name(info.getTestClass().get(), info.getTestMethod().get());
67104
}

0 commit comments

Comments
 (0)