Skip to content

Commit 9fa5115

Browse files
alexcottnergrahamalamaleplatrem
authored
Adding retry process for dead letter queue (#924)
Adding retry process for dead letter queue, and unit tests to verify functionality. --------- Co-authored-by: Graham Beckley <[email protected]> Co-authored-by: Mathieu Leplatre <[email protected]>
1 parent a0a7615 commit 9fa5115

File tree

8 files changed

+257
-6
lines changed

8 files changed

+257
-6
lines changed

.env.example

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ APP_RELOAD=True
66
APP_DEBUG=True
77
JBI_API_KEY="fake_api_key"
88
DL_QUEUE_DSN=file:///tmp
9+
DL_QUEUE_CONSTANT_RETRY=true
10+
DL_QUEUE_RETRY_TIMEOUT_DAYS=7
911

1012
# Jira API Secrets
1113
JIRA_USERNAME="fake_jira_username"

.secrets.baseline

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,16 @@
113113
"filename": ".env.example",
114114
"hashed_secret": "4b9a4ce92b6a01a4cd6ee1672d31c043f2ae79ab",
115115
"is_verified": false,
116-
"line_number": 12
116+
"line_number": 14
117117
},
118118
{
119119
"type": "Secret Keyword",
120120
"filename": ".env.example",
121121
"hashed_secret": "77ea6398f252999314d609a708842a49fc43e055",
122122
"is_verified": false,
123-
"line_number": 15
123+
"line_number": 17
124124
}
125125
]
126126
},
127-
"generated_at": "2024-03-20T18:05:17Z"
127+
"generated_at": "2024-04-10T20:05:01Z"
128128
}

Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ $(DOTENV_FILE):
6565

6666
.PHONY: docker-shell
6767
docker-shell: $(DOTENV_FILE)
68-
docker-compose run --rm web /bin/sh
68+
docker compose run --rm web /bin/sh
6969

7070
.PHONY: docker-start
7171
docker-start: $(DOTENV_FILE)
72-
docker-compose up
72+
docker compose up
7373

7474
.PHONY: test
7575
test: $(INSTALL_STAMP)

docker-compose.yaml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,3 +12,10 @@ services:
1212
- ${PORT:-8000}:${PORT:-8000}
1313
volumes:
1414
- .:/app
15+
retry:
16+
build: .
17+
command: python -m jbi.retry
18+
env_file:
19+
- .env
20+
volumes:
21+
- .:/app

jbi/environment.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,9 +48,12 @@ class Settings(BaseSettings):
4848
sentry_dsn: Optional[AnyUrl] = None
4949
sentry_traces_sample_rate: float = 1.0
5050

51+
# Retry queue
5152
dl_queue_dsn: FileUrl
5253

53-
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
54+
model_config = SettingsConfigDict(
55+
env_file=".env", env_file_encoding="utf-8", extra="ignore"
56+
)
5457

5558

5659
@lru_cache(maxsize=1)

jbi/queue.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -302,6 +302,9 @@ async def list(self, bug_id: int) -> List[str]:
302302
async def list_all(self) -> dict[int, List[str]]:
303303
return await self.backend.list_all()
304304

