11import asyncio
2- import base64
32import inspect
4- import json
5- import os
63from collections .abc import AsyncGenerator , Awaitable , Callable
74from contextlib import asynccontextmanager
85from typing import Any
96
10- import httpx
117import uvicorn
12- from agentex .lib .adk .utils ._modules .client import create_async_agentex_client
138from fastapi import FastAPI , Request
149from fastapi .responses import StreamingResponse
1510from pydantic import TypeAdapter , ValidationError
3025from agentex .types .task_message_content import TaskMessageContent
3126from agentex .lib .utils .logging import make_logger
3227from agentex .lib .utils .model_utils import BaseModel
28+ from agentex .lib .utils .registration import register_agent
3329
3430logger = make_logger (__name__ )
3531
@@ -74,7 +70,7 @@ def get_lifespan_function(self):
7470 async def lifespan_context (app : FastAPI ):
7571 env_vars = EnvironmentVariables .refresh ()
7672 if env_vars .AGENTEX_BASE_URL :
77- await self . _register_agent (env_vars )
73+ await register_agent (env_vars )
7874 else :
7975 logger .warning ("AGENTEX_BASE_URL not set, skipping agent registration" )
8076
@@ -101,6 +97,16 @@ async def _handle_jsonrpc(self, request: Request):
10197 data = await request .json ()
10298 rpc_request = JSONRPCRequest (** data )
10399
100+ # Check if the request is authenticated
101+ if refreshed_environment_variables and getattr (refreshed_environment_variables , "AGENT_API_KEY" , None ):
102+ authorization_header = request .headers .get ("x-agent-api-key" )
103+ if authorization_header != refreshed_environment_variables .AGENT_API_KEY :
104+ return JSONRPCResponse (
105+ id = rpc_request .id ,
106+ error = JSONRPCError (code = - 32601 , message = "Unauthorized" ),
107+ )
108+
109+
104110 # Check if method is valid first
105111 try :
106112 method = RPCMethod (rpc_request .method )
@@ -345,87 +351,4 @@ def run(self, host: str = "0.0.0.0", port: int = 8000, **kwargs):
345351 """Start the Uvicorn server for async handlers."""
346352 uvicorn .run (self , host = host , port = port , ** kwargs )
347353
348- def _get_auth_principal (self , env_vars : EnvironmentVariables ):
349- if not env_vars .AUTH_PRINCIPAL_B64 :
350- return None
351-
352- try :
353- decoded_str = base64 .b64decode (env_vars .AUTH_PRINCIPAL_B64 ).decode ('utf-8' )
354- return json .loads (decoded_str )
355- except Exception :
356- return None
357-
358- async def _register_agent (self , env_vars : EnvironmentVariables ):
359- """Register this agent with the Agentex server"""
360- # Build the agent's own URL
361- full_acp_url = f"{ env_vars .ACP_URL .rstrip ('/' )} :{ env_vars .ACP_PORT } "
362-
363- description = (
364- env_vars .AGENT_DESCRIPTION
365- or f"Generic description for agent: { env_vars .AGENT_NAME } "
366- )
367-
368- # Prepare registration data
369- registration_data = {
370- "name" : env_vars .AGENT_NAME ,
371- "description" : description ,
372- "acp_url" : full_acp_url ,
373- "acp_type" : env_vars .ACP_TYPE ,
374- "principal_context" : self ._get_auth_principal (env_vars )
375- }
376-
377- if env_vars .AGENT_ID :
378- registration_data ["agent_id" ] = env_vars .AGENT_ID
379-
380- # Make the registration request
381- registration_url = f"{ env_vars .AGENTEX_BASE_URL .rstrip ('/' )} /agents/register"
382- # Retry logic with configurable attempts and delay
383- max_retries = 3
384- base_delay = 5 # seconds
385- last_exception = None
386-
387- attempt = 0
388- while attempt < max_retries :
389- try :
390- async with httpx .AsyncClient () as client :
391- response = await client .post (
392- registration_url , json = registration_data , timeout = 30.0
393- )
394- if response .status_code == 200 :
395- agent = response .json ()
396- agent_id , agent_name = agent ["id" ], agent ["name" ]
397-
398- os .environ ["AGENT_ID" ] = agent_id
399- os .environ ["AGENT_NAME" ] = agent_name
400- env_vars .AGENT_ID = agent_id
401- env_vars .AGENT_NAME = agent_name
402- global refreshed_environment_variables
403- refreshed_environment_variables = env_vars
404- logger .info (
405- f"Successfully registered agent '{ env_vars .AGENT_NAME } ' with Agentex server with acp_url: { full_acp_url } . Registration data: { registration_data } "
406- )
407- return # Success, exit the retry loop
408- else :
409- error_msg = f"Failed to register agent. Status: { response .status_code } , Response: { response .text } "
410- logger .error (error_msg )
411- last_exception = Exception (
412- f"Failed to startup agent: { response .text } "
413- )
414-
415- except Exception as e :
416- logger .error (
417- f"Exception during agent registration attempt { attempt + 1 } : { e } "
418- )
419- last_exception = e
420- attempt += 1
421- if attempt < max_retries :
422- delay = (attempt ) * base_delay # 5, 10, 15 seconds
423- logger .info (
424- f"Retrying in { delay } seconds... (attempt { attempt } /{ max_retries } )"
425- )
426- await asyncio .sleep (delay )
427-
428- # If we get here, all retries failed
429- raise last_exception or Exception (
430- f"Failed to register agent after { max_retries } attempts"
431- )
354+
0 commit comments