Skip to content

Commit 15c014c

Browse files
authored
Fix quickstarts againn (#224)
* feat: wip on orchestrator state fixing + tracing Signed-off-by: Samantha Coyle <[email protected]> * fix: separate ex/in-ternal triggers + wip fix orchestrators Signed-off-by: Samantha Coyle <[email protected]> * fix: ensure progress on substeps/steps Signed-off-by: Samantha Coyle <[email protected]> * fix: give orchestrators ability to pick up where they left off using same session id Signed-off-by: Samantha Coyle <[email protected]> * style: make linter happy Signed-off-by: Samantha Coyle <[email protected]> * fix: rm extra edge check since captured elsewhere Signed-off-by: Samantha Coyle <[email protected]> * feat: add session context for long term context on durable agents Signed-off-by: Samantha Coyle <[email protected]> * fix: updates for all quickstarts for release Signed-off-by: Samantha Coyle <[email protected]> * docs: update docs to remind users to fill in api key Signed-off-by: Samantha Coyle <[email protected]> --------- Signed-off-by: Samantha Coyle <[email protected]>
1 parent 5a17523 commit 15c014c

File tree

32 files changed

+690
-79
lines changed

32 files changed

+690
-79
lines changed

dapr_agents/llm/dapr/chat.py

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -126,11 +126,42 @@ def translate_response(self, response: dict, model: str) -> dict:
126126
"""
127127
Convert Dapr Alpha2 response into OpenAI-style ChatCompletion dict.
128128
"""
129+
if not isinstance(response, dict):
130+
logger.error(f"Invalid response type: {type(response)}")
131+
raise ValueError(f"Response must be a dictionary, got {type(response)}")
132+
129133
# Flatten all output choices from Alpha2 envelope
130134
choices: List[Dict[str, Any]] = []
131-
for output in response.get("outputs", []) or []:
132-
for choice in output.get("choices", []) or []:
135+
outputs = response.get("outputs", []) or []
136+
if not isinstance(outputs, list):
137+
logger.error(f"Invalid outputs type: {type(outputs)}")
138+
raise ValueError(f"Outputs must be a list, got {type(outputs)}")
139+
140+
for output in outputs:
141+
if not isinstance(output, dict):
142+
logger.error(f"Invalid output type: {type(output)}")
143+
continue
144+
output_choices = output.get("choices", []) or []
145+
if not isinstance(output_choices, list):
146+
logger.error(f"Invalid choices type: {type(output_choices)}")
147+
continue
148+
for choice in output_choices:
149+
if not isinstance(choice, dict):
150+
logger.error(f"Invalid choice type: {type(choice)}")
151+
continue
152+
# Ensure message is present and has required fields
153+
message = choice.get("message", {})
154+
if not isinstance(message, dict):
155+
logger.error(f"Invalid message type: {type(message)}")
156+
continue
157+
# Add required fields if missing
158+
if "content" not in message:
159+
message["content"] = ""
160+
if "role" not in message:
161+
message["role"] = "assistant"
162+
choice["message"] = message
133163
choices.append(choice)
164+
134165
return {
135166
"choices": choices,
136167
"created": int(time.time()),
@@ -336,14 +367,28 @@ def generate(
336367
if not llm_component:
337368
llm_component = _get_llm_component(metadata)
338369

370+
# Extract and serialize response format parameters
371+
api_params = {}
372+
if "response_format" in params:
373+
try:
374+
import json
375+
376+
api_params["response_format"] = json.dumps(
377+
params["response_format"]
378+
)
379+
except Exception as e:
380+
logger.warning(f"Failed to serialize response_format: {e}")
381+
if "structured_mode" in params:
382+
api_params["structured_mode"] = str(params["structured_mode"])
383+
339384
raw = self.client.chat_completion_alpha2(
340385
llm=llm_component or self._llm_component,
341386
inputs=conv_inputs,
342387
scrub_pii=scrubPII,
343388
temperature=temperature,
344389
tools=params.get("tools"),
345390
tool_choice=params.get("tool_choice"),
346-
parameters=params.get("parameters"),
391+
parameters=api_params or None,
347392
)
348393
normalized = self.translate_response(
349394
raw, llm_component or self._llm_component

dapr_agents/llm/dapr/utils.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,17 @@ def process_dapr_chat_response(response: Dict[str, Any]) -> LLMChatResponse:
5151
None # there is no openai "function_call" in dapr only tool calls
5252
)
5353

54+
content = msg.get("content")
55+
if isinstance(content, dict):
56+
try:
57+
import json
58+
59+
content = json.dumps(content)
60+
except Exception as e:
61+
logger.warning(f"Failed to serialize dictionary content: {e}")
62+
5463
assistant_message = AssistantMessage(
55-
content=msg.get("content"),
64+
content=content,
5665
tool_calls=tool_calls,
5766
function_call=function_call,
5867
)

dapr_agents/llm/utils/request.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,18 @@ def process_params(
128128

129129
if response_format:
130130
logger.info(f"Structured Mode Activated! Mode={structured_mode}.")
131+
# Add system message for JSON formatting
132+
# This is necessary for the response formatting of the data to work correctly when a user has a function call response format.
133+
inputs = params.get("inputs", [])
134+
inputs.insert(
135+
0,
136+
{
137+
"role": "system",
138+
"content": "You must format your response as a valid JSON object matching the provided schema. Do not include any explanatory text or markdown formatting.",
139+
},
140+
)
141+
params["inputs"] = inputs
142+
131143
params = StructureHandler.generate_request(
132144
response_format=response_format,
133145
llm_provider=llm_provider,

dapr_agents/llm/utils/structure.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,17 @@ def extract_structured_response(
259259
if not content:
260260
raise StructureError("No content found for JSON mode.")
261261

262-
logger.debug(f"Extracted JSON content: {content}")
262+
# Try to parse content as JSON first
263+
try:
264+
if isinstance(content, str):
265+
parsed = json.loads(content)
266+
logger.debug(f"Successfully parsed JSON content: {parsed}")
267+
return parsed
268+
except json.JSONDecodeError:
269+
pass
270+
271+
# If parsing fails or content is not a string, return as is
272+
logger.debug(f"Returning raw content: {content}")
263273
return content
264274

265275
else:

dapr_agents/observability/instrumentor.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -471,9 +471,19 @@ def _apply_workflow_wrappers(self) -> None:
471471
wrapper=WorkflowMonitorWrapper(self._tracer),
472472
)
473473

474-
# Note: WorkflowRunWrapper removed to prevent double wrapping
475-
# run_and_monitor_workflow_async internally calls run_workflow
476-
# So wrapping both causes duplicate instances
474+
# This is necessary to create the parent workflow span for the 09 quickstart...
475+
wrap_function_wrapper(
476+
module="dapr_agents.workflow.base",
477+
name="WorkflowApp.run_workflow",
478+
wrapper=WorkflowRunWrapper(self._tracer),
479+
)
480+
481+
# Instrument workflow registration to add AGENT spans for orchestrator workflows
482+
wrap_function_wrapper(
483+
module="dapr_agents.workflow.base",
484+
name="WorkflowApp._register_workflows",
485+
wrapper=WorkflowRegistrationWrapper(self._tracer),
486+
)
477487

478488
# Instrument workflow registration to add AGENT spans for orchestrator workflows
479489
wrap_function_wrapper(

dapr_agents/observability/wrappers/workflow.py

Lines changed: 35 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -85,11 +85,23 @@ def __call__(self, wrapped: Any, instance: Any, args: Any, kwargs: Any) -> Any:
8585
workflow = arguments.get("workflow")
8686

8787
# Extract workflow name
88-
workflow_name = (
89-
workflow
90-
if isinstance(workflow, str)
91-
else getattr(instance, "_workflow_name", "unknown_workflow")
92-
)
88+
if isinstance(workflow, str):
89+
workflow_name = workflow
90+
else:
91+
# Try to get the name from the workflow function/class
92+
workflow_name = None
93+
# First try the workflow decorator name
94+
if hasattr(workflow, "_workflow_name"):
95+
workflow_name = workflow._workflow_name
96+
# Then try __name__ for function workflows
97+
if not workflow_name and hasattr(workflow, "__name__"):
98+
workflow_name = workflow.__name__
99+
# Finally try the name attribute
100+
if not workflow_name and hasattr(workflow, "name"):
101+
workflow_name = workflow.name
102+
# Fallback
103+
if not workflow_name:
104+
workflow_name = "AgenticWorkflow"
93105
logger.debug(f"Extracted workflow_name: {workflow_name}")
94106

95107
# Build span attributes
@@ -316,12 +328,24 @@ def _extract_workflow_name(self, args: Any, kwargs: Any) -> str:
316328
else:
317329
workflow = kwargs.get("workflow")
318330

319-
# Extract workflow name
320-
workflow_name = (
321-
workflow
322-
if isinstance(workflow, str)
323-
else getattr(workflow, "__name__", "AgenticWorkflow")
324-
)
331+
# Extract workflow name with better fallback chain
332+
if isinstance(workflow, str):
333+
workflow_name = workflow
334+
else:
335+
# Try to get the name from the workflow function/class
336+
workflow_name = None
337+
# First try the workflow decorator name
338+
if hasattr(workflow, "_workflow_name"):
339+
workflow_name = workflow._workflow_name
340+
# Then try __name__ for function workflows
341+
if not workflow_name and hasattr(workflow, "__name__"):
342+
workflow_name = workflow.__name__
343+
# Finally try the name attribute
344+
if not workflow_name and hasattr(workflow, "name"):
345+
workflow_name = workflow.name
346+
# Fallback
347+
if not workflow_name:
348+
workflow_name = "AgenticWorkflow"
325349
return workflow_name
326350

327351
def _build_workflow_attributes(

dapr_agents/observability/wrappers/workflow_task.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -591,10 +591,16 @@ async def async_wrapper(instance, *wrapper_args, **wrapper_kwargs):
591591
logger.warning(
592592
f"No parent context available for {span_name}, executing without span"
593593
)
594-
bound_method = wrapped.__get__(instance, type(instance))
595-
result = await bound_method(*wrapper_args, **wrapper_kwargs)
596-
597-
return result
594+
try:
595+
bound_method = wrapped.__get__(instance, type(instance))
596+
result = await bound_method(*wrapper_args, **wrapper_kwargs)
597+
return result
598+
except Exception as e:
599+
logger.error(
600+
f"Error in async workflow task execution (no span): {e}",
601+
exc_info=True,
602+
)
603+
raise
598604

599605
return async_wrapper(instance, *args, **kwargs)
600606

dapr_agents/prompt/utils/chat.py

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,21 +80,41 @@ def validate_and_create_message(
8080
elif isinstance(item, BaseMessage):
8181
normalized_messages.append(item)
8282
elif isinstance(item, dict):
83+
# For tool messages and assistant messages with tool calls,
84+
# preserve all fields by passing the entire dict as message_data
8385
role = item.get("role", "user")
8486
content = item.get("content", "")
85-
normalized_messages.append(
86-
validate_and_create_message(role, content, item)
87-
)
87+
if role == "tool" or (
88+
role == "assistant" and item.get("tool_calls")
89+
):
90+
normalized_messages.append(
91+
validate_and_create_message(role, content, item)
92+
)
93+
else:
94+
# For other messages, create a new message with just role and content
95+
normalized_messages.append(
96+
validate_and_create_message(role, content, {})
97+
)
8898
else:
8999
raise ValueError(
90100
f"Unsupported type in list for variable: {type(item)}"
91101
)
92102
elif isinstance(variable_value, dict):
93103
role = variable_value.get("role", "user")
94104
content = variable_value.get("content", "")
95-
normalized_messages.append(
96-
validate_and_create_message(role, content, variable_value)
97-
)
105+
if role == "tool" or (
106+
role == "assistant" and variable_value.get("tool_calls")
107+
):
108+
# For tool messages and assistant messages with tool calls,
109+
# preserve all fields by passing the entire dict as message_data
110+
normalized_messages.append(
111+
validate_and_create_message(role, content, variable_value)
112+
)
113+
else:
114+
# For other messages, create a new message with just role and content
115+
normalized_messages.append(
116+
validate_and_create_message(role, content, {})
117+
)
98118
else:
99119
raise ValueError(f"Unsupported type for variable: {type(variable_value)}")
100120

dapr_agents/utils/signal_mixin.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,6 @@ def _handle_shutdown_signal(self, sig: int) -> None:
9999
except Exception as e:
100100
logger.debug(f"Error in graceful shutdown: {e}")
101101

102-
import sys
103-
104-
sys.exit(0)
105-
106102
async def graceful_shutdown(self) -> None:
107103
"""
108104
Perform graceful shutdown operations.

quickstarts/01-hello-world-observability/requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,5 @@ dapr-agents[observability]
22
opentelemetry-sdk>=1.35.0,<2.0.0
33
opentelemetry-exporter-zipkin-json==1.25.0
44
# For local development use local changes by commenting out the dapr-agents line above and uncommenting the line below:
5-
# -e ../../
5+
# -e ../../[observability]
66

0 commit comments

Comments
 (0)