Skip to content

Commit e06ad77

Browse files
authored
Ref #967: Add queue parsing check in heartbeat (#968)
Add queue parsing check in heartbeat
1 parent 7f99b9e commit e06ad77

File tree

3 files changed

+54
-8
lines changed

3 files changed

+54
-8
lines changed

.env.example

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ LOG_LEVEL=debug
55
APP_RELOAD=True
66
APP_DEBUG=True
77
JBI_API_KEY="fake_api_key"
8-
DL_QUEUE_DSN=file:///tmp
8+
DL_QUEUE_DSN=file:///tmp/dlqueue
99
DL_QUEUE_CONSTANT_RETRY=true
1010
DL_QUEUE_RETRY_TIMEOUT_DAYS=7
1111

jbi/queue.py

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
item and parse it as an item
2323
"""
2424

25+
import asyncio
2526
import logging
2627
import tempfile
2728
import traceback
@@ -30,7 +31,7 @@
3031
from functools import lru_cache
3132
from json import JSONDecodeError
3233
from pathlib import Path
33-
from typing import AsyncIterator, List, Optional
34+
from typing import Any, AsyncIterator, List, Optional
3435
from urllib.parse import ParseResult, urlparse
3536

3637
import dockerflow.checks
@@ -42,6 +43,10 @@
4243
logger = logging.getLogger(__name__)
4344

4445

46+
async def async_iter(iter: AsyncIterator[Any]) -> list[Any]:
47+
return [item async for item in iter]
48+
49+
4550
class QueueItemRetrievalError(Exception):
4651
pass
4752

@@ -160,6 +165,9 @@ def __init__(self, location):
160165
self.location = Path(location)
161166
self.location.mkdir(parents=True, exist_ok=True)
162167

168+
def __repr__(self) -> str:
169+
return f"FileBackend({self.location})"
170+
163171
def ping(self):
164172
try:
165173
with tempfile.TemporaryDirectory(dir=self.location) as temp_dir:
@@ -222,7 +230,7 @@ async def get(self, bug_id: int) -> AsyncIterator[QueueItem]:
222230
yield QueueItem.parse_file(path)
223231
except (JSONDecodeError, ValidationError) as e:
224232
raise QueueItemRetrievalError(
225-
"Unable to load item at path %s from queue", str(path)
233+
f"Unable to load item at path {path} from queue"
226234
) from e
227235

228236
async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]:
@@ -253,15 +261,31 @@ def ready(self) -> list[dockerflow.checks.CheckMessage]:
253261
TODO: Convert to an async method when Dockerflow's FastAPI integration
254262
can run check asynchronously
255263
"""
256-
264+
results = []
257265
ping_result = self.backend.ping()
258266
if ping_result is False:
259-
return [
267+
results.append(
260268
dockerflow.checks.Error(
261-
f"queue with f{str(self.backend)} backend unavailable"
269+
f"queue with {str(self.backend)} unavailable",
270+
hint="with FileBackend, check that folder is writable",
271+
id="queue.backend.ping",
262272
)
263-
]
264-
return []
273+
)
274+
275+
try:
276+
bugs_items = asyncio.run(self.retrieve())
277+
for items in bugs_items.values():
278+
asyncio.run(async_iter(items))
279+
except Exception as exc:
280+
results.append(
281+
dockerflow.checks.Error(
282+
f"queue with {str(self.backend)} cannot be retrieved",
283+
hint=f"invalid data: {exc}",
284+
id="queue.backend.retrieve",
285+
)
286+
)
287+
288+
return results
265289

266290
async def postpone(self, payload: bugzilla.WebhookRequest) -> None:
267291
"""

tests/unit/jira/test_queue.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,28 @@ async def test_get_payload_doesnt_match_schema(
242242
await anext(items)
243243

244244

245+
def test_ready_ok(queue: DeadLetterQueue):
246+
assert queue.ready() == []
247+
248+
249+
def test_ready_not_writable(queue: DeadLetterQueue, tmp_path):
250+
queue.backend = FileBackend(tmp_path)
251+
tmp_path.chmod(0o400) # set to readonly
252+
[failure] = queue.ready()
253+
assert failure.id == "queue.backend.ping"
254+
255+
256+
def test_ready_not_parseable(queue: DeadLetterQueue):
257+
corrupt_file_dir = queue.backend.location / "999"
258+
corrupt_file_dir.mkdir()
259+
corrupt_file_path = corrupt_file_dir / "xxx.json"
260+
corrupt_file_path.write_text("BOOM")
261+
262+
[failure] = queue.ready()
263+
assert failure.id == "queue.backend.retrieve"
264+
assert failure.hint.startswith("invalid data: Unable to load item at path /")
265+
266+
245267
@pytest.mark.asyncio
246268
async def test_postpone(queue: DeadLetterQueue, webhook_request_factory):
247269
webhook_payload = webhook_request_factory()

0 commit comments

Comments
 (0)