Periodically send messages with websocket #16007
Answered
by
sosi-deadeye
LaySoft
asked this question in
Core Development
-
I'd like to send messages every second over an already opened websocket connection:
But only the first message goes out, the server responds, the response is written, but no further messages are sent. What am I messing up? |
Beta Was this translation helpful? Give feedback.
Answered by
sosi-deadeye
Oct 14, 2024
Replies: 1 comment 1 reply
-
The program work as expected. The async iterator waits for an unlimited time, if no message were sent from the server to your client. Some example code: import sys
import aiohttp
import asyncio
import json
WS_URL = "wss://echo.websocket.org/"
class AsyncTimeoutIterator:
"""
Simple implementation of AsyncTimeoutIterator.
The class iterates over an async generator and
if it takes longer than the timeout, the iteration is stopped.
"""
def __init__(self, async_gen, timeout):
self.gen = async_gen
self.timeout = timeout
# should be a sync function?
def __aiter__(self):
return self
# async function for the next item
async def __anext__(self):
try:
# use gen.__anext__(), because anext() is not implemented yet
# gen.__anext__() returns a coroutine, which holds the next element
# you must await for this corutine to get the value
# asyncio.wait_for(fs, timeout) is used to raise
# an asyncio.TimeoutError, which then jumps to the Except block
# and raises the StopAsyncIteration, which stops the async for loop
obj = await asyncio.wait_for(self.gen.__anext__(), self.timeout)
except asyncio.TimeoutError:
print("Timeout of AsyncTimeoutIterator")
raise StopAsyncIteration
return obj
async def echo(ws: aiohttp.ClientWebSocketResponse):
print("Echo task were started")
n = 1
try:
while True:
await ws.send_json({"cnt": n})
n += 1
# waiting longer if n == 10
if n == 10:
await asyncio.sleep(3)
else:
await asyncio.sleep(1)
except asyncio.CancelledError:
print("Task was cancelled")
async def ws_test_echo(session: aiohttp.ClientSession):
async with session.ws_connect(WS_URL, ssl=True) as ws:
# ws is aiohttp.ClientWebSocketResponse
print("Hello message:", await ws.receive_str())
print("Starting a task to send messages to the WebsocketServer")
print("Then iterating over the async iterator")
# starting a task to send data to the websocket echo server
task = asyncio.create_task(echo(ws))
# waiting for new messges, this may block for
# unlimited time
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
task.cancel()
await asyncio.sleep(0)
async def ws_test_echo2(session: aiohttp.ClientSession):
async with session.ws_connect(WS_URL, ssl=True) as ws:
# ws is aiohttp.ClientWebSocketResponse
print("Hello message:", await ws.receive_str())
print("Sending in a task data to the WebsocketServer")
print("Use of AsyncTimeoutIterator to add a timeout")
# starting a task to send data to the websocket echo server
task = asyncio.create_task(echo(ws))
# using a custom class to raise a timeout
# of the async iterator, if a message were received too late
# this will raise an StopAsyncIteration, which stops async for
async for msg in AsyncTimeoutIterator(ws, 2):
# the example server answers always with TEXT
if msg.type == aiohttp.WSMsgType.TEXT:
print(msg.data)
task.cancel()
await asyncio.sleep(0)
async def ws_test_echo3(session: aiohttp.ClientSession):
async with session.ws_connect(WS_URL, ssl=True) as ws:
print("Hello message:", await ws.receive_str())
print("Sendin a message and receive the echo reply from websocketserver")
for n in range(1, 11):
await ws.send_json({"cnt": n})
# the echo server return TEXT, but this is wrong
# because json were sent
# using the websocket directly, instead
# of the wrapper around the websocket connection
opcode, data = await ws.ws.receive()
print(data)
async def main():
async with aiohttp.ClientSession() as session:
print("ws_test_echo3")
await ws_test_echo3(session)
print()
print("ws_test_echo2")
await ws_test_echo2(session)
print()
print("ws_test_echo")
try:
await asyncio.wait_for(ws_test_echo(session), 10)
except asyncio.TimeoutError:
print("Timeout of ws_test_echo")
else:
print()
if __name__ == "__main__":
asyncio.run(main()) Output on EP32-S3:
|
Beta Was this translation helpful? Give feedback.
1 reply
Answer selected by
LaySoft
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
The program work as expected. The async iterator waits for an unlimited time, if no message were sent from the server to your client.
Some example code: