Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a91262d
fix(rdf): converge Fuseki state on weekly rebuilds and isolate API la…
harshach May 14, 2026
9b6e112
fix(rdf): address PR review — preserve relationships, scope DELETEs, …
harshach May 14, 2026
26f743b
test(rdf): expect per-source clear on batches whose relationships are…
harshach May 15, 2026
10fe181
fix(rdf): address remaining PR review nits
harshach May 15, 2026
1676feb
fix(rdf): surface cleanup failures, sync fallback predicates, time-bo…
harshach May 15, 2026
4567706
fix(rdf): qualify EntityRelationship in test to fix compile
harshach May 15, 2026
2fc1147
Merge branch 'main' into harshach/rdf-fuseki-duplicate-relations
harshach May 15, 2026
ef9bb30
fix(rdf): drop QueryExecution.setTimeout — removed in Jena 5 used by …
harshach May 15, 2026
7a1fae7
fix(rdf): align ontology-loaded check, predicate URIs, and CURIE fall…
harshach May 15, 2026
e2575d5
fix(rdf): schema default + migration force entities=[all] for safe fu…
harshach May 15, 2026
22d5825
fix(rdf): scope storeEntity DELETE to translator-managed predicates
harshach May 15, 2026
857c097
fix(rdf): scope reconciliation DELETE to relationship-hook predicates…
harshach May 15, 2026
63d9864
fix(rdf): scope bulk reconciliation to batch entities, not all relati…
harshach May 15, 2026
66884e2
fix(rdf): time-bound HTTP request bodies via CompletableFuture wrapper
harshach May 15, 2026
0c4345b
docs(rdf): document RdfUpdater async-ordering trade-off in submitAsync
harshach May 15, 2026
4242c15
fix(rdf): atomic clear+insert, broader fallback predicate set, close …
harshach May 15, 2026
03c5d4f
fix(rdf): make buildPredicateInList public so JenaFusekiStorage can u…
harshach May 15, 2026
9eeca99
fix(rdf): normalise sourcesToReconcile to empty-set to prevent NPE in…
harshach May 15, 2026
28fb585
test(rdf): update RdfIndexAppTest verifications for the new bulkAddRe…
harshach May 15, 2026
53a83b7
fix(rdf): four follow-up findings from Copilot review 4299978111
harshach May 15, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1 +1,29 @@
-- Post data migration script for Task workflow cutover - OpenMetadata 2.0.1

-- RdfIndexApp: switch to weekly Saturday cron and full-rebuild every run.
-- Previous defaults (daily, incremental) were producing unbounded triple growth
-- because relationship-removal paths weren't fully reconciled. With per-run
-- CLEAR ALL the dataset always converges to MySQL state; weekly cadence keeps
-- per-run cost from saturating Fuseki.
--
-- Also rewrite `entities` to `["all"]`. Pre-upgrade, an operator could have
-- narrowed RDF indexing to a subset of entity types; the new recreateIndex=true
-- semantics issues a CLEAR ALL before indexing, which would otherwise wipe
-- triples for entity types still in MySQL but missing from the subset list.
-- Forcing the subset list back to `["all"]` ensures the post-CLEAR-ALL run
-- repopulates the graph fully; operators can re-narrow after the migration if
-- they need partial indexing.
UPDATE installed_apps
SET json = JSON_SET(
JSON_SET(
json,
'$.appConfiguration.recreateIndex', CAST('true' AS JSON),
'$.appSchedule.cronExpression', '0 0 * * 6'
),
'$.appConfiguration.entities', JSON_ARRAY('all')
)
WHERE name = 'RdfIndexApp';

UPDATE apps_marketplace
SET json = JSON_SET(json, '$.appConfiguration.recreateIndex', CAST('true' AS JSON))
WHERE name = 'RdfIndexApp';
Original file line number Diff line number Diff line change
@@ -1 +1,30 @@
-- Post data migration script for Task workflow cutover - OpenMetadata 2.0.1

-- RdfIndexApp: switch to weekly Saturday cron and full-rebuild every run.
-- Previous defaults (daily, incremental) were producing unbounded triple growth
-- because relationship-removal paths weren't fully reconciled. With per-run
-- CLEAR ALL the dataset always converges to MySQL state; weekly cadence keeps
-- per-run cost from saturating Fuseki.
--
-- Also rewrite `entities` to `["all"]`. Pre-upgrade, an operator could have
-- narrowed RDF indexing to a subset of entity types; the new recreateIndex=true
-- semantics issues a CLEAR ALL before indexing, which would otherwise wipe
-- triples for entity types still in MySQL but missing from the subset list.
-- Forcing the subset list back to `["all"]` ensures the post-CLEAR-ALL run
-- repopulates the graph fully; operators can re-narrow after the migration if
-- they need partial indexing.
UPDATE installed_apps
SET json = jsonb_set(
jsonb_set(
jsonb_set(json::jsonb, '{appConfiguration,recreateIndex}', 'true'),
'{appSchedule,cronExpression}',
'"0 0 * * 6"'
),
'{appConfiguration,entities}',
'["all"]'::jsonb
)
WHERE name = 'RdfIndexApp';

