Skip to content

Commit ba294e0

Browse files
authored
Add processor_runs objref, delete unused model, add logs for pipeline (#265)
1 parent db02e56 commit ba294e0

File tree

8 files changed

+109
-68
lines changed

8 files changed

+109
-68
lines changed

llmstack/assets/utils.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@
33
logger = logging.getLogger(__name__)
44

55

6-
def get_asset_by_objref(objref, request_user, request_session):
6+
def get_asset_by_objref_internal(objref):
77
"""
8-
Get asset by objref if one exists and is accessible by the user.
8+
Get asset by objref if one exists.
99
"""
1010
from llmstack.apps.models import AppDataAssets, AppSessionFiles
1111
from llmstack.data.models import DataSourceEntryFiles
@@ -29,10 +29,18 @@ def get_asset_by_objref(objref, request_user, request_session):
2929
return None
3030

3131
asset = model_cls.objects.get(uuid=uuid)
32-
33-
if not asset or not asset.is_accessible(request_user, request_session):
34-
return None
3532
except Exception as e:
3633
logger.error(f"Error retrieving asset: {e}")
3734

3835
return asset
36+
37+
38+
def get_asset_by_objref(objref, request_user, request_session):
39+
"""
40+
Get asset by objref if one exists and is accessible by the user.
41+
"""
42+
asset = get_asset_by_objref_internal(objref)
43+
if not asset or not asset.is_accessible(request_user, request_session):
44+
return None
45+
46+
return asset

llmstack/data/pipeline.py

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,13 @@ def __init__(self, datasource: DataSource):
2222
self.datasource = datasource
2323
self._source_cls = self.datasource.pipeline_obj.source_cls
2424
self._destination_cls = self.datasource.pipeline_obj.destination_cls
25+
logger.debug("Initializing DataIngestionPipeline")
2526

2627
self._destination = None
2728
self._transformations = self.datasource.pipeline_obj.transformation_objs
2829
embedding_cls = self.datasource.pipeline_obj.embedding_cls
2930
if embedding_cls:
31+
logger.debug("Initializing DataIngestionPipeline Transformation")
3032
embedding_additional_kwargs = {
3133
**self.datasource.pipeline_obj.embedding.data.get("additional_kwargs", {}),
3234
**{"datasource": datasource},
@@ -39,29 +41,29 @@ def __init__(self, datasource: DataSource):
3941
}
4042
)
4143
)
44+
logger.debug("Finished Initializing DataIngestionPipeline Transformation")
4245

4346
if self._destination_cls:
47+
logger.debug("Initializing DataIngestionPipeline Destination")
4448
self._destination = self._destination_cls(**self.datasource.pipeline_obj.destination_data)
4549
self._destination.initialize_client(datasource=self.datasource, create_collection=True)
50+
logger.debug("Finished Initializing DataIngestionPipeline Destination")
4651

4752
def process(self, document: DataDocument) -> DataDocument:
53+
logger.debug(f"Processing document: {document.name}")
4854
document = self._source_cls.process_document(document)
49-
if self.datasource.pipeline_obj.embedding:
50-
embedding_data = self.datasource.pipeline_obj.embedding.data
51-
embedding_data["additional_kwargs"] = {
52-
**embedding_data.get("additional_kwargs", {}),
53-
**{"datasource": self.datasource},
54-
}
55-
embedding_transformer = self.datasource.pipeline_obj.embedding_cls(**embedding_data)
56-
self._transformations.append(embedding_transformer)
57-
55+
logger.debug(f"Creating IngestionPipeline for document: {document.name}")
5856
ingestion_pipeline = IngestionPipeline(transformations=self._transformations)
5957
ldoc = LlamaDocumentShim(**document.model_dump())
6058
ldoc.metadata = {**ldoc.metadata, **document.metadata}
59+
logger.debug(f"Running IngestionPipeline for document: {document.name}")
6160
document.nodes = ingestion_pipeline.run(documents=[ldoc])
61+
logger.debug(f"Finished running IngestionPipeline for document: {document.name}")
6262
document.node_ids = list(map(lambda x: x.id_, document.nodes))
6363
if self._destination:
64+
logger.debug(f"Adding document: {document.name} to destination")
6465
self._destination.add(document=document)
66+
logger.debug(f"Finished adding document: {document.name} to destination")
6567

6668
return document
6769

@@ -83,39 +85,50 @@ def __init__(self, datasource: DataSource):
8385
self._destination_cls = self.datasource.pipeline_obj.destination_cls
8486
self._destination = None
8587
self._embedding_generator = None
88+
logger.debug("Initializing DataQueryPipeline")
8689

8790
if self._destination_cls:
91+
logger.debug("Initializing DataQueryPipeline Destination")
8892
self._destination = self._destination_cls(**self.datasource.pipeline_obj.destination_data)
8993
self._destination.initialize_client(datasource=self.datasource, create_collection=False)
94+
logger.debug("Finished Initializing DataQueryPipeline Destination")
9095

