Skip to content

Commit 253113c

Browse files
authored
chore: misc fixes and improvements for connectors (#1484)
Signed-off-by: Tomas Pilar <[email protected]>
1 parent fce3e1f commit 253113c

File tree

9 files changed

+48
-8
lines changed

9 files changed

+48
-8
lines changed

apps/agentstack-sdk-py/src/agentstack_sdk/a2a/extensions/services/mcp.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@
33

44
from __future__ import annotations
55

6+
import re
67
from contextlib import asynccontextmanager
78
from types import NoneType
8-
from typing import Annotated, Any, Literal, Self
9+
from typing import TYPE_CHECKING, Annotated, Any, Literal, Self
910

1011
import a2a.types
1112
import pydantic
@@ -14,6 +15,10 @@
1415

1516
from agentstack_sdk.a2a.extensions.auth.oauth.oauth import OAuthExtensionServer
1617
from agentstack_sdk.a2a.extensions.base import BaseExtensionClient, BaseExtensionServer, BaseExtensionSpec
18+
from agentstack_sdk.util.logging import logger
19+
20+
if TYPE_CHECKING:
21+
from agentstack_sdk.server.context import RunContext
1722

1823
_TRANSPORT_TYPES = Literal["streamable_http", "stdio"]
1924

@@ -95,6 +100,23 @@ class MCPServiceExtensionMetadata(pydantic.BaseModel):
95100

96101

97102
class MCPServiceExtensionServer(BaseExtensionServer[MCPServiceExtensionSpec, MCPServiceExtensionMetadata]):
103+
def handle_incoming_message(self, message: a2a.types.Message, context: RunContext):
104+
from agentstack_sdk.platform import get_platform_client
105+
106+
super().handle_incoming_message(message, context)
107+
if not self.data:
108+
return
109+
110+
platform_url = str(get_platform_client().base_url)
111+
for fullfilment in self.data.mcp_fulfillments.values():
112+
if fullfilment.transport.type == "streamable_http":
113+
try:
114+
fullfilment.transport.url = pydantic.AnyHttpUrl(
115+
re.sub(r"^http[s]?://{platform_url}", platform_url, str(fullfilment.transport.url))
116+
)
117+
except Exception:
118+
logger.warning("Platform URL substitution failed", exc_info=True)
119+
98120
def parse_client_metadata(self, message: a2a.types.Message) -> MCPServiceExtensionMetadata | None:
99121
metadata = super().parse_client_metadata(message)
100122
if metadata:

apps/agentstack-sdk-py/src/agentstack_sdk/server/agent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@
4141
from agentstack_sdk.server.constants import _IMPLICIT_DEPENDENCY_PREFIX
4242
from agentstack_sdk.server.context import RunContext
4343
from agentstack_sdk.server.dependencies import extract_dependencies
44-
from agentstack_sdk.server.logging import logger
4544
from agentstack_sdk.server.store.context_store import ContextStore
4645
from agentstack_sdk.server.utils import cancel_task, close_queue
46+
from agentstack_sdk.util.logging import logger
4747
from agentstack_sdk.util.utils import extract_messages
4848

4949
AgentFunction: TypeAlias = Callable[[], AsyncGenerator[RunYield, RunYieldResume]]

apps/agentstack-sdk-py/src/agentstack_sdk/server/server.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
from agentstack_sdk.platform.provider import Provider
3737
from agentstack_sdk.server.agent import Agent, AgentFactory
3838
from agentstack_sdk.server.agent import agent as agent_decorator
39-
from agentstack_sdk.server.logging import configure_logger as configure_logger_func
40-
from agentstack_sdk.server.logging import logger
4139
from agentstack_sdk.server.store.context_store import ContextStore
4240
from agentstack_sdk.server.store.memory_context_store import InMemoryContextStore
4341
from agentstack_sdk.server.telemetry import configure_telemetry as configure_telemetry_func
4442
from agentstack_sdk.server.utils import cancel_task
43+
from agentstack_sdk.util.logging import configure_logger as configure_logger_func
44+
from agentstack_sdk.util.logging import logger
4545

4646

4747
class Server:

apps/agentstack-sdk-py/src/agentstack_sdk/server/logging.py renamed to apps/agentstack-sdk-py/src/agentstack_sdk/util/logging.py

File renamed without changes.

apps/agentstack-server/src/agentstack_server/bootstrap.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727

2828

2929
def setup_database_engine(config: Configuration) -> AsyncEngine:
30-
return config.persistence.create_async_engine(isolation_level="READ COMMITTED")
30+
return config.persistence.create_async_engine(isolation_level="READ COMMITTED", hide_parameters=True)
3131

3232

3333
async def setup_kubernetes_client(namespace: str | None = None, kubeconfig: pathlib.Path | str | dict | None = None):

apps/agentstack-server/src/agentstack_server/domain/models/connector.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from typing import Annotated, Literal
66
from uuid import UUID, uuid4
77

8-
from pydantic import AnyUrl, AwareDatetime, BaseModel, Field
8+
from pydantic import AnyUrl, AwareDatetime, BaseModel, ConfigDict, Field
99

1010
from agentstack_server.domain.models.common import Metadata
1111
from agentstack_server.utils.utils import utc_now
@@ -27,12 +27,15 @@ class Token(BaseModel):
2727
refresh_token: str | None = None
2828
token_type: Literal["bearer"]
2929

30+
model_config = ConfigDict(extra="allow")
31+
3032

3133
class Authorization(BaseModel):
3234
client_id: str | None = None
3335
client_secret: str | None = None
3436
flow: AuthFlow | None = None
3537
token: Token | None = None
38+
token_endpoint: AnyUrl | None = None
3639

3740

3841
class ConnectorState(StrEnum):

apps/agentstack-server/src/agentstack_server/jobs/crons/connector.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
blueprint = Blueprint()
1515

1616

17-
@blueprint.periodic(cron="* * * * * */15")
17+
@blueprint.periodic(cron="* * * * * */30")
1818
@blueprint.task(queueing_lock="refresh_connectors", queue=str(Queues.CRON_CONNECTOR))
1919
@inject
2020
async def refresh_connectors(timestamp: int, service: ConnectorService):

apps/agentstack-server/src/agentstack_server/jobs/procrastinate.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
from agentstack_server.configuration import Configuration
1111
from agentstack_server.jobs.crons.cleanup import blueprint as cleanup_crons
12+
from agentstack_server.jobs.crons.connector import blueprint as connector_crons
1213
from agentstack_server.jobs.crons.provider import blueprint as provider_crons
1314
from agentstack_server.jobs.tasks.context import blueprint as context_tasks
1415
from agentstack_server.jobs.tasks.file import blueprint as file_tasks
@@ -52,4 +53,5 @@ def exit_app_on_db_error(*_args, **_kwargs):
5253
app.add_tasks_from(blueprint=provider_build_tasks, namespace="provider_build_tasks")
5354
app.add_tasks_from(blueprint=provider_crons, namespace="cron_provider")
5455
app.add_tasks_from(blueprint=cleanup_crons, namespace="cron_cleanup")
56+
app.add_tasks_from(blueprint=connector_crons, namespace="cron_connector")
5557
return app

apps/agentstack-server/src/agentstack_server/service_layer/services/connector.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,10 @@ async def disconnect_connector(self, *, connector_id: UUID, user: User | None =
141141

142142
await self._revoke_auth_token(connector=connector)
143143

144+
if connector.auth:
145+
connector.auth.flow = None
144146
connector.state = ConnectorState.disconnected
147+
connector.disconnect_reason = "Client request"
145148

146149
async with self._uow() as uow:
147150
await uow.connectors.update(connector=connector)
@@ -176,12 +179,16 @@ async def oauth_callback(self, *, callback_url: str, state: str, error: str | No
176179
auth_metadata = await self._discover_auth_metadata(connector=connector)
177180
if not auth_metadata:
178181
raise RuntimeError("Authorization server no longer contains necessary metadata")
182+
token_endpoint = auth_metadata.get("token_endpoint")
183+
if not token_endpoint:
184+
raise RuntimeError("Authorization server has no token endpoint in metadata")
179185
token = await client.fetch_token(
180-
auth_metadata.get("token_endpoint"),
186+
token_endpoint,
181187
authorization_response=callback_url,
182188
code_verifier=connector.auth.flow.code_verifier,
183189
)
184190
connector.auth.token = Token.model_validate(token)
191+
connector.auth.token_endpoint = AnyUrl(str(token_endpoint))
185192
connector.auth.flow = None
186193
try:
187194
await self.probe_connector(connector=connector)
@@ -238,6 +245,9 @@ async def refresh_connector(self, *, connector_id: UUID, user: User | None = Non
238245
try:
239246
await self.probe_connector(connector=connector)
240247
except Exception as err:
248+
await self._revoke_auth_token(connector=connector)
249+
if connector.auth:
250+
connector.auth.flow = None
241251
connector.state = ConnectorState.disconnected
242252
connector.disconnect_reason = str(err)
243253
finally:
@@ -295,6 +305,7 @@ async def _revoke_auth_token(self, *, connector: Connector) -> None:
295305
logger.warning("Token revocation failed", exc_info=True)
296306

297307
connector.auth.token = None
308+
connector.auth.token_endpoint = None
298309
async with self._uow() as uow:
299310
await uow.connectors.update(connector=connector)
300311
await uow.commit()
@@ -329,6 +340,8 @@ async def update_token(token, refresh_token=None, access_token=None):
329340
code_challenge_method="S256",
330341
headers=headers,
331342
timeout=timeout,
343+
leeway=60, # A job probes connectors every 30 seconds, ensuring the token is valid roughly for at least 30 seconds per request.
344+
token_endpoint=str(connector.auth.token_endpoint),
332345
)
333346

334347
async def _discover_auth_metadata(self, *, connector: Connector) -> AuthorizationServerMetadata | None:

0 commit comments

Comments
 (0)