305+
async def size(self, bug_id=None):
306+
return await self.backend.size(bug_id=bug_id)
307+
305308
async def done(self, item: QueueItem) -> None:
306309
"""
307310
Mark item as done, remove from queue.

jbi/retry.py

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
import asyncio
2+
import logging
3+
from datetime import UTC, datetime, timedelta
4+
from os import getenv
5+
from time import sleep
6+
7+
import jbi.runner as runner
8+
from jbi.configuration import ACTIONS
9+
from jbi.queue import get_dl_queue
10+
11+
CONSTANT_RETRY = getenv("DL_QUEUE_CONSTANT_RETRY", "false") == "true"
12+
RETRY_TIMEOUT_DAYS = getenv("DL_QUEUE_RETRY_TIMEOUT_DAYS", 7)
13+
CONSTANT_RETRY_SLEEP = getenv("DL_QUEUE_CONSTANT_RETRY_SLEEP", 5)
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
async def retry_failed(item_executor=runner.execute_action, queue=get_dl_queue()):
19+
min_event_timestamp = datetime.now(UTC) - timedelta(days=int(RETRY_TIMEOUT_DAYS))
20+
21+
# load all bugs from DLQ
22+
bugs = await queue.retrieve()
23+
24+
# metrics to track
25+
metrics = {
26+
"bug_count": len(bugs),
27+
"events_processed": 0,
28+
"events_skipped": 0,
29+
"events_failed": 0,
30+
"bugs_failed": 0,
31+
}
32+
33+
for bug_id, items in bugs.items():
34+
try:
35+
async for item in items:
36+
# skip and delete item if we have exceeded RETRY_TIMEOUT_DAYS
37+
if item.timestamp < min_event_timestamp:
38+
logger.warning("removing expired event %s", item.identifier)
39+
await queue.done(item)
40+
metrics["events_skipped"] += 1
41+
continue
42+
43+
try:
44+
item_executor(item.payload, ACTIONS)
45+
await queue.done(item)
46+
metrics["events_processed"] += 1
47+
except Exception:
48+
logger.exception("failed to reprocess event %s.", item.identifier)
49+
metrics["events_failed"] += 1
50+
51+
# check for other events that will be skipped
52+
pending_events = await queue.size(bug_id)
53+
if pending_events > 1: # if this isn't the only event for the bug
54+
logger.info(
55+
"skipping %d event(s) for bug %d, previous event %s failed",
56+
pending_events - 1,
57+
bug_id,
58+
item.identifier,
59+
)
60+
metrics["events_skipped"] += pending_events - 1
61+
break
62+
except Exception:
63+
logger.exception("failed to parse events for bug %d.", bug_id)
64+
metrics["bugs_failed"] += 1
65+
66+
return metrics
67+
68+
69+
async def main():
70+
while True:
71+
metrics = await retry_failed()
72+
logger.info("event queue processing complete", extra=metrics)
73+
if not CONSTANT_RETRY:
74+
return
75+
sleep(int(CONSTANT_RETRY_SLEEP))
76+
77+
78+
if __name__ == "__main__":
79+
asyncio.run(main())

tests/unit/test_retry.py

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
from datetime import UTC, datetime, timedelta
2+
from unittest.mock import MagicMock
3+
4+
import pytest
5+
6+
from jbi.queue import DeadLetterQueue
7+
from jbi.retry import RETRY_TIMEOUT_DAYS, retry_failed
8+
from jbi.runner import execute_action
9+
10+
11+
def iter_error():
12+
mock = MagicMock()
13+
mock.__aiter__.return_value = None
14+
mock.__aiter__.side_effect = Exception("Throwing an exception")
15+
return mock
16+
17+
18+
async def aiter_sync(iterable):
19+
for i in iterable:
20+
yield i
21+
22+
23+
@pytest.fixture
24+
def mock_queue():
25+
return MagicMock(spec=DeadLetterQueue)
26+
27+
28+
@pytest.fixture
29+
def mock_executor():
30+
return MagicMock(spec=execute_action)
31+
32+
33+
@pytest.mark.asyncio
34+
async def test_retry_empty_list(caplog, mock_queue):
35+
mock_queue.retrieve.return_value = {}
36+
37+
metrics = await retry_failed(queue=mock_queue)
38+
mock_queue.retrieve.assert_called_once()
39+
assert len(caplog.messages) == 0
40+
assert metrics == {
41+
"bug_count": 0,
42+
"events_processed": 0,
43+
"events_skipped": 0,
44+
"events_failed": 0,
45+
"bugs_failed": 0,
46+
}
47+
48+
49+
@pytest.mark.asyncio
50+
async def test_retry_success(caplog, mock_queue, mock_executor, queue_item_factory):
51+
mock_queue.retrieve.return_value = {
52+
1: aiter_sync([queue_item_factory(payload__bug__id=1)])
53+
}
54+
55+
metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
56+
assert len(caplog.messages) == 0 # no logs should have been generated
57+
mock_queue.retrieve.assert_called_once()
58+
mock_queue.done.assert_called_once() # item should be marked as complete
59+
mock_executor.assert_called_once() # item should have been processed
60+
assert metrics == {
61+
"bug_count": 1,
62+
"events_processed": 1,
63+
"events_skipped": 0,
64+
"events_failed": 0,
65+
"bugs_failed": 0,
66+
}
67+
68+
69+
@pytest.mark.asyncio
70+
async def test_retry_fail_and_skip(
71+
caplog, mock_queue, mock_executor, queue_item_factory
72+
):
73+
mock_queue.retrieve.return_value = {
74+
1: aiter_sync(
75+
[
76+
queue_item_factory(payload__bug__id=1),
77+
queue_item_factory(payload__bug__id=1),
78+
]
79+
)
80+
}
81+
82+
mock_executor.side_effect = Exception("Throwing an exception")
83+
mock_queue.size.return_value = 3
84+
85+
metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
86+
mock_queue.retrieve.assert_called_once()
87+
mock_queue.done.assert_not_called() # no items should have been marked as done
88+
assert caplog.text.count("failed to reprocess event") == 1
89+
assert caplog.text.count("skipping 2 event(s)") == 1
90+
assert caplog.text.count("removing expired event") == 0
91+
mock_executor.assert_called_once() # only one item should have been attempted to be processed
92+
assert metrics == {
93+
"bug_count": 1,
94+
"events_processed": 0,
95+
"events_skipped": 2,
96+
"events_failed": 1,
97+
"bugs_failed": 0,
98+
}
99+
100+
101+
@pytest.mark.asyncio
102+
async def test_retry_remove_expired(
103+
caplog, mock_queue, mock_executor, queue_item_factory
104+
):
105+
mock_queue.retrieve.return_value = {
106+
1: aiter_sync(
107+
[
108+
queue_item_factory(
109+
payload__bug__id=1,
110+
payload__event__time=datetime.now(UTC)
111+
- timedelta(days=int(RETRY_TIMEOUT_DAYS), seconds=1),
112+
),
113+
queue_item_factory(payload__bug__id=1),
114+
]
115+
)
116+
}
117+
118+
metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
119+
mock_queue.retrieve.assert_called_once()
120+
assert (
121+
len(mock_queue.done.call_args_list) == 2
122+
), "both items should have been marked as done"
123+
assert caplog.text.count("failed to reprocess event") == 0
124+
assert caplog.text.count("skipping events") == 0
125+
assert caplog.text.count("removing expired event") == 1
126+
mock_executor.assert_called_once() # only one item should have been attempted to be processed
127+
assert metrics == {
128+
"bug_count": 1,
129+
"events_processed": 1,
130+
"events_skipped": 1,
131+
"events_failed": 0,
132+
"bugs_failed": 0,
133+
}
134+
135+
136+
@pytest.mark.asyncio
137+
async def test_retry_bug_failed(caplog, mock_queue, mock_executor, queue_item_factory):
138+
mock_queue.retrieve.return_value = {
139+
1: aiter_sync([queue_item_factory(payload__bug__id=1)]),
140+
2: iter_error(),
141+
}
142+
143+
metrics = await retry_failed(item_executor=mock_executor, queue=mock_queue)
144+
mock_queue.retrieve.assert_called_once()
145+
mock_queue.done.assert_called_once() # one item should have been marked as done
146+
assert caplog.text.count("failed to reprocess event") == 0
147+
assert caplog.text.count("skipping events") == 0
148+
assert caplog.text.count("removing expired event") == 0
149+
assert caplog.text.count("failed to parse events for bug") == 1
150+
mock_executor.assert_called_once() # only one item should have been attempted to be processed
151+
assert metrics == {
152+
"bug_count": 2,
153+
"events_processed": 1,
154+
"events_skipped": 0,
155+
"events_failed": 0,
156+
"bugs_failed": 1,
157+
}

0 commit comments

Comments
 (0)