Skip to content

Commit a6f9b62

Browse files
committed
Langchain examples for testing
and send with otlp and full chatbot demo --- So... turns out Langchain uses the v1beta1 prediction service client under the hood directly.. So we should probably instrument that after all instead of the main wrapper API. It also has a streaming option so we should try to support that as well, and it has ainvoke() for asyncio.
1 parent b2f6b32 commit a6f9b62

File tree

4 files changed

+332
-1
lines changed

4 files changed

+332
-1
lines changed
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
# https://python.langchain.com/docs/tutorials/chatbot/#streaming
2+
3+
4+
from typing import Sequence
5+
6+
from langchain_core.messages import (
7+
AIMessage,
8+
BaseMessage,
9+
HumanMessage,
10+
SystemMessage,
11+
trim_messages,
12+
)
13+
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
14+
from langchain_google_vertexai import ChatVertexAI
15+
from langgraph.checkpoint.memory import MemorySaver
16+
from langgraph.graph import START, MessagesState, StateGraph
17+
from langgraph.graph.message import add_messages
18+
from typing_extensions import Annotated, TypedDict
19+
20+
from opentelemetry import trace
21+
22+
23+
def main() -> None:
24+
model = ChatVertexAI(model="gemini-1.5-flash")
25+
26+
# Define a new graph
27+
workflow = StateGraph(state_schema=MessagesState)
28+
29+
# Define the function that calls the model
30+
def call_model(state: MessagesState):
31+
response = model.invoke(state["messages"])
32+
return {"messages": response}
33+
34+
# Define the (single) node in the graph
35+
workflow.add_edge(START, "model")
36+
workflow.add_node("model", call_model)
37+
38+
# Add memory
39+
memory = MemorySaver()
40+
app = workflow.compile(checkpointer=memory)
41+
42+
config = {"configurable": {"thread_id": "abc123"}}
43+
44+
query = "Hi! I'm Bob."
45+
46+
input_messages = [HumanMessage(query)]
47+
output = app.invoke({"messages": input_messages}, config)
48+
output["messages"][
49+
-1
50+
].pretty_print() # output contains all messages in state
51+
52+
query = "What's my name?"
53+
54+
input_messages = [HumanMessage(query)]
55+
output = app.invoke({"messages": input_messages}, config)
56+
output["messages"][-1].pretty_print()
57+
58+
config = {"configurable": {"thread_id": "abc234"}}
59+
60+
input_messages = [HumanMessage(query)]
61+
output = app.invoke({"messages": input_messages}, config)
62+
output["messages"][-1].pretty_print()
63+
64+
config = {"configurable": {"thread_id": "abc123"}}
65+
66+
input_messages = [HumanMessage(query)]
67+
output = app.invoke({"messages": input_messages}, config)
68+
output["messages"][-1].pretty_print()
69+
70+
prompt_template = ChatPromptTemplate.from_messages(
71+
[
72+
(
73+
"system",
74+
"You talk like a pirate. Answer all questions to the best of your ability.",
75+
),
76+
MessagesPlaceholder(variable_name="messages"),
77+
]
78+
)
79+
80+
workflow = StateGraph(state_schema=MessagesState)
81+
82+
def call_model(state: MessagesState):
83+
# highlight-start
84+
prompt = prompt_template.invoke(state)
85+
response = model.invoke(prompt)
86+
# highlight-end
87+
return {"messages": response}
88+
89+
workflow.add_edge(START, "model")
90+
workflow.add_node("model", call_model)
91+
92+
memory = MemorySaver()
93+
app = workflow.compile(checkpointer=memory)
94+
95+
config = {"configurable": {"thread_id": "abc345"}}
96+
query = "Hi! I'm Jim."
97+
98+
input_messages = [HumanMessage(query)]
99+
output = app.invoke({"messages": input_messages}, config)
100+
output["messages"][-1].pretty_print()
101+
102+
query = "What is my name?"
103+
104+
input_messages = [HumanMessage(query)]
105+
output = app.invoke({"messages": input_messages}, config)
106+
output["messages"][-1].pretty_print()
107+
108+
prompt_template = ChatPromptTemplate.from_messages(
109+
[
110+
(
111+
"system",
112+
"You are a helpful assistant. Answer all questions to the best of your ability in {language}.",
113+
),
114+
MessagesPlaceholder(variable_name="messages"),
115+
]
116+
)
117+
118+
# highlight-next-line
119+
class State(TypedDict):
120+
# highlight-next-line
121+
messages: Annotated[Sequence[BaseMessage], add_messages]
122+
# highlight-next-line
123+
language: str
124+
125+
workflow = StateGraph(state_schema=State)
126+
127+
def call_model(state: State):
128+
prompt = prompt_template.invoke(state)
129+
response = model.invoke(prompt)
130+
return {"messages": [response]}
131+
132+
workflow.add_edge(START, "model")
133+
workflow.add_node("model", call_model)
134+
135+
memory = MemorySaver()
136+
app = workflow.compile(checkpointer=memory)
137+
138+
config = {"configurable": {"thread_id": "abc456"}}
139+
query = "Hi! I'm Bob."
140+
language = "Spanish"
141+
142+
input_messages = [HumanMessage(query)]
143+
output = app.invoke(
144+
# highlight-next-line
145+
{"messages": input_messages, "language": language},
146+
config,
147+
)
148+
output["messages"][-1].pretty_print()
149+
150+
query = "What is my name?"
151+
152+
input_messages = [HumanMessage(query)]
153+
output = app.invoke(
154+
{"messages": input_messages},
155+
config,
156+
)
157+
output["messages"][-1].pretty_print()
158+
159+
trimmer = trim_messages(
160+
max_tokens=65,
161+
strategy="last",
162+
token_counter=model,
163+
include_system=True,
164+
allow_partial=False,
165+
start_on="human",
166+
)
167+
168+
messages = [
169+
SystemMessage(content="you're a good assistant"),
170+
HumanMessage(content="hi! I'm bob"),
171+
AIMessage(content="hi!"),
172+
HumanMessage(content="I like vanilla ice cream"),
173+
AIMessage(content="nice"),
174+
HumanMessage(content="whats 2 + 2"),
175+
AIMessage(content="4"),
176+
HumanMessage(content="thanks"),
177+
AIMessage(content="no problem!"),
178+
HumanMessage(content="having fun?"),
179+
AIMessage(content="yes!"),
180+
]
181+
182+
trimmer.invoke(messages)
183+
184+
workflow = StateGraph(state_schema=State)
185+
186+
def call_model(state: State):
187+
# highlight-start
188+
trimmed_messages = trimmer.invoke(state["messages"])
189+
prompt = prompt_template.invoke(
190+
{"messages": trimmed_messages, "language": state["language"]}
191+
)
192+
response = model.invoke(prompt)
193+
# highlight-end
194+
return {"messages": [response]}
195+
196+
workflow.add_edge(START, "model")
197+
workflow.add_node("model", call_model)
198+
199+
memory = MemorySaver()
200+
app = workflow.compile(checkpointer=memory)
201+
202+
config = {"configurable": {"thread_id": "abc567"}}
203+
query = "What is my name?"
204+
language = "English"
205+
206+
# highlight-next-line
207+
input_messages = messages + [HumanMessage(query)]
208+
output = app.invoke(
209+
{"messages": input_messages, "language": language},
210+
config,
211+
)
212+
output["messages"][-1].pretty_print()
213+
214+
config = {"configurable": {"thread_id": "abc678"}}
215+
query = "What math problem did I ask?"
216+
language = "English"
217+
218+
input_messages = messages + [HumanMessage(query)]
219+
output = app.invoke(
220+
{"messages": input_messages, "language": language},
221+
config,
222+
)
223+
output["messages"][-1].pretty_print()
224+
225+
config = {"configurable": {"thread_id": "abc789"}}
226+
query = "Hi I'm Todd, please tell me a joke."
227+
language = "English"
228+
229+
input_messages = [HumanMessage(query)]
230+
# highlight-next-line
231+
for chunk, metadata in app.stream(
232+
{"messages": input_messages, "language": language},
233+
config,
234+
# highlight-next-line
235+
stream_mode="messages",
236+
):
237+
if isinstance(chunk, AIMessage): # Filter to just model responses
238+
print(chunk.content, end="|")
239+
240+
241+
with trace.get_tracer(__name__).start_as_current_span("demo-root-span"):
242+
main()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# pylint: skip-file
2+
from langchain_google_vertexai import ChatVertexAI
3+
4+
# NOTE: OpenTelemetry Python Logs and Events APIs are in beta
5+
from opentelemetry import _events, _logs, trace
6+
7+
# from opentelemetry.exporter.otlp.proto.grpc._log_exporter import (
8+
# OTLPLogExporter,
9+
# )
10+
# from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
11+
# OTLPSpanExporter,
12+
# )
13+
from opentelemetry.instrumentation.vertexai import VertexAIInstrumentor
14+
from opentelemetry.sdk._events import EventLoggerProvider
15+
from opentelemetry.sdk._logs import LoggerProvider
16+
from opentelemetry.sdk.trace import TracerProvider
17+
from opentelemetry.sdk.trace.export import (
18+
BatchSpanProcessor,
19+
ConsoleSpanExporter,
20+
)
21+
22+
# configure tracing
23+
trace.set_tracer_provider(TracerProvider())
24+
trace.get_tracer_provider().add_span_processor(
25+
BatchSpanProcessor(ConsoleSpanExporter())
26+
)
27+
28+
# configure logging and events
29+
_logs.set_logger_provider(LoggerProvider())
30+
# _logs.get_logger_provider().add_log_record_processor(
31+
# BatchLogRecordProcessor(OTLPLogExporter())
32+
# )
33+
_events.set_event_logger_provider(EventLoggerProvider())
34+
35+
# instrument VertexAI
36+
VertexAIInstrumentor().instrument()
37+
38+
39+
def main():
40+
model = ChatVertexAI(
41+
model="gemini-1.5-flash", temperature=0.2, max_output_tokens=20
42+
)
43+
res = model.invoke("Hello, world!")
44+
print(res)
45+
46+
47+
if __name__ == "__main__":
48+
main()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
receivers:
2+
otlp:
3+
protocols:
4+
grpc:
5+
endpoint: "0.0.0.0:4317"
6+
7+
extensions:
8+
googleclientauth:
9+
project: ${GOOGLE_CLOUD_PROJECT}
10+
quota_project: ${GOOGLE_CLOUD_QUOTA_PROJECT}
11+
scopes:
12+
- "https://www.googleapis.com/auth/trace.append"
13+
- "https://www.googleapis.com/auth/cloud-platform"
14+
15+
processors:
16+
resource:
17+
attributes:
18+
- key: gcp.project_id
19+
value: ${GOOGLE_CLOUD_PROJECT}
20+
action: insert
21+
22+
exporters:
23+
googlecloud:
24+
project: ${GOOGLE_CLOUD_PROJECT}
25+
log:
26+
default_log_name: "collector-otlp-logs"
27+
otlp:
28+
endpoint: https://telemetry.us-central1.rep.googleapis.com:443
29+
auth:
30+
authenticator: googleclientauth
31+
32+
service:
33+
extensions: [googleclientauth]
34+
pipelines:
35+
traces:
36+
receivers: [otlp]
37+
processors: [resource]
38+
exporters: [otlp]
39+
logs:
40+
receivers: [otlp]
41+
processors: [resource]
42+
exporters: [googlecloud]

instrumentation-genai/opentelemetry-instrumentation-vertexai/src/opentelemetry/instrumentation/vertexai/utils.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616

1717
import re
1818
from dataclasses import dataclass
19-
from enum import Enum
2019
from os import environ
2120
from typing import (
2221
TYPE_CHECKING,

0 commit comments

Comments
 (0)