Skip to content

Commit 476671f

Browse files
authored
Merge pull request #34 from OpenMined/feat/sync-endpoints
feat: implement asynchronous endpoint synchronization to marketplaces…
2 parents 1bb8dbb + b16c80d commit 476671f

File tree

4 files changed

+201
-37
lines changed

4 files changed

+201
-37
lines changed

backend/syft_space/components/endpoints/handlers.py

Lines changed: 124 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
"""Endpoint handlers for business logic."""
22

3+
from typing import Any
34
from uuid import UUID
45

56
from fastapi import HTTPException
@@ -725,41 +726,7 @@ async def _publish_to_marketplace(
725726
username=marketplace.email, password=marketplace.password
726727
)
727728

728-
endpoint_type = (
729-
"model" if endpoint.model_id is not None else "data_source"
730-
)
731-
policies = [
732-
{
733-
"type": policy.policy_type,
734-
"version": "1.0",
735-
"enabled": True,
736-
"description": policy.name,
737-
"config": policy.configuration,
738-
}
739-
for policy in endpoint.policies
740-
]
741-
connection_config = {
742-
"path": f"/api/v1/endpoints/{endpoint.slug}/query",
743-
}
744-
745-
payload = {
746-
"name": endpoint.name,
747-
"description": endpoint.summary or "",
748-
"type": endpoint_type,
749-
"visibility": "public",
750-
"version": "0.1.0",
751-
"readme": endpoint.description or "",
752-
"slug": endpoint.slug,
753-
"policies": policies,
754-
"connect": [
755-
{
756-
"type": "https",
757-
"enabled": True,
758-
"description": "",
759-
"config": connection_config,
760-
}
761-
],
762-
}
729+
payload = self._build_publish_payload(endpoint)
763730
await client.publish_endpoint(payload, overwrite=True)
764731

765732
except SyftHubError as e:
@@ -920,3 +887,125 @@ async def _check_marketplace_availability(
920887
available=None,
921888
error=str(e),
922889
)
890+
891+
def _build_publish_payload(self, endpoint: Endpoint) -> dict[str, Any]:
892+
"""Build the publish payload for an endpoint.
893+
894+
Args:
895+
endpoint: Endpoint entity
896+
897+
Returns:
898+
Dict payload for publish/sync APIs
899+
"""
900+
endpoint_type = "model" if endpoint.model_id is not None else "data_source"
901+
902+
policies = [
903+
{
904+
"type": policy.policy_type,
905+
"version": "1.0",
906+
"enabled": True,
907+
"description": policy.name,
908+
"config": policy.configuration,
909+
}
910+
for policy in endpoint.policies
911+
]
912+
913+
connection_config = {
914+
"path": f"/api/v1/endpoints/{endpoint.slug}/query",
915+
}
916+
917+
return {
918+
"name": endpoint.name,
919+
"description": endpoint.summary or "",
920+
"type": endpoint_type,
921+
"visibility": "public",
922+
"version": "0.1.0",
923+
"readme": endpoint.description or "",
924+
"slug": endpoint.slug,
925+
"policies": policies,
926+
"connect": [
927+
{
928+
"type": "https",
929+
"enabled": True,
930+
"description": "",
931+
"config": connection_config,
932+
}
933+
],
934+
}
935+
936+
async def sync_endpoints_to_marketplaces(
937+
self, tenant: Tenant
938+
) -> dict[str, list[str]]:
939+
"""Sync all published endpoints to their respective marketplaces.
940+
941+
Groups endpoints by marketplace and calls sync_endpoints API for each.
942+
943+
Args:
944+
tenant: Tenant context
945+
946+
Returns:
947+
Dict mapping marketplace_id -> list of synced endpoint slugs
948+
"""
949+
if not self.marketplace_repository:
950+
logger.warning("Marketplace repository not configured, skipping sync")
951+
return {}
952+
953+
# Get all published endpoints
954+
endpoints = await self.endpoint_repository.get_published_endpoints(tenant.id)
955+
if not endpoints:
956+
logger.debug("No published endpoints to sync")
957+
return {}
958+
959+
# Group endpoints by marketplace
960+
marketplace_endpoints: dict[UUID, list[Endpoint]] = {}
961+
for endpoint in endpoints:
962+
for marketplace_id in endpoint.published_to:
963+
marketplace_endpoints.setdefault(marketplace_id, []).append(endpoint)
964+
965+
results: dict[str, list[str]] = {}
966+
967+
# Sync to each marketplace
968+
for marketplace_id, eps in marketplace_endpoints.items():
969+
try:
970+
marketplace = await self.marketplace_repository.get_by_id(
971+
UUID(marketplace_id), tenant.id
972+
)
973+
if not marketplace:
974+
logger.warning(f"Marketplace {marketplace_id} not found, skipping")
975+
continue
976+
977+
if not marketplace.is_active:
978+
logger.warning(f"Marketplace {marketplace_id} not active, skipping")
979+
continue
980+
981+
if not marketplace.email or not marketplace.password:
982+
logger.warning(
983+
f"Marketplace {marketplace_id} missing credentials, skipping"
984+
)
985+
continue
986+
987+
# Build payloads
988+
payloads = [self._build_publish_payload(ep) for ep in eps]
989+
990+
# Call sync API
991+
async with SyftHubClient(base_url=marketplace.url) as client:
992+
await client.login(
993+
username=marketplace.email, password=marketplace.password
994+
)
995+
await client.sync_endpoints(payloads)
996+
997+
results[marketplace_id] = [ep.slug for ep in eps]
998+
logger.info(
999+
f"Synced {len(eps)} endpoints to marketplace {marketplace.name}"
1000+
)
1001+
1002+
except SyftHubError as e:
1003+
logger.warning(
1004+
f"Failed to sync to marketplace {marketplace_id}: {e.message}"
1005+
)
1006+
except Exception as e:
1007+
logger.error(
1008+
f"Unexpected error syncing to marketplace {marketplace_id}: {e}"
1009+
)
1010+
1011+
return results

