Skip to content

Commit 002e049

Browse files
committed
fix: resolve Ruff linting errors in Redis event handling
1 parent 6f4e83a commit 002e049

File tree

6 files changed

+82
-33
lines changed

6 files changed

+82
-33
lines changed

src/a2a/server/events/redis_event_consumer.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,9 @@
33
import asyncio
44
import logging
55

6-
from typing import Protocol, TYPE_CHECKING
6+
from typing import TYPE_CHECKING, Protocol
7+
8+
79
if TYPE_CHECKING:
810
from collections.abc import AsyncGenerator
911

@@ -24,6 +26,7 @@ def is_closed(self) -> bool:
2426
"""Return True if the underlying queue has been closed."""
2527
...
2628

29+
2730
logger = logging.getLogger(__name__)
2831

2932

@@ -38,6 +41,7 @@ class RedisEventConsumer:
3841
def __init__(self, queue: QueueLike) -> None:
3942
"""Wrap a queue-like object that exposes dequeue_event and is_closed."""
4043
self._queue = queue
44+
4145
async def consume_one(self) -> object:
4246
"""Consume a single event without waiting; raises asyncio.QueueEmpty if none."""
4347
return await self._queue.dequeue_event(no_wait=True)

src/a2a/server/events/redis_event_queue.py

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,36 @@
55
import asyncio
66
import json
77
import logging
8+
89
from typing import Any
910

11+
1012
try:
1113
import redis.asyncio as aioredis # type: ignore
14+
1215
from redis.exceptions import RedisError # type: ignore
1316
except ImportError: # pragma: no cover - optional dependency
1417
aioredis = None # type: ignore
1518
RedisError = Exception # type: ignore
1619

17-
from a2a.server.events.event_queue import EventQueue
1820
from typing import TYPE_CHECKING
21+
22+
from a2a.server.events.event_queue import EventQueue
23+
24+
1925
if TYPE_CHECKING:
2026
from a2a.server.events.event_queue import Event
2127
from pydantic import ValidationError
28+
2229
from a2a.types import (
2330
Message,
2431
Task,
25-
TaskStatusUpdateEvent,
2632
TaskArtifactUpdateEvent,
33+
TaskStatusUpdateEvent,
2734
)
2835
from a2a.utils.telemetry import SpanKind, trace_class
2936

37+
3038
logger = logging.getLogger(__name__)
3139

3240

@@ -98,7 +106,7 @@ async def enqueue_event(self, event: Event) -> None:
98106
except RedisError:
99107
logger.exception('Failed to XADD event to redis stream')
100108

