From 964eefa60c98a10610eec3d1b9e8a7cfcc3838e5 Mon Sep 17 00:00:00 2001 From: hungryDev399 Date: Sun, 8 Dec 2024 13:56:42 +0200 Subject: [PATCH 1/6] initial implementation of the pub/sub pattern using JMS --- publish-subsribe/checkstyle.xml | 14 ++ publish-subsribe/pom.xml | Bin 0 -> 6854 bytes .../hungrydev399/publishsubscribe/App.java | 139 ++++++++++++++++++ .../publishsubscribe/jms/JmsUtil.java | 93 ++++++++++++ .../publishsubscribe/model/Message.java | 26 ++++ .../publisher/TopicPublisher.java | 41 ++++++ .../subscriber/SubscriberType.java | 8 + .../subscriber/TopicSubscriber.java | 98 ++++++++++++ .../src/main/resources/log4j2.xml | 17 +++ 9 files changed, 436 insertions(+) create mode 100644 publish-subsribe/checkstyle.xml create mode 100644 publish-subsribe/pom.xml create mode 100644 publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java create mode 100644 publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java create mode 100644 publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java create mode 100644 publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java create mode 100644 publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java create mode 100644 publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java create mode 100644 publish-subsribe/src/main/resources/log4j2.xml diff --git a/publish-subsribe/checkstyle.xml b/publish-subsribe/checkstyle.xml new file mode 100644 index 000000000000..ccf7369e4c67 --- /dev/null +++ b/publish-subsribe/checkstyle.xml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/publish-subsribe/pom.xml b/publish-subsribe/pom.xml new file mode 100644 index 0000000000000000000000000000000000000000..27bde75b4751172b2d18b5c7be679d4928dc16b7 GIT binary patch literal 6854 zcmeHM*>2N76ur+#{6p1uyG>XMl}UxFB9ID26j0w#nz(67>XhtDd%`LL!VvQ!gVQK`HG&I z#Q2uu?x9DDUt^#Q(BrKf$!oL^@oZb&n;zya@TW2?!2Tw@V%!fLyf#p_r3J(j;2z5H zLLxPZfI~fJ_@3c;3Vo?R@ptj5bz{#%=+{F#!<#YsHe=bw+U!ubzQ<$aFcmA^`>>)T z_vIdJ(N_ZA#~7EwFCXzaQ5MpUjx#z@xMlr(yn*uh93FfH+zz-ILq-gH=0N4!D<$tM zeDM?)vBNPwVi#k`?ipBS9BJGfc&E^M2#wEi^B4Ux!IM*1Lfqn!K00I0wYsGuD?&89 zk>`lHV|j_Vd#~`qzCMzU_FUmn-xT`zY`d)6Xx9wo7%@^~V~iWao(Zf;fzXm2CEd-C zRbyPus!C<7Gt*LVTve(+rdX<91!8z-u59gtY1h&Nm1&GJ=H380d;sq+kc&^0{q=EN zy%-m(J(Z{!r7tt}jWuAoN0~!@|Em)3TYrRcuPa?x|8&$3Rdu|SFN%wBq-&wUH*Hdq z2O(~Rej{AfGRL5sHc|Syne+($ZNYajGM$*qvd5xq6)<1MWRaK3`dSGgcZT&TY(GQQ zyJ(DIix?r&hp2l`yctXA6> zcuEVyQOo9VE2}tzQmpT}%ZebutSlqXmU>>da>Lo1J1MP)_zcqec{TMOzG^B^>@>D#wDo2T$ zcQdT3hR~jS8oN_oj|c8lyK1elj|}Z1o2Y}{2bJ$GW8}c{ePWDVUXF`)7D=cU$y>GU zq}=K?r`h`CuL*4BAZxY?P8+XQRub1`Jmu-hXZUl5vjOgbZ@zM|dTlGa>QyerT{BVb zA{_$NJxl7SjOVJDtbFpg;)xDV(-=R?F{(tctz8<6`*O-WLiX^Cj-MUe6MWNa=wnyn`iYkV0fi*LGchUj;w(LY+!iG?!Ja zv(|rg>%2iQZ6bS!uQpiOz-{MP{$}j&tIwjkz6VaLaAqW#Hi2h&V#xWDUg)EpExe-N z)I-hH`iuT8#&+M=lTdeFHkO;#T{+mg$k;Lx>N0B)){m&$UOU{zj99#i_U9$mKc4>I v{A|RX)W0zsnJODrV!uX~^{SFRe^M!{MMl^wjCj7cXV5yM^ceTwD*gNy!YQJ{ literal 0 HcmV?d00001 diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java new file mode 100644 index 000000000000..e03080384ad8 --- /dev/null +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java @@ -0,0 +1,139 @@ +package com.hungrydev399.publishsubscribe; + +import com.hungrydev399.publishsubscribe.jms.JmsUtil; +import com.hungrydev399.publishsubscribe.model.Message; +import com.hungrydev399.publishsubscribe.publisher.TopicPublisher; +import com.hungrydev399.publishsubscribe.subscriber.TopicSubscriber; +import com.hungrydev399.publishsubscribe.subscriber.SubscriberType; + +import java.util.ArrayList; +import java.util.List; + +/** + * Main application demonstrating different aspects of JMS publish-subscribe pattern. + */ +public final class App { + private static TopicPublisher publisher; + + public static void main(String[] args) { + try { + publisher = new TopicPublisher("NEWS"); + + // Run each demonstration independently + demonstrateBasicPubSub(); + demonstrateDurableSubscriptions(); + demonstrateSharedSubscriptions(); + + } catch (Exception e) { + System.err.println("Error in publish-subscribe demo: " + e.getMessage()); + e.printStackTrace(); + } finally { + cleanup(null); + } + } + + /** + * Demonstrates basic publish-subscribe with non-durable subscribers. + * All subscribers receive all messages. + */ + private static void demonstrateBasicPubSub() throws Exception { + System.out.println("\n=== Basic Publish-Subscribe Demonstration ==="); + List subscribers = new ArrayList<>(); + + try { + // Create basic subscribers + subscribers.add(new TopicSubscriber("BasicSub1", "NEWS", SubscriberType.NONDURABLE, null)); + subscribers.add(new TopicSubscriber("BasicSub2", "NEWS", SubscriberType.NONDURABLE, null)); + Thread.sleep(100); // Wait for subscribers to initialize + + // Publish messages - all subscribers should receive all messages + publisher.publish(new Message("Basic message 1", "NEWS")); + publisher.publish(new Message("Basic message 2", "NEWS")); + + Thread.sleep(1000); // Wait for message processing + } finally { + cleanup(subscribers); + System.out.println("=== Basic Demonstration Completed ===\n"); + } + } + + /** + * Demonstrates durable subscriptions that persist messages when subscribers are offline. + */ + private static void demonstrateDurableSubscriptions() throws Exception { + System.out.println("\n=== Durable Subscriptions Demonstration ==="); + List subscribers = new ArrayList<>(); + + try { + // Create durable subscriber + TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS", + SubscriberType.DURABLE, "durable-client"); + subscribers.add(durableSub); + Thread.sleep(100); + + // First message - subscriber is online + publisher.publish(new Message("Durable message while online", "NEWS")); + Thread.sleep(500); + + // Disconnect subscriber + durableSub.close(); + subscribers.clear(); + + // Send message while subscriber is offline + publisher.publish(new Message("Durable message while offline", "NEWS")); + Thread.sleep(500); + + // Reconnect subscriber - should receive offline message + subscribers.add(new TopicSubscriber("DurableSub", "NEWS", + SubscriberType.DURABLE, "durable-client")); + Thread.sleep(1000); + + } finally { + cleanup(subscribers); + System.out.println("=== Durable Demonstration Completed ===\n"); + } + } + + /** + * Demonstrates shared subscriptions where messages are distributed among subscribers. + */ + private static void demonstrateSharedSubscriptions() throws Exception { + System.out.println("\n=== Shared Subscriptions Demonstration ==="); + List subscribers = new ArrayList<>(); + + try { + // Create shared subscribers + subscribers.add(new TopicSubscriber("SharedSub1", "NEWS", SubscriberType.SHARED, null)); + subscribers.add(new TopicSubscriber("SharedSub2", "NEWS", SubscriberType.SHARED, null)); + Thread.sleep(100); + + // Messages should be distributed between subscribers + for (int i = 1; i <= 4; i++) { + publisher.publish(new Message("Shared message " + i, "NEWS")); + Thread.sleep(100); + } + + Thread.sleep(1000); + } finally { + cleanup(subscribers); + System.out.println("=== Shared Demonstration Completed ===\n"); + } + } + + /** + * Cleanup specified subscribers and optionally the publisher. + */ + private static void cleanup(List subscribers) { + try { + if (subscribers != null) { + for (TopicSubscriber subscriber : subscribers) { + if (subscriber != null) { + subscriber.close(); + } + } + } + } catch (Exception e) { + System.err.println("Error during subscriber cleanup: " + e.getMessage()); + } + } +} diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java new file mode 100644 index 000000000000..f2c0a7d1a254 --- /dev/null +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java @@ -0,0 +1,93 @@ +package com.hungrydev399.publishsubscribe.jms; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.JMSException; +import javax.jms.Session; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * JMS utility class that manages connections and provides an embedded message broker. + * Supports both shared connections and client-specific connections for durable subscriptions. + */ +public final class JmsUtil { + private static final String BROKER_URL = "tcp://localhost:61616"; + private static ConnectionFactory factory; + private static Connection defaultConnection; + private static BrokerService broker; + private static Map clientConnections = new ConcurrentHashMap<>(); + + static { + try { + // Start embedded broker for self-contained messaging + broker = new BrokerService(); + broker.addConnector(BROKER_URL); + broker.setPersistent(false); // Messages won't survive broker restart + broker.start(); + + // Default connection for non-durable subscribers + factory = new ActiveMQConnectionFactory(BROKER_URL); + defaultConnection = factory.createConnection(); + defaultConnection.start(); + } catch (Exception e) { + System.err.println("Failed to initialize JMS: " + e.getMessage()); + throw new RuntimeException(e); + } + } + + private JmsUtil() { + // Utility class, prevent instantiation + } + + /** + * Creates a JMS session, optionally with a client ID for durable subscriptions. + * Each client ID gets its own dedicated connection to support durable subscribers. + */ + public static Session createSession(String clientId) throws JMSException { + if (clientId == null) { + return defaultConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + Connection conn = clientConnections.computeIfAbsent(clientId, id -> { + try { + Connection newConn = factory.createConnection(); + newConn.setClientID(id); + newConn.start(); + return newConn; + } catch (JMSException e) { + throw new RuntimeException(e); + } + }); + + return conn.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + public static Session createSession() throws JMSException { + return createSession(null); + } + + /** + * Closes all JMS resources. + */ + public static void closeConnection() { + try { + if (defaultConnection != null) { + defaultConnection.close(); + } + for (Connection conn : clientConnections.values()) { + if (conn != null) { + conn.close(); + } + } + clientConnections.clear(); + if (broker != null) { + broker.stop(); + } + } catch (Exception e) { + System.err.println("Error closing JMS resources: " + e.getMessage()); + } + } +} diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java new file mode 100644 index 000000000000..98c522115e79 --- /dev/null +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java @@ -0,0 +1,26 @@ +package com.hungrydev399.publishsubscribe.model; + +import java.io.Serializable; + +public class Message implements Serializable { + private String content; + private String topic; + + public Message(String content, String topic) { + this.content = content; + this.topic = topic; + } + + public String getContent() { + return content; + } + + public String getTopic() { + return topic; + } + + @Override + public String toString() { + return "Message{topic='" + topic + "', content='" + content + "'}"; + } +} diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java new file mode 100644 index 000000000000..3a3a27e59f53 --- /dev/null +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java @@ -0,0 +1,41 @@ +package com.hungrydev399.publishsubscribe.publisher; + +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import com.hungrydev399.publishsubscribe.jms.JmsUtil; +import com.hungrydev399.publishsubscribe.model.Message; + +/** + * JMS topic publisher that supports persistent messages and message grouping + * for shared subscriptions. + */ +public class TopicPublisher { + private final Session session; + private final MessageProducer producer; + + public TopicPublisher(String topicName) throws JMSException { + session = JmsUtil.createSession(); + Topic topic = session.createTopic(topicName); + producer = session.createProducer(topic); + // Enable persistent delivery for durable subscriptions + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + } + + public void publish(Message message) throws JMSException { + TextMessage textMessage = session.createTextMessage(message.getContent()); + // Group messages to enable load balancing for shared subscribers + textMessage.setStringProperty("JMSXGroupID", "group-" + message.getTopic()); + producer.send(textMessage); + System.out.println("Published to topic " + message.getTopic() + ": " + message.getContent()); + } + + public void close() throws JMSException { + if (session != null) { + session.close(); + } + } +} diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java new file mode 100644 index 000000000000..73395221851c --- /dev/null +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java @@ -0,0 +1,8 @@ +package com.hungrydev399.publishsubscribe.subscriber; + +public enum SubscriberType { + NONDURABLE, // Regular non-durable subscriber + DURABLE, // Durable subscriber that receives messages even when offline + SHARED, // Shared subscription where multiple subscribers share the load + SHARED_DURABLE // Combination of shared and durable subscription +} diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java new file mode 100644 index 000000000000..e4f4177dd37e --- /dev/null +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java @@ -0,0 +1,98 @@ +package com.hungrydev399.publishsubscribe.subscriber; + +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageListener; +import javax.jms.Session; +import javax.jms.TextMessage; +import javax.jms.Topic; +import javax.jms.JMSException; +import com.hungrydev399.publishsubscribe.jms.JmsUtil; + +/** + * JMS topic subscriber that supports different subscription types: + * - Non-durable: Regular subscriber that misses messages when offline + * - Durable: Receives missed messages after reconnecting + * - Shared: Multiple subscribers share the message load + */ +public class TopicSubscriber implements MessageListener { + private final Session session; + private final MessageConsumer consumer; + private final String name; + private final SubscriberType type; + private final String subscriptionName; + + public TopicSubscriber(String name, String topicName, SubscriberType type, String clientId) throws JMSException { + this.name = name; + this.type = type; + this.subscriptionName = generateSubscriptionName(name, type); + session = JmsUtil.createSession(clientId); + Topic topic = session.createTopic(topicName); + + switch (type) { + case DURABLE: + // Durable subscribers maintain subscription state even when offline + consumer = session.createDurableSubscriber(topic, subscriptionName); + break; + case SHARED: + // Shared subscribers distribute the message load using message groups + consumer = session.createConsumer(topic, + "JMSXGroupID = '" + subscriptionName + "'"); + break; + case SHARED_DURABLE: + consumer = session.createDurableSubscriber(topic, subscriptionName); + break; + default: + consumer = session.createConsumer(topic); + } + + consumer.setMessageListener(this); + System.out.println("Created " + type + " subscriber: " + name); + } + + private String generateSubscriptionName(String name, SubscriberType type) { + switch (type) { + case DURABLE: + return "durable-" + name; + case SHARED_DURABLE: + return "shared-durable-" + name; + case SHARED: + return "shared-" + name; + default: + return name; + } + } + + @Override + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; + System.out.println(name + " (" + type + ") received: " + textMessage.getText()); + } + } catch (JMSException e) { + e.printStackTrace(); + } + } + + /** + * Cleanup subscriber resources, ensuring proper unsubscribe for durable subscribers. + */ + public void close() throws JMSException { + if (type == SubscriberType.DURABLE || type == SubscriberType.SHARED_DURABLE) { + try { + consumer.close(); + session.unsubscribe(subscriptionName); + } catch (JMSException e) { + System.err.println("Error unsubscribing " + name + ": " + e.getMessage()); + } + } + if (session != null) { + session.close(); + } + } + + public SubscriberType getType() { + return type; + } +} diff --git a/publish-subsribe/src/main/resources/log4j2.xml b/publish-subsribe/src/main/resources/log4j2.xml new file mode 100644 index 000000000000..67980fb33ef6 --- /dev/null +++ b/publish-subsribe/src/main/resources/log4j2.xml @@ -0,0 +1,17 @@ + + + + + + + + + + + + + + + + + From 5ef4363136438b47b30a13abc3a9bf10cd1716d2 Mon Sep 17 00:00:00 2001 From: hungryDev399 Date: Sun, 8 Dec 2024 17:01:36 +0200 Subject: [PATCH 2/6] Closing connection after demonstrations --- .../hungrydev399/publishsubscribe/App.java | 44 ++++++++++--------- 1 file changed, 24 insertions(+), 20 deletions(-) diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java index e03080384ad8..6e80beeea0f0 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java @@ -10,7 +10,8 @@ import java.util.List; /** - * Main application demonstrating different aspects of JMS publish-subscribe pattern. + * Main application demonstrating different aspects of JMS publish-subscribe + * pattern. */ public final class App { private static TopicPublisher publisher; @@ -18,7 +19,7 @@ public final class App { public static void main(String[] args) { try { publisher = new TopicPublisher("NEWS"); - + // Run each demonstration independently demonstrateBasicPubSub(); demonstrateDurableSubscriptions(); @@ -30,6 +31,7 @@ public static void main(String[] args) { } finally { cleanup(null); } + JmsUtil.closeConnection(); } /** @@ -39,17 +41,17 @@ public static void main(String[] args) { private static void demonstrateBasicPubSub() throws Exception { System.out.println("\n=== Basic Publish-Subscribe Demonstration ==="); List subscribers = new ArrayList<>(); - + try { // Create basic subscribers subscribers.add(new TopicSubscriber("BasicSub1", "NEWS", SubscriberType.NONDURABLE, null)); subscribers.add(new TopicSubscriber("BasicSub2", "NEWS", SubscriberType.NONDURABLE, null)); Thread.sleep(100); // Wait for subscribers to initialize - + // Publish messages - all subscribers should receive all messages publisher.publish(new Message("Basic message 1", "NEWS")); publisher.publish(new Message("Basic message 2", "NEWS")); - + Thread.sleep(1000); // Wait for message processing } finally { cleanup(subscribers); @@ -58,36 +60,37 @@ private static void demonstrateBasicPubSub() throws Exception { } /** - * Demonstrates durable subscriptions that persist messages when subscribers are offline. + * Demonstrates durable subscriptions that persist messages when subscribers are + * offline. */ private static void demonstrateDurableSubscriptions() throws Exception { System.out.println("\n=== Durable Subscriptions Demonstration ==="); List subscribers = new ArrayList<>(); - + try { // Create durable subscriber - TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS", - SubscriberType.DURABLE, "durable-client"); + TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS", + SubscriberType.DURABLE, "durable-client"); subscribers.add(durableSub); Thread.sleep(100); - + // First message - subscriber is online publisher.publish(new Message("Durable message while online", "NEWS")); Thread.sleep(500); - + // Disconnect subscriber durableSub.close(); subscribers.clear(); - + // Send message while subscriber is offline publisher.publish(new Message("Durable message while offline", "NEWS")); Thread.sleep(500); - + // Reconnect subscriber - should receive offline message - subscribers.add(new TopicSubscriber("DurableSub", "NEWS", - SubscriberType.DURABLE, "durable-client")); + subscribers.add(new TopicSubscriber("DurableSub", "NEWS", + SubscriberType.DURABLE, "durable-client")); Thread.sleep(1000); - + } finally { cleanup(subscribers); System.out.println("=== Durable Demonstration Completed ===\n"); @@ -95,24 +98,25 @@ private static void demonstrateDurableSubscriptions() throws Exception { } /** - * Demonstrates shared subscriptions where messages are distributed among subscribers. + * Demonstrates shared subscriptions where messages are distributed among + * subscribers. */ private static void demonstrateSharedSubscriptions() throws Exception { System.out.println("\n=== Shared Subscriptions Demonstration ==="); List subscribers = new ArrayList<>(); - + try { // Create shared subscribers subscribers.add(new TopicSubscriber("SharedSub1", "NEWS", SubscriberType.SHARED, null)); subscribers.add(new TopicSubscriber("SharedSub2", "NEWS", SubscriberType.SHARED, null)); Thread.sleep(100); - + // Messages should be distributed between subscribers for (int i = 1; i <= 4; i++) { publisher.publish(new Message("Shared message " + i, "NEWS")); Thread.sleep(100); } - + Thread.sleep(1000); } finally { cleanup(subscribers); From 35972f10dec712526a02b57a66e3e28bb3fd3c43 Mon Sep 17 00:00:00 2001 From: hungryDev399 Date: Sun, 8 Dec 2024 18:28:37 +0200 Subject: [PATCH 3/6] Adding unit tests --- publish-subsribe/checkstyle.xml | 14 ---- publish-subsribe/pom.xml | Bin 6854 -> 11268 bytes .../publishsubscribe/jms/JmsUtil.java | 50 +++++++----- .../publishsubscribe/TestBase.java | 30 +++++++ .../publishsubscribe/jms/JmsUtilTest.java | 28 +++++++ .../publishsubscribe/model/MessageTest.java | 31 ++++++++ .../publisher/TopicPublisherTest.java | 65 ++++++++++++++++ .../subscriber/TopicSubscriberTest.java | 73 ++++++++++++++++++ 8 files changed, 258 insertions(+), 33 deletions(-) delete mode 100644 publish-subsribe/checkstyle.xml create mode 100644 publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java create mode 100644 publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java create mode 100644 publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java create mode 100644 publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java create mode 100644 publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java diff --git a/publish-subsribe/checkstyle.xml b/publish-subsribe/checkstyle.xml deleted file mode 100644 index ccf7369e4c67..000000000000 --- a/publish-subsribe/checkstyle.xml +++ /dev/null @@ -1,14 +0,0 @@ - - - - - - - - - - - - \ No newline at end of file diff --git a/publish-subsribe/pom.xml b/publish-subsribe/pom.xml index 27bde75b4751172b2d18b5c7be679d4928dc16b7..031ee5ee621fa14d94ffddece785d64b0e9a2a4a 100644 GIT binary patch delta 1729 zcmbVMO=}ZT6ur<%gGtjgN}G%zCZbjsi5*)(X&Nc5U`y0gEVvS<>DcBw%p_uwDuV7@ zh<8)4xYE7Sr5oM2_Ya6aq1`KZ?wd)Oq(US-rg?quew=&mx$paxm-j#IXm7PpxJ+%T zQ;AxX;e$YYe2Ow;P?j!{|0Xs(8Ld%+id0dy4P`y9UxoFQvhKZw$84EQc!(G5=`mSk zlLP+-GUniyA@Mg3^ZBe9{uM`An()d1jfn^*b`s5vWzq&>7O09EHV{{$`g2MZ{~3d? zCsNO|UDoTgg@`8A5V=V6YG1{fRNx?!jofYQEV_%Y2~;*%bQBx@to`CdC`3tF{AVaK z?f3N8l2Q3OSqi|_qD`PUOHX-4J7Jj$$vUvvw2iu3u&@xme+kcSDJ6aT2R?+gqM7` z>7LtIXz4WASEt?t^hHUqrDzm6rl&f1Ym%7uJrcZ6^OJJ($BB1giQ`!z z#(^Rrmw3ppW4loYa*+E+O3oI)h!g{tfpi+!UWHv@=cTEj?%Jxf+EnsMdooD}zOsN3Kug0dcpLoU8q! zR|dn~iS9nLbqH5FA-zDa`{kTZ(<1CFSjtGd!|fk!d3a5a0|2Ge3J?GQ diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java index f2c0a7d1a254..799e8a82dd86 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java +++ b/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java @@ -19,34 +19,39 @@ public final class JmsUtil { private static Connection defaultConnection; private static BrokerService broker; private static Map clientConnections = new ConcurrentHashMap<>(); - - static { - try { - // Start embedded broker for self-contained messaging - broker = new BrokerService(); - broker.addConnector(BROKER_URL); - broker.setPersistent(false); // Messages won't survive broker restart - broker.start(); - - // Default connection for non-durable subscribers - factory = new ActiveMQConnectionFactory(BROKER_URL); - defaultConnection = factory.createConnection(); - defaultConnection.start(); - } catch (Exception e) { - System.err.println("Failed to initialize JMS: " + e.getMessage()); - throw new RuntimeException(e); - } - } + private static boolean isInitialized = false; private JmsUtil() { // Utility class, prevent instantiation } + public static synchronized void initialize() { + if (!isInitialized) { + try { + broker = new BrokerService(); + broker.addConnector(BROKER_URL); + broker.setPersistent(false); + broker.start(); + + factory = new ActiveMQConnectionFactory(BROKER_URL); + defaultConnection = factory.createConnection(); + defaultConnection.start(); + isInitialized = true; + } catch (Exception e) { + System.err.println("Failed to initialize JMS: " + e.getMessage()); + throw new RuntimeException(e); + } + } + } + /** * Creates a JMS session, optionally with a client ID for durable subscriptions. * Each client ID gets its own dedicated connection to support durable subscribers. */ public static Session createSession(String clientId) throws JMSException { + if (!isInitialized) { + initialize(); + } if (clientId == null) { return defaultConnection.createSession(false, Session.AUTO_ACKNOWLEDGE); } @@ -72,8 +77,9 @@ public static Session createSession() throws JMSException { /** * Closes all JMS resources. */ - public static void closeConnection() { + public static synchronized void closeConnection() { try { + isInitialized = false; if (defaultConnection != null) { defaultConnection.close(); } @@ -90,4 +96,10 @@ public static void closeConnection() { System.err.println("Error closing JMS resources: " + e.getMessage()); } } + + public static synchronized void reset() { + closeConnection(); + isInitialized = false; + initialize(); + } } diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java new file mode 100644 index 000000000000..1f823ba73be1 --- /dev/null +++ b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java @@ -0,0 +1,30 @@ +package com.hungrydev399.publishsubscribe; + +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.AfterAll; +import com.hungrydev399.publishsubscribe.jms.JmsUtil; + +/** + * Base class for JMS-related tests. + * + * Provides: + * - Common JMS broker initialization + * - Resource cleanup after tests + * - Shared configuration for all JMS tests + * + * Usage: + * - Extend this class in any test that needs JMS functionality + * - Ensures consistent JMS lifecycle across test classes + * - Prevents connection/broker issues between tests + */ +public abstract class TestBase { + @BeforeAll + static void initializeJms() { + JmsUtil.initialize(); + } + + @AfterAll + static void cleanupJms() { + JmsUtil.closeConnection(); + } +} diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java new file mode 100644 index 000000000000..f106e274e4fb --- /dev/null +++ b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java @@ -0,0 +1,28 @@ +package com.hungrydev399.publishsubscribe.jms; + +import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; +import javax.jms.JMSException; +import javax.jms.Session; + +public class JmsUtilTest { + + @Test + void shouldCreateSessionWithoutClientId() throws JMSException { + Session session = JmsUtil.createSession(); + assertNotNull(session); + } + + @Test + void shouldCreateSessionWithClientId() throws JMSException { + Session session = JmsUtil.createSession("test-client"); + assertNotNull(session); + } + + @Test + void shouldCloseConnectionGracefully() { + JmsUtil.closeConnection(); + // Verify no exceptions thrown + assertTrue(true); + } +} diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java new file mode 100644 index 000000000000..1cdae09b99da --- /dev/null +++ b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java @@ -0,0 +1,31 @@ +package com.hungrydev399.publishsubscribe.model; + +import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.Test; + +/** + * Tests for the Message model class. + * + * Test coverage: + * - Message creation with content and topic + * - String representation (toString) + * - Getters functionality + */ +public class MessageTest { + @Test + void shouldCreateMessageWithContentAndTopic() { + Message message = new Message("Test content", "test-topic"); + + assertEquals("Test content", message.getContent()); + assertEquals("test-topic", message.getTopic()); + } + + @Test + void shouldGenerateCorrectToString() { + Message message = new Message("Test content", "test-topic"); + String toString = message.toString(); + + assertTrue(toString.contains("test-topic")); + assertTrue(toString.contains("Test content")); + } +} diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java new file mode 100644 index 000000000000..5bedb0f20053 --- /dev/null +++ b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java @@ -0,0 +1,65 @@ +package com.hungrydev399.publishsubscribe.publisher; + +import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.*; + +import com.hungrydev399.publishsubscribe.model.Message; +import com.hungrydev399.publishsubscribe.subscriber.TopicSubscriber; +import com.hungrydev399.publishsubscribe.subscriber.SubscriberType; +import com.hungrydev399.publishsubscribe.TestBase; +import com.hungrydev399.publishsubscribe.jms.JmsUtil; + +/** + * Tests for the TopicPublisher class. + * + * Test Strategy: + * - Uses TestBase for JMS lifecycle management + * - Creates both publisher and subscriber to verify message flow + * - Tests null message handling + * - Validates resource cleanup + * + * Test Coverage: + * - Message publishing functionality + * - Error handling (null messages) + * - Resource cleanup (close) + * - JMS connection management + */ +class TopicPublisherTest extends TestBase { + private TopicPublisher publisher; + private TopicSubscriber subscriber; + private static final String TEST_TOPIC = "TEST_TOPIC"; + private static final String TEST_MESSAGE = "Test Message"; + + @BeforeEach + void setUp() throws Exception { + JmsUtil.reset(); // Reset JMS state before each test + publisher = new TopicPublisher(TEST_TOPIC); + subscriber = new TopicSubscriber("TestSub", TEST_TOPIC, SubscriberType.NONDURABLE, null); + Thread.sleep(100); // Allow connection setup + } + + @AfterEach + void tearDown() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + if (publisher != null) { + publisher.close(); + } + } + + @Test + void shouldPublishAndReceiveMessage() throws Exception { + Message message = new Message(TEST_MESSAGE, TEST_TOPIC); + publisher.publish(message); + + Thread.sleep(500); // Allow message delivery + // Verification is done through console output + assertTrue(true); // Test passes if no exceptions thrown + } + + @Test + void shouldHandleNullMessage() { + assertThrows(NullPointerException.class, () -> publisher.publish(null)); + } +} diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java new file mode 100644 index 000000000000..493f72fa7b32 --- /dev/null +++ b/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java @@ -0,0 +1,73 @@ +package com.hungrydev399.publishsubscribe.subscriber; + +import static org.junit.jupiter.api.Assertions.*; +import org.junit.jupiter.api.*; + +import com.hungrydev399.publishsubscribe.model.Message; +import com.hungrydev399.publishsubscribe.publisher.TopicPublisher; +import com.hungrydev399.publishsubscribe.TestBase; +import com.hungrydev399.publishsubscribe.jms.JmsUtil; + +/** + * Tests for the TopicSubscriber class. + * + * Test Strategy: + * - Tests different subscription types (durable, non-durable) + * - Verifies message reception + * - Ensures proper resource cleanup + * - Uses TestBase for JMS infrastructure + * + * Test Coverage: + * - Subscriber creation with different types + * - Message reception functionality + * - Resource cleanup + * - Subscription type verification + * - Error handling + */ +class TopicSubscriberTest extends TestBase { + private TopicPublisher publisher; + private TopicSubscriber subscriber; + private static final String TEST_TOPIC = "TEST_TOPIC"; + + @BeforeEach + void setUp() throws Exception { + JmsUtil.reset(); // Reset JMS state before each test + publisher = new TopicPublisher(TEST_TOPIC); + } + + @AfterEach + void tearDown() throws Exception { + if (subscriber != null) { + subscriber.close(); + } + if (publisher != null) { + publisher.close(); + } + } + + @Test + void shouldCreateNonDurableSubscriber() throws Exception { + subscriber = new TopicSubscriber("test", TEST_TOPIC, SubscriberType.NONDURABLE, null); + assertNotNull(subscriber); + assertEquals(SubscriberType.NONDURABLE, subscriber.getType()); + } + + @Test + void shouldCreateDurableSubscriber() throws Exception { + subscriber = new TopicSubscriber("test", TEST_TOPIC, SubscriberType.DURABLE, "client1"); + assertNotNull(subscriber); + assertEquals(SubscriberType.DURABLE, subscriber.getType()); + } + + @Test + void shouldReceiveMessages() throws Exception { + subscriber = new TopicSubscriber("test", TEST_TOPIC, SubscriberType.NONDURABLE, null); + Thread.sleep(100); // Allow subscriber to initialize + + publisher.publish(new Message("Test message", TEST_TOPIC)); + Thread.sleep(500); // Allow message delivery + + // Verification is done through console output + assertTrue(true); // Test passes if no exceptions thrown + } +} From 3b1ed092ddf53582c1864841aa9defef2af19f23 Mon Sep 17 00:00:00 2001 From: hungryDev399 Date: Sun, 8 Dec 2024 18:53:50 +0200 Subject: [PATCH 4/6] Adding README.md --- publish-subsribe/README.md | 218 +++++++++++++++++++++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 publish-subsribe/README.md diff --git a/publish-subsribe/README.md b/publish-subsribe/README.md new file mode 100644 index 000000000000..7951964bde20 --- /dev/null +++ b/publish-subsribe/README.md @@ -0,0 +1,218 @@ +--- +title: "Publish/Subscribe" +shortTitle: "Pub/Sub Pattern" +description: "A messaging pattern that enables loose coupling between publishers and subscribers through message topics" +category: Behavioral +language: en +tag: + - Messaging + - Event-Driven + - Decoupling +--- + +## Also known as + +- Pub/Sub +- Observer Pattern (similar but not identical) + +## Intent + +Establish a one-to-many, many-to-one, or many-to-many dependency between objects where multiple publishers send messages to topics, and multiple subscribers can receive those messages without direct knowledge of each other, promoting loose coupling and scalability. + +## Core Concepts + +### Topics and Messaging Components + +A topic is a distribution mechanism for publishing messages between message producers and message consumers in a one-to-many relationship. Key components include: + +- **Topics**: Destinations where publishers send messages and from which subscribers receive them. Unlike queues (used in point-to-point messaging), topics can have multiple consumers. + +- **Message Producers**: Lightweight objects created to send messages to a topic. They are typically created per topic and can be created for each message since they don't consume significant resources. + +- **Message Consumers**: Objects that subscribe to topics and receive messages. Consumers can receive messages either: + - Synchronously: By calling a `receive()` method that blocks until a message arrives + - Asynchronously: By implementing a message listener with an `onMessage()` callback (which is discussed in this implementation) + +### Types of Subscriptions + +1. **Non-Durable Subscriptions** + + - Simplest form of subscription + - Exists only while the consumer is active + - Messages sent while the subscriber is inactive are missed + - Best for real-time data where missing messages is acceptable + - Example: Live sports score updates + +2. **Durable Subscriptions** + + - Maintains subscription state even when subscriber is offline + - Messages are stored by JMS provider until consumer reconnects + - Requires a unique client identifier for persistence + - Two subtypes: + - Unshared: Only one consumer can use the subscription + - Shared: Multiple consumers can share the subscription + - Example: Critical business notifications + +3. **Shared Subscriptions** + - Allows multiple consumers to share message load + - Messages are distributed among active consumers + - Can be combined with durability + - Useful for load balancing and high-throughput scenarios + - Example: Distributed processing of bank transactions + +### Messaging Infrastructure + +The JMS (Java Message Service) provider handles: + +- Message persistence for durable subscriptions +- Message distribution to appropriate subscribers +- Connection and session management +- Transaction support when integrated with JTA +- Load balancing for shared subscriptions + +## Detailed Explanation with Real-World Examples + +Real-world example + +> Consider a news distribution system where different types of subscribers receive news updates: +> +> - Regular subscribers who only receive messages while they're online +> - Durable subscribers who receive missed messages when they reconnect +> - Shared subscribers who distribute the message load among multiple instances +> This is exactly what our demonstration implements in the App.java examples. + +In plain words + +> A news publishing system where publishers can send news to topics, and different types of subscribers (basic, durable, shared) can receive these updates in various ways, as demonstrated in our three main scenarios: basic publish-subscribe, durable subscriptions, and shared subscriptions. + +Wikipedia says + +> Publish–subscribe is a messaging pattern where senders of messages, called publishers, do not program the messages to be sent directly to specific receivers, called subscribers, but instead categorize published messages into classes without knowledge of which subscribers, if any, may be interested. + +## Programmatic Example + +### 1. Basic Publish-Subscribe Pattern + +The most straightforward implementation where all subscribers receive all messages: + +```java +// Create basic subscribers that receive messages only while online +TopicSubscriber basicSub1 = new TopicSubscriber( + "BasicSub1", "NEWS", SubscriberType.NONDURABLE, null +); +TopicSubscriber basicSub2 = new TopicSubscriber( + "BasicSub2", "NEWS", SubscriberType.NONDURABLE, null +); + +// Create publisher and send messages +TopicPublisher publisher = new TopicPublisher("NEWS"); +publisher.publish(new Message("Basic message 1", "NEWS")); +publisher.publish(new Message("Basic message 2", "NEWS")); + +// Both BasicSub1 and BasicSub2 will receive all messages +``` + +### 2. Durable Subscriptions Pattern + +Demonstrates how subscribers can receive messages that were published while they were offline: + +```java +// Create durable subscriber with client ID for persistence +TopicSubscriber durableSub = new TopicSubscriber( + "DurableSub", "NEWS", SubscriberType.DURABLE, "durable-client" +); + +// First message - subscriber receives while online +publisher.publish(new Message("Durable message while online", "NEWS")); + +// Subscriber goes offline (close connection) +durableSub.close(); + +// Message sent while subscriber is offline +publisher.publish(new Message("Durable message while offline", "NEWS")); + +// When subscriber reconnects, it receives the missed message +durableSub = new TopicSubscriber( + "DurableSub", "NEWS", SubscriberType.DURABLE, "durable-client" +); +``` + +### 3. Shared Subscriptions Pattern + +Shows how messages can be distributed among multiple subscribers for load balancing: + +```java +// Create shared subscribers that will distribute the message load +TopicSubscriber sharedSub1 = new TopicSubscriber( + "SharedSub1", "NEWS", SubscriberType.SHARED, null +); +TopicSubscriber sharedSub2 = new TopicSubscriber( + "SharedSub2", "NEWS", SubscriberType.SHARED, null +); + +// Send multiple messages that will be distributed +publisher.publish(new Message("Shared message 1", "NEWS")); +publisher.publish(new Message("Shared message 2", "NEWS")); +publisher.publish(new Message("Shared message 3", "NEWS")); +publisher.publish(new Message("Shared message 4", "NEWS")); + +// Messages are distributed between SharedSub1 and SharedSub2 +// Each subscriber receives approximately half of the messages +``` + +## Implementation Details + +- Basic subscribers demonstrate the simplest form of pub/sub where all subscribers receive all messages +- Durable subscribers maintain their subscription state even when offline, ensuring no messages are missed +- Shared subscribers enable load balancing by distributing messages among multiple consumers +- Messages are delivered asynchronously through the `onMessage()` callback +- The JMS provider (ActiveMQ in this implementation) handles message persistence and distribution + +## When to Use + +- When you need different types of message consumption patterns: + - Basic subscribers for simple real-time updates + - Durable subscribers for critical messages that can't be missed + - Shared subscribers for load balancing +- When subscribers need to receive messages even after being offline (demonstrated in durableSubscriptions() example) +- When message load needs to be distributed among multiple consumers (demonstrated in sharedSubscriptions() example) + +## Real-World Applications + +Any event-driven system that requires loose coupling between publishers and subscribers can benefit from the pub/sub pattern. Some common examples include: + +- IOT systems where multiple devices publish data to a central server +- Enterprise messaging systems (JMS) for inter-application communication +- Microservices architectures where services communicate through message brokers +- News distribution systems (as demonstrated in our NEWS topic example) +- Load-balanced message processing systems (using shared subscriptions) +- Message broadcasting systems (using basic pub/sub) + +## Benefits and Trade-offs + +Benefits: + +- Loose coupling between publishers and subscribers +- Scalability through message distribution +- Flexibility to add/remove components +- Support for offline components through durable subscriptions +- Asynchronous communication + +Trade-offs: + +- Additional complexity in message delivery guarantees +- Potential performance overhead from message broker +- Message ordering challenges in distributed systems +- Durable subscriptions which allow for message persistence add a coniderable overhead + +## Related Patterns + +- Observer Pattern +- Mediator Pattern +- Event Sourcing +- Message Queue Pattern + +## References + +- Java EE Tutorial - JMS Messaging +- Enterprise Integration Patterns From d378144b54da6110806ff5cc48d6a4b5b629582a Mon Sep 17 00:00:00 2001 From: hungryDev399 Date: Sun, 8 Dec 2024 19:10:56 +0200 Subject: [PATCH 5/6] Editing package name to match the rest of the project --- publish-subsribe/pom.xml | Bin 11268 -> 11180 bytes .../publishsubscribe/App.java | 12 ++++++------ .../publishsubscribe/jms/JmsUtil.java | 2 +- .../publishsubscribe/model/Message.java | 2 +- .../publisher/TopicPublisher.java | 6 +++--- .../subscriber/SubscriberType.java | 2 +- .../subscriber/TopicSubscriber.java | 4 ++-- .../publishsubscribe/TestBase.java | 4 ++-- .../publishsubscribe/jms/JmsUtilTest.java | 2 +- .../publishsubscribe/model/MessageTest.java | 2 +- .../publisher/TopicPublisherTest.java | 12 ++++++------ .../subscriber/TopicSubscriberTest.java | 10 +++++----- 12 files changed, 29 insertions(+), 29 deletions(-) rename publish-subsribe/src/main/java/com/{hungrydev399 => iluwatar}/publishsubscribe/App.java (93%) rename publish-subsribe/src/main/java/com/{hungrydev399 => iluwatar}/publishsubscribe/jms/JmsUtil.java (98%) rename publish-subsribe/src/main/java/com/{hungrydev399 => iluwatar}/publishsubscribe/model/Message.java (91%) rename publish-subsribe/src/main/java/com/{hungrydev399 => iluwatar}/publishsubscribe/publisher/TopicPublisher.java (89%) rename publish-subsribe/src/main/java/com/{hungrydev399 => iluwatar}/publishsubscribe/subscriber/SubscriberType.java (85%) rename publish-subsribe/src/main/java/com/{hungrydev399 => iluwatar}/publishsubscribe/subscriber/TopicSubscriber.java (96%) rename publish-subsribe/src/test/java/com/{hungrydev399 => iluwatar}/publishsubscribe/TestBase.java (87%) rename publish-subsribe/src/test/java/com/{hungrydev399 => iluwatar}/publishsubscribe/jms/JmsUtilTest.java (93%) rename publish-subsribe/src/test/java/com/{hungrydev399 => iluwatar}/publishsubscribe/model/MessageTest.java (94%) rename publish-subsribe/src/test/java/com/{hungrydev399 => iluwatar}/publishsubscribe/publisher/TopicPublisherTest.java (83%) rename publish-subsribe/src/test/java/com/{hungrydev399 => iluwatar}/publishsubscribe/subscriber/TopicSubscriberTest.java (88%) diff --git a/publish-subsribe/pom.xml b/publish-subsribe/pom.xml index 031ee5ee621fa14d94ffddece785d64b0e9a2a4a..7084a9ece0e0d3e12d7c6691591653843061aad3 100644 GIT binary patch delta 62 zcmZpPSQEZMfQdhoA%~%qp`0O+p#+GFCiikFPTs{Uu{nY1hcLS2=6tdLoSU~P_;CXO DJ75x# delta 112 zcmZ1z-V(7vfJr=qp_CzyA)TR!p^_nmA(f$w!I;64!E$mVui|D0CIJy#3O2`!{pXx4 qAiyE%#gNG0%#aVp3Jd`ZIY1NB88R92Cff_hPM*NSwK-0~j~f8L#~G0T diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java similarity index 93% rename from publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java rename to publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java index 6e80beeea0f0..17be2f7dc8b7 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/App.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java @@ -1,10 +1,10 @@ -package com.hungrydev399.publishsubscribe; +package com.iluwatar.publishsubscribe; -import com.hungrydev399.publishsubscribe.jms.JmsUtil; -import com.hungrydev399.publishsubscribe.model.Message; -import com.hungrydev399.publishsubscribe.publisher.TopicPublisher; -import com.hungrydev399.publishsubscribe.subscriber.TopicSubscriber; -import com.hungrydev399.publishsubscribe.subscriber.SubscriberType; +import com.iluwatar.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.model.Message; +import com.iluwatar.publishsubscribe.publisher.TopicPublisher; +import com.iluwatar.publishsubscribe.subscriber.TopicSubscriber; +import com.iluwatar.publishsubscribe.subscriber.SubscriberType; import java.util.ArrayList; import java.util.List; diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java similarity index 98% rename from publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java rename to publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java index 799e8a82dd86..418f3477a4c1 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/jms/JmsUtil.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.jms; +package com.iluwatar.publishsubscribe.jms; import org.apache.activemq.ActiveMQConnectionFactory; import org.apache.activemq.broker.BrokerService; diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java similarity index 91% rename from publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java rename to publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java index 98c522115e79..1012b70575c9 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/model/Message.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.model; +package com.iluwatar.publishsubscribe.model; import java.io.Serializable; diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java similarity index 89% rename from publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java rename to publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java index 3a3a27e59f53..ac10ff2d37dd 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisher.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.publisher; +package com.iluwatar.publishsubscribe.publisher; import javax.jms.DeliveryMode; import javax.jms.JMSException; @@ -6,8 +6,8 @@ import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import com.hungrydev399.publishsubscribe.jms.JmsUtil; -import com.hungrydev399.publishsubscribe.model.Message; +import com.iluwatar.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.model.Message; /** * JMS topic publisher that supports persistent messages and message grouping diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java similarity index 85% rename from publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java rename to publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java index 73395221851c..64422d71d409 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/SubscriberType.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.subscriber; +package com.iluwatar.publishsubscribe.subscriber; public enum SubscriberType { NONDURABLE, // Regular non-durable subscriber diff --git a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java similarity index 96% rename from publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java rename to publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java index e4f4177dd37e..328314dcdf62 100644 --- a/publish-subsribe/src/main/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriber.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.subscriber; +package com.iluwatar.publishsubscribe.subscriber; import javax.jms.Message; import javax.jms.MessageConsumer; @@ -7,7 +7,7 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.JMSException; -import com.hungrydev399.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.jms.JmsUtil; /** * JMS topic subscriber that supports different subscription types: diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/TestBase.java similarity index 87% rename from publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java rename to publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/TestBase.java index 1f823ba73be1..081b10f65b46 100644 --- a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/TestBase.java +++ b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/TestBase.java @@ -1,8 +1,8 @@ -package com.hungrydev399.publishsubscribe; +package com.iluwatar.publishsubscribe; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.AfterAll; -import com.hungrydev399.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.jms.JmsUtil; /** * Base class for JMS-related tests. diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/jms/JmsUtilTest.java similarity index 93% rename from publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java rename to publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/jms/JmsUtilTest.java index f106e274e4fb..5736b8f21d10 100644 --- a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/jms/JmsUtilTest.java +++ b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/jms/JmsUtilTest.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.jms; +package com.iluwatar.publishsubscribe.jms; import static org.junit.jupiter.api.Assertions.*; import org.junit.jupiter.api.Test; diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/model/MessageTest.java similarity index 94% rename from publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java rename to publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/model/MessageTest.java index 1cdae09b99da..b9cc6dd2e7db 100644 --- a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/model/MessageTest.java +++ b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/model/MessageTest.java @@ -1,4 +1,4 @@ -package com.hungrydev399.publishsubscribe.model; +package com.iluwatar.publishsubscribe.model; import static org.junit.jupiter.api.Assertions.*; import org.junit.jupiter.api.Test; diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/publisher/TopicPublisherTest.java similarity index 83% rename from publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java rename to publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/publisher/TopicPublisherTest.java index 5bedb0f20053..759c5869f10f 100644 --- a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/publisher/TopicPublisherTest.java +++ b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/publisher/TopicPublisherTest.java @@ -1,13 +1,13 @@ -package com.hungrydev399.publishsubscribe.publisher; +package com.iluwatar.publishsubscribe.publisher; import static org.junit.jupiter.api.Assertions.*; import org.junit.jupiter.api.*; -import com.hungrydev399.publishsubscribe.model.Message; -import com.hungrydev399.publishsubscribe.subscriber.TopicSubscriber; -import com.hungrydev399.publishsubscribe.subscriber.SubscriberType; -import com.hungrydev399.publishsubscribe.TestBase; -import com.hungrydev399.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.model.Message; +import com.iluwatar.publishsubscribe.subscriber.TopicSubscriber; +import com.iluwatar.publishsubscribe.subscriber.SubscriberType; +import com.iluwatar.publishsubscribe.TestBase; +import com.iluwatar.publishsubscribe.jms.JmsUtil; /** * Tests for the TopicPublisher class. diff --git a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriberTest.java similarity index 88% rename from publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java rename to publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriberTest.java index 493f72fa7b32..9b3bd33c721a 100644 --- a/publish-subsribe/src/test/java/com/hungrydev399/publishsubscribe/subscriber/TopicSubscriberTest.java +++ b/publish-subsribe/src/test/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriberTest.java @@ -1,12 +1,12 @@ -package com.hungrydev399.publishsubscribe.subscriber; +package com.iluwatar.publishsubscribe.subscriber; import static org.junit.jupiter.api.Assertions.*; import org.junit.jupiter.api.*; -import com.hungrydev399.publishsubscribe.model.Message; -import com.hungrydev399.publishsubscribe.publisher.TopicPublisher; -import com.hungrydev399.publishsubscribe.TestBase; -import com.hungrydev399.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.model.Message; +import com.iluwatar.publishsubscribe.publisher.TopicPublisher; +import com.iluwatar.publishsubscribe.TestBase; +import com.iluwatar.publishsubscribe.jms.JmsUtil; /** * Tests for the TopicSubscriber class. From ea90dd4716c40479d434b03af6952120fe3e0e1e Mon Sep 17 00:00:00 2001 From: hungryDev399 Date: Mon, 9 Dec 2024 12:34:16 +0200 Subject: [PATCH 6/6] Fixing checkstyle violations --- .../com/iluwatar/publishsubscribe/App.java | 249 +++++++++--------- .../publishsubscribe/jms/JmsUtil.java | 35 ++- .../publishsubscribe/model/Message.java | 35 +-- .../publisher/TopicPublisher.java | 51 ++-- .../subscriber/SubscriberType.java | 3 + .../subscriber/TopicSubscriber.java | 161 +++++------ 6 files changed, 289 insertions(+), 245 deletions(-) diff --git a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java index 17be2f7dc8b7..c35a74e109e9 100644 --- a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/App.java @@ -3,141 +3,142 @@ import com.iluwatar.publishsubscribe.jms.JmsUtil; import com.iluwatar.publishsubscribe.model.Message; import com.iluwatar.publishsubscribe.publisher.TopicPublisher; -import com.iluwatar.publishsubscribe.subscriber.TopicSubscriber; import com.iluwatar.publishsubscribe.subscriber.SubscriberType; - +import com.iluwatar.publishsubscribe.subscriber.TopicSubscriber; import java.util.ArrayList; import java.util.List; /** - * Main application demonstrating different aspects of JMS publish-subscribe - * pattern. + * Main application demonstrating different aspects of JMS publish-subscribe pattern. */ public final class App { - private static TopicPublisher publisher; - - public static void main(String[] args) { - try { - publisher = new TopicPublisher("NEWS"); - - // Run each demonstration independently - demonstrateBasicPubSub(); - demonstrateDurableSubscriptions(); - demonstrateSharedSubscriptions(); - - } catch (Exception e) { - System.err.println("Error in publish-subscribe demo: " + e.getMessage()); - e.printStackTrace(); - } finally { - cleanup(null); - } - JmsUtil.closeConnection(); + private static TopicPublisher publisher; + + private App() { + } + + /** + * Main method to run the demo. + */ + public static void main(String[] args) { + try { + publisher = new TopicPublisher("NEWS"); + demonstrateBasicPubSub(); + demonstrateDurableSubscriptions(); + demonstrateSharedSubscriptions(); + } catch (Exception e) { + System.err.println("Error in publish-subscribe demo: " + e.getMessage()); + e.printStackTrace(); + } finally { + cleanup(null); } - - /** - * Demonstrates basic publish-subscribe with non-durable subscribers. - * All subscribers receive all messages. - */ - private static void demonstrateBasicPubSub() throws Exception { - System.out.println("\n=== Basic Publish-Subscribe Demonstration ==="); - List subscribers = new ArrayList<>(); - - try { - // Create basic subscribers - subscribers.add(new TopicSubscriber("BasicSub1", "NEWS", SubscriberType.NONDURABLE, null)); - subscribers.add(new TopicSubscriber("BasicSub2", "NEWS", SubscriberType.NONDURABLE, null)); - Thread.sleep(100); // Wait for subscribers to initialize - - // Publish messages - all subscribers should receive all messages - publisher.publish(new Message("Basic message 1", "NEWS")); - publisher.publish(new Message("Basic message 2", "NEWS")); - - Thread.sleep(1000); // Wait for message processing - } finally { - cleanup(subscribers); - System.out.println("=== Basic Demonstration Completed ===\n"); - } + JmsUtil.closeConnection(); + } + + /** + * Demonstrates basic publish-subscribe with non-durable subscribers. + * All subscribers receive all messages. + */ + private static void demonstrateBasicPubSub() throws Exception { + System.out.println("\n=== Basic Publish-Subscribe Demonstration ==="); + List subscribers = new ArrayList<>(); + + try { + // Create basic subscribers + subscribers.add(new TopicSubscriber("BasicSub1", "NEWS", SubscriberType.NONDURABLE, null)); + subscribers.add(new TopicSubscriber("BasicSub2", "NEWS", SubscriberType.NONDURABLE, null)); + Thread.sleep(100); // Wait for subscribers to initialize + + // Publish messages - all subscribers should receive all messages + publisher.publish(new Message("Basic message 1", "NEWS")); + publisher.publish(new Message("Basic message 2", "NEWS")); + + Thread.sleep(1000); // Wait for message processing + } finally { + cleanup(subscribers); + System.out.println("=== Basic Demonstration Completed ===\n"); } - - /** - * Demonstrates durable subscriptions that persist messages when subscribers are - * offline. - */ - private static void demonstrateDurableSubscriptions() throws Exception { - System.out.println("\n=== Durable Subscriptions Demonstration ==="); - List subscribers = new ArrayList<>(); - - try { - // Create durable subscriber - TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS", - SubscriberType.DURABLE, "durable-client"); - subscribers.add(durableSub); - Thread.sleep(100); - - // First message - subscriber is online - publisher.publish(new Message("Durable message while online", "NEWS")); - Thread.sleep(500); - - // Disconnect subscriber - durableSub.close(); - subscribers.clear(); - - // Send message while subscriber is offline - publisher.publish(new Message("Durable message while offline", "NEWS")); - Thread.sleep(500); - - // Reconnect subscriber - should receive offline message - subscribers.add(new TopicSubscriber("DurableSub", "NEWS", - SubscriberType.DURABLE, "durable-client")); - Thread.sleep(1000); - - } finally { - cleanup(subscribers); - System.out.println("=== Durable Demonstration Completed ===\n"); - } + } + + /** + * Demonstrates durable subscriptions that persist messages when subscribers are + * offline. + */ + private static void demonstrateDurableSubscriptions() throws Exception { + System.out.println("\n=== Durable Subscriptions Demonstration ==="); + List subscribers = new ArrayList<>(); + + try { + // Create durable subscriber + TopicSubscriber durableSub = new TopicSubscriber("DurableSub", "NEWS", + SubscriberType.DURABLE, "durable-client"); + subscribers.add(durableSub); + Thread.sleep(100); + + // First message - subscriber is online + publisher.publish(new Message("Durable message while online", "NEWS")); + Thread.sleep(500); + + // Disconnect subscriber + durableSub.close(); + subscribers.clear(); + + // Send message while subscriber is offline + publisher.publish(new Message("Durable message while offline", "NEWS")); + Thread.sleep(500); + + // Reconnect subscriber - should receive offline message + subscribers.add(new TopicSubscriber("DurableSub", "NEWS", + SubscriberType.DURABLE, "durable-client")); + Thread.sleep(1000); + + } finally { + cleanup(subscribers); + System.out.println("=== Durable Demonstration Completed ===\n"); } - - /** - * Demonstrates shared subscriptions where messages are distributed among - * subscribers. - */ - private static void demonstrateSharedSubscriptions() throws Exception { - System.out.println("\n=== Shared Subscriptions Demonstration ==="); - List subscribers = new ArrayList<>(); - - try { - // Create shared subscribers - subscribers.add(new TopicSubscriber("SharedSub1", "NEWS", SubscriberType.SHARED, null)); - subscribers.add(new TopicSubscriber("SharedSub2", "NEWS", SubscriberType.SHARED, null)); - Thread.sleep(100); - - // Messages should be distributed between subscribers - for (int i = 1; i <= 4; i++) { - publisher.publish(new Message("Shared message " + i, "NEWS")); - Thread.sleep(100); - } - - Thread.sleep(1000); - } finally { - cleanup(subscribers); - System.out.println("=== Shared Demonstration Completed ===\n"); - } + } + + /** + * Demonstrates shared subscriptions where messages are distributed among + * subscribers. + */ + private static void demonstrateSharedSubscriptions() throws Exception { + System.out.println("\n=== Shared Subscriptions Demonstration ==="); + List subscribers = new ArrayList<>(); + + try { + // Create shared subscribers + subscribers.add(new TopicSubscriber("SharedSub1", "NEWS", SubscriberType.SHARED, null)); + subscribers.add(new TopicSubscriber("SharedSub2", "NEWS", SubscriberType.SHARED, null)); + Thread.sleep(100); + + // Messages should be distributed between subscribers + for (int i = 1; i <= 4; i++) { + publisher.publish(new Message("Shared message " + i, "NEWS")); + Thread.sleep(100); + } + + Thread.sleep(1000); + } finally { + cleanup(subscribers); + System.out.println("=== Shared Demonstration Completed ===\n"); } - - /** - * Cleanup specified subscribers and optionally the publisher. - */ - private static void cleanup(List subscribers) { - try { - if (subscribers != null) { - for (TopicSubscriber subscriber : subscribers) { - if (subscriber != null) { - subscriber.close(); - } - } - } - } catch (Exception e) { - System.err.println("Error during subscriber cleanup: " + e.getMessage()); + } + + /** + * Cleanup specified subscribers and optionally the publisher. + */ + private static void cleanup(List subscribers) { + try { + if (subscribers != null) { + for (TopicSubscriber subscriber : subscribers) { + if (subscriber != null) { + subscriber.close(); + } } + } + } catch (Exception e) { + System.err.println("Error during subscriber cleanup: " + e.getMessage()); } + } } diff --git a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java index 418f3477a4c1..415dc9bffb41 100644 --- a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/jms/JmsUtil.java @@ -1,17 +1,19 @@ package com.iluwatar.publishsubscribe.jms; -import org.apache.activemq.ActiveMQConnectionFactory; -import org.apache.activemq.broker.BrokerService; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.JMSException; import javax.jms.Session; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; /** - * JMS utility class that manages connections and provides an embedded message broker. - * Supports both shared connections and client-specific connections for durable subscriptions. + * JMS utility class that manages connections and provides an embedded message + * broker. + * Supports both shared connections and client-specific connections for durable + * subscriptions. */ public final class JmsUtil { private static final String BROKER_URL = "tcp://localhost:61616"; @@ -21,10 +23,21 @@ public final class JmsUtil { private static Map clientConnections = new ConcurrentHashMap<>(); private static boolean isInitialized = false; + /** + * Private constructor to prevent instantiation. + */ private JmsUtil() { // Utility class, prevent instantiation } + /** + * Initializes the JMS environment by starting the embedded broker and creating + * the default connection. This method is thread-safe and ensures single + * initialization. + * + * + * @throws RuntimeException if initialization fails + */ public static synchronized void initialize() { if (!isInitialized) { try { @@ -46,7 +59,8 @@ public static synchronized void initialize() { /** * Creates a JMS session, optionally with a client ID for durable subscriptions. - * Each client ID gets its own dedicated connection to support durable subscribers. + * Each client ID gets its own dedicated connection to support durable + * subscribers. */ public static Session createSession(String clientId) throws JMSException { if (!isInitialized) { @@ -70,6 +84,9 @@ public static Session createSession(String clientId) throws JMSException { return conn.createSession(false, Session.AUTO_ACKNOWLEDGE); } + /** + * Creates a default JMS session without client ID. + */ public static Session createSession() throws JMSException { return createSession(null); } @@ -97,6 +114,10 @@ public static synchronized void closeConnection() { } } + /** + * Resets the JMS environment by closing existing connections and + * reinitializing. + */ public static synchronized void reset() { closeConnection(); isInitialized = false; diff --git a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java index 1012b70575c9..db78d9256d95 100644 --- a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/model/Message.java @@ -2,25 +2,28 @@ import java.io.Serializable; +/** + * Represents a message in the publish-subscribe system. + */ public class Message implements Serializable { - private String content; - private String topic; + private String content; + private String topic; - public Message(String content, String topic) { - this.content = content; - this.topic = topic; - } + public Message(String content, String topic) { + this.content = content; + this.topic = topic; + } - public String getContent() { - return content; - } + public String getContent() { + return content; + } - public String getTopic() { - return topic; - } + public String getTopic() { + return topic; + } - @Override - public String toString() { - return "Message{topic='" + topic + "', content='" + content + "'}"; - } + @Override + public String toString() { + return "Message{topic='" + topic + "', content='" + content + "'}"; + } } diff --git a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java index ac10ff2d37dd..b925dd325dcc 100644 --- a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/publisher/TopicPublisher.java @@ -1,41 +1,48 @@ package com.iluwatar.publishsubscribe.publisher; +import com.iluwatar.publishsubscribe.jms.JmsUtil; +import com.iluwatar.publishsubscribe.model.Message; import javax.jms.DeliveryMode; import javax.jms.JMSException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import com.iluwatar.publishsubscribe.jms.JmsUtil; -import com.iluwatar.publishsubscribe.model.Message; /** * JMS topic publisher that supports persistent messages and message grouping * for shared subscriptions. */ public class TopicPublisher { - private final Session session; - private final MessageProducer producer; + private final Session session; + private final MessageProducer producer; - public TopicPublisher(String topicName) throws JMSException { - session = JmsUtil.createSession(); - Topic topic = session.createTopic(topicName); - producer = session.createProducer(topic); - // Enable persistent delivery for durable subscriptions - producer.setDeliveryMode(DeliveryMode.PERSISTENT); - } + /** + * Creates a new publisher for the specified topic. + */ + public TopicPublisher(String topicName) throws JMSException { + session = JmsUtil.createSession(); + Topic topic = session.createTopic(topicName); + producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + } - public void publish(Message message) throws JMSException { - TextMessage textMessage = session.createTextMessage(message.getContent()); - // Group messages to enable load balancing for shared subscribers - textMessage.setStringProperty("JMSXGroupID", "group-" + message.getTopic()); - producer.send(textMessage); - System.out.println("Published to topic " + message.getTopic() + ": " + message.getContent()); - } + /** + * Publishes a message to the topic. + */ + public void publish(Message message) throws JMSException { + TextMessage textMessage = session.createTextMessage(message.getContent()); + textMessage.setStringProperty("JMSXGroupID", "group-" + message.getTopic()); + producer.send(textMessage); + System.out.println("Published to topic " + message.getTopic() + ": " + message.getContent()); + } - public void close() throws JMSException { - if (session != null) { - session.close(); - } + /** + * Closes the publisher resources. + */ + public void close() throws JMSException { + if (session != null) { + session.close(); } + } } diff --git a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java index 64422d71d409..2447d69cdcee 100644 --- a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/SubscriberType.java @@ -1,5 +1,8 @@ package com.iluwatar.publishsubscribe.subscriber; +/** + * Enum defining different types of subscribers supported by the system. + */ public enum SubscriberType { NONDURABLE, // Regular non-durable subscriber DURABLE, // Durable subscriber that receives messages even when offline diff --git a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java index 328314dcdf62..5c537c866140 100644 --- a/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java +++ b/publish-subsribe/src/main/java/com/iluwatar/publishsubscribe/subscriber/TopicSubscriber.java @@ -1,98 +1,107 @@ package com.iluwatar.publishsubscribe.subscriber; +import com.iluwatar.publishsubscribe.jms.JmsUtil; +import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.Session; import javax.jms.TextMessage; import javax.jms.Topic; -import javax.jms.JMSException; -import com.iluwatar.publishsubscribe.jms.JmsUtil; /** - * JMS topic subscriber that supports different subscription types: - * - Non-durable: Regular subscriber that misses messages when offline - * - Durable: Receives missed messages after reconnecting - * - Shared: Multiple subscribers share the message load + * JMS topic subscriber that supports different subscription types. + * Handles durable, non-durable, and shared subscriptions. */ public class TopicSubscriber implements MessageListener { - private final Session session; - private final MessageConsumer consumer; - private final String name; - private final SubscriberType type; - private final String subscriptionName; + private final Session session; + private final MessageConsumer consumer; + private final String name; + private final SubscriberType type; + private final String subscriptionName; - public TopicSubscriber(String name, String topicName, SubscriberType type, String clientId) throws JMSException { - this.name = name; - this.type = type; - this.subscriptionName = generateSubscriptionName(name, type); - session = JmsUtil.createSession(clientId); - Topic topic = session.createTopic(topicName); - - switch (type) { - case DURABLE: - // Durable subscribers maintain subscription state even when offline - consumer = session.createDurableSubscriber(topic, subscriptionName); - break; - case SHARED: - // Shared subscribers distribute the message load using message groups - consumer = session.createConsumer(topic, - "JMSXGroupID = '" + subscriptionName + "'"); - break; - case SHARED_DURABLE: - consumer = session.createDurableSubscriber(topic, subscriptionName); - break; - default: - consumer = session.createConsumer(topic); - } - - consumer.setMessageListener(this); - System.out.println("Created " + type + " subscriber: " + name); - } + /** + * Creates a new topic subscriber with the specified configuration. + * + * @param name The name of the subscriber + * @param topicName The topic to subscribe to + * @param type The type of subscription + * @param clientId Client ID for durable subscriptions + * @throws JMSException if there's an error creating the subscriber + */ + public TopicSubscriber(String name, String topicName, SubscriberType type, String clientId) + throws JMSException { + this.name = name; + this.type = type; + this.subscriptionName = generateSubscriptionName(name, type); + session = JmsUtil.createSession(clientId); + Topic topic = session.createTopic(topicName); - private String generateSubscriptionName(String name, SubscriberType type) { - switch (type) { - case DURABLE: - return "durable-" + name; - case SHARED_DURABLE: - return "shared-durable-" + name; - case SHARED: - return "shared-" + name; - default: - return name; - } + switch (type) { + case DURABLE: + // Durable subscribers maintain subscription state even when offline + consumer = session.createDurableSubscriber(topic, subscriptionName); + break; + case SHARED: + // Shared subscribers distribute the message load using message groups + consumer = session.createConsumer(topic, + "JMSXGroupID = '" + subscriptionName + "'"); + break; + case SHARED_DURABLE: + consumer = session.createDurableSubscriber(topic, subscriptionName); + break; + default: + consumer = session.createConsumer(topic); } - @Override - public void onMessage(Message message) { - try { - if (message instanceof TextMessage) { - TextMessage textMessage = (TextMessage) message; - System.out.println(name + " (" + type + ") received: " + textMessage.getText()); - } - } catch (JMSException e) { - e.printStackTrace(); - } + consumer.setMessageListener(this); + System.out.println("Created " + type + " subscriber: " + name); + } + + private String generateSubscriptionName(String name, SubscriberType type) { + switch (type) { + case DURABLE: + return "durable-" + name; + case SHARED_DURABLE: + return "shared-durable-" + name; + case SHARED: + return "shared-" + name; + default: + return name; } + } - /** - * Cleanup subscriber resources, ensuring proper unsubscribe for durable subscribers. - */ - public void close() throws JMSException { - if (type == SubscriberType.DURABLE || type == SubscriberType.SHARED_DURABLE) { - try { - consumer.close(); - session.unsubscribe(subscriptionName); - } catch (JMSException e) { - System.err.println("Error unsubscribing " + name + ": " + e.getMessage()); - } - } - if (session != null) { - session.close(); - } + @Override + public void onMessage(Message message) { + try { + if (message instanceof TextMessage) { + TextMessage textMessage = (TextMessage) message; + System.out.println(name + " (" + type + ") received: " + textMessage.getText()); + } + } catch (JMSException e) { + e.printStackTrace(); } + } - public SubscriberType getType() { - return type; + /** + * Cleanup subscriber resources, ensuring proper unsubscribe for durable + * subscribers. + */ + public void close() throws JMSException { + if (type == SubscriberType.DURABLE || type == SubscriberType.SHARED_DURABLE) { + try { + consumer.close(); + session.unsubscribe(subscriptionName); + } catch (JMSException e) { + System.err.println("Error unsubscribing " + name + ": " + e.getMessage()); + } } + if (session != null) { + session.close(); + } + } + + public SubscriberType getType() { + return type; + } }