Skip to content

Commit e075d2a

Browse files
authored
Add a new events app (#199)
1 parent 07f0515 commit e075d2a

File tree

8 files changed

+88
-54
lines changed

8 files changed

+88
-54
lines changed

llmstack/events/__init__.py

Whitespace-only changes.

llmstack/events/apis.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import importlib
2+
import logging
3+
import uuid
4+
5+
from django.conf import settings
6+
from rest_framework import viewsets
7+
8+
from llmstack.jobs.adhoc import ProcessingJob
9+
10+
logger = logging.getLogger(__name__)
11+
12+
13+
class EventProcessingJob(ProcessingJob):
14+
@classmethod
15+
def generate_job_id(cls):
16+
return "{}".format(str(uuid.uuid4()))
17+
18+
19+
class EventsViewSet(viewsets.ViewSet):
20+
def create(self, topic, event_data):
21+
if topic in settings.EVENT_TOPIC_MAPPING:
22+
for processor_fn_name in settings.EVENT_TOPIC_MAPPING[topic]:
23+
module_name = ".".join(processor_fn_name.split(".")[:-1])
24+
fn_name = processor_fn_name.split(".")[-1]
25+
module = importlib.import_module(module_name)
26+
fn = getattr(module, fn_name)
27+
28+
EventProcessingJob.create(func=fn, args=[event_data]).add_to_queue()

llmstack/events/apps.py

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
import importlib
2+
3+
from django.apps import AppConfig
4+
5+
6+
class EventsConfig(AppConfig):
7+
name = "llmstack.events"
8+
label = "events"
9+
10+
def ready(self) -> None:
11+
from django.conf import settings
12+
13+
# Ensure Event topic mapping is correctly configured in settings, if not make it empty
14+
if not hasattr(settings, "EVENT_TOPIC_MAPPING"):
15+
setattr(settings, "EVENT_TOPIC_MAPPING", {})
16+
17+
for topic in settings.EVENT_TOPIC_MAPPING:
18+
if not isinstance(settings.EVENT_TOPIC_MAPPING[topic], list):
19+
raise ValueError(f"EVENT_TOPIC_MAPPING[{topic}] must be a list of processor functions")
20+
for processor_fn_name in settings.EVENT_TOPIC_MAPPING[topic]:
21+
try:
22+
module_name = ".".join(processor_fn_name.split(".")[:-1])
23+
fn_name = processor_fn_name.split(".")[-1]
24+
module = importlib.import_module(module_name)
25+
if not callable(getattr(module, fn_name)):
26+
raise ValueError(
27+
f"EVENT_TOPIC_MAPPING[{topic}] contains an invalid processor function: {processor_fn_name}"
28+
)
29+
except ImportError:
30+
raise ValueError(
31+
f"EVENT_TOPIC_MAPPING[{topic}] contains an invalid processor function: {processor_fn_name}"
32+
)
33+
34+
return super().ready()

llmstack/events/consumers/__init__.py

Whitespace-only changes.

llmstack/processors/tasks.py renamed to llmstack/events/consumers/app_run_finished.py

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,10 @@
22

33
from llmstack.processors.models import RunEntry
44

5-
from .history import HistoryStore
6-
75
logger = logging.getLogger(__name__)
86

97

10-
def persist_history_task_internal(processors, bookkeeping_data_map):
11-
if "input" not in bookkeeping_data_map or (
12-
"output" not in bookkeeping_data_map and "agent" not in bookkeeping_data_map
13-
):
14-
logger.error(
15-
f"Could not persist history {bookkeeping_data_map} for {processors} because input or output is missing",
16-
)
17-
return
18-
8+
def _process_app_run_data(processors, bookkeeping_data_map):
199
input = bookkeeping_data_map["input"]["run_data"]
2010
output = (
2111
bookkeeping_data_map["agent"]["run_data"]
@@ -118,17 +108,19 @@ def persist_history_task_internal(processors, bookkeeping_data_map):
118108
processor_runs=processor_runs,
119109
platform_data=platform_data,
120110
)
111+
return input, processor_runs, run_entry
121112

122-
# Persist history
123-
HistoryStore.persist(
124-
run_entry,
125-
processor_runs=processor_runs,
126-
input=input,
127-
)
128113

114+
def persist_app_run_history(event_data):
115+
processors = event_data.get("processors", [])
116+
bookkeeping_data_map = event_data.get("bookkeeping_data_map", {})
117+
if "input" not in bookkeeping_data_map or (
118+
"output" not in bookkeeping_data_map and "agent" not in bookkeeping_data_map
119+
):
120+
logger.error(
121+
f"Could not persist history {bookkeeping_data_map} for {processors} because input or output is missing",
122+
)
123+
return
129124

130-
def persist_history_task(processors, bookkeeping_data_map):
131-
try:
132-
persist_history_task_internal(processors, bookkeeping_data_map)
133-
except Exception as e:
134-
logger.error(f"Error persisting history: {e}")
125+
input, processor_runs, run_entry = _process_app_run_data(processors, bookkeeping_data_map)
126+
run_entry.save()

llmstack/play/actors/bookkeeping.py

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@
22
from typing import Any
33

44
from llmstack.apps.app_session_utils import save_app_session_data
5-
from llmstack.jobs.adhoc import HistoryPersistenceJob
5+
from llmstack.events.apis import EventsViewSet
66
from llmstack.play.actor import Actor
77
from llmstack.play.output_stream import Message, MessageType
8-
from llmstack.processors.tasks import persist_history_task
98

109
logger = logging.getLogger(__name__)
1110

@@ -81,16 +80,13 @@ def on_stop(self) -> None:
8180
):
8281
logger.info("Not persisting history since disable_history is set to True")
8382
return super().on_stop()
84-
85-
HistoryPersistenceJob.create(
86-
func=persist_history_task,
87-
args=[
88-
list(
89-
self._processor_configs.keys(),
90-
),
91-
self._bookkeeping_data_map,
92-
],
93-
).add_to_queue()
83+
EventsViewSet().create(
84+
"app.run.finished",
85+
{
86+
"processors": list(self._processor_configs.keys()),
87+
"bookkeeping_data_map": self._bookkeeping_data_map,
88+
},
89+
)
9490
except Exception as e:
9591
logger.error(f"Error adding history persistence job: {e}")
9692
return super().on_stop()

llmstack/processors/history.py

Lines changed: 0 additions & 19 deletions
This file was deleted.

llmstack/server/settings.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,6 @@
378378
INDEX_VIEW_MODULE = "llmstack.base.views"
379379
EMAIL_SENDER_CLASS = "llmstack.emails.sender.DefaultEmailSender"
380380
EMAIL_TEMPLATE_FACTORY_CLASS = "llmstack.emails.templates.factory.DefaultEmailTemplateFactory"
381-
HISTORY_STORE_CLASS = "llmstack.processors.history.DefaultHistoryStore"
382381
FLAG_SOURCES = ["llmstack.base.flags.FlagSource"]
383382

384383
# Make sure name and slug are unique
@@ -573,6 +572,10 @@
573572
},
574573
}
575574

575+
EVENT_TOPIC_MAPPING = {
576+
"app.run.finished": ["llmstack.events.consumers.app_run_finished.persist_app_run_history"],
577+
}
578+
576579
ANONYMOUS_USER_RATELIMIT = os.getenv("ANONYMOUS_USER_RATELIMIT", None)
577580

578581
LIMITER_MODULE = "llmstack.server.limiter"

0 commit comments

Comments
 (0)