Skip to content

Commit 964eefa

Browse files
committed
initial implementation of the pub/sub pattern using JMS
1 parent f9945c9 commit 964eefa

File tree

9 files changed

+436
-0
lines changed

9 files changed

+436
-0
lines changed

publish-subsribe/checkstyle.xml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
2+
<?xml version="1.0"?>
3+
<!DOCTYPE module PUBLIC
4+
"-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
5+
"https://checkstyle.org/dtds/configuration_1_3.dtd">
6+
<module name="Checker">
7+
<property name="severity" value="warning"/>
8+
<module name="TreeWalker">
9+
<module name="JavadocMethod"/>
10+
<module name="JavadocType"/>
11+
<module name="JavadocVariable"/>
12+
<module name="JavadocStyle"/>
13+
</module>
14+
</module>

publish-subsribe/pom.xml

6.69 KB
Binary file not shown.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
package com.hungrydev399.publishsubscribe;
2+
3+
import com.hungrydev399.publishsubscribe.jms.JmsUtil;
4+
import com.hungrydev399.publishsubscribe.model.Message;
5+
import com.hungrydev399.publishsubscribe.publisher.TopicPublisher;
6+
import com.hungrydev399.publishsubscribe.subscriber.TopicSubscriber;
7+
import com.hungrydev399.publishsubscribe.subscriber.SubscriberType;
8+
9+
import java.util.ArrayList;
10+
import java.util.List;
11+
12+
/**
13+
* Main application demonstrating different aspects of JMS publish-subscribe pattern.
14+
*/
15+
public final class App {
16+
private static TopicPublisher publisher;
17+
18+
public static void main(String[] args) {
19+
try {
20+
publisher = new TopicPublisher("NEWS");
21+
22+
// Run each demonstration independently
23+
demonstrateBasicPubSub();
24+
demonstrateDurableSubscriptions();
25+
demonstrateSharedSubscriptions();
26+
27+
} catch (Exception e) {
28+
System.err.println("Error in publish-subscribe demo: " + e.getMessage());
29+
e.printStackTrace();
30+
} finally {
31+
cleanup(null);
32+
}
33+
}
34+
35+
/**
36+
* Demonstrates basic publish-subscribe with non-durable subscribers.
37+
* All subscribers receive all messages.
38+
*/
39+
private static void demonstrateBasicPubSub() throws Exception {
40+
System.out.println("\n=== Basic Publish-Subscribe Demonstration ===");
41+
List<TopicSubscriber> subscribers = new ArrayList<>();
42+
43+
try {
44+
// Create basic subscribers
45+
subscribers.add(new TopicSubscriber("BasicSub1", "NEWS", SubscriberType.NONDURABLE, null));
46+
subscribers.add(new TopicSubscriber("BasicSub2", "NEWS", SubscriberType.NONDURABLE, null));
47+
Thread.sleep(100); // Wait for subscribers to initialize
48+
49+
// Publish messages - all subscribers should receive all messages
50+
publisher.publish(new Message("Basic message 1", "NEWS"));
51+
publisher.publish(new Message("Basic message 2", "NEWS"));
52+
53+
Thread.sleep(1000); // Wait for message processing
54+
} finally {
55+
cleanup(subscribers);
56+
System.out.println("=== Basic Demonstration Completed ===\n");
57+
}
58+
}
59+
60+
/**
61+
* Demonstrates durable subscriptions that persist messages when subscribers are offline.
62+
*/
63+
private static void demonstrateDurableSubscriptions() throws Exception {
64+
System.out.println("\n=== Durable Subscriptions Demonstration ===");
65+
List<TopicSubscriber> subscribers = new ArrayList<>();
66+
67+
try {
68+
// Create durable subscriber
69+
TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS",
70+
SubscriberType.DURABLE, "durable-client");
71+
subscribers.add(durableSub);
72+
Thread.sleep(100);
73+
74+
// First message - subscriber is online
75+
publisher.publish(new Message("Durable message while online", "NEWS"));
76+
Thread.sleep(500);
77+
78+
// Disconnect subscriber
79+
durableSub.close();
80+
subscribers.clear();
81+
82+
// Send message while subscriber is offline
83+
publisher.publish(new Message("Durable message while offline", "NEWS"));
84+
Thread.sleep(500);
85+
86+
// Reconnect subscriber - should receive offline message
87+
subscribers.add(new TopicSubscriber("DurableSub", "NEWS",
88+
SubscriberType.DURABLE, "durable-client"));
89+
Thread.sleep(1000);
90+
91+
} finally {
92+
cleanup(subscribers);
93+
System.out.println("=== Durable Demonstration Completed ===\n");
94+
}
95+
}
96+
97+
/**
98+
* Demonstrates shared subscriptions where messages are distributed among subscribers.
99+
*/
100+
private static void demonstrateSharedSubscriptions() throws Exception {
101+
System.out.println("\n=== Shared Subscriptions Demonstration ===");
102+
List<TopicSubscriber> subscribers = new ArrayList<>();
103+
104+
try {
105+
// Create shared subscribers
106+
subscribers.add(new TopicSubscriber("SharedSub1", "NEWS", SubscriberType.SHARED, null));
107+
subscribers.add(new TopicSubscriber("SharedSub2", "NEWS", SubscriberType.SHARED, null));
108+
Thread.sleep(100);
109+
110+
// Messages should be distributed between subscribers
111+
for (int i = 1; i <= 4; i++) {
112+
publisher.publish(new Message("Shared message " + i, "NEWS"));
113+
Thread.sleep(100);
114+
}
115+
116+
Thread.sleep(1000);
117+
} finally {
118+
cleanup(subscribers);
119+
System.out.println("=== Shared Demonstration Completed ===\n");
120+
}
121+
}
122+
123+
/**
124+
* Cleanup specified subscribers and optionally the publisher.
125+
*/
126+
private static void cleanup(List<TopicSubscriber> subscribers) {
127+
try {
128+
if (subscribers != null) {
129+
for (TopicSubscriber subscriber : subscribers) {
130+
if (subscriber != null) {
131+
subscriber.close();
132+
}
133+
}
134+
}
135+
} catch (Exception e) {
136+
System.err.println("Error during subscriber cleanup: " + e.getMessage());
137+
}
138+
}
139+
}
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package com.hungrydev399.publishsubscribe.jms;
2+
3+
import org.apache.activemq.ActiveMQConnectionFactory;
4+
import org.apache.activemq.broker.BrokerService;
5+
import javax.jms.Connection;
6+
import javax.jms.ConnectionFactory;
7+
import javax.jms.JMSException;
8+
import javax.jms.Session;
9+
import java.util.Map;
10+
import java.util.concurrent.ConcurrentHashMap;
11+
12+
/**
13+
* JMS utility class that manages connections and provides an embedded message broker.
14+
* Supports both shared connections and client-specific connections for durable subscriptions.
15+
*/
16+
public final class JmsUtil {
17+
private static final String BROKER_URL = "tcp://localhost:61616";
18+
private static ConnectionFactory factory;
19+
private static Connection defaultConnection;
20+
private static BrokerService broker;
21+
private static Map<String, Connection> clientConnections = new ConcurrentHashMap<>();
22+
23+
static {
24+
try {
25+
// Start embedded broker for self-contained messaging
26+
broker = new BrokerService();
27+
broker.addConnector(BROKER_URL);
28+
broker.setPersistent(false); // Messages won't survive broker restart
29+
broker.start();
30+
31+
// Default connection for non-durable subscribers
32+
factory = new ActiveMQConnectionFactory(BROKER_URL);
33+
defaultConnection = factory.createConnection();
34+
defaultConnection.start();
35+
} catch (Exception e) {
36+
System.err.println("Failed to initialize JMS: " + e.getMessage());
37+
throw new RuntimeException(e);
38+
}
39+
}
40+
41+
private JmsUtil() {
42+
// Utility class, prevent instantiation
43+
}
44+
45+
/**
46+
* Creates a JMS session, optionally with a client ID for durable subscriptions.
47+
* Each client ID gets its own dedicated connection to support durable subscribers.
48+
*/
49+
public static Session createSession(String clientId) throws JMSException {
50+
if (clientId == null) {
51+
return defaultConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
52+
}
53+
54+
Connection conn = clientConnections.computeIfAbsent(clientId, id -> {
55+
try {
56+
Connection newConn = factory.createConnection();
57+
newConn.setClientID(id);
58+
newConn.start();
59+
return newConn;
60+
} catch (JMSException e) {
61+
throw new RuntimeException(e);
62+
}
63+
});
64+
65+
return conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
66+
}
67+
68+
public static Session createSession() throws JMSException {
69+
return createSession(null);
70+
}
71+
72+
/**
73+
* Closes all JMS resources.
74+
*/
75+
public static void closeConnection() {
76+
try {
77+
if (defaultConnection != null) {
78+
defaultConnection.close();
79+
}
80+
for (Connection conn : clientConnections.values()) {
81+
if (conn != null) {
82+
conn.close();
83+
}
84+
}
85+
clientConnections.clear();
86+
if (broker != null) {
87+
broker.stop();
88+
}
89+
} catch (Exception e) {
90+
System.err.println("Error closing JMS resources: " + e.getMessage());
91+
}
92+
}
93+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package com.hungrydev399.publishsubscribe.model;
2+
3+
import java.io.Serializable;
4+
5+
public class Message implements Serializable {
6+
private String content;
7+
private String topic;
8+
9+
public Message(String content, String topic) {
10+
this.content = content;
11+
this.topic = topic;
12+
}
13+
14+
public String getContent() {
15+
return content;
16+
}
17+
18+
public String getTopic() {
19+
return topic;
20+
}
21+
22+
@Override
23+
public String toString() {
24+
return "Message{topic='" + topic + "', content='" + content + "'}";
25+
}
26+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package com.hungrydev399.publishsubscribe.publisher;
2+
3+
import javax.jms.DeliveryMode;
4+
import javax.jms.JMSException;
5+
import javax.jms.MessageProducer;
6+
import javax.jms.Session;
7+
import javax.jms.TextMessage;
8+
import javax.jms.Topic;
9+
import com.hungrydev399.publishsubscribe.jms.JmsUtil;
10+
import com.hungrydev399.publishsubscribe.model.Message;
11+
12+
/**
13+
* JMS topic publisher that supports persistent messages and message grouping
14+
* for shared subscriptions.
15+
*/
16+
public class TopicPublisher {
17+
private final Session session;
18+
private final MessageProducer producer;
19+
20+
public TopicPublisher(String topicName) throws JMSException {
21+
session = JmsUtil.createSession();
22+
Topic topic = session.createTopic(topicName);
23+
producer = session.createProducer(topic);
24+
// Enable persistent delivery for durable subscriptions
25+
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
26+
}
27+
28+
public void publish(Message message) throws JMSException {
29+
TextMessage textMessage = session.createTextMessage(message.getContent());
30+
// Group messages to enable load balancing for shared subscribers
31+
textMessage.setStringProperty("JMSXGroupID", "group-" + message.getTopic());
32+
producer.send(textMessage);
33+
System.out.println("Published to topic " + message.getTopic() + ": " + message.getContent());
34+
}
35+
36+
public void close() throws JMSException {
37+
if (session != null) {
38+
session.close();
39+
}
40+
}
41+
}
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package com.hungrydev399.publishsubscribe.subscriber;
2+
3+
public enum SubscriberType {
4+
NONDURABLE, // Regular non-durable subscriber
5+
DURABLE, // Durable subscriber that receives messages even when offline
6+
SHARED, // Shared subscription where multiple subscribers share the load
7+
SHARED_DURABLE // Combination of shared and durable subscription
8+
}

0 commit comments

Comments
 (0)