Skip to content

Commit 948f7d1

Browse files
authored
Use Agent API key for auth (#34)
* Update base_acp_server.py * use agent api key * . * check against server hash * update * Update uv.lock * . * add agent registration to temporal worker * Update acp.py * Update acp.py * masking api key for logging * Update base_acp_server.py * x-agent-api-key * Update worker.py
1 parent df8429c commit 948f7d1

File tree

5 files changed

+139
-95
lines changed

5 files changed

+139
-95
lines changed

src/agentex/lib/adk/utils/_modules/client.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,18 @@
88

99

1010
class EnvAuth(httpx.Auth):
11-
def __init__(self, header_name="x-agent-identity"):
11+
def __init__(self, header_name="x-agent-api-key"):
1212
self.header_name = header_name
1313

1414
def auth_flow(self, request):
1515
# This gets called for every request
1616
env_vars = EnvironmentVariables.refresh()
1717
if env_vars:
18-
agent_id = env_vars.AGENT_ID
19-
if agent_id:
20-
request.headers[self.header_name] = agent_id
21-
logger.info(f"Adding header {self.header_name}:{agent_id}")
18+
agent_api_key = env_vars.AGENT_API_KEY
19+
if agent_api_key:
20+
request.headers[self.header_name] = agent_api_key
21+
masked_key = agent_api_key[-4:] if agent_api_key and len(agent_api_key) > 4 else "****"
22+
logger.info(f"Adding header {self.header_name}:{masked_key}")
2223
yield request
2324

2425

src/agentex/lib/core/temporal/workers/worker.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
)
2525

2626
from agentex.lib.utils.logging import make_logger
27+
from agentex.lib.utils.registration import register_agent
28+
from agentex.lib.environment_variables import EnvironmentVariables
2729

2830
logger = make_logger(__name__)
2931

@@ -103,6 +105,7 @@ async def run(
103105
workflow: type,
104106
):
105107
await self.start_health_check_server()
108+
await self._register_agent()
106109
temporal_client = await get_temporal_client(
107110
temporal_address=os.environ.get("TEMPORAL_ADDRESS", "localhost:7233"),
108111
)
@@ -160,3 +163,17 @@ async def start_health_check_server(self):
160163
f"Failed to start health check server on alternative port {alt_port}: {e}"
161164
)
162165
raise
166+
167+
"""
168+
Register the worker with the Agentex server.
169+
170+
Even though the Temporal server will also register the agent with the server,
171+
doing this on the worker side is required to make sure that both share the API key
172+
which is returned on registration and used to authenticate the worker with the Agentex server.
173+
"""
174+
async def _register_agent(self):
175+
env_vars = EnvironmentVariables.refresh()
176+
if env_vars and env_vars.AGENTEX_BASE_URL:
177+
await register_agent(env_vars)
178+
else:
179+
logger.warning("AGENTEX_BASE_URL not set, skipping worker registration")

src/agentex/lib/environment_variables.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ class EnvVarKeys(str, Enum):
2323
AGENT_NAME = "AGENT_NAME"
2424
AGENT_DESCRIPTION = "AGENT_DESCRIPTION"
2525
AGENT_ID = "AGENT_ID"
26+
AGENT_API_KEY = "AGENT_API_KEY"
2627
# ACP Configuration
2728
ACP_URL = "ACP_URL"
2829
ACP_PORT = "ACP_PORT"
@@ -52,6 +53,7 @@ class EnvironmentVariables(BaseModel):
5253
AGENT_NAME: str
5354
AGENT_DESCRIPTION: str | None = None
5455
AGENT_ID: str | None = None
56+
AGENT_API_KEY: str | None = None
5557
ACP_TYPE: str | None = "agentic"
5658
# ACP Configuration
5759
ACP_URL: str

src/agentex/lib/sdk/fastacp/base/base_acp_server.py

Lines changed: 13 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,10 @@
11
import asyncio
2-
import base64
32
import inspect
4-
import json
5-
import os
63
from collections.abc import AsyncGenerator, Awaitable, Callable
74
from contextlib import asynccontextmanager
85
from typing import Any
96

