Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 177 additions & 24 deletions studio/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,15 @@
_executions: Dict[str, WorkflowExecution] = {}


def _convert_prompts_to_yaml_format(prompts: List[Dict[str, Any]]) -> List[Dict[str, str]]:
def _convert_prompts_to_yaml_format(prompts: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
Convert prompts from frontend format to SyGra YAML format.

Frontend format: [{"role": "system", "content": "..."}, {"role": "user", "content": "..."}]
SyGra YAML format: [{"system": "..."}, {"user": "..."}]

Multi-modal frontend format: [{"role": "user", "content": [{"type": "text", "text": "..."}, {"type": "audio_url", "audio_url": "..."}]}]
Multi-modal YAML format: [{"user": [{"type": "text", "text": "..."}, {"type": "audio_url", "audio_url": "..."}]}]
"""
if not prompts:
return []
Expand All @@ -71,10 +74,20 @@ def _convert_prompts_to_yaml_format(prompts: List[Dict[str, Any]]) -> List[Dict[
if isinstance(msg, dict):
role = msg.get('role', 'user')
content = msg.get('content', '')
yaml_prompts.append({role: content})
# Handle multi-modal content (array of content parts)
if isinstance(content, list):
# Multi-modal format - preserve as-is
yaml_prompts.append({role: content})
else:
# Simple text format
yaml_prompts.append({role: content})
elif hasattr(msg, 'role') and hasattr(msg, 'content'):
# Handle PromptMessage objects
yaml_prompts.append({msg.role: msg.content})
content = msg.content
if isinstance(content, list):
yaml_prompts.append({msg.role: content})
else:
yaml_prompts.append({msg.role: content})
return yaml_prompts

# Scalable execution storage instance (lazy initialized)
Expand Down Expand Up @@ -1021,12 +1034,79 @@ async def get_workflow_sample_data(workflow_id: str, limit: int = 3, source_inde
}

elif source_type in ("servicenow", "snow"):
# ServiceNow - would need actual connection
return {
"records": [],
"total": 0,
"message": "ServiceNow preview requires authentication. Configure SNOW credentials to preview data."
}
# ServiceNow - use PySNC to fetch actual data
table = source.get("table")
fields = source.get("fields", [])
alias = source.get("alias")

if not table:
return {
"records": [],
"total": 0,
"source_type": "servicenow",
"message": "No table specified for ServiceNow source"
}

try:
from sygra.core.dataset.servicenow_handler import ServiceNowHandler
from sygra.core.dataset.dataset_config import DataSourceConfig as SygraDataSourceConfig

# Build source config for ServiceNowHandler
snow_config = SygraDataSourceConfig(
type="servicenow",
table=table,
fields=fields if fields else None,
filters=source.get("filters"),
limit=limit,
order_by=source.get("order_by"),
order_desc=source.get("order_desc", False),
)

handler = ServiceNowHandler(source_config=snow_config)
raw_records = handler.read()

# Flatten records (ServiceNow returns value/display_value dicts)
records = []
for record in raw_records[:limit]:
flat_record = {}
for key, value in record.items():
if isinstance(value, dict) and "display_value" in value:
flat_record[key] = value.get("display_value") or value.get("value")
else:
flat_record[key] = value
records.append(flat_record)

total_count = len(raw_records)
return {
"records": records,
"total": total_count,
"source_type": "servicenow",
"table": table,
"alias": alias,
"message": None
}

except Exception as snow_err:
# Fall back to showing configured fields if connection fails
error_msg = str(snow_err)
if fields:
placeholder_record = {field: f"<{field}>" for field in fields}
return {
"records": [placeholder_record],
"total": "unknown",
"source_type": "servicenow",
"table": table,
"alias": alias,
"message": f"Could not connect to ServiceNow: {error_msg[:100]}. Showing configured fields."
}
else:
return {
"records": [],
"total": 0,
"source_type": "servicenow",
"table": table,
"message": f"ServiceNow connection failed: {error_msg[:150]}"
}
else:
return {
"records": [],
Expand Down Expand Up @@ -1923,18 +2003,19 @@ async def get_workflow_code(workflow_id: str):
@app.get("/api/workflows/{workflow_id}/node/{node_id}/code/{code_type}")
async def get_node_code(workflow_id: str, node_id: str, code_type: str):
"""
Get the code for a specific node from task_executor.py.
Get the code for a specific node from task_executor.py or from the class path specified in YAML.

Uses AST-based detection to find code blocks by checking base class inheritance.
This is the single source of truth - no markers or metadata copies.
When a class path is specified in the node config (e.g., pre_process or post_process),
it extracts the class name and searches for that specific class.

Args:
workflow_id: The workflow ID
node_id: The node ID
code_type: Type of code ('pre_process', 'post_process', 'lambda', 'branch_condition', 'output_generator', 'data_transform')

Returns:
{ "code": "...", "found": true/false }
{ "code": "...", "found": true/false, "class_path": "..." }
"""
import ast

Expand All @@ -1946,28 +2027,82 @@ async def get_node_code(workflow_id: str, node_id: str, code_type: str):

workflow = _workflows[workflow_id]
workflow_dir = Path(workflow.source_path).parent
task_executor_path = workflow_dir / "task_executor.py"

valid_types = {'pre_process', 'post_process', 'lambda', 'branch_condition', 'output_generator', 'data_transform'}
if code_type not in valid_types:
raise HTTPException(status_code=400, detail=f"Invalid code_type: {code_type}")

if not task_executor_path.exists():
return {"code": "", "found": False, "path": None}
# Get the class path from node config if available
class_path = None
target_class_name = None
target_file_path = None

# Find the node in the workflow to get its config
node_config = None
for n in workflow.nodes:
if n.id == node_id:
node_config = n
break

if node_config:
# Check for class path in node metadata (original_config from YAML)
original_config = node_config.metadata.get("original_config", {}) if node_config.metadata else {}

if code_type == 'pre_process':
class_path = original_config.get("pre_process") or getattr(node_config, "pre_process", None)
elif code_type == 'post_process':
class_path = original_config.get("post_process") or getattr(node_config, "post_process", None)
elif code_type == 'lambda':
class_path = original_config.get("lambda") or original_config.get("function") or getattr(node_config, "function_path", None)

# If we have a class path, extract the class name and determine the file
if class_path:
# Extract class name from path (e.g., "tasks.examples.foo.task_executor.MyClass" -> "MyClass")
parts = class_path.split(".")
target_class_name = parts[-1] if parts else None

# Determine the file path from the module path
# e.g., "tasks.examples.foo.task_executor.MyClass" -> "tasks/examples/foo/task_executor.py"
if len(parts) > 1:
module_parts = parts[:-1] # Everything except the class name
relative_path = "/".join(module_parts) + ".py"

# Try to find the file relative to the project root
# First try: relative to workflow directory's parent (tasks folder level)
potential_paths = [
workflow_dir / relative_path,
workflow_dir.parent / relative_path,
workflow_dir.parent.parent / relative_path,
Path.cwd() / relative_path,
]

for potential_path in potential_paths:
if potential_path.exists():
target_file_path = potential_path
break

# Default to task_executor.py in workflow directory
if not target_file_path:
target_file_path = workflow_dir / "task_executor.py"

if not target_file_path.exists():
return {"code": "", "found": False, "path": None, "class_path": class_path}

try:
with open(task_executor_path, 'r') as f:
with open(target_file_path, 'r') as f:
content = f.read()
except Exception as e:
return {"code": "", "found": False, "error": str(e)}
return {"code": "", "found": False, "error": str(e), "class_path": class_path}

# Find the code block using AST
code = _get_node_code_from_file(content, node_id, code_type)
# If we have a specific class name from the config, search for it directly
code = _get_node_code_from_file(content, node_id, code_type, target_class_name)

return {
"code": code if code else "",
"found": code is not None,
"path": str(task_executor_path.resolve())
"path": str(target_file_path.resolve()),
"class_path": class_path
}

@app.put("/api/workflows/{workflow_id}/yaml")
Expand Down Expand Up @@ -6116,10 +6251,16 @@ def _remove_code_block_from_file(content: str, node_id: str, code_type: str) ->
return result


def _get_node_code_from_file(content: str, node_id: str, code_type: str) -> Optional[str]:
def _get_node_code_from_file(content: str, node_id: str, code_type: str, target_class_name: Optional[str] = None) -> Optional[str]:
"""
Extract the code for a specific node from file content using AST.

Args:
content: The file content to parse
node_id: The node ID (used for fallback matching)
code_type: Type of code ('pre_process', 'post_process', etc.)
target_class_name: Optional specific class name to search for (from YAML config)

Returns the code string if found, None otherwise.
"""
import ast
Expand All @@ -6145,7 +6286,7 @@ def _get_node_code_from_file(content: str, node_id: str, code_type: str) -> Opti
'branch_condition': 'Condition',
}

# Normalize node_id for comparison
# Normalize node_id for comparison (fallback matching)
safe_node_id = re.sub(r'[^a-zA-Z0-9_]', '', node_id.replace('-', '_').replace(' ', '_'))
expected_suffix = SUFFIX_MAP.get(code_type, '')
expected_name = f"{safe_node_id}{expected_suffix}"
Expand Down Expand Up @@ -6175,10 +6316,22 @@ def _get_node_code_from_file(content: str, node_id: str, code_type: str) -> Opti
detected_type = BASE_CLASS_TO_TYPE[base_name]
break

# Match if type and node_id match
# Match logic:
# 1. If target_class_name is provided, match exact class name with correct base class
# 2. Otherwise, fall back to node_id-based matching
if detected_type == code_type:
normalized_class_id = re.sub(r'[^a-zA-Z0-9_]', '', class_safe_id.replace('-', '_'))
if normalized_class_id == safe_node_id or class_name == expected_name:
match_found = False

# Priority 1: Match by specific class name from YAML config
if target_class_name and class_name == target_class_name:
match_found = True
# Priority 2: Fall back to node_id-based matching
elif not target_class_name:
normalized_class_id = re.sub(r'[^a-zA-Z0-9_]', '', class_safe_id.replace('-', '_'))
if normalized_class_id == safe_node_id or class_name == expected_name:
match_found = True

if match_found:
start_line = node.lineno - 1 # 0-indexed
end_line = node.end_lineno if hasattr(node, 'end_lineno') else start_line + 1

Expand Down
35 changes: 28 additions & 7 deletions studio/converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,14 @@ def _generate_llm_script(self, node: Any) -> str:
prompts = []
if node.prompt:
for msg in node.prompt:
prompts.append(f'{{"role": "{msg.role}", "content": """{msg.content}"""}}')
# Handle both simple string and multi-modal content
if isinstance(msg.content, str):
prompts.append(f'{{"role": "{msg.role}", "content": """{msg.content}"""}}')
elif isinstance(msg.content, list):
# Multi-modal content - serialize as JSON
import json
content_json = json.dumps(msg.content)
prompts.append(f'{{"role": "{msg.role}", "content": {content_json}}}')

prompt_list = ",\n ".join(prompts)

Expand Down Expand Up @@ -342,12 +349,26 @@ def _build_input_transforms(self, node: Any) -> Dict[str, Any]:
pattern = r"(?<!\{)\{([^{}]+)\}(?!\})"

for msg in node.prompt:
matches = re.findall(pattern, msg.content)
for var in matches:
transforms[var] = {
"type": "javascript",
"expr": f"flow_input.{var}",
}
# Handle both simple string and multi-modal content
if isinstance(msg.content, str):
matches = re.findall(pattern, msg.content)
for var in matches:
transforms[var] = {
"type": "javascript",
"expr": f"flow_input.{var}",
}
elif isinstance(msg.content, list):
# Multi-modal content - extract variables from each part
for part in msg.content:
if isinstance(part, dict):
for key, value in part.items():
if isinstance(value, str):
matches = re.findall(pattern, value)
for var in matches:
transforms[var] = {
"type": "javascript",
"expr": f"flow_input.{var}",
}

return transforms

Expand Down
Loading