feat(workflow): add inputPorts, outputPorts, glossaryTerms as WorkflowTriggerFields#28120
Conversation
…wTriggerFields - Add inputPorts, outputPorts, glossaryTerms to workflowTriggerFields.json enum - Unit tests in FilterEntityImplTest verifying all three fields pass the passesFieldBasedFilter check, including include/exclude config - Integration test scenario appended to test_CustomApprovalWorkflowForNewEntities: adds table as inputPort, removes it, adds as outputPort — each port change must produce an approval task proving the workflow trigger fires correctly
✅ TypeScript Types Auto-UpdatedThe generated TypeScript types have been automatically updated based on JSON schema changes in this PR. |
There was a problem hiding this comment.
Pull request overview
This PR adds new workflow trigger fields so data product port changes and glossary term-related changes can participate in event-based workflow filtering.
Changes:
- Adds
glossaryTerms,inputPorts, andoutputPortstoWorkflowTriggerFields. - Adds unit coverage for recognizing the new trigger fields and include/exclude behavior for port fields.
- Appends an integration scenario for data product input/output port workflow triggers.
Reviewed changes
Copilot reviewed 3 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json |
Extends the workflow trigger field enum. |
openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImplTest.java |
Adds unit tests for the new trigger fields. |
openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java |
Adds port-trigger workflow validation to an existing approval workflow test. |
| "glossaryTerms", | ||
| "inputPorts", | ||
| "outputPorts" |
| // Step 11: Verify inputPorts and outputPorts changes trigger the approval workflow | ||
| // "inputPorts" and "outputPorts" are WorkflowTriggerFields, so bulk port operations | ||
| // must produce a ChangeEvent with the correct field name that the workflow filter recognises. | ||
| LOG.info("Step 11: Verifying inputPorts/outputPorts changes trigger the approval workflow"); |
🟡 Playwright Results — all passed (9 flaky)✅ 4140 passed · ❌ 0 failed · 🟡 9 flaky · ⏭️ 89 skipped
🟡 9 flaky test(s) (passed on retry)
How to debug locally# Download playwright-test-results-<shard> artifact and unzip
npx playwright show-trace path/to/trace.zip # view trace |
…rnance workflows - Add inputPorts, outputPorts, glossaryTerms to WorkflowTriggerFields enum - Fix executeBulkPortsOperation to update entity changeDescription so FilterEntityImpl reads the correct changed fields - Add governance-bot impersonation to applyPatchEntityFieldAction to prevent entityStatus updates from triggering spurious workflow signals - Pass caller username through port endpoints to fix self-approval check incorrectly removing the reviewer when entity updatedBy was set to the reviewer from a prior task resolution
| ChangeEvent changeEvent = | ||
| getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion()); | ||
| getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion(), updatedBy); | ||
| Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); | ||
| DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*")); | ||
| entityToUpdate.setChangeDescription(change); | ||
| entityToUpdate.setUpdatedBy(updatedBy); | ||
| storeEntity(entityToUpdate, true); | ||
| invalidate(entityToUpdate); |
There was a problem hiding this comment.
⚠️ Bug: ChangeEvent embeds stale entity without updated version/updatedBy
In executeBulkPortsOperation, the ChangeEvent at line 397 is constructed with the original dataProduct object which has not been refreshed after relationships were added. Its updatedBy still reflects the previous updater (not the current user), and its version is never incremented. The fresh entity fetched at line 399 gets setUpdatedBy(updatedBy) and setChangeDescription(change) but is only used for storeEntity — the ChangeEvent sent to subscribers contains the stale snapshot.
This means:
- Workflow consumers (like
WorkflowEventConsumer) see an entity snapshot with the wrongupdatedByinside the event payload. - The entity version is never incremented by
storeEntity, so consecutive port operations produce change events with identicalpreviousVersion, which may confuse version-tracking logic.
Consider either (a) re-fetching the entity before constructing the ChangeEvent and using that for both, or (b) incrementing the version explicitly (similar to how bulkAssetsOperation in the same file delegates to the updater path).
Re-fetch entity before building ChangeEvent, increment version, and use the fresh entity in the event payload so subscribers see consistent state.:
if (!success.isEmpty()) {
List<EntityReference> successAssets =
success.stream().map(r -> (EntityReference) r.getRequest()).collect(Collectors.toList());
ChangeDescription change =
addBulkAddRemoveChangeDescription(dataProduct.getVersion(), isAdd, successAssets, null);
if (!change.getFieldsAdded().isEmpty()) {
change.getFieldsAdded().get(0).setName(fieldName);
}
if (!change.getFieldsDeleted().isEmpty()) {
change.getFieldsDeleted().get(0).setName(fieldName);
}
// Fetch fresh entity, update metadata, then build ChangeEvent from it
DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*"));
entityToUpdate.setChangeDescription(change);
entityToUpdate.setUpdatedBy(updatedBy);
entityToUpdate.setVersion(EntityUtil.nextVersion(entityToUpdate.getVersion()));
storeEntity(entityToUpdate, true);
invalidate(entityToUpdate);
ChangeEvent changeEvent =
getChangeEvent(entityToUpdate, change, DATA_PRODUCT, dataProduct.getVersion(), updatedBy);
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
}
Was this helpful? React with 👍 / 👎
| .executeForString( | ||
| HttpMethod.POST, BASE_PATH, portWorkflow, RequestOptions.builder().build()); | ||
| LOG.debug("Created port trigger workflow: {}", portWorkflowName); | ||
| waitForWorkflowDeployment(client, portWorkflowName); |
| await() | ||
| .atMost(Duration.ofMinutes(2)) | ||
| .pollInterval(Duration.ofSeconds(2)) | ||
| .until( | ||
| () -> | ||
| !listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()) | ||
| .getData() | ||
| .isEmpty()); | ||
| Task inputPortTask = | ||
| listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()).getData().get(0); | ||
| reviewerClient.tasks().resolve(inputPortTask.getId().toString(), resolveApproved); | ||
| LOG.info("inputPorts add triggered and resolved approval task"); | ||
|
|
||
| // outputPorts add — outputTable is a data product asset, satisfying the prerequisite | ||
| // Using a different table so it doesn't conflict with inputTable already in inputPorts | ||
| org.openmetadata.schema.type.api.BulkAssets outputPortAssets = | ||
| new org.openmetadata.schema.type.api.BulkAssets() | ||
| .withAssets(List.of(outputTable.getEntityReference())); | ||
| client.dataProducts().bulkAddOutputPorts(dataProduct.getFullyQualifiedName(), outputPortAssets); | ||
| LOG.debug("Added outputTable as outputPort"); | ||
| await() | ||
| .atMost(Duration.ofMinutes(2)) | ||
| .pollInterval(Duration.ofSeconds(2)) | ||
| .until( | ||
| () -> | ||
| !listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()) | ||
| .getData() | ||
| .isEmpty()); | ||
| Task outputPortTask = | ||
| listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()).getData().get(0); | ||
| reviewerClient.tasks().resolve(outputPortTask.getId().toString(), resolveApproved); | ||
| LOG.info("outputPorts add triggered and resolved approval task"); | ||
|
|
||
| try { | ||
| WorkflowDefinition wd = client.workflowDefinitions().getByName(portWorkflowName, null); | ||
| client.workflowDefinitions().delete(wd.getId()); | ||
| LOG.debug("Deleted port trigger workflow"); | ||
| } catch (Exception e) { | ||
| LOG.warn("Error deleting port trigger workflow: {}", e.getMessage()); | ||
| } | ||
|
|
||
| LOG.info("test_PortChangesOnDataProductTriggerWorkflow completed successfully"); |
| org.openmetadata.schema.type.api.BulkAssets outputPortAssets = | ||
| new org.openmetadata.schema.type.api.BulkAssets() | ||
| .withAssets(List.of(outputTable.getEntityReference())); | ||
| client.dataProducts().bulkAddOutputPorts(dataProduct.getFullyQualifiedName(), outputPortAssets); |
| DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*")); | ||
| entityToUpdate.setChangeDescription(change); | ||
| entityToUpdate.setUpdatedBy(updatedBy); | ||
| storeEntity(entityToUpdate, true); |
| getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion(), updatedBy); | ||
| Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); | ||
| DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*")); | ||
| entityToUpdate.setChangeDescription(change); |
| "providesTo", | ||
| "lifecycleStage" | ||
| "lifecycleStage", | ||
| "glossaryTerms", |
Code Review
|
| Compact |
|
Was this helpful? React with 👍 / 👎 | Gitar
|
|
|
Failed to cherry-pick changes to the 1.13 branch. |
|
Failed to cherry-pick changes to the 1.12.9 branch. |



