Skip to content

Commit d719ba6

Browse files
Support for RabbitMQ topic wildcards
1 parent d796357 commit d719ba6

File tree

2 files changed

+87
-2
lines changed

2 files changed

+87
-2
lines changed

README.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,18 @@ These are the possible changes you need to make to your code for this branch, if
225225
- Connection Provider is selected by an enum
226226
- It used to be done by passing it a WebSocket class from either JWS or OkHttp
227227
- New way of using it can be seen in the examples above
228+
- Topic subscription now supports wildcard parsing
229+
- Example: RabbitMQ allows subscribing to topics with wildcards ([more info](https://www.rabbitmq.com/tutorials/tutorial-five-java.html))
230+
- Usage:
231+
``` java
232+
StompClient client = Stomp.over(...);
233+
client.setParser(StompClient.Parser.RABBITMQ);
234+
client.connect();
235+
236+
client.topic("/topic/*").subscribe(message -> {
237+
Log.i(TAG, "Received message: " + message.getPayload());
238+
});
239+
```
228240
229241
## Additional Reading
230242

lib/src/main/java/ua/naiksoftware/stomp/client/StompClient.java

Lines changed: 75 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import java.util.List;
1111
import java.util.UUID;
1212

13+
import java8.util.StringJoiner;
1314
import java8.util.concurrent.CompletableFuture;
1415
import rx.Completable;
1516
import rx.Observable;
@@ -39,19 +40,41 @@ public class StompClient {
3940
private CompletableFuture<Boolean> mConnectionFuture;
4041
private Completable mConnectionComplete;
4142
private HashMap<String, Observable<StompMessage>> mStreamMap;
43+
private Parser parser;
4244

4345
public StompClient(ConnectionProvider connectionProvider) {
4446
mConnectionProvider = connectionProvider;
4547
mMessageStream = PublishSubject.create();
4648
mStreamMap = new HashMap<>();
4749
resetStatus();
50+
parser = Parser.NONE;
4851
}
4952

5053
private void resetStatus() {
5154
mConnectionFuture = new CompletableFuture<>();
5255
mConnectionComplete = Completable.fromFuture(mConnectionFuture).subscribeOn(Schedulers.newThread());
5356
}
5457

58+
public enum Parser {
59+
NONE,
60+
RABBITMQ
61+
}
62+
63+
/**
64+
* Set the wildcard parser for Topic subscription.
65+
* <p>
66+
* Right now, the only options are NONE and RABBITMQ.
67+
* <p>
68+
* When set to RABBITMQ, topic subscription allows for RMQ-style wildcards.
69+
* <p>
70+
* See more info <a href="https://www.rabbitmq.com/tutorials/tutorial-five-java.html">here</a>.
71+
*
72+
* @param parser Set to NONE by default
73+
*/
74+
public void setParser(Parser parser) {
75+
this.parser = parser;
76+
}
77+
5578
/**
5679
* Connect without reconnect if connected
5780
*/
@@ -148,20 +171,70 @@ public Observable<StompMessage> topic(String destinationPath) {
148171
return topic(destinationPath, null);
149172
}
150173

151-
public Observable<StompMessage> topic(@Nullable String destPath, List<StompHeader> headerList) {
174+
public Observable<StompMessage> topic(@NonNull String destPath, List<StompHeader> headerList) {
152175
if (destPath == null)
153176
return Observable.error(new IllegalArgumentException("Topic path cannot be null"));
154177
else if (!mStreamMap.containsKey(destPath))
155178
mStreamMap.put(destPath,
156179
mMessageStream
157-
.filter(msg -> destPath.equals(msg.findHeader(StompHeader.DESTINATION)))
180+
.filter(msg -> matches(destPath, msg))
158181
.doOnSubscribe(() -> subscribePath(destPath, headerList).subscribe())
159182
.doOnUnsubscribe(() -> unsubscribePath(destPath).subscribe())
160183
.share()
161184
);
162185
return mStreamMap.get(destPath);
163186
}
164187

188+
private boolean matches(String path, StompMessage msg) {
189+
String dest = msg.findHeader(StompHeader.DESTINATION);
190+
if (dest == null) return false;
191+
boolean ret;
192+
193+
switch (parser) {
194+
case NONE:
195+
ret = path.equals(dest);
196+
break;
197+
198+
case RABBITMQ:
199+
// for example string "lorem.ipsum.*.sit":
200+
201+
// split it up into ["lorem", "ipsum", "*", "sit"]
202+
String[] split = path.split("\\.");
203+
ArrayList<String> transformed = new ArrayList<>();
204+
// check for wildcards and replace with corresponding regex
205+
for (String s : split) {
206+
switch (s) {
207+
case "*":
208+
transformed.add("[^.]+");
209+
break;
210+
case "#":
211+
// TODO: make this work with zero-word
212+
// e.g. "lorem.#.dolor" should ideally match "lorem.dolor"
213+
transformed.add(".*");
214+
break;
215+
default:
216+
transformed.add(s);
217+
break;
218+
}
219+
}
220+
// at this point, 'transformed' looks like ["lorem", "ipsum", "[^.]+", "sit"]
221+
StringJoiner sj = new StringJoiner("\\.");
222+
for (String s : transformed)
223+
sj.add(s);
224+
String join = sj.toString();
225+
// join = "lorem\.ipsum\.[^.]+\.sit"
226+
227+
ret = dest.matches(join);
228+
break;
229+
230+
default:
231+
ret = false;
232+
break;
233+
}
234+
235+
return ret;
236+
}
237+
165238
private Completable subscribePath(String destinationPath, @Nullable List<StompHeader> headerList) {
166239
String topicId = UUID.randomUUID().toString();
167240

0 commit comments

Comments
 (0)