Skip to content

Commit 7f10ec6

Browse files
committed
Allow for multiple subscriptions in a single stream
1 parent 47b119a commit 7f10ec6

File tree

3 files changed

+29
-12
lines changed

3 files changed

+29
-12
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# 0.7.0
2+
- Allow for multiple subscriptions per stream
13

24
# 0.6.3
35
- Fixed an issue with payload delivery stalling due to client pipe timeouts

README.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
- [Introduction](#introduction)
77
- [Installing](#installing)
88
- [Topics and publishing/subscribing](#topics-and-publishingsubscribing)
9+
* [Subscribing to multiple topic streams](#subscribing-to-multiple-topic-streams)
910
- [Pushing an event to PyPubSub](#pushing-an-event-to-pypubsub)
1011
* [Pushing an event via Python](#pushing-an-event-via-python)
1112
- [Listening for events](#listening-for-events)
@@ -52,6 +53,15 @@ The below matrix shows how subscription paths match topics:
5253
| fruits + apples + red ||||||
5354
| fruits + oranges ||||||
5455

56+
### Subscribing to multiple topic streams
57+
As mentioned above, subscription topics are typically AND'ed together, with more topics narrowing the
58+
event stream. Thus, if you wanted to subscribe to `bar` OR `foo`, you would need two streams.
59+
60+
It is possible (from 0.7.0 onwards) to utilize a single stream to subscribe to multiple topic streams at once,
61+
using a comma as a delimiter between topic batches. For instance, to subscribe to both
62+
`apples/red` events AND `oranges/ripe` events, you may subscribe to:
63+
64+
`http://localhost:2069/apples/red,oranges/ripe`.
5565

5666
## Pushing an event to PyPubSub
5767
Event payloads requires that the IP or IP range (Ipv4 or IPv6) is listed in `pypubsub.yaml` under `payloaders` first.

pypubsub.py

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import plugins.sqs
3434

3535
# Some consts
36-
PUBSUB_VERSION = '0.6.3'
36+
PUBSUB_VERSION = '0.7.0'
3737
PUBSUB_CONTENT_TYPE = 'application/vnd.pypubsub-stream'
3838
PUBSUB_DEFAULT_PORT = 2069
3939
PUBSUB_DEFAULT_IP = '0.0.0.0'
@@ -302,7 +302,10 @@ def __init__(self, server, connection, request):
302302
self.lock = asyncio.Lock()
303303

304304
# Set topics subscribed to
305-
self.topics = [x for x in request.path.split('/') if x]
305+
self.topics = []
306+
for topic_batch in request.path.split(','):
307+
sub_to = [x for x in topic_batch.split('/') if x]
308+
self.topics.append(sub_to)
306309

307310
# Is the client old and expecting zero-terminators?
308311
self.old_school = False
@@ -393,16 +396,18 @@ async def publish(self, subscribers):
393396
if not can_see:
394397
continue
395398
# If subscribed to all the topics, tell a subscriber about this
396-
if all(el in self.topics for el in sub.topics):
397-
try:
398-
if sub.old_school:
399-
async with sub.lock:
400-
await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
401-
else:
402-
async with sub.lock:
403-
await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
404-
except Exception:
405-
bad_subs.append(sub)
399+
for topic_batch in sub.topics:
400+
if all(el in self.topics for el in topic_batch):
401+
try:
402+
if sub.old_school:
403+
async with sub.lock:
404+
await asyncio.wait_for(sub.connection.write(ojs), timeout=PUBSUB_WRITE_TIMEOUT)
405+
else:
406+
async with sub.lock:
407+
await asyncio.wait_for(sub.connection.write(js), timeout=PUBSUB_WRITE_TIMEOUT)
408+
except Exception:
409+
bad_subs.append(sub)
410+
break
406411
return bad_subs
407412

408413

0 commit comments

Comments
 (0)