Skip to content

Commit 323be99

Browse files
grahamalamaleplatremalexcottner
authored
Add Dead Letter Queue class, integrate into heartbeat (#918)
This commit adds the classes that allow us to manage our "Dead Letter Queue". See #775 for motivation. We also add a basic heartbeat check in this commit to assert that we are able to read from / write to the queue. In future commits, we'll introduce a method to process items in the queue and fully integrate the queue into the `/bugzilla_webhook` route. --------- Co-authored-by: Mathieu Leplatre <[email protected]> Co-authored-by: Alex Cottner <[email protected]>
1 parent 25014c4 commit 323be99

File tree

13 files changed

+707
-10
lines changed

13 files changed

+707
-10
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ LOG_LEVEL=debug
55
APP_RELOAD=True
66
APP_DEBUG=True
77
JBI_API_KEY="fake_api_key"
8+
DL_QUEUE_DSN=file:///tmp
89

910
# Jira API Secrets
1011
JIRA_USERNAME="fake_jira_username"

.secrets.baseline

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -113,16 +113,16 @@
113113
"filename": ".env.example",
114114
"hashed_secret": "4b9a4ce92b6a01a4cd6ee1672d31c043f2ae79ab",
115115
"is_verified": false,
116-
"line_number": 11
116+
"line_number": 12
117117
},
118118
{
119119
"type": "Secret Keyword",
120120
"filename": ".env.example",
121121
"hashed_secret": "77ea6398f252999314d609a708842a49fc43e055",
122122
"is_verified": false,
123-
"line_number": 14
123+
"line_number": 15
124124
}
125125
]
126126
},
127-
"generated_at": "2024-01-29T20:59:20Z"
127+
"generated_at": "2024-03-20T18:05:17Z"
128128
}

jbi/app.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,8 @@
2121
from fastapi.responses import JSONResponse
2222
from fastapi.staticfiles import StaticFiles
2323

24-
import jbi.jira
24+
import jbi
25+
import jbi.queue
2526
from jbi.configuration import ACTIONS
2627
from jbi.environment import get_settings
2728
from jbi.log import CONFIG
@@ -60,6 +61,7 @@ def traces_sampler(sampling_context: dict[str, Any]) -> float:
6061
async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
6162
jira_service = jbi.jira.get_service()
6263
bugzilla_service = jbi.bugzilla.get_service()
64+
queue = jbi.queue.get_dl_queue()
6365

6466
checks.register(bugzilla_service.check_bugzilla_connection, name="bugzilla.up")
6567
checks.register(
@@ -89,6 +91,7 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
8991
name="jira.all_project_issue_types_exist",
9092
)
9193
checks.register(jira_service.check_jira_pandoc_install, name="jira.pandoc_install")
94+
checks.register(queue.ready, name="queue.ready")
9295

9396
yield
9497

jbi/bugzilla/models.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ class WebhookEvent(BaseModel, frozen=True):
3131
"""Bugzilla Event Object"""
3232

3333
action: str
34-
time: Optional[datetime.datetime] = None
34+
time: datetime.datetime
3535
user: Optional[WebhookUser] = None
3636
changes: Optional[list[WebhookEventChange]] = None
3737
target: Optional[str] = None

jbi/environment.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from functools import lru_cache
88
from typing import Optional
99

10-
from pydantic import AnyUrl
10+
from pydantic import AnyUrl, FileUrl
1111
from pydantic_settings import BaseSettings, SettingsConfigDict
1212

1313

@@ -48,6 +48,8 @@ class Settings(BaseSettings):
4848
sentry_dsn: Optional[AnyUrl] = None
4949
sentry_traces_sample_rate: float = 1.0
5050

51+
dl_queue_dsn: FileUrl
52+
5153
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8")
5254

5355

jbi/queue.py

