Skip to content

Commit 24ca271

Browse files
committed
fix: changes after review on publisher-subscriber design pattern (#2898)
1 parent fd0d33b commit 24ca271

File tree

15 files changed

+288
-202
lines changed

15 files changed

+288
-202
lines changed

publish-subscribe/README.md

Lines changed: 80 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,27 @@
11
---
22
title: "Publish-Subscribe Pattern in Java: Decoupling the solution with asynchronous communication"
3-
shortTitle: Proxy
4-
description: "Explore the Proxy design pattern in Java with detailed examples. Learn how it provides controlled access, facilitates lazy initialization, and ensures security. Ideal for developers looking to implement advanced Java techniques."
5-
category: Structural
3+
shortTitle: Publish-Subscribe
4+
description: "Explore the Publish-Subscribe design pattern in Java with detailed examples. Learn how it helps to create loosely coupled, scalable, and flexible systems by allowing components to communicate asynchronously without knowing each other directly."
5+
category: Behavioral
66
language: en
77
tag:
88
- Decoupling
9-
- Encapsulation
9+
- Event-driven
1010
- Gang Of Four
11-
- Lazy initialization
12-
- Proxy
13-
- Security
14-
- Wrapping
11+
- Publish/subscribe
1512
---
1613

1714
## Intent of the Publish-Subscribe Design Pattern
1815

19-
The Publish-Subscriber design pattern is widely used in software architecture to transmit data between various components in a system.
16+
The Publish-Subscribe design pattern is widely used in software architecture to transmit data between various components in a system.
2017
It is a behavioral design pattern aimed at achieving loosely coupled communication between objects.
21-
The primary intent is to allow a one-to-many dependency relationship where one object (the Publisher) notifies multiple other objects (the Subscribers) about changes or events,
22-
without needing to know who or what the subscribers are.
18+
The primary intent is to allow a one-to-many dependency relationship where one object (the Publisher) notifies multiple other objects (the Subscribers)
19+
about changes or events, without needing to know who or what the subscribers are.
2320

2421
## Detailed Explanation of Publish-Subscribe Pattern with Real-World Examples
2522

23+
### Real-world example
24+
2625
- Messaging systems like Kafka, RabbitMQ, AWS SNS, JMS
2726
- **Kafka** : publishes messages to topics and subscribers consumes them in real time for analytics, logs or other purposes.
2827
- **RabbitMQ** : Uses exchanges as publisher and queues as subscribers to route messages
@@ -38,13 +37,32 @@ without needing to know who or what the subscribers are.
3837
- **Publisher** : Writes a new blog post and publish to subscribers
3938
- **Subscribers** : All the subscribers to the newsletter receive the email
4039

40+
### In plain words
41+
42+
The Publish-Subscribe design pattern allows senders (publishers) to broadcast messages to multiple receivers (subscribers) without knowing who they are,
43+
enabling loose coupling and asynchronous communication in a system
44+
45+
### Wikipedia says
46+
47+
In software architecture, publish–subscribe or pub/sub is a messaging pattern where publishers categorize messages into classes that are received by subscribers.
48+
This is contrasted to the typical messaging pattern model where publishers send messages directly to subscribers.
49+
50+
Similarly, subscribers express interest in one or more classes and only receive messages that are of interest, without knowledge of which publishers, if any, there are.
51+
52+
Publish–subscribe is a sibling of the message queue paradigm, and is typically one part of a larger message-oriented middleware system.
53+
Most messaging systems support both the pub/sub and message queue models in their API; e.g., Java Message Service (JMS).
54+
55+
### Architectural Diagram
56+
![pub-sub](./etc/pub-sub.png)
57+
4158
## Programmatic Example of Publish-Subscribe Pattern in Java
4259

4360
First we need to identify the Event on which we need the pub-sub methods to trigger.
4461
For example:
4562

4663
- Sending alerts based on the weather events such as earthquakes, floods and tornadoes
47-
- Sending an email to different customer support accounts when a support ticket is created.
64+
- Sending alerts based on the temperature
65+
- Sending an email to different customer support emails when a support ticket is created.
4866

4967
The Message class below will hold the content of the message we need to pass between the publisher and the subscribers.
5068

@@ -57,8 +75,10 @@ public record Message(Object content) {
5775
The Topic class will have the topic **name** based on the event
5876

5977
- Weather events TopicName WEATHER
78+
- Weather events TopicName TEMPERATURE
6079
- Support ticket created TopicName CUSTOMER_SUPPORT
61-
Also the Topic contains a list of subscribers that will listen to that topic
80+
- Any other custom topic depending on use case
81+
- Also, the Topic contains a list of subscribers that will listen to that topic
6282

6383
We can add or remove subscribers from the subscription to the topic
6484

@@ -101,14 +121,17 @@ public class PublisherImpl implements Publisher {
101121
Finally, we can Subscribers to the Topics we want to listen to.
102122

103123
- For WEATHER topic we will create _WeatherSubscriber_
124+
- _WeatherSubscriber_ can also subscribe to TEMPERATURE topic
104125
- For CUSTOMER_SUPPORT topic we will create _CustomerSupportSubscribe_
126+
- Also to demonstrate the async behavior we will create a _DelayedWeatherSubscriber_ who has a 0.2 sec processing deplay
105127

106-
Both classes will have a _onMessage_ method which will take a Message input.
128+
All classes will have a _onMessage_ method which will take a Message input.
107129

108130
- On message method will verify the content of the message is as expected
109131
- After content is verified it will perform the operation based on the message
110-
- _WeatherSubscriber_ will send a weather alert based on the _Message_
132+
- _WeatherSubscriber_ will send a weather or temperature alert based on the _Message_
111133
- _CustomerSupportSubscribe_will send an email based on the _Message_
134+
- _DelayedWeatherSubscriber_ will send a weather alert based on the _Message_ after a delay
112135

113136
```java
114137
public interface Subscriber {
@@ -119,42 +142,66 @@ public interface Subscriber {
119142
And here is the invocation of the publisher and subscribers.
120143

121144
```java
122-
public static void main(String[] args) {
145+
public static void main(String[] args) throws InterruptedException {
123146

124-
final String weatherSub1Name = "weatherSub1";
125-
final String weatherSub2Name = "weatherSub2";
126-
final String supportSub1Name = "supportSub1";
127-
final String supportSub2Name = "supportSub2";
128-
129-
Topic weatherTopic = new Topic(TopicName.WEATHER);
130-
Topic supportTopic = new Topic(TopicName.CUSTOMER_SUPPORT);
147+
final String topicWeather = "WEATHER";
148+
final String topicTemperature = "TEMPERATURE";
149+
final String topicCustomerSupport = "CUSTOMER_SUPPORT";
131150

151+
// 1. create the publisher.
132152
Publisher publisher = new PublisherImpl();
153+
154+
// 2. define the topics and register on publisher
155+
Topic weatherTopic = new Topic(topicWeather);
133156
publisher.registerTopic(weatherTopic);
157+
158+
Topic temperatureTopic = new Topic(topicTemperature);
159+
publisher.registerTopic(temperatureTopic);
160+
161+
Topic supportTopic = new Topic(topicCustomerSupport);
134162
publisher.registerTopic(supportTopic);
135163

136-
Subscriber weatherSub1 = new WeatherSubscriber(weatherSub1Name);
137-
Subscriber weatherSub2 = new WeatherSubscriber(weatherSub2Name);
164+
// 3. Create the subscribers and subscribe to the relevant topics
165+
// weatherSub1 will subscribe to two topics WEATHER and TEMPERATURE.
166+
Subscriber weatherSub1 = new WeatherSubscriber();
138167
weatherTopic.addSubscriber(weatherSub1);
168+
temperatureTopic.addSubscriber(weatherSub1);
169+
170+
// weatherSub2 will subscribe to WEATHER topic
171+
Subscriber weatherSub2 = new WeatherSubscriber();
139172
weatherTopic.addSubscriber(weatherSub2);
140173

141-
Subscriber supportSub1 = new CustomerSupportSubscriber(supportSub1Name);
142-
Subscriber supportSub2 = new CustomerSupportSubscriber(supportSub2Name);
174+
// delayedWeatherSub will subscribe to WEATHER topic
175+
// NOTE :: DelayedWeatherSubscriber has a 0.2 sec delay of processing message.
176+
Subscriber delayedWeatherSub = new DelayedWeatherSubscriber();
177+
weatherTopic.addSubscriber(delayedWeatherSub);
178+
179+
// subscribe the customer support subscribers to the CUSTOMER_SUPPORT topic.
180+
Subscriber supportSub1 = new CustomerSupportSubscriber();
143181
supportTopic.addSubscriber(supportSub1);
182+
Subscriber supportSub2 = new CustomerSupportSubscriber();
144183
supportTopic.addSubscriber(supportSub2);
145184

146-
publisher.publish(weatherTopic, new Message(WeatherContent.earthquake));
147-
publisher.publish(supportTopic, new Message(CustomerSupportContent.DE));
185+
// 4. publish message from each topic
186+
publisher.publish(weatherTopic, new Message("earthquake"));
187+
publisher.publish(temperatureTopic, new Message("23C"));
188+
publisher.publish(supportTopic, new Message("[email protected]"));
189+
148190
}
149191
```
150192

151193
Program output:
152194

195+
Note that the order of output could change everytime you run the program.
196+
The subscribers could take different time to consume the message.
197+
153198
```
154-
11:46:44.310 [main] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber - Subscriber: weatherSub1 issued message: earthquake tsunami warning
155-
11:46:44.311 [main] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber - Subscriber: weatherSub2 issued message: earthquake tsunami warning
156-
11:46:44.311 [main] INFO com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber - Subscriber: supportSub1 sent the email to: [email protected]
157-
11:46:44.311 [main] INFO com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber - Subscriber: supportSub2 sent the email to: [email protected]
199+
14:01:45.599 [ForkJoinPool.commonPool-worker-6] INFO com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber -- Customer Support Subscriber: 1416331388 sent the email to: [email protected]
200+
14:01:45.599 [ForkJoinPool.commonPool-worker-4] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber -- Weather Subscriber: 1949521124 issued message: 23C
201+
14:01:45.599 [ForkJoinPool.commonPool-worker-2] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber -- Weather Subscriber: 60629172 issued message: earthquake
202+
14:01:45.599 [ForkJoinPool.commonPool-worker-5] INFO com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber -- Customer Support Subscriber: 1807508804 sent the email to: [email protected]
203+
14:01:45.599 [ForkJoinPool.commonPool-worker-1] INFO com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber -- Weather Subscriber: 1949521124 issued message: earthquake
204+
14:01:47.600 [ForkJoinPool.commonPool-worker-3] INFO com.iluwatar.publish.subscribe.subscriber.DelayedWeatherSubscriber -- Delayed Weather Subscriber: 2085808749 issued message: earthquake
158205
```
159206

160207
## When to Use the Publish-Subscribe Pattern

publish-subscribe/etc/pub-sub.png

9.5 KB
Loading
Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,14 @@
11
package com.iluwatar.publish.subscribe;
22

3-
import com.iluwatar.publish.subscribe.model.CustomerSupportContent;
43
import com.iluwatar.publish.subscribe.model.Message;
54
import com.iluwatar.publish.subscribe.model.Topic;
6-
import com.iluwatar.publish.subscribe.model.TopicName;
7-
import com.iluwatar.publish.subscribe.model.WeatherContent;
85
import com.iluwatar.publish.subscribe.publisher.Publisher;
96
import com.iluwatar.publish.subscribe.publisher.PublisherImpl;
107
import com.iluwatar.publish.subscribe.subscriber.CustomerSupportSubscriber;
8+
import com.iluwatar.publish.subscribe.subscriber.DelayedWeatherSubscriber;
119
import com.iluwatar.publish.subscribe.subscriber.Subscriber;
1210
import com.iluwatar.publish.subscribe.subscriber.WeatherSubscriber;
11+
import java.util.concurrent.TimeUnit;
1312

1413
/**
1514
* The Publish and Subscribe pattern is a messaging paradigm used in software architecture with
@@ -29,14 +28,18 @@
2928
* without significant changes to the existing components, making the system highly adaptable to
3029
* evolving requirements.
3130
*
32-
* <p>In this example we will create two {@link TopicName}s WEATHER and CUSTOMER_SUPPORT. Then
33-
* we will register those topics in the {@link Publisher}. After that we will create two {@link
34-
* WeatherSubscriber}s to WEATHER {@link Topic}. Also, we will create two {@link
35-
* CustomerSupportSubscriber}s to CUSTOMER_SUPPORT {@link Topic}. Now we can publish the two
31+
* <p>In this example we will create three topics WEATHER, TEMPERATURE and CUSTOMER_SUPPORT.
32+
* Then we will register those topics in the {@link Publisher}. After that we will create two
33+
* {@link WeatherSubscriber}s, one {@link DelayedWeatherSubscriber} and two {@link
34+
* CustomerSupportSubscriber}.The subscribers will subscribe to the relevant topics. One {@link
35+
* WeatherSubscriber} will subscribe to two topics (WEATHER, TEMPERATURE). {@link
36+
* DelayedWeatherSubscriber} has a delay in message processing. Now we can publish the three
3637
* {@link Topic}s with different content in the {@link Message}s. And we can observe the output
37-
* in the log where, {@link WeatherSubscriber} will output the message with {@link
38-
* WeatherContent}. {@link CustomerSupportSubscriber} will output the message with {@link
39-
* CustomerSupportContent}. Each subscriber is only listening to the subscribed topic.
38+
* in the log where, one {@link WeatherSubscriber} will output the message with weather and the
39+
* other {@link WeatherSubscriber} will output weather and temperature. {@link
40+
* CustomerSupportSubscriber}s will output the message with customer support email. {@link
41+
* DelayedWeatherSubscriber} has a delay in processing and will output the message at last. Each
42+
* subscriber is only listening to the subscribed topics.
4043
*/
4144
public class App {
4245

@@ -45,31 +48,56 @@ public class App {
4548
*
4649
* @param args command line args
4750
*/
48-
public static void main(String[] args) {
51+
public static void main(String[] args) throws InterruptedException {
4952

50-
final String weatherSub1Name = "weatherSub1";
51-
final String weatherSub2Name = "weatherSub2";
52-
final String supportSub1Name = "supportSub1";
53-
final String supportSub2Name = "supportSub2";
54-
55-
Topic weatherTopic = new Topic(TopicName.WEATHER);
56-
Topic supportTopic = new Topic(TopicName.CUSTOMER_SUPPORT);
53+
final String topicWeather = "WEATHER";
54+
final String topicTemperature = "TEMPERATURE";
55+
final String topicCustomerSupport = "CUSTOMER_SUPPORT";
5756

57+
// 1. create the publisher.
5858
Publisher publisher = new PublisherImpl();
59+
60+
// 2. define the topics and register on publisher
61+
Topic weatherTopic = new Topic(topicWeather);
5962
publisher.registerTopic(weatherTopic);
63+
64+
Topic temperatureTopic = new Topic(topicTemperature);
65+
publisher.registerTopic(temperatureTopic);
66+
67+
Topic supportTopic = new Topic(topicCustomerSupport);
6068
publisher.registerTopic(supportTopic);
6169

62-
Subscriber weatherSub1 = new WeatherSubscriber(weatherSub1Name);
63-
Subscriber weatherSub2 = new WeatherSubscriber(weatherSub2Name);
70+
// 3. Create the subscribers and subscribe to the relevant topics
71+
// weatherSub1 will subscribe to two topics WEATHER and TEMPERATURE.
72+
Subscriber weatherSub1 = new WeatherSubscriber();
6473
weatherTopic.addSubscriber(weatherSub1);
74+
temperatureTopic.addSubscriber(weatherSub1);
75+
76+
// weatherSub2 will subscribe to WEATHER topic
77+
Subscriber weatherSub2 = new WeatherSubscriber();
6578
weatherTopic.addSubscriber(weatherSub2);
6679

67-
Subscriber supportSub1 = new CustomerSupportSubscriber(supportSub1Name);
68-
Subscriber supportSub2 = new CustomerSupportSubscriber(supportSub2Name);
80+
// delayedWeatherSub will subscribe to WEATHER topic
81+
// NOTE :: DelayedWeatherSubscriber has a 0.2 sec delay of processing message.
82+
Subscriber delayedWeatherSub = new DelayedWeatherSubscriber();
83+
weatherTopic.addSubscriber(delayedWeatherSub);
84+
85+
// subscribe the customer support subscribers to the CUSTOMER_SUPPORT topic.
86+
Subscriber supportSub1 = new CustomerSupportSubscriber();
6987
supportTopic.addSubscriber(supportSub1);
88+
Subscriber supportSub2 = new CustomerSupportSubscriber();
7089
supportTopic.addSubscriber(supportSub2);
7190

72-
publisher.publish(weatherTopic, new Message(WeatherContent.earthquake));
73-
publisher.publish(supportTopic, new Message(CustomerSupportContent.DE));
91+
// 4. publish message from each topic
92+
publisher.publish(weatherTopic, new Message("earthquake"));
93+
publisher.publish(temperatureTopic, new Message("23C"));
94+
publisher.publish(supportTopic, new Message("[email protected]"));
95+
96+
/*
97+
* Finally, wait for the subscribers to consume messages to check the output.
98+
* this is not necessary in real life.
99+
* the output depends on the time subscribers take to consume the message
100+
*/
101+
TimeUnit.SECONDS.sleep(2);
74102
}
75103
}

publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/CustomerSupportContent.java

Lines changed: 0 additions & 17 deletions
This file was deleted.

publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/Topic.java

Lines changed: 9 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -2,32 +2,21 @@
22

33
import com.iluwatar.publish.subscribe.subscriber.Subscriber;
44
import java.util.Set;
5+
import java.util.concurrent.CompletableFuture;
56
import java.util.concurrent.CopyOnWriteArraySet;
7+
import lombok.Getter;
8+
import lombok.RequiredArgsConstructor;
9+
import lombok.Setter;
610

711
/** This class represents a Topic that topic name and subscribers. */
12+
@Getter
13+
@Setter
14+
@RequiredArgsConstructor
815
public class Topic {
916

10-
private final TopicName name;
17+
private final String topicName;
1118
private final Set<Subscriber> subscribers = new CopyOnWriteArraySet<>();
1219

13-
/**
14-
* Creates a new instance of the Topic class.
15-
*
16-
* @param name The name of the topic.
17-
*/
18-
public Topic(TopicName name) {
19-
this.name = name;
20-
}
21-
22-
/**
23-
* Get the name of the topic.
24-
*
25-
* @return topic name
26-
*/
27-
public TopicName getName() {
28-
return name;
29-
}
30-
3120
/**
3221
* Add a subscriber to the list of subscribers.
3322
*
@@ -53,7 +42,7 @@ public void removeSubscriber(Subscriber subscriber) {
5342
*/
5443
public void publish(Message message) {
5544
for (Subscriber subscriber : subscribers) {
56-
subscriber.onMessage(message);
45+
CompletableFuture.runAsync(() -> subscriber.onMessage(message));
5746
}
5847
}
5948
}

publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/TopicName.java

Lines changed: 0 additions & 7 deletions
This file was deleted.

publish-subscribe/src/main/java/com/iluwatar/publish/subscribe/model/WeatherContent.java

Lines changed: 0 additions & 18 deletions
This file was deleted.

0 commit comments

Comments
 (0)