Skip to content

Commit 309d21c

Browse files
committed
feat: Implement Event-Driven Orchestration (Track B) with Celery and Security
1 parent 576670c commit 309d21c

15 files changed

Lines changed: 2682 additions & 0 deletions

EVENT_DRIVEN_IMPLEMENTATION.md

Lines changed: 582 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
"""
2+
Event-Driven Orchestration Demo
3+
4+
This script demonstrates the event-driven architecture implemented
5+
for Issue #190: Event-Driven Orchestration with Celery Integration.
6+
7+
Requirements:
8+
1. Redis server running (localhost:6379 or REDIS_URL env var)
9+
2. Celery worker running (celery -A worker worker --loglevel=info)
10+
11+
Usage:
12+
python examples/event_driven_demo.py
13+
"""
14+
15+
import asyncio
16+
import os
17+
import sys
18+
19+
# add src to path
20+
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src"))
21+
22+
from memu.app.service import MemoryService
23+
from memu.events import event_manager
24+
from memu.events.dispatcher import CeleryDispatcher
25+
from memu.events.setup import init_event_system
26+
27+
28+
def print_section(title: str):
29+
"""Print a formatted section header."""
30+
print("\n" + "=" * 60)
31+
print(f" {title}")
32+
print("=" * 60 + "\n")
33+
34+
35+
async def demo_event_driven_flow():
36+
"""
37+
Demonstrate the complete event-driven flow.
38+
39+
Flow:
40+
1. Initialize event system with CeleryDispatcher
41+
2. Emit memory operation event
42+
3. CeleryDispatcher catches event
43+
4. Dispatches to Celery worker
44+
5. Worker processes memory in background
45+
"""
46+
print_section("Event-Driven Orchestration Demo")
47+
48+
print("Step 1: Initializing event system...")
49+
init_event_system(celery=True)
50+
print(f"Event system initialized")
51+
print(f" - Registered dispatchers: {len(event_manager._dispatchers)}")
52+
print(f" - Supported events: {', '.join(event_manager.list_events())}")
53+
54+
print("\nStep 2: Creating MemoryService instance...")
55+
service = MemoryService()
56+
print("MemoryService created")
57+
58+
print("\nStep 3: Submitting memory for background processing...")
59+
print(" (Using event-driven dispatch)")
60+
61+
result = await service.memorize(
62+
resource_url="https://example.com/event-driven-demo.pdf",
63+
modality="document",
64+
user={"user_id": "demo_user_123"},
65+
background=True # triggers event emission
66+
)
67+
68+
print(f"Event emitted and dispatched")
69+
print(f" - Status: {result.get('status')}")
70+
print(f" - Message: {result.get('message')}")
71+
print(f" - Event: {result.get('event')}")
72+
print(f" - Resource: {result.get('resource_url')}")
73+
74+
print_section("Event Flow Visualization")
75+
print("1. MemoryService.memorize(background=True)")
76+
print(" ↓")
77+
print("2. EventManager.emit('on_memory_saved', data)")
78+
print(" ↓")
79+
print("3. CeleryDispatcher.on_memory_saved(data)")
80+
print(" ↓")
81+
print("4. process_memory_task.delay(...) → Redis Queue")
82+
print(" ↓")
83+
print("5. Celery Worker consumes task")
84+
print(" ↓")
85+
print("6. MemoryService.memorize() executes in background")
86+
87+
print_section("Custom Event Listener Demo")
88+
89+
custom_events = []
90+
91+
def custom_listener(data):
92+
"""Custom event listener for demonstration."""
93+
custom_events.append(data)
94+
print(f"Custom listener received event!")
95+
print(f" - Resource: {data.get('resource_url')}")
96+
print(f" - Modality: {data.get('modality')}")
97+
98+
# Register custom listener
99+
event_manager.on('on_memory_saved', custom_listener)
100+
print("Registered custom listener")
101+
102+
# Emit another event
103+
print("\nEmitting event with custom listener...")
104+
await service.memorize(
105+
resource_url="https://example.com/custom-listener-test.pdf",
106+
modality="document",
107+
user={"user_id": "demo_user_456"},
108+
background=True
109+
)
110+
111+
print(f"\nCustom listener captured {len(custom_events)} event(s)")
112+
113+
print_section("Dispatcher Status")
114+
for dispatcher in event_manager._dispatchers:
115+
if isinstance(dispatcher, CeleryDispatcher):
116+
print(f"CeleryDispatcher:")
117+
print(f" - Enabled: {dispatcher.enabled}")
118+
print(f" - Task Options: {dispatcher.task_options}")
119+
120+
print_section("Demo Complete")
121+
print("Event-driven orchestration system working correctly!")
122+
print("\nNext steps:")
123+
print("1. Start Celery worker: celery -A worker worker --loglevel=info")
124+
print("2. Check worker logs to see background task execution")
125+
print("3. Verify tasks are being processed in the background")
126+
127+
128+
if __name__ == "__main__":
129+
print("""
130+
MemU Event-Driven Orchestration Demo
131+
Issue #190: Event-Driven Orchestration with Celery
132+
""")
133+
134+
print("Prerequisites:")
135+
print(" Redis server running (default: localhost:6379)")
136+
print(" Celery worker running (celery -A worker worker --loglevel=info)")
137+
print("\nStarting demo...\n")
138+
139+
asyncio.run(demo_event_driven_flow())

