11import os
2- from typing import Any
2+ from typing import Any , Dict , Optional , Union
33
44from codegen_api_client .api .agents_api import AgentsApi
55from codegen_api_client .api_client import ApiClient
66from codegen_api_client .configuration import Configuration
77from codegen_api_client .models .agent_run_response import AgentRunResponse
88from codegen_api_client .models .create_agent_run_input import CreateAgentRunInput
9+ from codegen_api_client .rest import ApiException
910
1011from codegen .agents .constants import CODEGEN_BASE_API_URL
12+ from codegen .exceptions import handle_api_error
1113
1214
1315class AgentTask :
@@ -23,34 +25,69 @@ def __init__(self, task_data: AgentRunResponse, api_client: ApiClient, org_id: i
2325 self ._agents_api = AgentsApi (api_client )
2426
2527 def refresh (self ) -> None :
26- """Refresh the job status from the API."""
28+ """Refresh the job status from the API.
29+
30+ Raises:
31+ CodegenError: If the API request fails
32+ """
2733 if self .id is None :
2834 return
2935
30- job_data = self ._agents_api .get_agent_run_v1_organizations_org_id_agent_run_agent_run_id_get (
31- agent_run_id = int (self .id ), org_id = int (self .org_id ), authorization = f"Bearer { self ._api_client .configuration .access_token } "
32- )
33-
34- # Convert API response to dict for attribute access
35- job_dict = {}
36- if hasattr (job_data , "__dict__" ):
37- job_dict = job_data .__dict__
38- elif isinstance (job_data , dict ):
39- job_dict = job_data
40-
41- self .status = job_dict .get ("status" )
42- self .result = job_dict .get ("result" )
36+ try :
37+ job_data = self ._agents_api .get_agent_run_v1_organizations_org_id_agent_run_agent_run_id_get (
38+ agent_run_id = int (self .id ), org_id = int (self .org_id ), authorization = f"Bearer { self ._api_client .configuration .access_token } "
39+ )
40+
41+ # Convert API response to dict for attribute access
42+ job_dict = {}
43+ if hasattr (job_data , "__dict__" ):
44+ job_dict = job_data .__dict__
45+ elif isinstance (job_data , dict ):
46+ job_dict = job_data
47+
48+ self .status = job_dict .get ("status" )
49+ self .result = job_dict .get ("result" )
50+ except ApiException as e :
51+ error = handle_api_error (e .status , str (e ), getattr (e , 'body' , None ))
52+ raise error from e
53+
54+ def is_completed (self ) -> bool :
55+ """Check if the agent task is completed."""
56+ return self .status in ['completed' , 'failed' , 'cancelled' ]
57+
58+ def is_running (self ) -> bool :
59+ """Check if the agent task is currently running."""
60+ return self .status in ['running' , 'pending' ]
61+
62+ def is_successful (self ) -> bool :
63+ """Check if the agent task completed successfully."""
64+ return self .status == 'completed'
65+
66+ def is_failed (self ) -> bool :
67+ """Check if the agent task failed."""
68+ return self .status == 'failed'
69+
70+ def to_dict (self ) -> Dict [str , Any ]:
71+ """Convert agent task to dictionary."""
72+ return {
73+ 'id' : self .id ,
74+ 'org_id' : self .org_id ,
75+ 'status' : self .status ,
76+ 'result' : self .result ,
77+ 'web_url' : self .web_url
78+ }
4379
4480
4581class Agent :
4682 """API client for interacting with Codegen AI agents."""
4783
48- def __init__ (self , token : str , org_id : int | None = None , base_url : str | None = CODEGEN_BASE_API_URL ):
84+ def __init__ (self , token : str , org_id : Optional [ Union [ int , str ]] = None , base_url : str = CODEGEN_BASE_API_URL ):
4985 """Initialize a new Agent client.
5086
5187 Args:
5288 token: API authentication token
5389 org_id: Optional organization ID. If not provided, default org will be used.
90+ base_url: Base URL for the API (defaults to production)
5491 """
5592 self .token = token
5693 self .org_id = org_id or int (os .environ .get ("CODEGEN_ORG_ID" , "1" )) # Default to org ID 1 if not specified
@@ -61,7 +98,7 @@ def __init__(self, token: str, org_id: int | None = None, base_url: str | None =
6198 self .agents_api = AgentsApi (self .api_client )
6299
63100 # Current job
64- self .current_job : AgentTask | None = None
101+ self .current_job : Optional [ AgentTask ] = None
65102
66103 def run (self , prompt : str ) -> AgentTask :
67104 """Run an agent with the given prompt.
@@ -70,26 +107,58 @@ def run(self, prompt: str) -> AgentTask:
70107 prompt: The instruction for the agent to execute
71108
72109 Returns:
73- Job: A job object representing the agent run
110+ AgentTask: A task object representing the agent run
111+
112+ Raises:
113+ CodegenError: If the API request fails
74114 """
75- run_input = CreateAgentRunInput (prompt = prompt )
76- agent_run_response = self .agents_api .create_agent_run_v1_organizations_org_id_agent_run_post (
77- org_id = int (self .org_id ), create_agent_run_input = run_input , authorization = f"Bearer { self .token } " , _headers = {"Content-Type" : "application/json" }
78- )
79- # Convert API response to dict for Job initialization
80-
81- job = AgentTask (agent_run_response , self .api_client , self .org_id )
82- self .current_job = job
83- return job
84-
85- def get_status (self ) -> dict [str , Any ] | None :
115+ try :
116+ run_input = CreateAgentRunInput (prompt = prompt )
117+ agent_run_response = self .agents_api .create_agent_run_v1_organizations_org_id_agent_run_post (
118+ org_id = int (self .org_id ), create_agent_run_input = run_input , authorization = f"Bearer { self .token } " , _headers = {"Content-Type" : "application/json" }
119+ )
120+
121+ job = AgentTask (agent_run_response , self .api_client , self .org_id )
122+ self .current_job = job
123+ return job
124+ except ApiException as e :
125+ error = handle_api_error (e .status , str (e ), getattr (e , 'body' , None ))
126+ raise error from e
127+
128+ def get_status (self ) -> Optional [Dict [str , Any ]]:
86129 """Get the status of the current job.
87130
88131 Returns:
89132 dict: A dictionary containing job status information,
90133 or None if no job has been run.
134+
135+ Raises:
136+ CodegenError: If the API request fails
91137 """
92138 if self .current_job :
93139 self .current_job .refresh ()
94140 return {"id" : self .current_job .id , "status" : self .current_job .status , "result" : self .current_job .result , "web_url" : self .current_job .web_url }
95141 return None
142+
143+ def get_task (self , task_id : Union [int , str ]) -> AgentTask :
144+ """Get a specific agent task by ID.
145+
146+ Args:
147+ task_id: Agent task ID to retrieve
148+
149+ Returns:
150+ AgentTask: The requested agent task
151+
152+ Raises:
153+ CodegenError: If the API request fails or task is not found
154+ """
155+ try :
156+ response = self .agents_api .get_agent_run_v1_organizations_org_id_agent_run_agent_run_id_get (
157+ org_id = int (self .org_id ),
158+ agent_run_id = int (task_id ),
159+ authorization = f"Bearer { self .token } "
160+ )
161+ return AgentTask (response , self .api_client , self .org_id )
162+ except ApiException as e :
163+ error = handle_api_error (e .status , str (e ), getattr (e , 'body' , None ))
164+ raise error from e
0 commit comments