4
4
from datetime import datetime , timedelta
5
5
6
6
import requests
7
- from langchain_core .messages import HumanMessage , SystemMessage
8
7
from langchain_openai import ChatOpenAI
8
+ from langchain_core .messages import HumanMessage , SystemMessage
9
+ # Add BaseMessage for typed state
10
+ from langchain_core .messages import BaseMessage
9
11
10
12
from opentelemetry import _events , _logs , metrics , trace
11
13
from opentelemetry .exporter .otlp .proto .grpc ._log_exporter import (
17
19
from opentelemetry .exporter .otlp .proto .grpc .trace_exporter import (
18
20
OTLPSpanExporter ,
19
21
)
20
- from opentelemetry .instrumentation .langchain import LangChainInstrumentor
22
+ from opentelemetry .instrumentation .langchain import LangchainInstrumentor
21
23
from opentelemetry .sdk ._events import EventLoggerProvider
22
24
from opentelemetry .sdk ._logs import LoggerProvider
23
25
from opentelemetry .sdk ._logs .export import BatchLogRecordProcessor
24
26
from opentelemetry .sdk .metrics import MeterProvider
25
27
from opentelemetry .sdk .metrics .export import PeriodicExportingMetricReader
26
28
from opentelemetry .sdk .trace import TracerProvider
27
29
from opentelemetry .sdk .trace .export import BatchSpanProcessor
30
+ # NEW: access telemetry handler to manually flush async evaluations
31
+ try : # pragma: no cover - defensive in case util package not installed
32
+ from opentelemetry .util .genai .handler import get_telemetry_handler
33
+ except Exception : # pragma: no cover
34
+ get_telemetry_handler = lambda ** _ : None # type: ignore
28
35
29
36
# configure tracing
30
37
trace .set_tracer_provider (TracerProvider ())
@@ -110,11 +117,21 @@ def cleanup_token_cache(self):
110
117
f .write (b"\0 " * length )
111
118
os .remove (self .cache_file )
112
119
113
-
114
- def main ():
115
- # Set up instrumentation
116
- LangChainInstrumentor ().instrument ()
117
-
120
+ def _flush_evaluations ():
121
+ """Force one evaluation processing cycle if async evaluators are enabled.
122
+
123
+ The GenAI evaluation system samples and enqueues invocations asynchronously.
124
+ For demo / test determinism we explicitly trigger one drain so evaluation
125
+ spans / events / metrics are emitted before the script exits.
126
+ """
127
+ try :
128
+ handler = get_telemetry_handler ()
129
+ if handler and hasattr (handler , "process_evaluations" ):
130
+ handler .process_evaluations () # type: ignore[attr-defined]
131
+ except Exception :
132
+ pass
133
+
134
+ def llm_invocation_demo (llm : ChatOpenAI ):
118
135
import random
119
136
120
137
# List of capital questions to randomly select from
@@ -132,19 +149,155 @@ def main():
132
149
"What is the capital of United States?" ,
133
150
]
134
151
152
+
153
+ messages = [
154
+ SystemMessage (content = "You are a helpful assistant!" ),
155
+ HumanMessage (content = "What is the capital of France?" ),
156
+ ]
157
+
158
+ result = llm .invoke (messages )
159
+
160
+ print ("LLM output:\n " , result )
161
+ _flush_evaluations () # ensure first invocation evaluations processed
162
+
163
+ selected_question = random .choice (capital_questions )
164
+ print (f"Selected question: { selected_question } " )
165
+
166
+ system_message = "You are a helpful assistant!"
167
+
168
+ messages = [
169
+ SystemMessage (content = system_message ),
170
+ HumanMessage (content = selected_question ),
171
+ ]
172
+
173
+ result = llm .invoke (messages )
174
+ print (f"LLM output: { getattr (result , 'content' , result )} " )
175
+ _flush_evaluations () # flush after second invocation
176
+
177
+ def agent_demo (llm : ChatOpenAI ):
178
+ """Demonstrate a LangGraph + LangChain agent with:
179
+ - A tool (get_capital)
180
+ - A subagent specialized for capital questions
181
+ - A simple classifier node routing to subagent or general LLM response
182
+
183
+ Tracing & metrics:
184
+ * Each LLM call is instrumented via LangChainInstrumentor.
185
+ * Tool invocation will create its own span.
186
+ """
187
+ try :
188
+ from langchain_core .tools import tool
189
+ from langchain_core .messages import AIMessage
190
+ from langgraph .graph import StateGraph , END
191
+ from typing import TypedDict , Annotated
192
+ from langgraph .graph .message import add_messages
193
+ except ImportError : # pragma: no cover - optional dependency
194
+ print ("LangGraph or necessary LangChain core tooling not installed; skipping agent demo." )
195
+ return
196
+
197
+ # Define structured state with additive messages so multiple nodes can append safely.
198
+ class AgentState (TypedDict , total = False ):
199
+ input : str
200
+ # messages uses additive channel combining lists across steps
201
+ messages : Annotated [list [BaseMessage ], add_messages ]
202
+ route : str
203
+ output : str
204
+
205
+ # ---- Tool Definition ----
206
+ capitals_map = {
207
+ "france" : "Paris" ,
208
+ "germany" : "Berlin" ,
209
+ "italy" : "Rome" ,
210
+ "spain" : "Madrid" ,
211
+ "japan" : "Tokyo" ,
212
+ "canada" : "Ottawa" ,
213
+ "australia" : "Canberra" ,
214
+ "brazil" : "Brasília" ,
215
+ "india" : "New Delhi" ,
216
+ "united states" : "Washington, D.C." ,
217
+ "united kingdom" : "London" ,
218
+ }
219
+
220
+ @tool
221
+ def get_capital (country : str ) -> str : # noqa: D401
222
+ """Return the capital city for the given country name.
223
+
224
+ The lookup is case-insensitive and trims punctuation/whitespace.
225
+ If the country is unknown, returns the string "Unknown".
226
+ """
227
+ return capitals_map .get (country .strip ().lower (), "Unknown" )
228
+
229
+ # ---- Subagent (Capital Specialist) ----
230
+ def capital_subagent (state : AgentState ) -> AgentState :
231
+ question : str = state ["input" ]
232
+ country = question .rstrip ("?!. " ).split (" " )[- 1 ]
233
+ cap = get_capital .run (country )
234
+ answer = f"The capital of { country .capitalize ()} is { cap } ."
235
+ return {"messages" : [AIMessage (content = answer )], "output" : answer }
236
+
237
+ # ---- General Node (Fallback) ----
238
+ def general_node (state : AgentState ) -> AgentState :
239
+ question : str = state ["input" ]
240
+ response = llm .invoke ([
241
+ SystemMessage (content = "You are a helpful, concise assistant." ),
242
+ HumanMessage (content = question ),
243
+ ])
244
+ # Ensure we wrap response as AIMessage if needed
245
+ ai_msg = response if isinstance (response , AIMessage ) else AIMessage (content = getattr (response , "content" , str (response )))
246
+ return {"messages" : [ai_msg ], "output" : getattr (response , "content" , str (response ))}
247
+
248
+ # ---- Classifier Node ----
249
+ def classifier (state : AgentState ) -> AgentState :
250
+ q : str = state ["input" ].lower ()
251
+ return {"route" : "capital" if ("capital" in q or "city" in q ) else "general" }
252
+
253
+ graph = StateGraph (AgentState )
254
+ graph .add_node ("classify" , classifier )
255
+ graph .add_node ("capital_agent" , capital_subagent )
256
+ graph .add_node ("general_agent" , general_node )
257
+
258
+ def route_decider (state : AgentState ): # returns which edge to follow
259
+ return state .get ("route" , "general" )
260
+
261
+ graph .add_conditional_edges (
262
+ "classify" ,
263
+ route_decider ,
264
+ {"capital" : "capital_agent" , "general" : "general_agent" },
265
+ )
266
+ graph .add_edge ("capital_agent" , END )
267
+ graph .add_edge ("general_agent" , END )
268
+ graph .set_entry_point ("classify" )
269
+ app = graph .compile ()
270
+
271
+ demo_questions = [
272
+ "What is the capital of France?" ,
273
+ "Explain why the sky is blue in one sentence." ,
274
+ "What is the capital city of Brazil?" ,
275
+ ]
276
+
277
+ print ("\n --- LangGraph Agent Demo ---" )
278
+ for q in demo_questions :
279
+ print (f"\n User Question: { q } " )
280
+ # Initialize state with additive messages list.
281
+ result_state = app .invoke ({"input" : q , "messages" : []})
282
+ print ("Agent Output:" , result_state .get ("output" ))
283
+ _flush_evaluations ()
284
+ print ("--- End Agent Demo ---\n " )
285
+
286
+
287
+
288
+ def main ():
289
+ # Set up instrumentation
290
+ LangchainInstrumentor ().instrument ()
291
+
292
+ # Set up Cisco CircuIT credentials from environment
135
293
cisco_client_id = os .getenv ("CISCO_CLIENT_ID" )
136
294
cisco_client_secret = os .getenv ("CISCO_CLIENT_SECRET" )
137
295
cisco_app_key = os .getenv ("CISCO_APP_KEY" )
138
-
139
296
token_manager = TokenManager (
140
297
cisco_client_id , cisco_client_secret , cisco_app_key , "/tmp/.token.json"
141
298
)
142
-
143
299
api_key = token_manager .get_token ()
144
300
145
- # Set up instrumentation once
146
- LangChainInstrumentor ().instrument ()
147
-
148
301
# ChatOpenAI setup
149
302
llm = ChatOpenAI (
150
303
model = "gpt-4.1" ,
@@ -161,30 +314,16 @@ def main():
161
314
model_kwargs = {"user" : '{"appkey": "' + cisco_app_key + '"}' },
162
315
)
163
316
164
- messages = [
165
- SystemMessage (content = "You are a helpful assistant!" ),
166
- HumanMessage (content = "What is the capital of France?" ),
167
- ]
317
+ # LLM invocation demo (simple)
318
+ # llm_invocation_demo(llm)
168
319
169
- result = llm .invoke (messages )
170
-
171
- print ("LLM output:\n " , result )
172
-
173
- selected_question = random .choice (capital_questions )
174
- print (f"Selected question: { selected_question } " )
175
-
176
- system_message = "You are a helpful assistant!"
320
+ # Run agent demo (tool + subagent). Safe if LangGraph unavailable.
321
+ agent_demo (llm )
177
322
178
- messages = [
179
- SystemMessage (content = system_message ),
180
- HumanMessage (content = selected_question ),
181
- ]
182
-
183
- result = llm .invoke (messages )
184
- print (f"LLM output: { result .content } " )
323
+ _flush_evaluations () # final flush before shutdown
185
324
186
325
# Un-instrument after use
187
- LangChainInstrumentor ().uninstrument ()
326
+ LangchainInstrumentor ().uninstrument ()
188
327
189
328
190
329
if __name__ == "__main__" :
0 commit comments