Skip to content

Commit 66eac1a

Browse files
authored
fix(DATAGO-120949): Move migrations to __init__ for sequential execution (#741)
Both Platform and WebUI migrations now run in __init__() during component creation, ensuring sequential execution and preventing Alembic/SQLAlchemy race conditions. Changes: - Platform: Migrations in __init__() via _run_database_migrations() - WebUI: Migrations in __init__() via _run_database_migrations() - Removed unused component parameter from _setup_database() - Simplified comments and docstrings - Added Alembic log prefixing for better debugging
1 parent 89ae893 commit 66eac1a

File tree

4 files changed

+70
-125
lines changed

4 files changed

+70
-125
lines changed

src/solace_agent_mesh/gateway/http_sse/component.py

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,8 +308,27 @@ def __init__(self, **kwargs):
308308
"%s Data retention is disabled via configuration.", self.log_identifier
309309
)
310310

311+
if self.database_url:
312+
log.info("%s Running database migrations...", self.log_identifier)
313+
self._run_database_migrations()
314+
log.info("%s Database migrations completed", self.log_identifier)
315+
311316
log.info("%s Web UI Backend Component initialized.", self.log_identifier)
312317

318+
def _run_database_migrations(self):
319+
"""Run database migrations synchronously during __init__."""
320+
try:
321+
from ...gateway.http_sse.main import _setup_database
322+
_setup_database(self.database_url)
323+
except Exception as e:
324+
log.error(
325+
"%s Failed to run database migrations: %s",
326+
self.log_identifier,
327+
e,
328+
exc_info=True
329+
)
330+
raise RuntimeError(f"Database migration failed during component initialization: {e}") from e
331+
313332
def process_event(self, event: Event):
314333
if event.event_type == EventType.TIMER:
315334
timer_id = event.data.get("timer_id")
@@ -1283,7 +1302,7 @@ def _start_fastapi_server(self):
12831302

12841303
self.fastapi_app = fastapi_app_instance
12851304

1286-
setup_dependencies(self, self.database_url)
1305+
setup_dependencies(self)
12871306

12881307
# Instantiate services that depend on the database session factory.
12891308
# This must be done *after* setup_dependencies has run.

src/solace_agent_mesh/gateway/http_sse/main.py

Lines changed: 20 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,13 @@
4848

4949
log = logging.getLogger(__name__)
5050

51+
5152
app = FastAPI(
5253
title="A2A Web UI Backend",
5354
version="1.0.0", # Updated to reflect simplified architecture
5455
description="Backend API and SSE server for the A2A Web UI, hosted by Solace AI Connector.",
5556
)
5657

57-
# Global flag to track if dependencies have been initialized
58-
_dependencies_initialized = False
5958

6059

6160

@@ -77,58 +76,44 @@ def _run_community_migrations(database_url: str) -> None:
7776
try:
7877
from sqlalchemy import create_engine
7978

80-
log.info("Starting community migrations...")
79+
log.info("[WebUI Gateway] Starting community migrations...")
8180
engine = create_engine(database_url)
8281
inspector = sa.inspect(engine)
8382
existing_tables = inspector.get_table_names()
8483

84+
alembic_cfg = _setup_alembic_config(database_url)
8585
if not existing_tables or "sessions" not in existing_tables:
86-
log.info("Running initial community database setup")
87-
alembic_cfg = _setup_alembic_config(database_url)
88-
command.upgrade(alembic_cfg, "head")
89-
log.info("Community database migrations completed")
86+
log.info("[WebUI Gateway] Running initial database setup")
9087
else:
91-
log.info("Checking for community schema updates")
92-
alembic_cfg = _setup_alembic_config(database_url)
93-
command.upgrade(alembic_cfg, "head")
94-
log.info("Community database schema is current")
88+
log.info("[WebUI Gateway] Checking for schema updates")
89+
90+
command.upgrade(alembic_cfg, "head")
91+
log.info("[WebUI Gateway] Community migrations completed")
9592
except Exception as e:
96-
log.warning(
97-
"Community migration check failed: %s - attempting to run migrations",
98-
e,
99-
)
93+
log.warning("[WebUI Gateway] Migration check failed: %s - attempting to run migrations", e)
10094
try:
10195
alembic_cfg = _setup_alembic_config(database_url)
10296
command.upgrade(alembic_cfg, "head")
103-
log.info("Community database migrations completed")
97+
log.info("[WebUI Gateway] Community migrations completed")
10498
except Exception as migration_error:
105-
log.error("Community migration failed: %s", migration_error)
106-
log.error("Check database connectivity and permissions")
99+
log.error("[WebUI Gateway] Migration failed: %s", migration_error)
100+
log.error("[WebUI Gateway] Check database connectivity and permissions")
107101
raise RuntimeError(
108102
f"Community database migration failed: {migration_error}"
109103
) from migration_error
110104