101-
async def dequeue_event(self, no_wait: bool = False) -> Event | Any:
109+
async def dequeue_event(self, no_wait: bool = False) -> Event | Any: # noqa: PLR0912
102110
"""Read one event from the Redis stream respecting no_wait semantics.
103111
104112
Returns a parsed pydantic model matching the event type.
@@ -128,18 +136,25 @@ async def dequeue_event(self, no_wait: bool = False) -> Event | Any:
128136
norm: dict[str, object] = {}
129137
try:
130138
for k, v in fields.items():
131-
key = k.decode('utf-8') if isinstance(k, (bytes, bytearray)) else k
132-
if isinstance(v, (bytes, bytearray)):
139+
key = (
140+
k.decode('utf-8')
141+
if isinstance(k, bytes | bytearray)
142+
else k
143+
)
144+
if isinstance(v, bytes | bytearray):
133145
try:
134146
val: object = v.decode('utf-8')
135-
except Exception:
147+
except UnicodeDecodeError:
136148
val = v
137149
else:
138150
val = v
139151
norm[str(key)] = val
140-
except Exception:
152+
except Exception: # noqa: BLE001
141153
# Defensive: if normalization fails, skip this entry and continue
142-
logger.debug('RedisEventQueue.dequeue_event: failed to normalize entry fields, skipping %s', entry_id)
154+
logger.debug(
155+
'RedisEventQueue.dequeue_event: failed to normalize entry fields, skipping %s',
156+
entry_id,
157+
)
143158
continue
144159

145160
evt_type = norm.get('type')
@@ -153,7 +168,10 @@ async def dequeue_event(self, no_wait: bool = False) -> Event | Any:
153168
if raw_payload is None:
154169
# Missing payload — likely due to key mismatch or malformed entry.
155170
# Skip and continue to next entry instead of returning None to callers.
156-
logger.debug('RedisEventQueue.dequeue_event: skipping entry %s with missing payload', entry_id)
171+
logger.debug(
172+
'RedisEventQueue.dequeue_event: skipping entry %s with missing payload',
173+
entry_id,
174+
)
157175
# continue loop to read next entry
158176
continue
159177

@@ -171,12 +189,17 @@ async def dequeue_event(self, no_wait: bool = False) -> Event | Any:
171189
try:
172190
return model.parse_obj(data)
173191
except ValidationError as exc:
174-
logger.debug('Failed to parse event payload into model, returning raw data: %s', exc)
192+
logger.debug(
193+
'Failed to parse event payload into model, returning raw data: %s',
194+
exc,
195+
)
175196
# Return raw data for flexibility when parsing fails
176197
return data
177198

178199
# Unknown type — return raw data for flexibility
179-
logger.debug('Unknown event type: %s, returning raw payload', evt_type)
200+
logger.debug(
201+
'Unknown event type: %s, returning raw payload', evt_type
202+
)
180203
return data
181204

182205
def task_done(self) -> None: # streams do not require task_done semantics
@@ -226,4 +249,6 @@ async def clear_events(self, clear_child_queues: bool = True) -> None:
226249
try:
227250
await self._redis.delete(self._stream_key)
228251
except RedisError:
229-
logger.exception('Failed to delete redis stream during clear_events')
252+
logger.exception(
253+
'Failed to delete redis stream during clear_events'
254+
)

src/a2a/server/events/redis_queue_manager.py

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
from __future__ import annotations
22

33
import logging
4+
45
from typing import TYPE_CHECKING, Any
56

67
from a2a.server.events.queue_manager import QueueManager
78

9+
810
if TYPE_CHECKING:
911
from a2a.server.events.event_queue import EventQueue
1012

@@ -25,7 +27,9 @@ class RedisQueueManager(QueueManager):
2527
All coordination happens through Redis streams.
2628
"""
2729

28-
def __init__(self, redis_client: Any, stream_prefix: str = 'a2a:task') -> None:
30+
def __init__(
31+
self, redis_client: Any, stream_prefix: str = 'a2a:task'
32+
) -> None:
2933
self._redis = redis_client
3034
self._stream_prefix = stream_prefix
3135

@@ -92,7 +96,7 @@ async def close(self, task_id: str) -> None:
9296
if result and result[0][1].get('type') == 'CLOSE':
9397
# Stream is already closed, no need to add another CLOSE entry
9498
return
95-
except Exception as exc:
99+
except Exception as exc: # noqa: BLE001
96100
# If we can't check (e.g., stream doesn't exist), proceed with closing
97101
logger.debug('Could not check if stream is already closed: %s', exc)
98102

