diff --git a/aiohttp_sse_client/client.py b/aiohttp_sse_client/client.py index f3edbc5..8d6610f 100644 --- a/aiohttp_sse_client/client.py +++ b/aiohttp_sse_client/client.py @@ -6,7 +6,7 @@ from typing import Optional, Dict, Any import attr -from aiohttp import hdrs, ClientSession, ClientConnectionError +from aiohttp import hdrs, ClientSession, ClientPayloadError, ClientConnectionError, ClientConnectorError from multidict import MultiDict from yarl import URL @@ -56,6 +56,7 @@ class EventSource: .. seealso:: https://www.w3.org/TR/eventsource/#eventsource """ + def __init__(self, url: str, option: Optional[Dict[str, Any]] = None, reconnection_time: timedelta = DEFAULT_RECONNECTION_TIME, @@ -154,38 +155,42 @@ async def __anext__(self) -> MessageEvent: # async for ... in StreamReader only split line by \n while self._response.status != 204: - async for line_in_bytes in self._response.content: - line = line_in_bytes.decode('utf8') # type: str - line = line.rstrip('\n').rstrip('\r') - - if line == '': - # empty line - event = self._dispatch_event() - if event is not None: - return event - continue - - if line[0] == ':': - # comment line, ignore - continue - - if ':' in line: - # contains ':' - fields = line.split(':', 1) - field_name = fields[0] - field_value = fields[1].lstrip(' ') - self._process_field(field_name, field_value) - else: - self._process_field(line, '') + try: + async for line_in_bytes in self._response.content: + line = line_in_bytes.decode('utf8') # type: str + line = line.rstrip('\n').rstrip('\r') + + if line == '': + # empty line + event = self._dispatch_event() + if event is not None: + return event + continue + + if line[0] == ':': + # comment line, ignore + continue + + if ':' in line: + # contains ':' + fields = line.split(':', 1) + field_name = fields[0] + field_value = fields[1].lstrip(' ') + self._process_field(field_name, field_value) + else: + self._process_field(line, '') + except ClientPayloadError: + pass + self._ready_state = READY_STATE_CONNECTING if self._on_error: self._on_error() - self._reconnection_time *= 2 _LOGGER.debug('wait %s seconds for retry', self._reconnection_time.total_seconds()) await asyncio.sleep( self._reconnection_time.total_seconds()) - await self.connect() + self._reconnection_time *= 2 + await self.connect(self._max_connect_retry) raise StopAsyncIteration async def connect(self, retry=0): @@ -216,7 +221,7 @@ async def connect(self, retry=0): self._url, **self._kwargs ) - except ClientConnectionError: + except (ClientConnectionError, ClientConnectorError): if retry <= 0 or self._ready_state == READY_STATE_CLOSED: await self._fail_connect() raise @@ -224,11 +229,11 @@ async def connect(self, retry=0): self._ready_state = READY_STATE_CONNECTING if self._on_error: self._on_error() - self._reconnection_time *= 2 _LOGGER.debug('wait %s seconds for retry', self._reconnection_time.total_seconds()) await asyncio.sleep( self._reconnection_time.total_seconds()) + self._reconnection_time *= 2 await self.connect(retry - 1) return @@ -252,8 +257,8 @@ async def connect(self, retry=0): if response.content_type != CONTENT_TYPE_EVENT_STREAM: error_message = \ - 'fetch {} failed with wrong Content-Type: {}'.format( - self._url, response.headers.get(hdrs.CONTENT_TYPE)) + 'fetch {} failed with wrong Content-Type: {}'.format( + self._url, response.headers.get(hdrs.CONTENT_TYPE)) _LOGGER.error(error_message) await self._fail_connect() @@ -336,3 +341,18 @@ def _process_field(self, field_name, field_value): pass pass + +# if __name__ == '__main__': +# import asyncio +# +# +# async def main(): +# async with EventSource("http://10.10.10.103:30106/api/v1/stream-events/", max_connect_retry=10) as event_source: +# try: +# async for event in event_source: +# print(event) +# except ConnectionError: +# pass +# +# +# asyncio.run(main()) diff --git a/setup.py b/setup.py index 062bb08..42fb5bf 100644 --- a/setup.py +++ b/setup.py @@ -45,6 +45,6 @@ test_suite='tests', tests_require=test_requirements, url='https://github.com/rtfol/aiohttp-sse-client', - version='0.2.1', + version='0.2.2', zip_safe=False, )