Skip to content

Commit 5aaf5fd

Browse files
authored
[AI-6470] Add EventBusOrchestrator to ddev (#22394)
* Add EventBusOrchestrator to ddev * Fix formatting * Add changelog * Finalize threadpool executor implementation and property generic typing * Improvements * Small update on tests * Addres comments about naming, queue injection and test setup for clarity * Address comments in message pump finalization and error handling on task cancellations
1 parent 461b69f commit 5aaf5fd

File tree

6 files changed

+946
-0
lines changed

6 files changed

+946
-0
lines changed

ddev/changelog.d/22394.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add EventBusOrchestrator to ddev toolset
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# (C) Datadog, Inc. 2026-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# (C) Datadog, Inc. 2026-present
2+
# All rights reserved
3+
# Licensed under a 3-clause BSD style license (see LICENSE)
4+
from __future__ import annotations
5+
6+
from typing import TYPE_CHECKING
7+
8+
if TYPE_CHECKING:
9+
from .orchestrator import BaseMessage
10+
11+
12+
class ProcessorQueueError(Exception):
13+
"""
14+
Exception raised when a processor queue is not initialized.
15+
"""
16+
17+
pass
18+
19+
20+
class MessageProcessingError(Exception):
21+
"""
22+
Exception raised when a processor fails to process a message.
23+
"""
24+
25+
def __init__(self, processor_name: str, message: BaseMessage, original_exception: Exception):
26+
self.processor_name = processor_name
27+
self.message = message
28+
self.original_exception = original_exception
29+
super().__init__(
30+
f"Error processing message by processor '{processor_name}'. "
31+
f"Message: {message}. Original error: {original_exception}"
32+
)
33+
34+
35+
class ProcessorSuccessHookError(MessageProcessingError):
36+
"""
37+
Exception raised when the on_success hook of a processor fails.
38+
"""
39+
40+
def __init__(self, processor_name: str, message: BaseMessage, original_exception: Exception):
41+
super(Exception, self).__init__(
42+
f"Error in 'on_success' hook for processor '{processor_name}'. "
43+
f"Message: {message}. "
44+
f"Original error: {original_exception}"
45+
)
46+
self.processor_name = processor_name
47+
self.message = message
48+
self.original_exception = original_exception
49+
50+
51+
class FatalProcessingError(Exception):
52+
"""
53+
Raised by hooks or processors to signal that the Orchestrator should
54+
stop processing immediately and shutdown gracefully.
55+
"""
56+
57+
pass

0 commit comments

Comments
 (0)