Skip to content

Commit 06fcd13

Browse files
authored
Employ asyncio locks for pipes to avoid drain errors in aiohttp
aio-libs/aiohttp#2934 (comment)
1 parent 44d8d40 commit 06fcd13

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

pypubsub.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ def __init__(self, server, connection, request):
296296
self.connection = connection
297297
self.acl = {}
298298
self.server = server
299+
self.lock = asyncio.Lock()
299300

300301
# Set topics subscribed to
301302
self.topics = [x for x in request.path.split('/') if x]
@@ -351,7 +352,8 @@ async def ping(self):
351352
js = b"%s\n" % json.dumps({"stillalive": time.time()}).encode('utf-8')
352353
if self.old_school:
353354
js += b"\0"
354-
await self.connection.write(js)
355+
async with self.lock:
356+
await self.connection.write(js)
355357

356358

357359
class Payload:
@@ -391,9 +393,11 @@ async def publish(self, subscribers):
391393
if all(el in self.topics for el in sub.topics):
392394
try:
393395
if sub.old_school:
394-
await sub.connection.write(ojs)
396+
async with sub.lock:
397+
await sub.connection.write(ojs)
395398
else:
396-
await sub.connection.write(js)
399+
async with sub.lock:
400+
await sub.connection.write(js)
397401
except Exception:
398402
bad_subs.append(sub)
399403
return bad_subs

0 commit comments

Comments
 (0)