10-
import httpx
117
import uvicorn
12-
from agentex.lib.adk.utils._modules.client import create_async_agentex_client
138
from fastapi import FastAPI, Request
149
from fastapi.responses import StreamingResponse
1510
from pydantic import TypeAdapter, ValidationError
@@ -30,6 +25,7 @@
3025
from agentex.types.task_message_content import TaskMessageContent
3126
from agentex.lib.utils.logging import make_logger
3227
from agentex.lib.utils.model_utils import BaseModel
28+
from agentex.lib.utils.registration import register_agent
3329

3430
logger = make_logger(__name__)
3531

@@ -74,7 +70,7 @@ def get_lifespan_function(self):
7470
async def lifespan_context(app: FastAPI):
7571
env_vars = EnvironmentVariables.refresh()
7672
if env_vars.AGENTEX_BASE_URL:
77-
await self._register_agent(env_vars)
73+
await register_agent(env_vars)
7874
else:
7975
logger.warning("AGENTEX_BASE_URL not set, skipping agent registration")
8076

@@ -101,6 +97,16 @@ async def _handle_jsonrpc(self, request: Request):
10197
data = await request.json()
10298
rpc_request = JSONRPCRequest(**data)
10399

100+
# Check if the request is authenticated
101+
if refreshed_environment_variables and getattr(refreshed_environment_variables, "AGENT_API_KEY", None):
102+
authorization_header = request.headers.get("x-agent-api-key")
103+
if authorization_header != refreshed_environment_variables.AGENT_API_KEY:
104+
return JSONRPCResponse(
105+
id=rpc_request.id,
106+
error=JSONRPCError(code=-32601, message="Unauthorized"),
107+
)
108+
109+
104110
# Check if method is valid first
105111
try:
106112
method = RPCMethod(rpc_request.method)
@@ -345,87 +351,4 @@ def run(self, host: str = "0.0.0.0", port: int = 8000, **kwargs):
345351
"""Start the Uvicorn server for async handlers."""
346352
uvicorn.run(self, host=host, port=port, **kwargs)
347353

