Skip to content

Commit f84233a

Browse files
fix(07.1): use savepoints for KG entity/relationship inserts and rollback on failure
- Wrap each entity/relationship insert in db.begin_nested() savepoint to prevent PendingRollbackError cascade when a single insert fails - Add db.rollback() in pipeline KG Builder except block so db.commit() doesn't cascade-fail and kill the entire pipeline - Guard compute_entity_degrees behind entities_written > 0 check
1 parent 01b0428 commit f84233a

File tree

2 files changed

+54
-41
lines changed

2 files changed

+54
-41
lines changed

backend/app/agents/kg_builder.py

Lines changed: 51 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -248,26 +248,29 @@ async def write_kg_from_llm_output(
248248
# Use first domain for backward-compat 'domain' column
249249
primary_domain = entity.domains[0] if entity.domains else "unknown"
250250

251-
kg_entity = KgEntity(
252-
case_id=case_uuid,
253-
name=entity.name,
254-
name_normalized=normalize_entity_name(entity.name),
255-
entity_type=entity.entity_type,
256-
domain=primary_domain,
257-
confidence=entity.confidence,
258-
properties=properties_dict,
259-
context=entity.description_detailed,
260-
aliases=entity.aliases if entity.aliases else None,
261-
description_brief=entity.description_brief,
262-
description_detailed=entity.description_detailed,
263-
domains=entity.domains if entity.domains else None,
264-
source_finding_ids=(
265-
entity.source_finding_ids if entity.source_finding_ids else None
266-
),
267-
source_execution_id=execution_id,
268-
)
269-
db.add(kg_entity)
270-
await db.flush()
251+
# Use a savepoint so a single malformed entity doesn't poison
252+
# the entire session transaction (PendingRollbackError cascade).
253+
async with db.begin_nested():
254+
kg_entity = KgEntity(
255+
case_id=case_uuid,
256+
name=entity.name,
257+
name_normalized=normalize_entity_name(entity.name),
258+
entity_type=entity.entity_type,
259+
domain=primary_domain,
260+
confidence=entity.confidence,
261+
properties=properties_dict,
262+
context=entity.description_detailed,
263+
aliases=entity.aliases if entity.aliases else None,
264+
description_brief=entity.description_brief,
265+
description_detailed=entity.description_detailed,
266+
domains=entity.domains if entity.domains else None,
267+
source_finding_ids=(
268+
entity.source_finding_ids if entity.source_finding_ids else None
269+
),
270+
source_execution_id=execution_id,
271+
)
272+
db.add(kg_entity)
273+
await db.flush()
271274

272275
llm_id_to_db_id[entity.id] = kg_entity.id
273276
entities_written += 1
@@ -302,23 +305,31 @@ async def write_kg_from_llm_output(
302305

303306
corroboration = len(rel.source_finding_ids) if rel.source_finding_ids else 1
304307

305-
kg_rel = KgRelationship(
306-
case_id=case_uuid,
307-
source_entity_id=source_db_id,
308-
target_entity_id=target_db_id,
309-
relationship_type=rel.relationship_type,
310-
label=rel.label,
311-
strength=rel.strength,
312-
evidence_excerpt=rel.evidence_excerpt if rel.evidence_excerpt else None,
313-
source_finding_ids=(
314-
rel.source_finding_ids if rel.source_finding_ids else None
315-
),
316-
temporal_context=rel.temporal_context if rel.temporal_context else None,
317-
confidence=rel.confidence,
318-
corroboration_count=corroboration,
319-
source_execution_id=execution_id,
320-
)
321-
db.add(kg_rel)
308+
# Savepoint per relationship for the same reason as entities
309+
async with db.begin_nested():
310+
kg_rel = KgRelationship(
311+
case_id=case_uuid,
312+
source_entity_id=source_db_id,
313+
target_entity_id=target_db_id,
314+
relationship_type=rel.relationship_type,
315+
label=rel.label,
316+
strength=rel.strength,
317+
evidence_excerpt=(
318+
rel.evidence_excerpt if rel.evidence_excerpt else None
319+
),
320+
source_finding_ids=(
321+
rel.source_finding_ids if rel.source_finding_ids else None
322+
),
323+
temporal_context=(
324+
rel.temporal_context if rel.temporal_context else None
325+
),
326+
confidence=rel.confidence,
327+
corroboration_count=corroboration,
328+
source_execution_id=execution_id,
329+
)
330+
db.add(kg_rel)
331+
await db.flush()
332+
322333
relationships_written += 1
323334
except Exception:
324335
logger.warning(
@@ -331,11 +342,10 @@ async def write_kg_from_llm_output(
331342
exc_info=True,
332343
)
333344

334-
await db.flush()
335-
336345
# Compute entity degrees after all relationships are written
337-
await compute_entity_degrees(case_uuid, db)
338-
await db.flush()
346+
if entities_written > 0:
347+
await compute_entity_degrees(case_uuid, db)
348+
await db.flush()
339349

340350
logger.info(
341351
"Wrote curated KG data for case=%s: %d entities, %d relationships "

backend/app/services/pipeline.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1022,6 +1022,9 @@ async def run_analysis_workflow(
10221022
logger.exception("KG Builder failed for case=%s: %s", case_id, exc)
10231023
kg_entities_created = 0
10241024
kg_relationships_created = 0
1025+
# Clear poisoned session state so db.commit() below doesn't
1026+
# cascade-fail with PendingRollbackError and kill the pipeline.
1027+
await db.rollback()
10251028
await emit_agent_error(
10261029
case_id=case_id,
10271030
agent_type="kg_builder",

0 commit comments

Comments
 (0)