@@ -114,7 +118,7 @@ async def create_or_tap(self, task_id: str) -> EventQueue:
114118
logger.info('create_or_tap called with task_id: %s', task_id)
115119
logger.info('RedisEventQueue value: %s', RedisEventQueue)
116120
logger.info('RedisEventQueue type: %s', type(RedisEventQueue))
117-
121+
118122
if RedisEventQueue is None:
119123
logger.error('RedisEventQueue is None - import failed!')
120124
raise RuntimeError(

src/a2a/server/request_handlers/default_request_handler.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -94,11 +94,13 @@ def __init__( # noqa: PLR0913
9494
"""
9595
self.agent_executor = agent_executor
9696
self.task_store = task_store
97-
97+
9898
# Handle queue_manager with deprecation warning for backward compatibility
9999
if queue_manager is None:
100100
# Allow disabling fallback via environment variable for strict production deployments
101-
disable_fallback = os.getenv('A2A_DISABLE_QUEUE_MANAGER_FALLBACK', '').lower() in ('true', '1', 'yes')
101+
disable_fallback = os.getenv(
102+
'A2A_DISABLE_QUEUE_MANAGER_FALLBACK', ''
103+
).lower() in ('true', '1', 'yes')
102104

103105
if disable_fallback:
104106
raise ValueError(
@@ -110,12 +112,12 @@ def __init__( # noqa: PLR0913
110112
'Using default InMemoryQueueManager. This will be removed in a future version. '
111113
'Please explicitly pass a QueueManager instance to ensure proper production deployment.',
112114
DeprecationWarning,
113-
stacklevel=2
115+
stacklevel=2,
114116
)
115117
self._queue_manager = InMemoryQueueManager()
116118
else:
117119
self._queue_manager = queue_manager
118-
120+
119121
self._push_config_store = push_config_store
120122
self._push_sender = push_sender
121123
self._request_context_builder = (

src/a2a/server/request_handlers/redis_request_handler.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,10 @@
22

33
from typing import Any
44

5-
from a2a.server.request_handlers.default_request_handler import DefaultRequestHandler
6-
from a2a.server.events.redis_queue_manager import RedisQueueManager
5+
from a2a.server.events.redis_queue_manager import RedisQueueManager
6+
from a2a.server.request_handlers.default_request_handler import (
7+
DefaultRequestHandler,
8+
)
79

810

911
def create_redis_request_handler(
@@ -19,5 +21,12 @@ def create_redis_request_handler(
1921
provided `redis_client` and passes it into `DefaultRequestHandler` so the
2022
rest of the application can remain unchanged.
2123
"""
22-
queue_manager = RedisQueueManager(redis_client=redis_client, stream_prefix=stream_prefix)
23-
return DefaultRequestHandler(agent_executor=agent_executor, task_store=task_store, queue_manager=queue_manager, **kwargs)
24+
queue_manager = RedisQueueManager(
25+
redis_client=redis_client, stream_prefix=stream_prefix
26+
)
27+
return DefaultRequestHandler(
28+
agent_executor=agent_executor,
29+
task_store=task_store,
30+
queue_manager=queue_manager,
31+
**kwargs,
32+
)

src/a2a/utils/stream_write/redis_stream_writer.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import logging
99

1010
from datetime import datetime, timezone
11+
from types import TracebackType
1112
from typing import Any
1213

1314

@@ -58,19 +59,26 @@ async def disconnect(self) -> None:
5859
self._connected = False
5960
logger.info('Disconnected from Redis')
6061

61-
async def __aenter__(self):
62+
async def __aenter__(self) -> 'RedisStreamInjector':
63+
"""Enter the async context manager."""
6264
await self.connect()
6365
return self
6466

65-
async def __aexit__(self, exc_type, exc_val, exc_tb):
67+
async def __aexit__(
68+
self,
69+
exc_type: type[BaseException] | None,
70+
exc_val: BaseException | None,
71+
exc_tb: TracebackType | None,
72+
) -> None:
73+
"""Exit the async context manager."""
6674
await self.disconnect()
6775

6876
def _get_stream_key(self, task_id: str) -> str:
6977
"""Get the Redis stream key for a task."""
7078
if not task_id:
7179
raise ValueError('task_id cannot be empty')
7280
stream_key = f'a2a:task:{task_id}'
73-
logger.debug(f'Generated stream key: {stream_key}')
81+
logger.debug('Generated stream key: %s', stream_key)
7482
return stream_key
7583

7684
def _serialize_event(
@@ -105,10 +113,7 @@ async def stream_message(
105113
if not context_id:
106114
raise ValueError('context_id cannot be empty')
107115

108-
if isinstance(message, dict):
109-
data = message
110-
else:
111-
data = message.model_dump()
116+
data = message if isinstance(message, dict) else message.model_dump()
112117

113118
event_data = self._serialize_event('Message', data)
114119
return await self._append_to_stream(task_id, event_data)
@@ -227,7 +232,7 @@ async def get_latest_event(self, task_id: str) -> dict[str, Any] | None:
227232
if result:
228233
entry_id, fields = result[0]
229234
return {'id': entry_id, **fields}
230-
except Exception as e:
235+
except Exception as e: # noqa: BLE001
231236
logger.warning(
232237
'Failed to get latest event',
233238
extra={'task_id': task_id, 'error': str(e)},
@@ -247,7 +252,7 @@ async def get_events_since(self, task_id: str, start_id: str = '0') -> list:
247252
try:
248253
result = await self._client.xrange(stream_key, start_id, '+')
249254
return [{'id': entry_id, **fields} for entry_id, fields in result]
250-
except Exception as e:
255+
except Exception as e: # noqa: BLE001
251256
logger.warning(
252257
'Failed to get events',
253258
extra={'task_id': task_id, 'error': str(e)},

0 commit comments

Comments
 (0)