forked from microsoft/agent-framework
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathazure_responses_client.py
More file actions
55 lines (41 loc) · 1.79 KB
/
azure_responses_client.py
File metadata and controls
55 lines (41 loc) · 1.79 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
# Copyright (c) Microsoft. All rights reserved.
import asyncio
from random import randint
from typing import Annotated
from agent_framework import ChatResponse
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential
from pydantic import BaseModel, Field
"""
Azure Responses Client Direct Usage Example
Demonstrates direct AzureResponsesClient usage for structured response generation with Azure OpenAI models.
Shows function calling capabilities with custom business logic.
"""
def get_weather(
location: Annotated[str, Field(description="The location to get the weather for.")],
) -> str:
"""Get the weather for a given location."""
conditions = ["sunny", "cloudy", "rainy", "stormy"]
return f"The weather in {location} is {conditions[randint(0, 3)]} with a high of {randint(10, 30)}°C."
class OutputStruct(BaseModel):
"""Structured output for weather information."""
location: str
weather: str
async def main() -> None:
# For authentication, run `az login` command in terminal or replace AzureCliCredential with preferred
# authentication option.
client = AzureOpenAIResponsesClient(credential=AzureCliCredential())
message = "What's the weather in Amsterdam and in Paris?"
stream = True
print(f"User: {message}")
if stream:
response = await ChatResponse.from_chat_response_generator(
client.get_streaming_response(message, tools=get_weather, response_format=OutputStruct),
output_format_type=OutputStruct,
)
print(f"Assistant: {response.value}")
else:
response = await client.get_response(message, tools=get_weather, response_format=OutputStruct)
print(f"Assistant: {response.value}")
if __name__ == "__main__":
asyncio.run(main())