Skip to content

Commit 8061aa6

Browse files
authored
Instrument tracing with langfuse (#167)
* add langsmith * Update requirements and add async support for LLM completion - Upgrade ZenML to version 0.74.0 - Add Pinecone, nest_asyncio, and asyncio to requirements - Implement async version of get_completion_from_messages - Add Langsmith callback for LLM requests - Improve error handling for async completion * Refactor Pinecone and import statements in populate_index and llm_utils - Reorganize import statements in populate_index.py and llm_utils.py - Remove redundant Pinecone import in populate_index.py - Improve code formatting and import order - Minor code cleanup and optimization * Instrument langsmith tracing * Add environment-specific tracing tags for Gradio deployment - Import os module to handle environment variables - Add APP_ENVIRONMENT variable with default "dev" - Include tracing tags in predict function with environment context
1 parent 6c61fd3 commit 8061aa6

File tree

7 files changed

+185
-45
lines changed

7 files changed

+185
-45
lines changed
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
0.71.0
1+
0.74.0

llm-complete-guide/constants.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
# ZenML constants
2727
ZENML_CHATBOT_MODEL = "zenml-docs-qa-chatbot"
2828
ZENML_CHATBOT_MODEL_NAME = "zenml-docs-qa-chatbot"
29-
ZENML_CHATBOT_MODEL_VERSION = "0.71.0-dev"
29+
ZENML_CHATBOT_MODEL_VERSION = "0.74.0-dev"
3030

3131
# Scraping constants
3232
RATE_LIMIT = 5 # Maximum number of requests per second
Lines changed: 98 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,20 @@
11
import logging
2+
import os
3+
import time
24

35
import gradio as gr
46
from constants import SECRET_NAME
7+
from langfuse import Langfuse
58
from utils.llm_utils import process_input_with_retrieval
69
from zenml.client import Client
710

8-
# Set up logging
11+
langfuse = Langfuse()
12+
913
logging.basicConfig(level=logging.INFO)
1014
logger = logging.getLogger(__name__)
1115

16+
APP_ENVIRONMENT = os.getenv("GRADIO_ZENML_APP_ENVIRONMENT", "dev")
17+
1218
# Initialize ZenML client and verify secret access
1319
try:
1420
client = Client()
@@ -21,6 +27,80 @@
2127
raise RuntimeError(f"Application startup failed: {e}")
2228

2329

30+
def get_langfuse_trace_id() -> str | None:
31+
"""Get the trace from Langfuse.
32+
33+
This is a very naive implementation. It simply returns the id of the first trace
34+
in the last 60 seconds. Will retry up to 3 times if no traces are found or if
35+
there's an error.
36+
37+
Returns:
38+
str | None: The trace ID if found, None otherwise
39+
"""
40+
logger.info("Getting trace from Langfuse")
41+
retries = 0
42+
max_retries = 3
43+
while retries < max_retries:
44+
try:
45+
# Wait 5 seconds before making the API call
46+
time.sleep(5)
47+
traces = langfuse.fetch_traces(
48+
limit=1, order_by="timestamp.desc"
49+
).data
50+
if not traces:
51+
retries += 1
52+
if retries == max_retries:
53+
logger.error(
54+
f"No traces found after {max_retries} attempts"
55+
)
56+
return None
57+
logger.warning(
58+
f"No traces found (attempt {retries}/{max_retries})"
59+
)
60+
time.sleep(10)
61+
continue
62+
return traces[0].id
63+
except Exception as e:
64+
retries += 1
65+
if retries == max_retries:
66+
logger.error(
67+
f"Error fetching traces after {max_retries} attempts: {e}"
68+
)
69+
return None
70+
logger.warning(
71+
f"Error fetching traces (attempt {retries}/{max_retries}): {e}"
72+
)
73+
time.sleep(10)
74+
return None
75+
76+
77+
def vote(data: gr.LikeData):
78+
"""Vote on a response.
79+
80+
Args:
81+
data (gr.LikeData): The vote data.
82+
"""
83+
84+
trace_id = get_langfuse_trace_id()
85+
logger.info(f"Vote data: {data}")
86+
if data.liked:
87+
logger.info("Vote up")
88+
langfuse.score(
89+
trace_id=trace_id,
90+
name="user-explicit-feedback",
91+
value="like",
92+
comment="I like this response",
93+
)
94+
else:
95+
logger.info("Vote down")
96+
langfuse.score(
97+
trace_id=trace_id,
98+
name="user-explicit-feedback",
99+
value="dislike",
100+
comment="I don't like the response",
101+
)
102+
103+
24104
def predict(message, history):
25105
try:
26106
# add the prod flag here
@@ -29,18 +109,29 @@ def predict(message, history):
29109
n_items_retrieved=20,
30110
use_reranking=True,
31111
model_version_stage="production",
112+
tracing_tags=["gradio", "web-interface", APP_ENVIRONMENT],
32113
)
33114
except Exception as e:
34115
logger.error(f"Error processing message: {e}")
35116
return f"Sorry, I encountered an error: {str(e)}"
36117