9196
if self.datasource.pipeline_obj.embedding:
97+
logger.debug("Initializing DataQueryPipeline Embedding")
9298
embedding_data = self.datasource.pipeline_obj.embedding.data
9399
embedding_data["additional_kwargs"] = {
94100
**embedding_data.get("additional_kwargs", {}),
95101
**{"datasource": self.datasource},
96102
}
97103
self._embedding_generator = self.datasource.pipeline_obj.embedding_cls(**embedding_data)
104+
logger.debug("Finished Initializing DataQueryPipeline Embedding")
98105

99106
def search(self, query: str, use_hybrid_search=True, **kwargs) -> List[dict]:
100107
content_key = self.datasource.destination_text_content_key
101108
query_embedding = None
102109

110+
logger.debug(f"Initializing Search for query: {query}")
111+
103112
if kwargs.get("search_filters", None):
104113
raise NotImplementedError("Search filters are not supported for this data source.")
105114

106115
documents = []
107116

108117
if self._embedding_generator:
118+
logger.debug("Generating embedding for query")
109119
query_embedding = self._embedding_generator.get_embedding(query)
120+
logger.debug("Finished generating embedding for query")
110121

111122
if self._destination:
123+
logger.debug(f"Searching for query: {query} in destination")
112124
query_result = self._destination.search(
113125
query=query,
114126
use_hybrid_search=use_hybrid_search,
115127
query_embedding=query_embedding,
116128
datasource_uuid=str(self.datasource.uuid),
117129
**kwargs,
118130
)
131+
logger.debug(f"Received results for query: {query} from destination")
119132
documents = list(
120133
map(
121134
lambda x: Document(page_content_key=content_key, page_content=x.text, metadata=x.metadata),

llmstack/data/sources/website/url.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -111,9 +111,11 @@ def process_document(cls, document: DataDocument) -> DataDocument:
111111
connection_context = (
112112
get_connection_context(connection_id, document.metadata["datasource_uuid"]) if connection_id else None
113113
)
114-
html_page = get_page_html(
115-
document.request_data.get("url"), connection=connection_id, storage_state=connection_context
116-
)
114+
url = document.name
115+
if document.request_data.get("url"):
116+
url = document.request_data.get("url")
117+
118+
html_page = get_page_html(url, connection=connection_id, storage_state=connection_context)
117119
page_text = extract_text(html_page)
118120

119121
text_data_uri = (

llmstack/events/consumers/app_run_finished.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,6 @@ def persist_app_run_history(event_data: AppRunFinishedEventData):
217217
response_content_type=event_data.response_content_type,
218218
response_headers=event_data.response_headers,
219219
response_time=event_data.response_time,
220-
processor_runs=event_data.processor_runs,
221220
platform_data=event_data.platform_data,
222221
)
223-
run_entry.save()
222+
run_entry.save(processor_runs=event_data.processor_runs)

llmstack/processors/admin.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
ApiBackend,
55
ApiProvider,
66
Endpoint,
7-
EndpointInvocationCount,
87
Feedback,
98
Request,
109
Response,
@@ -16,7 +15,6 @@
1615
admin.site.register(ApiProvider)
1716
admin.site.register(ApiBackend)
1817
admin.site.register(Endpoint)
19-
admin.site.register(EndpointInvocationCount)
2018
admin.site.register(VersionedEndpoint)
2119
admin.site.register(Feedback)
2220
admin.site.register(Request)
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# Generated by Django 4.2.14 on 2024-08-07 21:57
2+
3+
from django.db import migrations, models
4+
5+
6+
class Migration(migrations.Migration):
7+
8+
dependencies = [
9+
('apiabstractor', '0007_runentry_apiabstract_request_fb04e3_idx_and_more'),
10+
]
11+
12+
operations = [
13+
migrations.AddField(
14+
model_name='runentry',
15+
name='processor_runs_objref',
16+
field=models.CharField(blank=True, default=None, help_text='Processor runs objref', null=True),
17+
),
18+
migrations.DeleteModel(
19+
name='EndpointInvocationCount',
20+
),
21+
]

llmstack/processors/models.py

Lines changed: 34 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import json
12
import logging
23
import uuid
34

@@ -7,6 +8,7 @@
78
from django.db.models.signals import pre_save
89
from django.dispatch import receiver
910

11+
from llmstack.assets.utils import get_asset_by_objref_internal
1012
from llmstack.common.utils.db_models import ArrayField
1113

1214
logger = logging.getLogger(__name__)
@@ -416,6 +418,12 @@ class RunEntry(models.Model):
416418
help_text="Array of processor data for each endpoint including input and output data",
417419
)
418420
)
421+
processor_runs_objref = models.CharField(
422+
default=None,
423+
blank=True,
424+
null=True,
425+
help_text="Processor runs objref",
426+
)
419427
platform_data = models.JSONField(
420428
default=dict,
421429
blank=True,
@@ -442,66 +450,45 @@ def clean_dict(self, data):
442450
return data.replace("\u0000", "")
443451
return data
444452

445-
def clean_processor_runs(self):
446-
if self.processor_runs:
447-
self.processor_runs = [self.clean_dict(item) for item in self.processor_runs]
453+
def clean_processor_runs(self, processor_runs=[]):
454+
return [self.clean_dict(item) for item in processor_runs]
448455

449456
def save(self, *args, **kwargs):
450457
# Clean the processor_runs field
451-
self.clean_processor_runs()
458+
processor_runs = kwargs.pop("processor_runs", [])
459+
processor_runs_objref = self.create_processor_runs_objref(processor_runs)
460+
self.processor_runs = []
461+
self.processor_runs_objref = processor_runs_objref
452462
super(RunEntry, self).save(*args, **kwargs)
453463

454464
@property
455465
def is_store_request(self):
456466
return self.app_store_uuid is not None
457467

458-
@staticmethod
459-
def from_pinot_dict(row):
460-
owner = User.objects.get(id=row["owner_id"])
461-
462-
return RunEntry(
463-
request_uuid=row["request_uuid"],
464-
app_uuid=row["app_uuid"],
465-
endpoint_uuid=row["endpoint_uuid"],
466-
owner=owner,
467-
session_key=row["session_key"],
468-
request_user_email=row["request_user_email"],
469-
request_ip=row["request_ip"],
470-
request_location=row["request_location"],
471-
request_user_agent=row["request_user_agent"],
472-
request_content_type=row["request_content_type"],
473-
request_body=row["request_body"],
474-
response_status=row["response_status"],
475-
response_body=row["response_body"],
476-
response_content_type=row["response_content_type"],
477-
response_headers=row["response_headers"],
478-
response_time=row["response_time"],
479-
processor_runs=row["processor_runs"],
480-
)
468+
def create_processor_runs_objref(self, processor_runs=[]):
469+
import base64
470+
import json
481471

472+
from llmstack.apps.models import AppSessionFiles
482473

483-
class EndpointInvocationCount(models.Model):
484-
"""
485-
Model to track the usage of endpoints by users
486-
"""
474+
processor_runs = self.clean_processor_runs(processor_runs)
475+
processor_runs = {"processor_runs": processor_runs}
487476

488-
user = models.ForeignKey(
489-
User,
490-
on_delete=models.DO_NOTHING,
491-
help_text="User this count is for",
492-
)
493-
month = models.CharField(
494-
max_length=5,
495-
help_text="Month for the count as MM-YY",
496-
default="",
497-
)
498-
count = models.IntegerField(
499-
help_text="Count for the month",
500-
default=0,
501-
)
477+
request_uuid = str(self.request_uuid)
478+
processor_runs_datauri = f"data:application/json;name={request_uuid}_processor_runs.json;base64,{base64.b64encode(json.dumps(processor_runs).encode()).decode()}"
502479

503-
def __str__(self):
504-
return self.user.__str__() + ":" + self.month
480+
session_id = self.session_key
481+
processor_runs_objrefs = AppSessionFiles.create_from_data_uri(
482+
data_uri=processor_runs_datauri,
483+
ref_id=session_id,
484+
metadata={"session_id": session_id, "request_uuid": request_uuid},
485+
)
486+
return processor_runs_objrefs.objref
487+
488+
def get_processor_runs_from_objref(self):
489+
file_asset = get_asset_by_objref_internal(self.processor_runs_objref)
490+
content = file_asset.file.read().decode("utf-8")
491+
return json.loads(content).get("processor_runs", [])
505492

506493

507494
class Feedback(models.Model):

llmstack/processors/serializers.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ class Meta:
161161

162162
class HistorySerializer(serializers.ModelSerializer):
163163
app_detail = serializers.SerializerMethodField()
164+
processor_runs = serializers.SerializerMethodField()
164165

165166
def to_representation(self, instance):
166167
representation = super().to_representation(instance)
@@ -205,6 +206,18 @@ def get_app_store_app(uuid):
205206
pass
206207
return {"name": "Deleted App", "path": "/"}
207208

209+
def get_processor_runs(self, obj):
210+
if obj.processor_runs_objref:
211+
try:
212+
return obj.get_processor_runs_from_objref()
213+
except Exception:
214+
pass
215+
216+
if obj.processor_runs:
217+
return obj.processor_runs
218+
219+
return []
220+
208221
class Meta:
209222
model = RunEntry
210223
fields = [

0 commit comments

Comments
 (0)