Skip to content

Commit 9a61aac

Browse files
authored
Fix unsubscribe/subscribe of Pubsub + enhance pubsub websocket (#333)
* Fix Pubsub subscribre/unsubscribe - add missing append to own pool in subscribe - instead of passing a groupby object to websocket.unsubscribe, we pass a list(vals) (doubled iterating over a groupby object results in an empty List) * Add missing RECONNECT response * Update changelog.rst * Update changelog.rst * Update pool.py
1 parent d2b1c56 commit 9a61aac

File tree

3 files changed

+13
-2
lines changed

3 files changed

+13
-2
lines changed

docs/changelog.rst

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,13 @@ Master
1616
- ext.commands
1717
- Bug fixes
1818
- Make sure double-quotes are properly tokenized for bot commands
19+
20+
- ext.pubsub
21+
- Additions
22+
- Websocket automatically handles "RECONNECT" requests by Twitch
23+
- Bug fixes
24+
- Unsubscribing from Pubsubevents works again
25+
1926

2027

2128
- ext.sound

twitchio/ext/pubsub/pool.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ async def subscribe_topics(self, topics: List[Topic]):
6666
if node is None:
6767
node = PubSubWebsocket(self.client, max_topics=self._max_connection_topics)
6868
await node.connect()
69+
self._pool.append(node)
6970

7071
await node.subscribe_topics(topics)
7172
self._topics.update({t: node for t in topics})
@@ -81,7 +82,7 @@ async def unsubscribe_topics(self, topics: List[Topic]):
8182
8283
"""
8384
for node, vals in itertools.groupby(topics, lambda t: self._topics[t]):
84-
await node.unsubscribe_topic(vals)
85+
await node.unsubscribe_topic(list(vals))
8586
if not node.topics:
8687
await node.disconnect()
8788
self._pool.remove(node)
@@ -91,7 +92,7 @@ def _find_node(self, topics: List[Topic]) -> Optional[PubSubWebsocket]:
9192
raise ValueError("group is the only supported mode.")
9293

9394
for p in self._pool:
94-
if len(p.max_topics) + len(topics) <= p.max_topics:
95+
if p.max_topics + len(topics) <= p.max_topics:
9596
return p
9697

9798
if len(self._pool) < self._max_size:

twitchio/ext/pubsub/websocket.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,9 @@ async def handle_response(self, message: dict):
194194
if message["error"]:
195195
logger.error(f"Received errored response for nonce {message['nonce']}: {message['error']}")
196196
self.client.run_event("pubsub_error", message)
197+
elif message["type"] == "RECONNECT":
198+
logger.warning("Received RECONNECT response from pubsub edge. Reconnecting")
199+
await asyncio.shield(self.reconnect())
197200
elif message["nonce"]:
198201
logger.debug(f"Received OK response for nonce {message['nonce']}")
199202
self.client.run_event("pubsub_nonce", message)

0 commit comments

Comments
 (0)