Skip to content

Commit c8f3e26

Browse files
committed
add pubsub reconnect hook and auth failure hook
1 parent eecafda commit c8f3e26

File tree

5 files changed

+129
-9
lines changed

5 files changed

+129
-9
lines changed

docs/changelog.rst

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,11 @@ Master
4444
- Bug fixes
4545
- Fix forced RECONNECT messages
4646

47+
- Additions
48+
- Added proper message when wrong type is passed to a topic argument
49+
- Added auth failure hook: :func:`~twitchio.ext.pubsub.PubSubPool.auth_fail_hook`
50+
- Added reconnect hook: :func:`~twitchio.ext.pubsub.PubSubPool.reconnect_hook`
51+
4752
2.5.0
4853
======
4954
- TwitchIO

docs/exts/pubsub.rst

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,32 @@ If the topic requires multiple channel ids, they should be passed as such:
212212
listen_to_id = 12345
213213
topic = pubsub.whispers(user_token)[listen_to_id]
214214
215+
Hooks
216+
------
217+
218+
There are two hooks available in the PubSubPool class. To access these hooks, subclass the PubSubPool.
219+
After subclassing, use the subclass like normal.
220+
221+
The ``auth_fail_hook`` is called whenever you attempt to subscribe to a topic and the auth token is invalid.
222+
From the hook, you are able to fix your token (maybe you need to prompt the user for a new token), and then subscribe again.
223+
224+
The ``reconnect_hook`` is called whenevever a node has to reconnect to twitch, for any reason. The node will wait for you to
225+
return a list of topics before reconnecting. Any modifications to the topics will be applied to the node.
226+
227+
.. code-block:: python3
228+
229+
from typing import List
230+
from twitchio.ext import pubsub
231+
232+
class MyPool(pubsub.PubSubPool):
233+
async def auth_fail_hook(self, topics: List[pubsub.Topic]) -> None:
234+
fixed_topics = fix_my_auth_tokens(topics) # somehow fix your auth tokens
235+
await self.subscribe_topics(topics)
236+
237+
async def reconnect_hook(self, node: pubsub.PubSubWebsocket, topics: List[pubsub.Topic]) -> List[pubsub.Topic]:
238+
return topics
239+
240+
215241
Api Reference
216242
--------------
217243

twitchio/ext/pubsub/pool.py

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2222
DEALINGS IN THE SOFTWARE.
2323
"""
24-
24+
import copy
2525
import itertools
26+
import logging
2627
from typing import List, Optional
2728

2829
from twitchio import Client
@@ -33,6 +34,7 @@
3334

3435
__all__ = ("PubSubPool",)
3536

37+
logger = logging.getLogger("twitchio.ext.eventsub.pool")
3638

3739
class PubSubPool:
3840
"""
@@ -64,7 +66,7 @@ async def subscribe_topics(self, topics: List[Topic]):
6466
"""
6567
node = self._find_node(topics)
6668
if node is None:
67-
node = PubSubWebsocket(self.client, max_topics=self._max_connection_topics)
69+
node = PubSubWebsocket(self.client, pool=self, max_topics=self._max_connection_topics)
6870
await node.connect()
6971
self._pool.append(node)
7072

@@ -87,6 +89,73 @@ async def unsubscribe_topics(self, topics: List[Topic]):
8789
await node.disconnect()
8890
self._pool.remove(node)
8991

92+
async def _process_auth_fail(self, nonce: str, node: PubSubWebsocket) -> None:
93+
topics = [topic for topic in self._topics if topic._nonce == nonce]
94+
95+
for topic in topics:
96+
topic._nonce = None
97+
del self._topics[topic]
98+
node.topics.remove(topic)
99+
100+
try:
101+
await self.auth_fail_hook(topics)
102+
except Exception as e:
103+
logger.error("Error occurred while calling auth_fail_hook.", exc_info=e)
104+
105+
106+
async def auth_fail_hook(self, topics: list[Topic]):
107+
"""
108+
This is a hook that can be overridden in a subclass.
109+
110+
111+
Parameters
112+
----------
113+
node
114+
topics: List[:class:`Topic`]
115+
The topcs that this node has.
116+
117+
Returns
118+
-------
119+
List[:class:`Topic`]
120+
The list of topics this node should have. Any additions, modifications, or removals will be respected.
121+
"""
122+
123+
async def _process_reconnect_hook(self, node: PubSubWebsocket) -> None:
124+
topics = copy.copy(node.topics)
125+
126+
for topic in topics:
127+
self._topics.pop(topic, None)
128+
129+
try:
130+
new_topics = await self.reconnect_hook(node, topics)
131+
except Exception as e:
132+
new_topics = node.topics
133+
logger.error("Error occurred while calling reconnect_hook.", exc_info=e)
134+
135+
for topic in new_topics:
136+
self._topics[topic] = node
137+
138+
node.topics = new_topics
139+
140+
async def reconnect_hook(self, node: PubSubWebsocket, topics: List[Topic]) -> List[Topic]:
141+
"""
142+
This is a low-level hook that can be overridden in a subclass.
143+
it is called whenever a node has to reconnect for any reason, from the twitch edge lagging out to being told to by twitch.
144+
This hook allows you to modify the topics, potentially updating tokens or removing topics altogether.
145+
146+
Parameters
147+
----------
148+
node
149+
topics: List[:class:`Topic`]
150+
The topcs that this node has.
151+
152+
Returns
153+
-------
154+
List[:class:`Topic`]
155+
The list of topics this node should have. Any additions, modifications, or removals will be respected.
156+
"""
157+
return topics
158+
90159
def _find_node(self, topics: List[Topic]) -> Optional[PubSubWebsocket]:
91160
if self._mode != "group":
92161
raise ValueError("group is the only supported mode.")

twitchio/ext/pubsub/topics.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2222
DEALINGS IN THE SOFTWARE.
2323
"""
24-
24+
import uuid
2525
from typing import Optional, List, Type
2626