UPDATE apps_marketplace
SET json = jsonb_set(json::jsonb, '{appConfiguration,recreateIndex}', 'true')
WHERE name = 'RdfIndexApp';
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.openmetadata.service.apps.bundles.rdf;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.UUID;
Expand Down Expand Up @@ -208,18 +209,38 @@ public RelationshipProcessingResult processBatchRelationships(
}
}

if (!allRelationships.isEmpty()) {
try {
rdfRepository.bulkAddRelationships(allRelationships);
} catch (Exception e) {
LOG.error(
"Failed to bulk add {} relationships for entity type {}",
allRelationships.size(),
entityType,
e);
failures += allRelationships.size();
lastError = describeBulkError(entityType, "bulkRelationships", e);
}
// Reconcile EVERY entity in the batch — not just those with current
// outgoing relationships. An entity whose last outgoing relationship was
// removed in MySQL contributes zero RelationshipData entries to
// allRelationships; we pass it explicitly via batchSources so
// bulkAddRelationships' per-source DELETE still fires for it.
//
// The clear+insert run in a SINGLE SPARQL update inside
// JenaFusekiStorage.bulkStoreRelationships, so the operation is atomic
// at the Fuseki side — a transient error can't leave the graph wiped
// without the replacement edges in place. (Previously the clear ran in
// a separate call to clearOutgoingEntityRelationships; if the
// subsequent bulkAdd failed, batch sources lost their relationships
// until the next weekly recreate-index.)
Set<RdfRepository.EntitySourceRef> batchSources = new HashSet<>();
for (EntityInterface entity : entities) {
batchSources.add(new RdfRepository.EntitySourceRef(entityType, entity.getId()));
}
try {
// Pass batchSources so bulkStoreRelationships only reconciles edges
// for entities IN this batch. Incoming-lineage rows can carry source
// IDs that are outside the batch (the `from` of an UPSTREAM edge
// where this batch's entity is the `to`); reconciling those would
// wipe the outside-batch entity's unrelated outgoing edges.
rdfRepository.bulkAddRelationships(allRelationships, batchSources);
} catch (Exception e) {
Comment thread
harshach marked this conversation as resolved.
LOG.error(
"Failed to bulk add {} relationships for entity type {}",
allRelationships.size(),
entityType,
e);
failures += allRelationships.size();
lastError = describeBulkError(entityType, "bulkRelationships", e);
}
} catch (Exception e) {
LOG.error("Failed to process batch relationships for entity type {}", entityType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,22 @@ private void initializeJob(JobExecutionContext jobExecutionContext) {
rdfIndexStats.set(initializeTotalRecords(jobData.getEntities()));
jobData.setStats(rdfIndexStats.get());

// bulkAddGlossaryTermRelations has no per-batch DELETE side, so stale
// glossary-term relations would accumulate forever across reindex runs.
// When recreateIndex=true clearAll() already wipes everything, so we
// only need this targeted cleanup on incremental runs.
//
// Let the failure propagate: clearAllGlossaryTermRelations rethrows on
// failure precisely so the indexer can fail loudly instead of silently
// marking a job successful while the graph still has stale predicates.
// The outer try/catch in execute() will set the run status to FAILED.
if (!Boolean.TRUE.equals(jobData.getRecreateIndex())
&& jobData.getEntities() != null
&& jobData.getEntities().contains(Entity.GLOSSARY_TERM)) {
LOG.info("Clearing existing glossary term relations before re-indexing");
rdfRepository.clearAllGlossaryTermRelations();
}

if (Boolean.TRUE.equals(jobData.getUseDistributedIndexing())) {
sendUpdates(jobExecutionContext, true);
return;
Expand Down Expand Up @@ -242,6 +258,10 @@ private void clearRdfData() {
try {
rdfRepository.clearAll();
LOG.info("Cleared all RDF data");
// CLEAR ALL wipes the ontology and shapes graphs as well; reload them
// before indexing starts so SPARQL queries that depend on the ontology
// (inference, federated, etc.) work after the wipe.
rdfRepository.reloadOntologies();
} catch (Exception e) {
LOG.error("Failed to clear RDF data", e);
throw new RuntimeException("Failed to clear RDF data", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ public boolean areOntologiesLoaded() {
String checkQuery = "ASK { GRAPH <" + ONTOLOGY_GRAPH + "> { ?s ?p ?o } }";
String result =
rdfRepository.executeSparqlQuery(checkQuery, "application/sparql-results+json");
return result.contains("\"boolean\" : true");
// JenaFusekiStorage formats ASK results as `{"head": {}, "boolean": true}`
// (no space before the colon), so a literal-substring check that includes
// a space would never match. Normalise whitespace and match either form.
String normalised = result.replaceAll("\\s+", "");
return normalised.contains("\"boolean\":true");
} catch (Exception e) {
LOG.error("Failed to check if ontologies are loaded", e);
return false;
Expand Down
Loading
Loading