Monitoring and observability #30078
Replies: 1 comment 10 replies
-
I couldn't find specific information on monitoring and observing components within the
For more detailed and specific observability features, you might need to refer to the LangChain documentation or explore the source code further to identify any built-in capabilities that can be leveraged. To continue talking to Dosu, mention @dosu. Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
@dosu ,
import os
import re
from typing import List, TypedDict
from langchain_core.prompts import PromptTemplate
from langchain_core.documents import Document
from langgraph.graph import START, StateGraph
from langchain_core.messages import HumanMessage
from langchain_ollama import ChatOllama
from utils.utils import load_config, initialize_embeddings, load_faiss_store
from logger.logger import get_logger
logger = get_logger(file)
-------------------------------------------------------------------
1) Prompt for LLM Re-ranking (for products)
-------------------------------------------------------------------
template = """You are an intelligent assistant that reorders product IDs based on any constraints in the user query.
Instructions:
Context:
{context}
Question: {question}
Helpful Answer:
"""
custom_rag_prompt = PromptTemplate(
template=template,
input_variables=["context", "question", "doc_count"]
)
-------------------------------------------------------------------
2) Define Pipeline State (new, simplified)
-------------------------------------------------------------------
class ProductState(TypedDict):
question: str
k: int # Dynamic number of documents to retrieve.
context: List[Document] # Holds up to k retrieved product documents.
final_ids_output: str # The final combined output after re-ranking.
loop_step: int
-------------------------------------------------------------------
3) Helper: Regex Parsing for Product IDs
-------------------------------------------------------------------
def parse_product_ids_from_llm_text(llm_text: str) -> List[str]:
"""
Extracts all IDs from lines containing 'Product ID: ...' via regex.
e.g. 'Product ID: 1234' => '1234'
"""
pattern = r"Product ID:\s*(\S+)"
matches = re.findall(pattern, llm_text)
# Remove trailing punctuation (if any)
cleaned = [m.rstrip(",.") for m in matches]
return cleaned
-------------------------------------------------------------------
4) Retrieval: Retrieve k Products in One Call
-------------------------------------------------------------------
def retrieve_products(state: ProductState, faiss_store) -> ProductState:
"""
Uses the FAISS store retriever to fetch up to k product documents.
"""
try:
k = state.get("k", 200) # Use the dynamic k value (default to 200)
retriever = faiss_store.as_retriever(search_kwargs={'k': k})
docs = retriever.invoke(state["question"])
logger.info(f"Retrieved {len(docs)} product documents with k={k}.")
return {"context": docs}
except Exception as e:
logger.error(f"Error retrieving {k} product docs: {e}", exc_info=True)
return {"context": []}
-------------------------------------------------------------------
5) Rerank Top 25 and Append Remainder
-------------------------------------------------------------------
def rerank_and_append(state: ProductState, llm) -> ProductState:
"""
From the retrieved product documents, take the top 25 and send them to the LLM
for re-ranking, then append the remaining product IDs (docs 26 onward) after the LLM result.
"""
docs = state.get("context", [])
if not docs:
logger.warning("No product documents retrieved for re-ranking.")
return {"final_ids_output": "No relevant product IDs found."}
-------------------------------------------------------------------
6) Build LangGraph Workflow
-------------------------------------------------------------------
def build_product_graph(faiss_store, llm):
"""
Build a LangGraph pipeline with two nodes:
1. Retrieve k product documents.
2. Rerank top 25 and append the remaining product IDs.
"""
graph_builder = StateGraph(ProductState)
graph_builder.add_node("retrieve", lambda s: retrieve_products(s, faiss_store))
graph_builder.add_node("rerank_and_append", lambda s: rerank_and_append(s, llm))
graph_builder.add_edge(START, "retrieve")
graph_builder.add_edge("retrieve", "rerank_and_append")
compiled_graph = graph_builder.compile()
logger.info("LangGraph product pipeline compiled successfully.")
return compiled_graph
-------------------------------------------------------------------
7) Main Execution (for local testing)
-------------------------------------------------------------------
def main():
try:
this_dir = os.path.dirname(os.path.abspath(file))
config_path = os.path.join(this_dir, "..", "config.yaml")
config = load_config(config_path)
logger.info(f"Configuration loaded from {config_path}")
if name == "main":
main()
i have the retrieval pipeline where i should faiss retrieval , llm to rerank, and all under the framework of langgraph , , now what are the componets and that i can monitor and observe and how can i do it ?
Beta Was this translation helpful? Give feedback.
All reactions