Summary
inputPorts,outputPorts, andglossaryTermsto theWorkflowTriggerFieldsenum inworkflowTriggerFields.jsonso bulk port operations on Data Products correctly trigger event-based workflowsFilterEntityImpl.passesFieldBasedFiltersilently dropped port change events before this fix because neither field was a recognizedWorkflowTriggerFieldFilterEntityImplTestcovering recognition, include, and exclude filter behaviour for all three new fieldstest_CustomApprovalWorkflowForNewEntities: adds a table asinputPort, removes it, then adds asoutputPort— each operation must produce an approval task, proving the workflow trigger fires end-to-endFixes https://github.com/open-metadata/openmetadata-collate/issues/4092
Test plan
FilterEntityImplTest— 17 tests pass (run locally)WorkflowDefinitionResourceIT#test_CustomApprovalWorkflowForNewEntities— new Step 11 validatesinputPorts/outputPortsport changes trigger the approval workflowSummary by Gitar
NotFoundCache) entry inEntityRepository.javacleanup to prevent race conditions during entity deletion.markEntityNotFoundto short-circuit L1/Redis lookups after invalidation.dryRunin bulk asset operations withinDataProductRepository.javaandEntityRepository.java.dryRunis enabled.TaskWorkflowHandler.java.resolveResolutionTypeto prevent premature task closure onApprovedorGrantedstates.This will update automatically on new commits.