111105

112106

113107

114-
def _setup_database(
115-
component: "WebUIBackendComponent",
116-
database_url: str,
117-
) -> None:
108+
def _setup_database(database_url: str) -> None:
118109
"""
119-
Initialize database and run migrations for WebUI Gateway (chat only).
120-
121-
Platform database is no longer used by WebUI Gateway.
122-
Platform migrations are handled by Platform Service.
110+
Initialize database and run migrations for WebUI Gateway.
123111
124112
Args:
125-
component: WebUIBackendComponent instance
126-
database_url: Chat database URL (sessions, tasks, feedback) - REQUIRED
113+
database_url: Chat database URL (sessions, tasks, feedback)
127114
"""
128115
dependencies.init_database(database_url)
129-
log.info("Persistence enabled - sessions will be stored in database")
130-
log.info("Running database migrations...")
131-
116+
log.info("[WebUI Gateway] Running database migrations...")
132117
_run_community_migrations(database_url)
133118

134119

@@ -164,49 +149,24 @@ def _create_api_config(app_config: dict, database_url: str) -> dict:
164149
}
165150

166151

167-
def setup_dependencies(
168-
component: "WebUIBackendComponent",
169-
database_url: str = None,
170-
):
152+
def setup_dependencies(component: "WebUIBackendComponent"):
171153
"""
172-
Initialize dependencies for WebUI Gateway (chat only).
154+
Initialize FastAPI dependencies (middleware, routers, static files).
155+
Database migrations are handled in component.__init__().
173156
174157
Args:
175158
component: WebUIBackendComponent instance
176-
database_url: Chat database URL (sessions, tasks, feedback).
177-
If None, runs in compatibility mode with in-memory sessions.
178-
179-
This function is idempotent and safe to call multiple times.
180159
"""
181-
global _dependencies_initialized
182-
183-
if _dependencies_initialized:
184-
log.debug("[setup_dependencies] Dependencies already initialized, skipping")
185-
return
186-
187160
dependencies.set_component_instance(component)
188161

189-
if database_url:
190-
_setup_database(component, database_url)
191-
else:
192-
log.warning(
193-
"No database URL provided - using in-memory session storage (data not persisted across restarts)"
194-
)
195-
log.info("This maintains backward compatibility for existing SAM installations")
196-
197162
app_config = _get_app_config(component)
198-
api_config_dict = _create_api_config(app_config, database_url)
199-
163+
api_config_dict = _create_api_config(app_config, component.database_url)
200164
dependencies.set_api_config(api_config_dict)
201-
log.debug("API configuration extracted and stored.")
202165

203166
_setup_middleware(component)
204167
_setup_routers()
205168
_setup_static_files()
206169

207-
_dependencies_initialized = True
208-
log.debug("[setup_dependencies] Dependencies initialization complete")
209-
210170

211171
def _setup_middleware(component: "WebUIBackendComponent") -> None:
212172
allowed_origins = component.get_cors_origins()

src/solace_agent_mesh/services/platform/api/main.py

Lines changed: 12 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
description="Platform configuration management API (agents, connectors, toolsets, deployments)",
2424
)
2525

26-
# Global flag to track initialization (idempotent)
27-
_dependencies_initialized = False
2826

2927