348-
def _get_auth_principal(self, env_vars: EnvironmentVariables):
349-
if not env_vars.AUTH_PRINCIPAL_B64:
350-
return None
351-
352-
try:
353-
decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8')
354-
return json.loads(decoded_str)
355-
except Exception:
356-
return None
357-
358-
async def _register_agent(self, env_vars: EnvironmentVariables):
359-
"""Register this agent with the Agentex server"""
360-
# Build the agent's own URL
361-
full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}"
362-
363-
description = (
364-
env_vars.AGENT_DESCRIPTION
365-
or f"Generic description for agent: {env_vars.AGENT_NAME}"
366-
)
367-
368-
# Prepare registration data
369-
registration_data = {
370-
"name": env_vars.AGENT_NAME,
371-
"description": description,
372-
"acp_url": full_acp_url,
373-
"acp_type": env_vars.ACP_TYPE,
374-
"principal_context": self._get_auth_principal(env_vars)
375-
}
376-
377-
if env_vars.AGENT_ID:
378-
registration_data["agent_id"] = env_vars.AGENT_ID
379-
380-
# Make the registration request
381-
registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register"
382-
# Retry logic with configurable attempts and delay
383-
max_retries = 3
384-
base_delay = 5 # seconds
385-
last_exception = None
386-
387-
attempt = 0
388-
while attempt < max_retries:
389-
try:
390-
async with httpx.AsyncClient() as client:
391-
response = await client.post(
392-
registration_url, json=registration_data, timeout=30.0
393-
)
394-
if response.status_code == 200:
395-
agent = response.json()
396-
agent_id, agent_name = agent["id"], agent["name"]
397-
398-
os.environ["AGENT_ID"] = agent_id
399-
os.environ["AGENT_NAME"] = agent_name
400-
env_vars.AGENT_ID = agent_id
401-
env_vars.AGENT_NAME = agent_name
402-
global refreshed_environment_variables
403-
refreshed_environment_variables = env_vars
404-
logger.info(
405-
f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}"
406-
)
407-
return # Success, exit the retry loop
408-
else:
409-
error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}"
410-
logger.error(error_msg)
411-
last_exception = Exception(
412-
f"Failed to startup agent: {response.text}"
413-
)
414-
415-
except Exception as e:
416-
logger.error(
417-
f"Exception during agent registration attempt {attempt + 1}: {e}"
418-
)
419-
last_exception = e
420-
attempt += 1
421-
if attempt < max_retries:
422-
delay = (attempt) * base_delay # 5, 10, 15 seconds
423-
logger.info(
424-
f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})"
425-
)
426-
await asyncio.sleep(delay)
427-
428-
# If we get here, all retries failed
429-
raise last_exception or Exception(
430-
f"Failed to register agent after {max_retries} attempts"
431-
)
354+
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import base64
2+
import json
3+
import os
4+
import httpx
5+
import asyncio
6+
7+
from agentex.lib.environment_variables import EnvironmentVariables, refreshed_environment_variables
8+
from agentex.lib.utils.logging import make_logger
9+
10+
logger = make_logger(__name__)
11+
12+
def get_auth_principal(env_vars: EnvironmentVariables):
13+
if not env_vars.AUTH_PRINCIPAL_B64:
14+
return None
15+
16+
try:
17+
decoded_str = base64.b64decode(env_vars.AUTH_PRINCIPAL_B64).decode('utf-8')
18+
return json.loads(decoded_str)
19+
except Exception:
20+
return None
21+
22+
async def register_agent(env_vars: EnvironmentVariables):
23+
"""Register this agent with the Agentex server"""
24+
if not env_vars.AGENTEX_BASE_URL:
25+
logger.warning("AGENTEX_BASE_URL is not set, skipping registration")
26+
return
27+
# Build the agent's own URL
28+
full_acp_url = f"{env_vars.ACP_URL.rstrip('/')}:{env_vars.ACP_PORT}"
29+
30+
description = (
31+
env_vars.AGENT_DESCRIPTION
32+
or f"Generic description for agent: {env_vars.AGENT_NAME}"
33+
)
34+
35+
# Prepare registration data
36+
registration_data = {
37+
"name": env_vars.AGENT_NAME,
38+
"description": description,
39+
"acp_url": full_acp_url,
40+
"acp_type": env_vars.ACP_TYPE,
41+
"principal_context": get_auth_principal(env_vars)
42+
}
43+
44+
if env_vars.AGENT_ID:
45+
registration_data["agent_id"] = env_vars.AGENT_ID
46+
47+
# Make the registration request
48+
registration_url = f"{env_vars.AGENTEX_BASE_URL.rstrip('/')}/agents/register"
49+
# Retry logic with configurable attempts and delay
50+
max_retries = 3
51+
base_delay = 5 # seconds
52+
last_exception = None
53+
54+
attempt = 0
55+
while attempt < max_retries:
56+
try:
57+
async with httpx.AsyncClient() as client:
58+
response = await client.post(
59+
registration_url, json=registration_data, timeout=30.0
60+
)
61+
if response.status_code == 200:
62+
agent = response.json()
63+
agent_id, agent_name = agent["id"], agent["name"]
64+
agent_api_key = agent["agent_api_key"]
65+
66+
os.environ["AGENT_ID"] = agent_id
67+
os.environ["AGENT_NAME"] = agent_name
68+
os.environ["AGENT_API_KEY"] = agent_api_key
69+
env_vars.AGENT_ID = agent_id
70+
env_vars.AGENT_NAME = agent_name
71+
env_vars.AGENT_API_KEY = agent_api_key
72+
global refreshed_environment_variables
73+
refreshed_environment_variables = env_vars
74+
logger.info(
75+
f"Successfully registered agent '{env_vars.AGENT_NAME}' with Agentex server with acp_url: {full_acp_url}. Registration data: {registration_data}"
76+
)
77+
return # Success, exit the retry loop
78+
else:
79+
error_msg = f"Failed to register agent. Status: {response.status_code}, Response: {response.text}"
80+
logger.error(error_msg)
81+
last_exception = Exception(
82+
f"Failed to startup agent: {response.text}"
83+
)
84+
85+
except Exception as e:
86+
logger.error(
87+
f"Exception during agent registration attempt {attempt + 1}: {e}"
88+
)
89+
last_exception = e
90+
attempt += 1
91+
if attempt < max_retries:
92+
delay = (attempt) * base_delay # 5, 10, 15 seconds
93+
logger.info(
94+
f"Retrying in {delay} seconds... (attempt {attempt}/{max_retries})"
95+
)
96+
await asyncio.sleep(delay)
97+
98+
# If we get here, all retries failed
99+
raise last_exception or Exception(
100+
f"Failed to register agent after {max_retries} attempts"
101+
)

0 commit comments

Comments
 (0)