backend/syft_space/components/endpoints/repository.py

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from uuid import UUID
55

66
from sqlalchemy.orm import selectinload
7-
from sqlmodel import select
7+
from sqlmodel import or_, select
88

99
from syft_space.components.endpoints.entities import Endpoint
1010
from syft_space.components.shared.database import AsyncBaseRepository, AsyncDatabase
@@ -254,3 +254,28 @@ async def remove_publication(
254254
await session.refresh(endpoint)
255255

256256
return endpoint
257+
258+
async def get_published_endpoints(self, tenant_id: UUID) -> list[Endpoint]:
259+
"""Get all endpoints that are published to at least one marketplace.
260+
261+
Args:
262+
tenant_id: Tenant ID
263+
264+
Returns:
265+
List of published endpoints with policies eagerly loaded
266+
"""
267+
async with self.db.get_session() as session:
268+
statement = (
269+
select(Endpoint)
270+
.where(
271+
Endpoint.tenant_id == tenant_id,
272+
or_(Endpoint.published_to.isnot(None), Endpoint.published_to != []),
273+
)
274+
.options(
275+
selectinload(Endpoint.model),
276+
selectinload(Endpoint.dataset),
277+
selectinload(Endpoint.policies),
278+
)
279+
)
280+
result = await session.exec(statement)
281+
return list(result.all())

backend/syft_space/components/shared/syfthub_client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,23 @@ async def verify_satellite_token(self, token: str) -> SatelliteToken:
554554
response = await self._client.post("/api/v1/verify", json={"token": token}) # type: ignore
555555
return _handle_response(response, SatelliteToken)
556556

557+
async def sync_endpoints(self, payload: list[dict[str, Any]]) -> dict[str, Any]:
558+
"""Sync endpoints to SyftHub.
559+
560+
It is used to sync endpoints from the database to SyftHub.
561+
This is a destructive operation that will overwrite the existing endpoints with the new ones.
562+
563+
Args:
564+
payload: List of endpoints to sync
565+
Returns:
566+
dict[str, Any]: Sync endpoints response
567+
"""
568+
self._require_auth()
569+
response = await self._client.post(
570+
"/api/v1/endpoints/sync", json={"endpoints": payload}
571+
) # type: ignore
572+
return _handle_response_raw(response)
573+
557574
def _require_auth(self) -> None:
558575
if self._client is None:
559576
raise NotAuthenticatedError()

backend/syft_space/main.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,35 @@ async def _sync_public_url_safe(
123123
logger.warning(f"Marketplace sync failed: {e} - server will continue starting")
124124

125125

126+
async def _sync_endpoints_safe(handler: EndpointHandler, tenant: Tenant) -> None:
127+
"""Sync published endpoints to marketplaces without blocking startup.
128+
129+
This is a fire-and-forget helper that syncs all published endpoints to their
130+
respective marketplaces. If the sync fails or times out, the server continues
131+
operating - eventual consistency is achieved via frontend publish calls.
132+
133+
Args:
134+
handler: Endpoint handler instance
135+
tenant: Tenant context
136+
"""
137+
try:
138+
results = await asyncio.wait_for(
139+
handler.sync_endpoints_to_marketplaces(tenant),
140+
timeout=60.0, # 60 second timeout for multi-marketplace sync
141+
)
142+
if results:
143+
total = sum(len(slugs) for slugs in results.values())
144+
logger.info(
145+
f"Startup endpoint sync completed: {total} endpoints to {len(results)} marketplaces"
146+
)
147+
else:
148+
logger.debug("No published endpoints to sync on startup")
149+
except asyncio.TimeoutError:
150+
logger.warning("Endpoint sync timed out - server will continue running")
151+
except Exception as e:
152+
logger.warning(f"Endpoint sync failed: {e} - server will continue running")
153+
154+
126155
async def _setup_tenant_and_settings(
127156
tenant_repo: TenantRepository,
128157
settings_hdlr: SettingsHandler,
@@ -202,9 +231,13 @@ async def lifespan(app: FastAPI):
202231
_sync_public_url_safe(settings_handler, default_tenant, public_url)
203232
)
204233

234+
# 7. Fire-and-forget: Sync published endpoints to marketplaces
235+
if default_tenant:
236+
asyncio.create_task(_sync_endpoints_safe(endpoint_handler, default_tenant))
237+
205238
yield # Application runs here
206239

207-
# 7. Shutdown all services (reverse order: ingestion → provisioner → proxy)
240+
# 8. Shutdown all services (reverse order: ingestion → provisioner → proxy)
208241
for name, service in reversed(services):
209242
try:
210243
await service.shutdown()

0 commit comments

Comments
 (0)