Skip to content

Commit 2e164a1

Browse files
committed
updates with json helper
Signed-off-by: Filinto Duran <[email protected]>
1 parent e837ccb commit 2e164a1

File tree

3 files changed

+303
-0
lines changed

3 files changed

+303
-0
lines changed

dapr/aio/clients/grpc/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1853,6 +1853,99 @@ async def converse_stream_alpha1(
18531853
except grpc.aio.AioRpcError as err:
18541854
raise DaprGrpcError(err) from err
18551855

1856+
async def converse_stream_json(
1857+
self,
1858+
name: str,
1859+
inputs: List[ConversationInput],
1860+
*,
1861+
context_id: Optional[str] = None,
1862+
parameters: Optional[Dict[str, GrpcAny]] = None,
1863+
metadata: Optional[Dict[str, str]] = None,
1864+
scrub_pii: Optional[bool] = None,
1865+
temperature: Optional[float] = None,
1866+
) -> AsyncIterator[Dict[str, Any]]:
1867+
"""Invoke an LLM using the streaming conversation API with JSON response format (Alpha).
1868+
1869+
This method provides a JSON-formatted streaming interface that's compatible with
1870+
common LLM response formats, making it easier to integrate with existing tools
1871+
and frameworks that expect JSON responses.
1872+
1873+
Args:
1874+
name: Name of the LLM component to invoke
1875+
inputs: List of conversation inputs
1876+
context_id: Optional ID for continuing an existing chat
1877+
parameters: Optional custom parameters for the request
1878+
metadata: Optional metadata for the component
1879+
scrub_pii: Optional flag to scrub PII from inputs and outputs
1880+
temperature: Optional temperature setting for the LLM to optimize for creativity or predictability
1881+
1882+
Yields:
1883+
Dict[str, Any]: JSON-formatted conversation response chunks with structure:
1884+
{
1885+
"choices": [
1886+
{
1887+
"delta": {
1888+
"content": "chunk content",
1889+
"role": "assistant"
1890+
},
1891+
"index": 0,
1892+
"finish_reason": None
1893+
}
1894+
],
1895+
"context_id": "optional context ID",
1896+
"usage": {
1897+
"prompt_tokens": 0,
1898+
"completion_tokens": 0,
1899+
"total_tokens": 0
1900+
}
1901+
}
1902+
1903+
Raises:
1904+
DaprGrpcError: If the Dapr runtime returns an error
1905+
"""
1906+
async for chunk in self.converse_stream_alpha1(
1907+
name=name,
1908+
inputs=inputs,
1909+
context_id=context_id,
1910+
parameters=parameters,
1911+
metadata=metadata,
1912+
scrub_pii=scrub_pii,
1913+
temperature=temperature,
1914+
):
1915+
# Transform the chunk to JSON format compatible with common LLM APIs
1916+
chunk_dict = {
1917+
'choices': [],
1918+
'context_id': None,
1919+
'usage': None,
1920+
}
1921+
1922+
# Handle streaming result chunks
1923+
if chunk.result and chunk.result.result:
1924+
chunk_dict['choices'] = [
1925+
{
1926+
'delta': {
1927+
'content': chunk.result.result,
1928+
'role': 'assistant'
1929+
},
1930+
'index': 0,
1931+
'finish_reason': None
1932+
}
1933+
]
1934+
1935+
# Handle context ID
1936+
if chunk.context_id:
1937+
chunk_dict['context_id'] = chunk.context_id
1938+
1939+
# Handle usage information (typically in the final chunk)
1940+
if chunk.usage:
1941+
chunk_dict['usage'] = {
1942+
'prompt_tokens': chunk.usage.prompt_tokens,
1943+
'completion_tokens': chunk.usage.completion_tokens,
1944+
'total_tokens': chunk.usage.total_tokens,
1945+
}
1946+
1947+
yield chunk_dict
1948+
18561949
async def wait(self, timeout_s: float):
18571950
"""Waits for sidecar to be available within the timeout.
18581951

dapr/clients/grpc/client.py

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1855,6 +1855,99 @@ def converse_stream_alpha1(
18551855
except RpcError as err:
18561856
raise DaprGrpcError(err) from err
18571857

1858+
def converse_stream_json(
1859+
self,
1860+
name: str,
1861+
inputs: List[ConversationInput],
1862+
*,
1863+
context_id: Optional[str] = None,
1864+
parameters: Optional[Dict[str, GrpcAny]] = None,
1865+
metadata: Optional[Dict[str, str]] = None,
1866+
scrub_pii: Optional[bool] = None,
1867+
temperature: Optional[float] = None,
1868+
) -> Iterator[Dict[str, Any]]:
1869+
"""Invoke an LLM using the streaming conversation API with JSON response format (Alpha).
1870+
1871+
This method provides a JSON-formatted streaming interface that's compatible with
1872+
common LLM response formats, making it easier to integrate with existing tools
1873+
and frameworks that expect JSON responses.
1874+
1875+
Args:
1876+
name: Name of the LLM component to invoke
1877+
inputs: List of conversation inputs
1878+
context_id: Optional ID for continuing an existing chat
1879+
parameters: Optional custom parameters for the request
1880+
metadata: Optional metadata for the component
1881+
scrub_pii: Optional flag to scrub PII from inputs and outputs
1882+
temperature: Optional temperature setting for the LLM to optimize for creativity or predictability
1883+
1884+
Yields:
1885+
Dict[str, Any]: JSON-formatted conversation response chunks with structure:
1886+
{
1887+
"choices": [
1888+
{
1889+
"delta": {
1890+
"content": "chunk content",
1891+
"role": "assistant"
1892+
},
1893+
"index": 0,
1894+
"finish_reason": None
1895+
}
1896+
],
1897+
"context_id": "optional context ID",
1898+
"usage": {
1899+
"prompt_tokens": 0,
1900+
"completion_tokens": 0,
1901+
"total_tokens": 0
1902+
}
1903+
}
1904+
1905+
Raises:
1906+
DaprGrpcError: If the Dapr runtime returns an error
1907+
"""
1908+
for chunk in self.converse_stream_alpha1(
1909+
name=name,
1910+
inputs=inputs,
1911+
context_id=context_id,
1912+
parameters=parameters,
1913+
metadata=metadata,
1914+
scrub_pii=scrub_pii,
1915+
temperature=temperature,
1916+
):
1917+
# Transform the chunk to JSON format compatible with common LLM APIs
1918+
chunk_dict = {
1919+
'choices': [],
1920+
'context_id': None,
1921+
'usage': None,
1922+
}
1923+
1924+
# Handle streaming result chunks
1925+
if chunk.result and chunk.result.result:
1926+
chunk_dict['choices'] = [
1927+
{
1928+
'delta': {
1929+
'content': chunk.result.result,
1930+
'role': 'assistant'
1931+
},
1932+
'index': 0,
1933+
'finish_reason': None
1934+
}
1935+
]
1936+
1937+
# Handle context ID
1938+
if chunk.context_id:
1939+
chunk_dict['context_id'] = chunk.context_id
1940+
1941+
# Handle usage information (typically in the final chunk)
1942+
if chunk.usage:
1943+
chunk_dict['usage'] = {
1944+
'prompt_tokens': chunk.usage.prompt_tokens,
1945+
'completion_tokens': chunk.usage.completion_tokens,
1946+
'total_tokens': chunk.usage.total_tokens,
1947+
}
1948+
1949+
yield chunk_dict
1950+
18581951
def wait(self, timeout_s: float):
18591952
"""Waits for sidecar to be available within the timeout.
18601953
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
#!/usr/bin/env python3
2+
3+
"""
4+
Example demonstrating the new converse_stream_json API.
5+
6+
This example shows how to use the new JSON-formatted streaming conversation API
7+
that provides responses compatible with common LLM response formats, making it
8+
easier to integrate with existing tools and frameworks.
9+
10+
Prerequisites:
11+
- Dapr sidecar running with conversation components
12+
- Use tools/run_dapr_dev.py to start a development sidecar with echo component
13+
"""
14+
15+
import asyncio
16+
17+
from dapr.aio.clients import DaprClient as AsyncDaprClient
18+
from dapr.clients import DaprClient
19+
from dapr.clients.grpc._request import ConversationInput
20+
21+
22+
def sync_json_streaming_example():
23+
"""Demonstrate synchronous JSON streaming conversation."""
24+
print('🚀 Testing synchronous JSON streaming conversation...')
25+
26+
with DaprClient() as d:
27+
print('✓ Connected to Dapr sidecar')
28+
29+
inputs = [ConversationInput(content='Hello from JSON streaming test!', role='user')]
30+
31+
print('\n📡 Streaming with JSON format...')
32+
for chunk in d.converse_stream_json(
33+
name='echo', inputs=inputs, context_id='json-test-session'
34+
):
35+
print(f'📦 JSON chunk: {chunk}')
36+
37+
# Extract content from the JSON structure
38+
choices = chunk.get('choices', [])
39+
if choices and choices[0].get('delta', {}).get('content'):
40+
content = choices[0]['delta']['content']
41+
print(f' Content: "{content}"')
42+
43+
# Check for context ID
44+
if chunk.get('context_id'):
45+
print(f' Context ID: {chunk["context_id"]}')
46+
47+
# Check for usage information
48+
if chunk.get('usage'):
49+
usage = chunk['usage']
50+
prompt_tokens = usage['prompt_tokens']
51+
completion_tokens = usage['completion_tokens']
52+
total_tokens = usage['total_tokens']
53+
print(f' Usage: {prompt_tokens} + {completion_tokens} = {total_tokens} tokens')
54+
55+
56+
async def async_json_streaming_example():
57+
"""Demonstrate asynchronous JSON streaming conversation."""
58+
print('\n🧪 Testing asynchronous JSON streaming conversation...')
59+
60+
async with AsyncDaprClient() as d:
61+
print('✓ Connected to Dapr sidecar (async)')
62+
63+
inputs = [ConversationInput(content='Hello from async JSON streaming test!', role='user')]
64+
65+
print('\n📡 Async streaming with JSON format...')
66+
async for chunk in d.converse_stream_json(
67+
name='echo', inputs=inputs, context_id='async-json-test-session'
68+
):
69+
print(f'📦 Async JSON chunk: {chunk}')
70+
71+
# Extract content from the JSON structure
72+
choices = chunk.get('choices', [])
73+
if choices and choices[0].get('delta', {}).get('content'):
74+
content = choices[0]['delta']['content']
75+
print(f' Async Content: "{content}"')
76+
77+
# Check for context ID
78+
if chunk.get('context_id'):
79+
print(f' Async Context ID: {chunk["context_id"]}')
80+
81+
# Check for usage information
82+
if chunk.get('usage'):
83+
usage = chunk['usage']
84+
prompt_tokens = usage['prompt_tokens']
85+
completion_tokens = usage['completion_tokens']
86+
total_tokens = usage['total_tokens']
87+
usage_parts = [
88+
f' Async Usage: {prompt_tokens}',
89+
f'{completion_tokens}',
90+
f'{total_tokens} tokens',
91+
]
92+
print(' + '.join(usage_parts[:2]) + ' = ' + usage_parts[2])
93+
94+
95+
def main():
96+
"""Run both sync and async examples."""
97+
try:
98+
# Run synchronous example
99+
sync_json_streaming_example()
100+
101+
# Run asynchronous example
102+
asyncio.run(async_json_streaming_example())
103+
104+
print('\n✅ JSON streaming examples completed successfully!')
105+
json_compat_msg = '\n💡 The JSON format is compatible with common LLM APIs like OpenAI.'
106+
integration_msg = ' This makes it easier to integrate with existing tools and frameworks.'
107+
print(json_compat_msg)
108+
print(integration_msg)
109+
110+
except Exception as e:
111+
print(f'❌ Error: {e}')
112+
print('\n💡 Make sure to start the Dapr sidecar with:')
113+
print(' python tools/run_dapr_dev.py')
114+
115+
116+
if __name__ == '__main__':
117+
main()

0 commit comments

Comments
 (0)