diff --git a/jbi/queue.py b/jbi/queue.py index 40950e22..7d2aa6a9 100644 --- a/jbi/queue.py +++ b/jbi/queue.py @@ -43,6 +43,19 @@ logger = logging.getLogger(__name__) +ITEM_ID_PATTERN = re.compile( + r"(?P\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\+\d{2}:\d{2})-(?P\d+)-(?P\w*)-(?Perror|postponed)" +) + + +def extract_bug_id_from_item_id(item_id: str) -> str: + if match := re.search(ITEM_ID_PATTERN, item_id): + return match.group("bug_id") + raise ValueError( + "item_id %s did not match expected format: %s", item_id, ITEM_ID_PATTERN.pattern + ) + + class QueueItemRetrievalError(Exception): def __init__(self, message=None, path=None): self.message = message or "Error reading or parsing queue item" @@ -127,6 +140,13 @@ def get(self, bug_id: int) -> AsyncIterator[QueueItem]: """ pass + @abstractmethod + async def exists(self, item_id: str) -> bool: + """ + Report whether an item with id `item_id` exists in the queue + """ + pass + @abstractmethod async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]: """Retrieve all items in the queue, grouped by bug @@ -197,6 +217,19 @@ async def remove(self, bug_id: int, identifier: str): bug_dir.rmdir() logger.debug("Removed directory for bug %s", bug_id) + async def exists(self, item_id: str) -> bool: + try: + bug_id = extract_bug_id_from_item_id(item_id) + except ValueError as e: + logger.warning( + "provided item_id %s did not match expected format", item_id, exc_info=e + ) + return False + + item_path = (self.location / bug_id / item_id).with_suffix(".json") + # even though pathlib.Path.exists() returns a bool, mypy doesn't seem to get it + return bool(item_path.exists()) + async def get(self, bug_id: int) -> AsyncIterator[QueueItem]: folder = self.location / str(bug_id) if not folder.is_dir(): @@ -214,7 +247,7 @@ async def get_all(self) -> dict[int, AsyncIterator[QueueItem]]: all_items: dict[int, AsyncIterator[QueueItem]] = {} for filesystem_object in self.location.iterdir(): if filesystem_object.is_dir() and re.match( - "\d", filesystem_object.name + r"\d", filesystem_object.name ): # filtering out temp files from checks all_items[int(filesystem_object.name)] = self.get(filesystem_object) return all_items @@ -320,3 +353,16 @@ async def done(self, item: QueueItem) -> None: Mark item as done, remove from queue. """ return await self.backend.remove(item.payload.bug.id, item.identifier) + + async def exists(self, item_id) -> bool: + """ + Report whether an item with id `item_id` exists in the queue + """ + return await self.backend.exists(item_id) + + async def delete(self, item_id) -> None: + """ + Remove an item from the queue by item_id + """ + bug_id = extract_bug_id_from_item_id(item_id) + await self.backend.remove(bug_id=int(bug_id), identifier=item_id) diff --git a/jbi/router.py b/jbi/router.py index 7e4b4488..910f8ae9 100644 --- a/jbi/router.py +++ b/jbi/router.py @@ -101,6 +101,19 @@ async def inspect_dl_queue(queue: Annotated[DeadLetterQueue, Depends(get_dl_queu return results +@router.delete("/dl_queue/{item_id}", dependencies=[Depends(api_key_auth)]) +async def delete_queue_item_by_id( + item_id: str, queue: Annotated[DeadLetterQueue, Depends(get_dl_queue)] +): + item_exists = await queue.exists(item_id) + if item_exists: + await queue.delete(item_id) + else: + raise HTTPException( + status_code=404, detail=f"Item {item_id} not found in queue" + ) + + @router.get( "/whiteboard_tags/", dependencies=[Depends(api_key_auth)], diff --git a/tests/fixtures/factories.py b/tests/fixtures/factories.py index 992dbec0..3909d45e 100644 --- a/tests/fixtures/factories.py +++ b/tests/fixtures/factories.py @@ -115,7 +115,7 @@ class Meta: changes = None routing_key = "bug.create" target = "bug" - time = factory.LazyFunction(lambda: datetime.now(UTC)) + time = factory.LazyFunction(lambda: datetime.now(UTC).isoformat(timespec="seconds")) user = factory.SubFactory(WebhookUserFactory) diff --git a/tests/unit/jira/test_queue.py b/tests/unit/test_queue.py similarity index 95% rename from tests/unit/jira/test_queue.py rename to tests/unit/test_queue.py index d10c2151..e3bbbe58 100644 --- a/tests/unit/jira/test_queue.py +++ b/tests/unit/test_queue.py @@ -341,3 +341,22 @@ async def test_done(queue: DeadLetterQueue, queue_item_factory): await queue.done(item) assert await queue.backend.size() == 0 + + +@pytest.mark.asyncio +async def test_delete(queue: DeadLetterQueue, queue_item_factory): + item = queue_item_factory() + + await queue.backend.put(item) + assert await queue.backend.size() == 1 + + await queue.delete(item.identifier) + assert await queue.backend.size() == 0 + + +@pytest.mark.asyncio +async def test_exists(queue: DeadLetterQueue, queue_item_factory): + item = queue_item_factory() + + await queue.backend.put(item) + assert await queue.exists(item.identifier) is True diff --git a/tests/unit/test_router.py b/tests/unit/test_router.py index 633927b8..0b0af2e8 100644 --- a/tests/unit/test_router.py +++ b/tests/unit/test_router.py @@ -110,6 +110,43 @@ async def test_dl_queue_endpoint( } +@pytest.mark.asyncio +async def test_delete_queue_item_by_id( + dl_queue, authenticated_client, webhook_request_factory +): + item = webhook_request_factory(event__time=datetime(1982, 5, 8, 9, 10)) + await dl_queue.track_failed(item, Exception("boom"), rid="rid") + + resp = authenticated_client.delete( + "/dl_queue/1982-05-08%2009:10:00+00:00-654321-create-error" + ) + assert resp.status_code == 200 + + +@pytest.mark.asyncio +async def test_delete_queue_item_by_id_item_doesnt_exist( + dl_queue, authenticated_client, webhook_request_factory +): + item = webhook_request_factory(event__time=datetime(1982, 5, 8, 9, 10)) + await dl_queue.track_failed(item, Exception("boom"), rid="rid") + + resp = authenticated_client.delete("/dl_queue/return-a-404-4-me") + assert resp.status_code == 404 + + +@pytest.mark.asyncio +async def test_delete_queue_item_by_id_requires_authn( + dl_queue, anon_client, webhook_request_factory +): + item = webhook_request_factory(event__time=datetime(1982, 5, 8, 9, 10)) + await dl_queue.track_failed(item, Exception("boom"), rid="rid") + + resp = anon_client.delete( + "/dl_queue/1982-05-08%2009:10:00+00:00-654321-create-error" + ) + assert resp.status_code == 401 + + def test_powered_by_jbi(exclude_middleware, authenticated_client): resp = authenticated_client.get("/powered_by_jbi/") html = resp.text