Skip to content

Commit 0afde04

Browse files
committed
Change join_channels method
join_channels now chunks into groups of 20 rather than a single list relying on tick rates. Adjust timeouts based on list length.
1 parent e72e8a7 commit 0afde04

File tree

2 files changed

+29
-15
lines changed

2 files changed

+29
-15
lines changed

docs/changelog.rst

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,10 @@ Master
99
- Added repr for :class:`~twitchio.CustomReward`
1010
- Bug fixes
1111
- Added ``self.registered_callbacks = {}`` to :func:`~twitchio.Client.from_client_credentials`
12-
- Allow empty or missing initial_channels to trigger :func:`~twitchio.Client.event_ready`.
12+
- Allow empty or missing initial_channels to trigger :func:`~twitchio.Client.event_ready`
1313
- Corrected :func:`twitchio.CustomRewardRedemption.fulfill` endpoint typo and creation
1414
- Corrected :func:`twitchio.CustomRewardRedemption.refund` endpoint typo and creation
15+
- Changed :func:`~twitchio.Client.join_channels` logic to handle bigger channel lists better
1516

1617
- ext.commands
1718
- Bug fixes

twitchio/websocket.py

Lines changed: 27 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,16 @@ async def part_channels(self, *channels: str):
249249
if self._retain_cache:
250250
self._cache.pop(channel, None)
251251

252+
def _assign_timeout(self, channel_count: int):
253+
if channel_count <= 40:
254+
return 30
255+
elif channel_count <= 60:
256+
return 40
257+
elif channel_count <= 80:
258+
return 50
259+
else:
260+
return 60
261+
252262
async def join_channels(self, *channels: str):
253263
"""|coro|
254264
@@ -259,26 +269,29 @@ async def join_channels(self, *channels: str):
259269
*channels : str
260270
An argument list of channels to attempt joining.
261271
"""
262-
async with self._join_lock: # acquire a lock, allowing only one join_channels at once...
263-
for channel in channels:
264-
if self._join_handle < time.time(): # Handle is less than the current time
265-
self._join_tick = 20 # So lets start a new rate limit bucket..
266-
self._join_handle = time.time() + 11 # Set the handle timeout time
267-
if self._join_tick == 0: # We have exhausted the bucket, wait so we can make a new one...
268-
await asyncio.sleep(self._join_handle - time.time())
269-
asyncio.create_task(self._join_channel(channel))
270-
self._join_tick -= 1
271-
272-
async def _join_channel(self, entry):
272+
async with self._join_lock:
273+
channel_count = len(channels)
274+
if channel_count > 20:
275+
timeout = self._assign_timeout(channel_count)
276+
chunks = [channels[i : i + 20] for i in range(0, len(channels), 20)]
277+
for chunk in chunks:
278+
for channel in chunk:
279+
asyncio.create_task(self._join_channel(channel, timeout))
280+
await asyncio.sleep(11)
281+
else:
282+
for channel in channels:
283+
asyncio.create_task(self._join_channel(channel, 11))
284+
285+
async def _join_channel(self, entry: str, timeout: int):
273286
channel = re.sub("[#]", "", entry).lower()
274287
await self.send(f"JOIN #{channel}\r\n")
275288

276289
self._join_pending[channel] = fut = self._loop.create_future()
277-
asyncio.create_task(self._join_future_handle(fut, channel))
290+
asyncio.create_task(self._join_future_handle(fut, channel, timeout))
278291

279-
async def _join_future_handle(self, fut: asyncio.Future, channel: str):
292+
async def _join_future_handle(self, fut: asyncio.Future, channel: str, timeout: int):
280293
try:
281-
await asyncio.wait_for(fut, timeout=11)
294+
await asyncio.wait_for(fut, timeout=timeout)
282295
except asyncio.TimeoutError:
283296
log.error(f'The channel "{channel}" was unable to be joined. Check the channel is valid.')
284297
self._join_pending.pop(channel)

0 commit comments

Comments
 (0)