Skip to content

Commit b76813d

Browse files
authored
fix(DATAGO-126491): SAM Gateway error when PostgreSQL SSL connection is unexpectedly closed (#1088)
* fix: filter whitespace-only LLM responses * fix: do parts filtering in-place * fix: Scan for any whitespace-only text parts before creating any objects * fix: SSL errors on postgres * fix: use short-lived db sessions to prevent SSL timeout errors * fix: address review feedback * fix: _get_task_info() was too aggressive in raising 503 errors
1 parent 6216193 commit b76813d

File tree

2 files changed

+310
-93
lines changed

2 files changed

+310
-93
lines changed

src/solace_agent_mesh/gateway/http_sse/dependencies.py

Lines changed: 157 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,117 @@ def get_identity_service(
260260
return component.identity_service
261261

262262

263+
def _is_connection_error(exc: Exception, _depth: int = 0) -> bool:
264+
"""
265+
Check if an exception is a transient database connection error.
266+
267+
Compatible with PostgreSQL (psycopg2), SQLite (sqlite3), and MySQL.
268+
Uses a combination of:
269+
1. SQLAlchemy's connection_invalidated flag (most reliable)
270+
2. Exception type checking
271+
3. Error message pattern matching (fallback)
272+
273+
This multi-layered approach ensures robustness across different
274+
database backends and SQLAlchemy versions.
275+
276+
Args:
277+
exc: The exception to check
278+
_depth: Internal recursion depth counter (max 10 to prevent infinite loops)
279+
"""
280+
# Prevent infinite recursion from circular cause chains
281+
if _depth > 10:
282+
return False
283+
284+
# Method 1: Check SQLAlchemy's connection_invalidated flag (most reliable)
285+
# SQLAlchemy sets this flag on exceptions that indicate a disconnection
286+
if hasattr(exc, 'connection_invalidated') and exc.connection_invalidated:
287+
return True
288+
289+
# Method 2: Check exception class hierarchy
290+
exc_type_name = type(exc).__name__
291+
292+
# Check for SQLAlchemy's DisconnectionError (explicit disconnect indicator)
293+
if exc_type_name == 'DisconnectionError':
294+
return True
295+
296+
# Check for OperationalError or InterfaceError (can indicate connection issues)
297+
is_operational_or_interface = exc_type_name in ('OperationalError', 'InterfaceError')
298+
299+
# Method 3: Error message pattern matching
300+
error_str = str(exc).lower()
301+
connection_error_patterns = [
302+
# PostgreSQL / psycopg2 patterns
303+
"ssl connection has been closed unexpectedly",
304+
"connection reset by peer",
305+
"connection timed out",
306+
"server closed the connection unexpectedly",
307+
"could not connect to server",
308+
"connection refused",
309+
"network is unreachable",
310+
"terminating connection due to administrator command",
311+
"the connection is closed",
312+
# SQLite patterns (note: "database is locked" is a contention error, not connection)
313+
"disk i/o error",
314+
"unable to open database file",
315+
# MySQL patterns
316+
"lost connection to mysql server",
317+
"mysql server has gone away",
318+
# Generic patterns
319+
"connection was closed",
320+
"broken pipe",
321+
"connection unexpectedly closed",
322+
"connection already closed",
323+
]
324+
325+
has_connection_error_message = any(pattern in error_str for pattern in connection_error_patterns)
326+
327+
# Return True if it's an OperationalError/InterfaceError with a connection-related message
328+
if is_operational_or_interface and has_connection_error_message:
329+
return True
330+
331+
# Recursively check the cause chain for wrapped exceptions
332+
if exc.__cause__ is not None:
333+
return _is_connection_error(exc.__cause__, _depth + 1)
334+
335+
return False
336+
337+
338+
@contextmanager
339+
def short_lived_session():
340+
"""
341+
Context manager for short-lived database sessions.
342+
343+
Use this for database operations that should not hold a connection
344+
for extended periods, such as fetching data before a long-lived SSE stream.
345+
346+
The session is automatically closed after the context exits, even if an
347+
exception occurs. Rollback is attempted on exceptions before re-raising.
348+
349+
Yields:
350+
Session: A SQLAlchemy database session
351+
352+
Raises:
353+
The original exception if one occurs during database operations
354+
"""
355+
if SessionLocal is None:
356+
raise RuntimeError("Database not configured")
357+
358+
db = SessionLocal()
359+
try:
360+
yield db
361+
except Exception:
362+
try:
363+
db.rollback()
364+
except Exception:
365+
pass
366+
raise
367+
finally:
368+
try:
369+
db.close()
370+
except Exception:
371+
pass
372+
373+
263374
def get_db() -> Generator[Session, None, None]:
264375
if SessionLocal is None:
265376
raise HTTPException(
@@ -270,11 +381,30 @@ def get_db() -> Generator[Session, None, None]:
270381
try:
271382
yield db
272383
db.commit()
273-
except Exception:
274-
db.rollback()
384+
except Exception as e:
385+
# Always attempt rollback first
386+
try:
387+
db.rollback()
388+
except Exception as rollback_error:
389+
log.warning("Failed to rollback after error: %s", rollback_error)
390+
391+
# Check if this is a transient connection error
392+
if _is_connection_error(e):
393+
log.warning(
394+
"Database connection error during commit (connection may have been closed by server): %s",
395+
str(e)
396+
)
397+
# Re-raise as a service unavailable error for transient connection issues
398+
raise HTTPException(
399+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
400+
detail="Database connection temporarily unavailable. Please retry.",
401+
) from e
275402
raise
276403
finally:
277-
db.close()
404+
try:
405+
db.close()
406+
except Exception as close_error:
407+
log.warning("Failed to close database session: %s", close_error)
278408

279409

280410
def get_people_service(
@@ -562,11 +692,31 @@ def get_db_optional() -> Generator[Session | None, None, None]:
562692
try:
563693
yield db
564694
db.commit()
565-
except Exception:
566-
db.rollback()
695+
except Exception as e:
696+
# Always attempt rollback first
697+
try:
698+
db.rollback()
699+
except Exception as rollback_error:
700+
log.warning("Failed to rollback after error: %s", rollback_error)
701+
702+
# Check if this is a transient connection error
703+
if _is_connection_error(e):
704+
log.warning(
705+
"Database connection error during commit (connection may have been closed by server): %s",
706+
str(e)
707+
)
708+
# Re-raise as a service unavailable error for transient connection issues
709+
raise HTTPException(
710+
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
711+
detail="Database connection temporarily unavailable. Please retry.",
712+
) from e
567713
raise
568714
finally:
569-
db.close()
715+
try:
716+
db.close()
717+
except Exception as close_error:
718+
log.warning("Failed to close database session: %s", close_error)
719+
570720

571721
def get_project_service(
572722
component: "WebUIBackendComponent" = Depends(get_sac_component),
@@ -584,6 +734,7 @@ def get_project_service_optional(
584734
return None
585735
return ProjectService(component=component)
586736

737+
587738
def get_session_business_service_optional(
588739
component: "WebUIBackendComponent" = Depends(get_sac_component),
589740
) -> SessionService | None:

0 commit comments

Comments
 (0)