fix(rdf): converge Fuseki state on weekly rebuilds and isolate API latency#28117
fix(rdf): converge Fuseki state on weekly rebuilds and isolate API latency#28117harshach wants to merge 2 commits into
Conversation
…tency
RdfIndexApp ran daily and never reconciled removed relationships, so triples
grew unboundedly across runs. When Fuseki crash-looped on the resulting disk
pressure, every entity-write hook blocked synchronously on the unreachable
server (no HTTP connect timeout, 3-retry loop on ConnectException), saturating
the bounded AsyncService pool and pushing login to ~45s.
Storage-side fixes (stop growth):
- Drop the extractRelationshipTriples "preserve forward" path in
RdfRepository.createOrUpdate; the translator is the source of truth and the
surrounding orchestration already rewrites the current relationship set.
This also removes a wasted CONSTRUCT round-trip per entity write.
- bulkStoreRelationships now does per-source-entity DELETE WHERE with a
predicate-exclusion FILTER for lineage edges, so relationships that no
longer exist actually leave the store.
- Wire RdfRepository.clearAllGlossaryTermRelations() into RdfIndexApp's
initializeJob (the method existed but had no callers).
- Flip recreateIndex default to true and move the cron to Saturday midnight
("0 0 * * 6"). Add reloadOntologies() so CLEAR ALL doesn't leave the
ontology graph empty before indexing starts.
- Include a 2.0.1 post-data migration that updates existing installed_apps
rows; the app loader is insert-only on upgrade.
Connectivity / concurrency fixes (isolate API latency from Fuseki health):
- Add 2s connectTimeout to every JenaFusekiStorage HttpClient and fast-fail
on ConnectException / ClosedChannelException / HttpConnectTimeoutException
instead of retrying. Introduce a 5-failure/30s circuit breaker.
- Route all RdfUpdater mutators through AsyncService.execute with a bounded
pendingWrites gate (cap 1000, drop-on-overflow with logged warning) so a
dead Fuseki can no longer block request threads or starve the AsyncService
pool.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
| .append("entity/\") && ?p != <https://open-metadata.org/ontology/UPSTREAM> && ?p") | ||
| .append( | ||
| " != <http://www.w3.org/ns/prov#wasDerivedFrom> && ?p != <https://open-metadata.org/ontology/hasLineageDetails>) } }"); |
There was a problem hiding this comment.
⚠️ Bug: Hardcoded predicate URIs in DELETE filter ignore configurable baseUri
In bulkStoreRelationships, the SPARQL DELETE WHERE filter hardcodes https://open-metadata.org/ontology/UPSTREAM and https://open-metadata.org/ontology/hasLineageDetails to exclude lineage predicates from deletion. However, the INSERT uses the configurable baseUri field for the ontology prefix (PREFIX om: <baseUri + "ontology/">). If baseUri is configured to anything other than https://open-metadata.org/, the predicates stored in the graph won't match the hardcoded exclusion URIs, causing lineage edges to be incorrectly deleted on every reconciliation run.
The storeRelationship method (single-relationship path) also hardcodes the same ontology URI pattern, so this is consistent within the file — but both paths are broken for non-default baseUri configurations.
Use baseUri for ontology predicates in the DELETE filter to match the INSERT path, keeping only the W3C prov URI hardcoded (since it's a well-known external vocabulary).:
deleteUpdate
.append("DELETE { GRAPH <")
.append(KNOWLEDGE_GRAPH)
.append("> { <")
.append(sourceUri)
.append("> ?p ?o } } WHERE { GRAPH <")
.append(KNOWLEDGE_GRAPH)
.append("> { <")
.append(sourceUri)
.append("> ?p ?o . FILTER(isIRI(?o) && STRSTARTS(STR(?o), "")
.append(baseUri)
.append("entity/") && ?p != <")
.append(baseUri)
.append("ontology/UPSTREAM> && ?p")
.append(" != <http://www.w3.org/ns/prov#wasDerivedFrom> && ?p != <")
.append(baseUri)
.append("ontology/hasLineageDetails>) } }");
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
There was a problem hiding this comment.
Pull request overview
This PR aims to make RDF/Fuseki indexing converge more reliably and reduce platform latency impact when Fuseki is unhealthy.
Changes:
- Changes RDF app defaults to weekly recreate-index runs and adds migrations for existing app rows.
- Adds Fuseki connection timeout/circuit-breaker handling and async RDF updater submission.
- Adjusts RDF reindex cleanup paths, ontology reload after clear, and related unit tests.
Reviewed changes
Copilot reviewed 9 out of 9 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
openmetadata-service/src/test/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexAppTest.java |
Adds coverage for ontology reload and glossary relation cleanup behavior. |
openmetadata-service/src/main/resources/json/data/appMarketPlaceDefinition/RdfIndexApp.json |
Updates marketplace default recreateIndex to true. |
openmetadata-service/src/main/resources/json/data/app/RdfIndexApp.json |
Updates app default recreateIndex and weekly cron schedule. |
openmetadata-service/src/main/java/org/openmetadata/service/rdf/storage/JenaFusekiStorage.java |
Adds timeout/circuit-breaker state and relationship reconciliation changes. |
openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfUpdater.java |
Moves RDF mutating hooks to bounded async submission. |
openmetadata-service/src/main/java/org/openmetadata/service/rdf/RdfRepository.java |
Adds ontology reload and removes relationship preservation during entity writes. |
openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/RdfIndexApp.java |
Wires glossary relation cleanup and ontology reload after full RDF clear. |
bootstrap/sql/migrations/native/2.0.1/postgres/postDataMigrationSQLScript.sql |
Migrates existing PostgreSQL app rows to new RDF app defaults. |
bootstrap/sql/migrations/native/2.0.1/mysql/postDataMigrationSQLScript.sql |
Migrates existing MySQL app rows to new RDF app defaults. |
| Model rdfModel = translator.toRdf(entity); | ||
|
|
||
| // Preserve existing relationship triples before updating | ||
| // This prevents postCreate() from overwriting relationships added by storeRelationships() | ||
| Model existingModel = storageService.getEntity(entityType, entity.getId()); | ||
| if (existingModel != null && !existingModel.isEmpty()) { | ||
| String entityUri = | ||
| config.getBaseUri().toString() + "entity/" + entityType + "/" + entity.getId(); | ||
| // Extract and preserve relationship triples (where entity is subject and object is a URI) | ||
| Model relationshipTriples = extractRelationshipTriples(existingModel, entityUri); | ||
| if (!relationshipTriples.isEmpty()) { | ||
| rdfModel.add(relationshipTriples); | ||
| LOG.debug( | ||
| "Preserved {} relationship triples for entity {}", | ||
| relationshipTriples.size(), | ||
| entity.getId()); | ||
| } | ||
| } | ||
|
|
||
| storageService.storeEntity(entityType, entity.getId(), rdfModel); |
| submitAsync( | ||
| "updateEntity " + entity.getId(), | ||
| () -> { | ||
| Timer.Sample sample = RequestLatencyContext.startRdfOperation(); | ||
| try { | ||
| rdfRepository.createOrUpdate(entity); |
| submitAsync( | ||
| "removeRelationship", | ||
| () -> { | ||
| Timer.Sample sample = RequestLatencyContext.startRdfOperation(); | ||
| try { | ||
| rdfRepository.removeRelationship(relationship); | ||
| } catch (Exception e) { |
| @@ -371,32 +458,46 @@ public void bulkStoreRelationships(List<RelationshipData> relationships) { | |||
| if (relationships.isEmpty()) { | |||
| return; | |||
| try { | ||
| UpdateRequest deleteRequest = UpdateFactory.create(deleteUpdate.toString()); | ||
| connection.update(deleteRequest); | ||
| } catch (Exception e) { | ||
| if (isConnectError(e)) { | ||
| recordFailure(); | ||
| throw new RuntimeException( | ||
| "Failed to bulk store relationships in RDF (Fuseki unreachable)", e); | ||
| } | ||
| // Tolerate non-connect delete errors — the source entities may not | ||
| // have any prior outgoing edges yet (first-time indexing). | ||
| LOG.debug("Per-source delete completed (some sources may not have had prior edges)"); | ||
| } |
| // bulkAddGlossaryTermRelations has no per-batch DELETE side, so stale | ||
| // glossary-term relations would accumulate forever across reindex runs. | ||
| // When recreateIndex=true clearAll() already wipes everything, so we | ||
| // only need this targeted cleanup on incremental runs. | ||
| if (!Boolean.TRUE.equals(jobData.getRecreateIndex()) | ||
| && jobData.getEntities() != null | ||
| && jobData.getEntities().contains(Entity.GLOSSARY_TERM)) { | ||
| LOG.info("Clearing existing glossary term relations before re-indexing"); | ||
| try { | ||
| rdfRepository.clearAllGlossaryTermRelations(); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to clear glossary term relations; continuing with reindex", e); | ||
| } |
| java.net.http.HttpClient httpClient = | ||
| java.net.http.HttpClient.newBuilder() | ||
| .connectTimeout(CONNECT_TIMEOUT) |
| java.net.http.HttpClient httpClient = | ||
| java.net.http.HttpClient.newBuilder().connectTimeout(CONNECT_TIMEOUT).build(); | ||
| this.connection = | ||
| RDFConnectionFuseki.create().destination(endpoint).httpClient(httpClient).build(); |
| verify(mockRdfRepository).clearAll(); | ||
| // CLEAR ALL wipes ontology/shapes graphs; clearRdfData() must reload them | ||
| // so post-wipe SPARQL queries that rely on the ontology keep working. | ||
| verify(mockRdfRepository).reloadOntologies(); |
| try { | ||
| rdfRepository.clearAllGlossaryTermRelations(); | ||
| } catch (Exception e) { | ||
| LOG.warn("Failed to clear glossary term relations; continuing with reindex", e); | ||
| } |
…surface ontology failures PR #28117 review feedback. Addresses 13 findings across gitar-bot and Copilot: Storage correctness: - JenaFusekiStorage.storeEntity now keeps URI-valued triples (relationships) and only refreshes literal-valued triples. A metadata-only PATCH would otherwise wipe every inter-entity edge until the next weekly recreate-index, and async ordering between updateEntity and addRelationship could leave the graph missing edges (Copilot #1, #2). - RdfRepository.removeRelationship wraps the DELETE in the knowledge named graph and uses getRelationshipPredicate so the predicate URI matches what addRelationship actually wrote (e.g. UPSTREAM → prov:wasDerivedFrom). The previous bare DELETE in the default graph was a silent no-op (Copilot #3). - RdfBatchProcessor now calls a new RdfRepository.clearOutgoingEntityRelationships for every entity in the batch, not just those with current edges. An entity whose last outgoing relationship was removed in MySQL contributes zero RelationshipData entries, so bulkStoreRelationships' per-source DELETE never fired for it (Copilot #4). - bulkStoreRelationships no longer swallows non-connect DELETE errors — DELETE WHERE on a source with no edges is a no-op, so exceptions there are real failures (malformed SPARQL, auth, server errors) and should surface (Copilot #5). Visibility: - reloadOntologies() now checks areOntologiesLoaded() after load and throws if still empty. OntologyLoader.loadOntologies catches internally, so the old reloadOntologies always appeared to succeed (Copilot #6). - clearAllGlossaryTermRelations rethrows on failure instead of silently logging — the indexer's caller can now react to cleanup failures (Copilot #10). - clearAllGlossaryTermRelations pulls custom predicate URIs from GlossaryTermRelationSettings and includes them in the DELETE FILTER. The hardcoded list missed any custom predicates an admin configured (Copilot #7). Quality: - Set / LinkedHashSet imported instead of using java.util.* fully qualified in JenaFusekiStorage and RdfBatchProcessor (gitar-bot #2). - RdfIndexAppTest uses InOrder to assert clearAll → reloadOntologies ordering — a plain verify would have accepted a future change that reordered the calls (Copilot #9). - Documented the residual gap that HttpClient.connectTimeout only bounds TCP connect, not request bodies; circuit breaker + bounded pendingWrites contain the blast radius (Copilot #8). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Addressed all 13 review findings in Storage correctness:
Visibility:
Quality:
Documented gaps:
Treated as false positive:
|
| org.openmetadata.schema.configuration.GlossaryTermRelationSettings settings = | ||
| org.openmetadata.service.resources.settings.SettingsCache.getSetting( | ||
| org.openmetadata.schema.settings.SettingsType.GLOSSARY_TERM_RELATION_SETTINGS, |
There was a problem hiding this comment.
💡 Quality: Fully qualified class names in clearAllGlossaryTermRelations
Lines 2546-2548 use fully qualified names (org.openmetadata.schema.configuration.GlossaryTermRelationSettings, org.openmetadata.service.resources.settings.SettingsCache, org.openmetadata.schema.settings.SettingsType) instead of imports. Per project conventions, wildcard and fully-qualified names should be avoided — add proper imports at the top of the file.
Replace fully qualified names with imports for readability and consistency.:
// Add to imports at top of file:
import org.openmetadata.schema.configuration.GlossaryTermRelationSettings;
import org.openmetadata.schema.settings.SettingsType;
import org.openmetadata.service.resources.settings.SettingsCache;
// Then replace FQNs in method body with simple names:
GlossaryTermRelationSettings settings =
SettingsCache.getSetting(
SettingsType.GLOSSARY_TERM_RELATION_SETTINGS,
GlossaryTermRelationSettings.class);
- Apply fix
Check the box to apply the fix or reply for a change | Was this helpful? React with 👍 / 👎
| private static String expandPredicateCurie(String uri) { | ||
| if (uri == null || uri.isEmpty()) { | ||
| return "https://open-metadata.org/ontology/relatedTo"; | ||
| } |
There was a problem hiding this comment.
💡 Edge Case: expandPredicateCurie silently defaults null/empty to relatedTo
expandPredicateCurie (line 2709-2710) returns "https://open-metadata.org/ontology/relatedTo" for null or empty input. In clearAllGlossaryTermRelations, this means a misconfigured relation type with a null rdfPredicate would cause the cleanup to target relatedTo triples even if the configured type never wrote them — or worse, skip cleaning the actual custom predicate. Since the null case is already guarded by the if (rdfPredicate != null) check at line 2553, this default is unreachable in current code but could silently mask bugs if called from elsewhere.
Was this helpful? React with 👍 / 👎
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
🟡 Playwright Results — all passed (15 flaky)✅ 4055 passed · ❌ 0 failed · 🟡 15 flaky · ⏭️ 103 skipped
🟡 15 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |



Describe your changes:
RDF Knowledge Graph indexing was duplicating triples and accumulating disk + memory in Fuseki on every run; when Fuseki crash-looped, every entity-write hook blocked synchronously on the unreachable server (no HTTP timeout, 3-retry loop), saturating the bounded
AsyncServicepool and pushing login latency to ~45 s. The reindex now usesrecreateIndex=trueon a weekly Saturday cadence, every reconciliation path actually deletes removed relationships, and the Fuseki client has a 2 s connect timeout + circuit breaker so a dead Fuseki can no longer block request threads.Type of change:
High-level design:
Storage-side (stop growth):
RdfRepository.createOrUpdateno longer preserves stale relationship triples — the translator is the source of truth and surrounding orchestration rewrites the current set. Also removes a wasted CONSTRUCT round-trip per write.bulkStoreRelationshipsdoes per-source-entityDELETE WHEREwith a predicate-exclusion FILTER for lineage edges, so removed relationships actually leave the store.RdfRepository.clearAllGlossaryTermRelations()is now wired intoRdfIndexApp.initializeJob(the method existed but had no callers).recreateIndexdefault flipped totrue, cron moved to"0 0 * * 6"(Saturday midnight), andreloadOntologies()runs afterclearAll()so the ontology graph isn't left empty.2.0.1/{mysql,postgres}/postDataMigrationSQLScript.sqlto update existinginstalled_appsrows; the app loader is insert-only on upgrade.Connectivity / concurrency (isolate platform from Fuseki health):
JenaFusekiStorageHttpClients now useconnectTimeout=2s; onConnectException/ClosedChannelException/HttpConnectTimeoutExceptionwe fast-fail instead of retrying. A 5-failure/30 s circuit breaker short-circuits subsequent calls until Fuseki recovers (probed viatestConnectionwhich bypasses the breaker).RdfUpdatermutators now go throughAsyncService.execute(...)(the existing virtual-thread pool) with a boundedpendingWritesgate (cap 1000, drop-on-overflow with logged warning) so the request thread returns immediately and a dead Fuseki cannot starveAsyncServicepermits.Tests:
Unit tests
Extended
RdfIndexAppTest:recreateIndex=truetest now also verifiesreloadOntologies()is called afterclearAll().clearAllGlossaryTermRelations()is invoked whenglossaryTermis in the entity set ANDrecreateIndex=false.glossaryTermis absent.Backend integration tests
es.co.elastic.clients.*shading compile issue unrelated to this work that blocks the module build. Once that is fixed, the planned end-to-end tests (re-run indexer twice → triple count unchanged; remove an edge in MySQL → triple disappears in Fuseki; point RDF endpoint at a closed port → write returns <500ms;recreateIndex=true→ ontology graph non-empty after run) should be added.Manual testing performed
Entity.GLOSSARY_TERMconstant, AsyncService API).git stashbaseline to confirm pre-existing compile errors are unrelated to these changes.UI screen recording / screenshots:
Not applicable.
Checklist:
🤖 Generated with Claude Code