Lines changed: 309 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,309 @@
1+
"""This `queue` module stores Bugzilla webhook messages that we failed to sync
2+
to Jira.
3+
4+
As Bugzilla sends us webhook messages, we want to eagerly accept them and
5+
return a `200` response so that we don't prevent it from sending new messages.
6+
But if we fail to sync a bug, we want to keep the message so we can retry it
7+
later. We also want to store any messages that might be successfuly synced, but
8+
were preceded by a message that wasn't synced.
9+
10+
Classes:
11+
- QueueItem: An entry in the dead letter queue, containing information
12+
about the payload, timestamp, and any associated errors when attempting
13+
to sync the bug.
14+
- PythonException: Information about any exception that occured when
15+
syncing a bug, stored along with the item.
16+
- DeadLetterQueue: Class representing the dead letter queue system, providing methods
17+
for adding, retrieving, and managing queue items. Supports pluggable backends.
18+
- QueueBackend: Abstract base class defining the interface for a DeadLetterQueue backend.
19+
- FileBackend: Implementation of a QueueBackend that stores messages in files.
20+
- InvalidQueueDSNError: Exception raised when an invalid queue DSN is provided.
21+
- QueueItemRetrievalError: Exception raised when the queue is unable to retreive a failed
22+
item and parse it as an item
23+
"""
24+
25+
import logging
26+
import tempfile
27+
import traceback
28+
from abc import ABC, abstractmethod
29+
from datetime import datetime
30+
from functools import lru_cache
31+
from json import JSONDecodeError
32+
from pathlib import Path
33+
from typing import AsyncIterator, List, Optional
34+
from urllib.parse import ParseResult, urlparse
35+
36+
import dockerflow.checks
37+
from pydantic import BaseModel, FileUrl, ValidationError
38+
39+
from jbi import bugzilla
40+
from jbi.environment import get_settings
41+
42+
logger = logging.getLogger(__name__)
43+
44+
45+
class QueueItemRetrievalError(Exception):
46+
pass
47+
48+
49+
class InvalidQueueDSNError(Exception):
50+
pass
51+
52+
53+
class PythonException(BaseModel, frozen=True):
54+
type: str
55+
description: str
56+
details: str
57+
58+
@classmethod
59+
def from_exc(cls, exc: Exception):
60+
return PythonException(
61+
type=exc.__class__.__name__,
62+
description=str(exc),
63+
details="".join(traceback.format_exception(exc)),
64+
)
65+
66+
67+
class QueueItem(BaseModel, frozen=True):
68+
"""Dead Letter Queue entry."""
69+
70+
payload: bugzilla.WebhookRequest
71+
error: Optional[PythonException] = None
72+
73+
@property
74+
def timestamp(self) -> datetime:
75+
return self.payload.event.time
76+
77+
@property
78+
def identifier(self):
79+
return f"{self.payload.event.time}-{self.payload.bug.id}-{self.payload.event.action}-{"error" if self.error else "postponed"}"
80+
81+
82+
@lru_cache(maxsize=1)
83+
def get_dl_queue():
84+
settings = get_settings()
85+
return DeadLetterQueue(settings.dl_queue_dsn)
86+
87+
88+
class QueueBackend(ABC):
89+
"""An interface for dead letter queues."""
90+
91+
@abstractmethod
92+
def ping(self) -> bool:
93+
"""Report if the queue backend is available and ready to be written to"""
94+
pass
95+
96+
@abstractmethod
97+
async def clear(self) -> None:
98+
"""Remove all bugs and their items from the queue"""
99+
pass
100+
101+
@abstractmethod
102+
async def put(self, item: QueueItem) -> None:
103+
"""Insert item into queued items for a bug, maintaining sorted order by
104+
payload event time ascending
105+
"""
106+
pass
107+
108+
@abstractmethod
109+
async def remove(self, bug_id: int, identifier: str) -> None:
110+
"""Remove an item from the target bug's queue. If the item is the last
111+
one for the bug, remove the bug from the queue entirely.
112+
"""
113+
pass
114+
115+
@abstractmethod
116+
def get(self, bug_id: int) -> AsyncIterator[QueueItem]:
117+
"""Retrieve all of the queue items for a specific bug, sorted in
118+
ascending order by the timestamp of the payload event.
119+
"""
120+
pass
121+
122+
@abstractmethod
123+
async def list(self, bug_id: int) -> List[str]:
124+
"""Report a summary of all of the items in the queue for a bug
125+
126+
Returns:
127+
a dict bug id, list of item identifier
128+
"""
129+
pass
130+
131+
@abstractmethod
132+
async def list_all(self) -> dict[int, List[str]]:
133+
"""Report a summary of all of the items in the queue
134+
135+
Returns:
136+
a dict bug id, list of item identifiers
137+
"""
138+
pass
139+
140+
@abstractmethod
141+
async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]:
142+
"""Retrieve all items in the queue, grouped by bug
143+
144+
Returns:
145+
dict[int, List[QueueItem]]: Returns a dict of
146+
{bug_id: list of events}. Each list of events sorted in ascending
147+
order by the timestamp of the payload event.
148+
"""
149+
pass
150+
151+
@abstractmethod
152+
async def size(self, bug_id: Optional[int] = None) -> int:
153+
"""Report the number of items in the queue, optionally filtered by bug id"""
154+
pass
155+
156+
157+
class FileBackend(QueueBackend):
158+
def __init__(self, location):
159+
self.location = Path(location)
160+
self.location.mkdir(parents=True, exist_ok=True)
161+
162+
def ping(self):
163+
try:
164+
with tempfile.TemporaryDirectory(dir=self.location) as temp_dir:
165+
with tempfile.TemporaryFile(dir=temp_dir) as f:
166+
f.write(b"")
167+
return True
168+
except Exception:
169+
logger.exception("Could not write to file backed queue")
170+
return False
171+
172+
async def clear(self):
173+
for root, dirs, files in self.location.walk(top_down=False):
174+
for name in files:
175+
(root / name).unlink()
176+
for name in dirs:
177+
(root / name).rmdir()
178+
179+
async def put(self, item: QueueItem):
180+
folder = self.location / f"{item.payload.bug.id}"
181+
folder.mkdir(exist_ok=True)
182+
path = folder / (item.identifier + ".json")
183+
path.write_text(item.model_dump_json())
184+
logger.debug(
185+
"Wrote item %s for bug %s to path %s",
186+
item.identifier,
187+
item.payload.bug.id,
188+
path,
189+
)
190+
logger.debug("%d items in dead letter queue", await self.size())
191+
192+
async def remove(self, bug_id: int, identifier: str):
193+
bug_dir = self.location / f"{bug_id}"
194+
item_path = bug_dir / (identifier + ".json")
195+
item_path.unlink(missing_ok=True)
196+
logger.debug("Removed %s from queue for bug %s", identifier, bug_id)
197+
if not any(bug_dir.iterdir()):
198+
bug_dir.rmdir()
199+
logger.debug("Removed directory for bug %s", bug_id)
200+
201+
async def list(self, bug_id: int) -> List[str]:
202+
bug_dir = self.location / str(bug_id)
203+
return [path.stem for path in sorted(bug_dir.glob("*.json"))]
204+
205+
async def list_all(self) -> dict[int, List[str]]:
206+
item_data: dict[int, List[str]] = {}
207+
for filesystem_object in self.location.iterdir():
208+
if filesystem_object.is_dir():
209+
bug_id = int(filesystem_object.name)
210+
item_ids = await self.list(bug_id=bug_id)
211+
item_data[bug_id] = item_ids
212+
return item_data
213+
214+
async def get(self, bug_id: int) -> AsyncIterator[QueueItem]:
215+
folder = self.location / str(bug_id)
216+
if not folder.is_dir():
217+
return
218+
yield
219+
for path in sorted(folder.iterdir()):
220+
try:
221+
yield QueueItem.parse_file(path)
222+
except (JSONDecodeError, ValidationError) as e:
223+
raise QueueItemRetrievalError(
224+
"Unable to load item at path %s from queue", str(path)
225+
) from e
226+
227+
async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]:
228+
all_items: dict[int, AsyncIterator[QueueItem]] = {}
229+
for filesystem_object in self.location.iterdir():
230+
if filesystem_object.is_dir():
231+
all_items[int(filesystem_object.name)] = self.get(filesystem_object)
232+
return all_items
233+
234+
async def size(self, bug_id=None) -> int:
235+
location = self.location / str(bug_id) if bug_id else self.location
236+
return sum(1 for _ in location.rglob("*.json"))
237+
238+
239+
class DeadLetterQueue:
240+
backend: QueueBackend
241+
242+
def __init__(self, dsn: FileUrl | str | ParseResult):
243+
dsn = urlparse(url=dsn) if isinstance(dsn, str) else dsn
244+
245+
if dsn.scheme != "file":
246+
raise InvalidQueueDSNError(f"{dsn.scheme} is not supported")
247+
self.backend = FileBackend(dsn.path)
248+
249+
def ready(self) -> list[dockerflow.checks.CheckMessage]:
250+
"""Heartbeat check to assert we can write items to queue
251+
252+
TODO: Convert to an async method when Dockerflow's FastAPI integration
253+
can run check asynchronously
254+
"""
255+
256+
ping_result = self.backend.ping()
257+
if ping_result is False:
258+
return [
259+
dockerflow.checks.Error(
260+
f"queue with f{str(self.backend)} backend unavailable"
261+
)
262+
]
263+
return []
264+
265+
async def postpone(self, payload: bugzilla.WebhookRequest) -> None:
266+
"""
267+
Postpone the specified request for later.
268+
"""
269+
item = QueueItem(payload=payload)
270+
await self.backend.put(item)
271+
272+
async def track_failed(
273+
self, payload: bugzilla.WebhookRequest, exc: Exception
274+
) -> None:
275+
"""
276+
Store the specified payload and exception information into the queue.
277+
"""
278+
item = QueueItem(
279+
payload=payload,
280+
error=PythonException.from_exc(exc),
281+
)
282+
await self.backend.put(item)
283+
284+
async def is_blocked(self, payload: bugzilla.WebhookRequest) -> bool:
285+
"""
286+
Return `True` if the specified `payload` is blocked and should be
287+
queued instead of being processed.
288+
"""
289+
existing = await self.backend.size(payload.bug.id)
290+
return existing > 0
291+
292+
async def retrieve(self) -> dict[int, AsyncIterator[QueueItem]]:
293+
"""
294+
Returns the whole queue -- a dict of bug_id and a generator for the
295+
items for that bug
296+
"""
297+
return await self.backend.get_all()
298+
299+
async def list(self, bug_id: int) -> List[str]:
300+
return await self.backend.list(bug_id=bug_id)
301+
302+
async def list_all(self) -> dict[int, List[str]]:
303+
return await self.backend.list_all()
304+
305+
async def done(self, item: QueueItem) -> None:
306+
"""
307+
Mark item as done, remove from queue.
308+
"""
309+
return await self.backend.remove(item.payload.bug.id, item.identifier)

0 commit comments

Comments
 (0)