src/memu/app/memorize.py

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,63 @@ async def memorize(
6868
resource_url: str,
6969
modality: str,
7070
user: dict[str, Any] | None = None,
71+
background: bool = False,
7172
) -> dict[str, Any]:
73+
"""
74+
Memorize a resource from a URL.
75+
76+
This method processes the resource either synchronously or asynchronously
77+
using the event-driven orchestration system.
78+
79+
Args:
80+
resource_url: URL of the resource to memorize
81+
modality: Type of content (document, image, video, audio, conversation)
82+
user: Optional user context
83+
background: If True, uses event-driven dispatch to Celery workers
84+
85+
Returns:
86+
Response dictionary with result or task status
87+
88+
Event-Driven Flow (background=True):
89+
1. Emit 'on_memory_saved' event with payload
90+
2. CeleryDispatcher catches event
91+
3. Dispatches to Celery worker (process_memory_task)
92+
4. Returns task_id for tracking
93+
"""
94+
if background:
95+
# Event-driven orchestration: emit event instead of direct task call
96+
from memu.events import event_manager
97+
98+
# prepare event payload
99+
event_data = {
100+
"resource_url": resource_url,
101+
"modality": modality,
102+
"user": user,
103+
}
104+
105+
# Emit event - CeleryDispatcher will handle async dispatch
106+
logger.info(
107+
"Emitting on_memory_saved event for background processing",
108+
extra={
109+
"resource_url": resource_url,
110+
"modality": modality,
111+
"user_id": user.get("user_id") if user else None
112+
}
113+
)
114+
115+
# Emit the event - any registered dispatchers will handle it
116+
event_manager.emit('on_memory_saved', event_data)
117+
118+
# Get task_id from CeleryDispatcher if available
119+
# (For now, return generic response - can enhance to track task_id)
120+
return {
121+
"status": "queued",
122+
"message": "Memory processing event dispatched to background workers",
123+
"event": "on_memory_saved",
124+
"resource_url": resource_url
125+
}
126+
127+
# Synchronous processing
72128
ctx = self._get_context()
73129
store = self._get_database()
74130
user_scope = self.user_model(**user).model_dump() if user is not None else None
@@ -92,6 +148,7 @@ async def memorize(
92148
if response is None:
93149
msg = "Memorize workflow failed to produce a response"
94150
raise RuntimeError(msg)
151+
95152
return response
96153

97154
def _build_memorize_workflow(self) -> list[WorkflowStep]:

src/memu/celery_app.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
import os
2+
from celery import Celery
3+
from celery.signals import setup_logging
4+
5+
# Redis configuration with optional authentication
6+
REDIS_PASSWORD = os.getenv("REDIS_PASSWORD", None)
7+
REDIS_HOST = os.getenv("REDIS_HOST", "localhost")
8+
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
9+
REDIS_DB = int(os.getenv("REDIS_DB", "0"))
10+
11+
if REDIS_PASSWORD:
12+
REDIS_URL = f"redis://:{REDIS_PASSWORD}@{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
13+
else:
14+
REDIS_URL = f"redis://{REDIS_HOST}:{REDIS_PORT}/{REDIS_DB}"
15+
16+
celery_app = Celery(
17+
"memu_worker",
18+
broker=REDIS_URL,
19+
backend=REDIS_URL,
20+
include=['memu.tasks']
21+
)
22+
23+
# Configure Celery logging
24+
@setup_logging.connect
25+
def setup_celery_logging_config(**kwargs):
26+
"""Override Celery's default logging with structured JSON logging."""
27+
from memu.task_utils.logging_config import configure_celery_logging
28+
29+
log_level = os.getenv("CELERY_LOG_LEVEL", "INFO")
30+
use_json = os.getenv("CELERY_LOG_JSON", "true").lower() == "true"
31+
log_file = os.getenv("CELERY_LOG_FILE", None)
32+
33+
configure_celery_logging(log_level, use_json, log_file)
34+
35+
36+
celery_app.conf.update(
37+
# Serialization
38+
task_serializer="json",
39+
accept_content=["json"],
40+
result_serializer="json",
41+
42+
# Timezone
43+
timezone="UTC",
44+
enable_utc=True,
45+
46+
# Task acknowledgment
47+
task_acks_late=True,
48+
task_acks_on_failure_or_timeout=True,
49+
task_reject_on_worker_lost=True,
50+
51+
# Timeouts (can be overridden per-task)
52+
task_soft_time_limit=int(os.getenv("CELERY_TASK_SOFT_TIME_LIMIT", "300")), # 5 minutes
53+
task_time_limit=int(os.getenv("CELERY_TASK_TIME_LIMIT", "360")), # 6 minutes
54+
55+
# Result backend settings
56+
result_expires=int(os.getenv("CELERY_RESULT_EXPIRES", "86400")), # 24 hours
57+
result_compression='gzip',
58+
result_extended=True, # Store task args in result
59+
60+
# Task tracking
61+
task_track_started=True,
62+
task_send_sent_event=True,
63+
task_store_eager_result=True, # For testing
64+
65+
# Worker settings
66+
worker_prefetch_multiplier=int(os.getenv("CELERY_WORKER_PREFETCH", "1")),
67+
worker_max_tasks_per_child=int(os.getenv("CELERY_WORKER_MAX_TASKS", "100")),
68+
worker_max_memory_per_child=int(os.getenv("CELERY_WORKER_MAX_MEMORY", "500000")), # 500MB
69+
70+
# Connection settings
71+
broker_connection_retry_on_startup=True,
72+
broker_pool_limit=int(os.getenv("CELERY_BROKER_POOL_LIMIT", "10")),
73+
74+
# Global rate limiting
75+
task_default_rate_limit=os.getenv("CELERY_TASK_RATE_LIMIT", "100/h"),
76+
)

src/memu/events/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""
2+
Event-driven orchestration system for MemU.
3+
4+
This module provides a flexible event hook system that allows external
5+
workers (Celery/Redis) to react to memory operations asynchronously.
6+
7+
Architecture:
8+
EventManager (hub) -> Dispatchers (listeners) -> Workers (Celery tasks)
9+
10+
Usage:
11+
# Simply import - CeleryDispatcher auto-registers
12+
from memu.events import event_manager
13+
14+
# Emit events
15+
event_manager.emit('on_memory_saved', {
16+
'resource_url': 'https://example.com/doc.pdf',
17+
'modality': 'document',
18+
'user': {'user_id': '123'}
19+
})
20+
"""
21+
22+
from .manager import EventManager, event_manager
23+
24+
# Import setup to trigger auto-initialization
25+
# This creates and registers the CeleryDispatcher automatically
26+
from . import setup # noqa: F401
27+
28+
__all__ = ["event_manager", "EventManager"]

0 commit comments

Comments
 (0)