2727

@@ -65,16 +65,17 @@ class Topic(_topic):
6565
The arguments to substitute in to the topic string
6666
"""
6767

68-
__slots__ = "token", "args"
68+
__slots__ = "token", "args", "_nonce"
6969

7070
def __init__(self, topic, args):
7171
super().__init__(topic, args)
7272
self.token = None
73+
self._nonce = None
7374
self.args = []
7475

7576
def __getitem__(self, item):
7677
assert len(self.args) < len(self.__args__), ValueError("Too many arguments")
77-
assert isinstance(item, self.__args__[len(self.args)]) # noqa
78+
assert isinstance(item, self.__args__[len(self.args)]), ValueError(f"Got {item!r}, excepted {self.__args__[len(self.args)]}") # noqa
7879
self.args.append(item)
7980
return self
8081

@@ -89,12 +90,19 @@ def present(self) -> Optional[str]:
8990
except:
9091
return None
9192

93+
def _present_set_nonce(self, nonce: str) -> Optional[str]:
94+
self._nonce = nonce
95+
return self.present
96+
9297
def __eq__(self, other):
9398
return other is self or (isinstance(other, Topic) and other.present == self.present)
9499

95100
def __hash__(self):
96101
return hash(self.present)
97102

103+
def __repr__(self):
104+
return f"<Topic {self.__topic__} args={self.args}>"
105+
98106

99107
bits = _topic("channel-bits-events-v2.{0}", [int])
100108
bits_badge = _topic("channel-bits-badge-unlocks.{0}", [int])

twitchio/ext/pubsub/websocket.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,20 +21,24 @@
2121
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
2222
DEALINGS IN THE SOFTWARE.
2323
"""
24+
from __future__ import annotations
2425

2526
import asyncio
2627
import logging
2728
import time
2829
import uuid
2930
from itertools import groupby
30-
from typing import Optional, List
31+
from typing import Optional, List, TYPE_CHECKING
3132

3233
import aiohttp
3334

3435
from twitchio import Client
3536
from .topics import Topic
3637
from . import models
3738

39+
if TYPE_CHECKING:
40+
from .pool import PubSubPool
41+
3842
try:
3943
import ujson as json
4044
except:
@@ -51,6 +55,7 @@ class PubSubWebsocket:
5155
__slots__ = (
5256
"session",
5357
"topics",
58+
"pool",
5459
"client",
5560
"connection",
5661
"_latency",
@@ -63,11 +68,12 @@ class PubSubWebsocket:
6368

6469
ENDPOINT = "wss://pubsub-edge.twitch.tv"
6570

66-
def __init__(self, client: Client, *, max_topics=50):
71+
def __init__(self, client: Client, pool: PubSubPool, *, max_topics=50):
6772
self.max_topics = max_topics
6873
self.session = None
6974
self.connection: Optional[aiohttp.ClientWebSocketResponse] = None
7075
self.topics: List[Topic] = []
76+
self.pool = pool
7177
self.client = client
7278
self._latency = None
7379
self._closing = False
@@ -110,6 +116,7 @@ async def disconnect(self):
110116

111117
async def reconnect(self):
112118
await self.disconnect()
119+
await self.pool._process_reconnect_hook(self)
113120
await self.connect()
114121

115122
async def _send_initial_topics(self):
@@ -118,10 +125,11 @@ async def _send_initial_topics(self):
118125
async def _send_topics(self, topics: List[Topic], type="LISTEN"):
119126
for tok, _topics in groupby(topics, key=lambda val: val.token):
120127
nonce = ("%032x" % uuid.uuid4().int)[:8]
128+
121129
payload = {
122130
"type": type,
123131
"nonce": nonce,
124-
"data": {"topics": [x.present for x in _topics], "auth_token": tok},
132+
"data": {"topics": [x._present_set_nonce(nonce) for x in _topics], "auth_token": tok},
125133
}
126134
logger.debug(f"Sending {type} payload with nonce '{nonce}': {payload}")
127135
await self.send(payload)
@@ -150,7 +158,7 @@ async def poll(self):
150158

151159
handle = getattr(self, "handle_" + data["type"].lower().replace("-", "_"), None)
152160
if handle:
153-
self.client.loop.create_task(handle(data))
161+
self.client.loop.create_task(handle(data), name=f"pubsub-handle-event: {data['type']}")
154162
else:
155163
print(data)
156164
logger.debug(f"Pubsub event referencing unknown event '{data['type']}'. Discarding")
@@ -197,6 +205,10 @@ async def handle_response(self, message: dict):
197205
if message["error"]:
198206
logger.error(f"Received errored response for nonce {message['nonce']}: {message['error']}")
199207
self.client.run_event("pubsub_error", message)
208+
if message["error"] == "ERR_BADAUTH":
209+
nonce = message["nonce"]
210+
await self.pool._process_auth_fail(nonce, self)
211+
200212
elif message["type"] == "RECONNECT":
201213
logger.warning("Received RECONNECT response from pubsub edge. Reconnecting")
202214
await asyncio.shield(self.reconnect())

0 commit comments

Comments
 (0)