37118

38-
# Launch the Gradio interface
39-
interface = gr.ChatInterface(
40-
predict,
41-
title="ZenML Documentation Assistant",
42-
description="Ask me anything about ZenML!",
43-
)
119+
with gr.Blocks() as interface:
120+
custom_chatbot = gr.Chatbot(
121+
type="messages",
122+
editable=True,
123+
)
124+
125+
gr.ChatInterface(
126+
predict,
127+
type="messages",
128+
title="ZenML Documentation Assistant",
129+
description="Ask me anything about ZenML!",
130+
chatbot=custom_chatbot,
131+
theme="shivi/calm_seafoam",
132+
)
133+
134+
custom_chatbot.like(vote, None, None)
44135

45136
if __name__ == "__main__":
46137
interface.launch(server_name="0.0.0.0", share=False)

llm-complete-guide/requirements.txt

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
zenml[server]>=0.73.0
1+
git+https://github.com/zenml-io/zenml.git@develop#egg=zenml[server] # will work for zenml>=0.75.0
22
ratelimit
33
pgvector
44
psycopg2-binary
@@ -18,10 +18,12 @@ pyarrow
1818
rerankers[flashrank]
1919
datasets
2020
torch
21-
gradio
21+
gradio>=5.13.0
2222
huggingface-hub
2323
elasticsearch
2424
tenacity
25+
langfuse
26+
pinecone
2527

2628
# optional requirements for S3 artifact store
2729
# s3fs>2022.3.0

llm-complete-guide/run.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,11 @@ def main(
234234
)
235235
# add the prod flag here
236236
response = process_input_with_retrieval(
237-
query_text, model=model, use_reranking=use_reranker, model_version_stage="production"
237+
query_text,
238+
model=model,
239+
use_reranking=use_reranker,
240+
model_version_stage="production",
241+
tracing_tags=["cli", "dev"],
238242
)
239243
console = Console()
240244
md = Markdown(response)

llm-complete-guide/steps/populate_index.py

Lines changed: 19 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,12 +48,16 @@
4848
from PIL import Image, ImageDraw, ImageFont
4949
from sentence_transformers import SentenceTransformer
5050
from structures import Document
51-
from utils.llm_utils import get_db_conn, get_es_client, get_pinecone_client, split_documents
51+
from utils.llm_utils import (
52+
get_db_conn,
53+
get_es_client,
54+
get_pinecone_client,
55+
split_documents,
56+
)
5257
from zenml import ArtifactConfig, get_step_context, log_metadata, step
5358
from zenml.client import Client
5459
from zenml.metadata.metadata_types import Uri
55-
import pinecone
56-
from pinecone import Pinecone, ServerlessSpec
60+
5761
logging.basicConfig(level=logging.INFO)
5862
logger = logging.getLogger(__name__)
5963

@@ -642,7 +646,7 @@ def index_generator(
642646
documents (str): JSON string containing the documents to index.
643647
index_type (IndexType, optional): Type of index to generate. Defaults to IndexType.POSTGRES.
644648
"""
645-
# get model version
649+
# get model version
646650
context = get_step_context()
647651
model_version_stage = context.model_version.stage
648652
if index_type == IndexType.ELASTICSEARCH:
@@ -825,7 +829,9 @@ def _index_generator_postgres(documents: str) -> None:
825829
conn.close()
826830

827831

828-
def _index_generator_pinecone(documents: str, model_version_stage: str) -> None:
832+
def _index_generator_pinecone(
833+
documents: str, model_version_stage: str
834+
) -> None:
829835
"""Generates a Pinecone index for the given documents.
830836
831837
Args:
@@ -856,8 +862,8 @@ def _index_generator_pinecone(documents: str, model_version_stage: str) -> None:
856862
"parent_section": doc["parent_section"] or "",
857863
"url": doc["url"],
858864
"page_content": doc["page_content"],
859-
"token_count": doc["token_count"]
860-
}
865+
"token_count": doc["token_count"],
866+
},
861867
}
862868
batch.append(vector_record)
863869

@@ -870,7 +876,9 @@ def _index_generator_pinecone(documents: str, model_version_stage: str) -> None:
870876
if batch:
871877
index.upsert(vectors=batch)
872878

873-
logger.info(f"Successfully indexed {len(docs)} documents to Pinecone index")
879+
logger.info(
880+
f"Successfully indexed {len(docs)} documents to Pinecone index"
881+
)
874882

875883

876884
def _log_metadata(index_type: IndexType) -> None:
@@ -914,7 +922,9 @@ def _log_metadata(index_type: IndexType) -> None:
914922
store_name = "pinecone"
915923
connection_details = {
916924
"api_key": "**********",
917-
"environment": client.get_secret(SECRET_NAME_PINECONE).secret_values["pinecone_env"],
925+
"environment": client.get_secret(
926+
SECRET_NAME_PINECONE
927+
).secret_values["pinecone_env"],
918928
}
919929

920930
log_metadata(

0 commit comments

Comments
 (0)