3028
def _setup_alembic_config(database_url: str) -> Config:
@@ -96,90 +94,45 @@ def _run_enterprise_migrations(database_url: str) -> None:
9694
This is optional and only runs if the enterprise package is available.
9795
9896
Args:
99-
component: PlatformServiceComponent instance.
10097
database_url: Database connection string.
10198
"""
10299
try:
103100
from solace_agent_mesh_enterprise.platform_service.migration_runner import run_migrations
104101

105-
log.info("Starting enterprise platform migrations...")
102+
log.info("[Platform Service] Starting enterprise migrations...")
106103
run_migrations(database_url)
107-
log.info("Enterprise platform migrations completed")
104+
log.info("[Platform Service] Enterprise migrations completed successfully")
108105
except ImportError:
109-
log.debug("Enterprise platform module not found - skipping enterprise migrations")
106+
log.debug("[Platform Service] Enterprise module not found - skipping enterprise migrations")
110107
except Exception as e:
111-
log.error("Enterprise platform migration failed: %s", e)
112-
log.error("Enterprise platform features may be unavailable")
108+
log.error("[Platform Service] Enterprise migration failed: %s", e)
109+
log.error("[Platform Service] Enterprise features may be unavailable")
113110
raise RuntimeError(f"Enterprise platform database migration failed: {e}") from e
114111

115112

116113
def _setup_database(database_url: str) -> None:
117-
"""
118-
Initialize database connection and run all required migrations.
119-
Sets up both community and enterprise platform database schemas.
120-
121-
This follows the same pattern as gateway/http_sse:
122-
1. Initialize database (create engine and SessionLocal)
123-
2. Run community migrations
124-
3. Run enterprise migrations (if available)
125-
126-
Args:
127-
component: PlatformServiceComponent instance.
128-
database_url: Platform database URL (agents, connectors, deployments, toolsets) - REQUIRED
129-
"""
130-
from . import dependencies
131-
132-
# MIGRATION PHASE 1: DB is initialized in enterprise repo. In next phase, platform db will be initialized here
133-
# dependencies.init_database(database_url)
134-
# log.info("Platform database initialized - running migrations...")
135-
136-
# MIGRATION PHASE 1: No community migrations yet - only enterprise migrations
137-
# _run_community_migrations(database_url)
114+
"""Initialize database and run migrations."""
115+
log.info("[Platform Service] Initializing database and running migrations...")
138116
_run_enterprise_migrations(database_url)
117+
log.info("[Platform Service] Database initialization complete")
139118

140119

141-
def setup_dependencies(component: "PlatformServiceComponent", database_url: str):
120+
def setup_dependencies(component: "PlatformServiceComponent"):
142121
"""
143-
Initialize dependencies for the Platform Service.
144-
145-
This function is idempotent and safe to call multiple times.
146-
It sets up:
147-
1. Component instance reference
148-
2. Database connection
149-
3. Middleware (CORS, OAuth2)
150-
4. Routers (community and enterprise)
122+
Initialize FastAPI dependencies (middleware, routers).
123+
Database migrations are handled in component.__init__().
151124
152125
Args:
153-
component: PlatformServiceComponent instance.
154-
database_url: Database connection string.
126+
component: PlatformServiceComponent instance
155127
"""
156-
global _dependencies_initialized
157-
158-
if _dependencies_initialized:
159-
log.debug("Platform service dependencies already initialized, skipping")
160-
return
161-
162128
log.info("Initializing Platform Service dependencies...")
163129

164-
# Store component reference for dependency injection
165130
from . import dependencies
166-
167131
dependencies.set_component_instance(component)
168132

169-
# Initialize database and run migrations
170-
if database_url:
171-
_setup_database(database_url)
172-
log.info("Platform database initialized with migrations")
173-
else:
174-
log.warning("No database URL provided - platform service will not function")
175-
176-
# Setup middleware
177133
_setup_middleware(component)
178-
179-
# Setup routers
180134
_setup_routers()
181135

182-
_dependencies_initialized = True
183136
log.info("Platform Service dependencies initialized successfully")
184137

185138

src/solace_agent_mesh/services/platform/component.py

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,13 +175,27 @@ def __init__(self, **kwargs):
175175
self.background_scheduler = None
176176
self.background_tasks_thread = None
177177

178-
# Direct message publisher for deployer commands
179178
self.direct_publisher = None
180179

180+
log.info("%s Running database migrations...", self.log_identifier)
181+
self._run_database_migrations()
182+
log.info("%s Database migrations completed", self.log_identifier)
183+
181184
log.info("%s Platform Service Component initialized.", self.log_identifier)
182185

183-
# Note: FastAPI server, direct publisher, and background tasks are started
184-
# in _late_init() after SamComponentBase.run() is called and broker is ready
186+
def _run_database_migrations(self):
187+
"""Run database migrations synchronously during __init__."""
188+
try:
189+
from .api.main import _setup_database
190+
_setup_database(self.database_url)
191+
except Exception as e:
192+
log.error(
193+
"%s Failed to run database migrations: %s",
194+
self.log_identifier,
195+
e,
196+
exc_info=True
197+
)
198+
raise RuntimeError(f"Database migration failed during component initialization: {e}") from e
185199

186200
def _late_init(self):
187201
"""
@@ -234,8 +248,7 @@ def _start_fastapi_server(self):
234248

235249
self.fastapi_app = fastapi_app_instance
236250

237-
# Setup dependencies (idempotent - safe to call multiple times)
238-
setup_dependencies(self, self.database_url)
251+
setup_dependencies(self)
239252

240253
# Register startup event for background tasks
241254
@self.fastapi_app.on_event("startup")

0 commit comments

Comments
 (0)