Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5991,6 +5991,215 @@ void test_AutoApprovalForEntitiesWithoutReviewers(TestNamespace ns)
}
}

@Test
@Order(43)
void test_PortChangesOnDataProductTriggerWorkflow(TestNamespace ns) throws Exception {
LOG.info("Starting test_PortChangesOnDataProductTriggerWorkflow");

OpenMetadataClient client = SdkClients.adminClient();
String suffix = String.valueOf(System.currentTimeMillis());

Domain domain;
try {
domain = client.domains().getByName("port_test_domain");
} catch (Exception e) {
domain =
client
.domains()
.create(
new CreateDomain()
.withName("port_test_domain")
.withDescription("Domain for port trigger tests")
.withDomainType(CreateDomain.DomainType.AGGREGATE));
}

CreateUser createReviewer =
new CreateUser()
.withName("port_rvwr_" + suffix)
.withEmail("port_rvwr_" + suffix + "@example.com")
.withDisplayName("Port Test Reviewer")
.withPassword("password123");
User reviewer = client.users().create(createReviewer);
EntityReference reviewerRef = reviewer.getEntityReference();
OpenMetadataClient reviewerClient =
SdkClients.createClient(reviewer.getName(), reviewer.getEmail(), new String[] {});

String portWorkflowName = "PortTriggerWf_" + suffix;

CreateDatabaseService createDbService =
new CreateDatabaseService()
.withName("port_dbs_" + suffix)
.withServiceType(CreateDatabaseService.DatabaseServiceType.Mysql)
.withConnection(
new org.openmetadata.schema.api.services.DatabaseConnection()
.withConfig(new MysqlConnection()))
.withDomains(List.of(domain.getFullyQualifiedName()));
DatabaseService dbService = client.databaseServices().create(createDbService);

CreateDatabase createDatabase =
new CreateDatabase()
.withName("port_db")
.withService(dbService.getFullyQualifiedName())
.withDomains(List.of(domain.getFullyQualifiedName()));
Database database = client.databases().create(createDatabase);

CreateDatabaseSchema createSchema =
new CreateDatabaseSchema()
.withName("port_sc")
.withDatabase(database.getFullyQualifiedName());
DatabaseSchema schema = client.databaseSchemas().create(createSchema);

CreateTable createTable =
new CreateTable()
.withName("port_in_table")
.withDatabaseSchema(schema.getFullyQualifiedName())
.withColumns(
List.of(
new Column().withName("id").withDataType(ColumnDataType.INT),
new Column().withName("name").withDataType(ColumnDataType.STRING)))
.withDomains(List.of(domain.getFullyQualifiedName()));
Table inputTable = client.tables().create(createTable);

CreateTable createTable2 =
new CreateTable()
.withName("port_out_table")
.withDatabaseSchema(schema.getFullyQualifiedName())
.withColumns(
List.of(
new Column().withName("id").withDataType(ColumnDataType.INT),
new Column().withName("value").withDataType(ColumnDataType.STRING)))
.withDomains(List.of(domain.getFullyQualifiedName()));
Table outputTable = client.tables().create(createTable2);
LOG.debug("Created tables: {}, {}", inputTable.getName(), outputTable.getName());

org.openmetadata.schema.api.domains.CreateDataProduct createDataProduct =
new org.openmetadata.schema.api.domains.CreateDataProduct()
.withName("port_dp_" + suffix)
.withDescription("Data product for port trigger test")
.withDomains(List.of(domain.getFullyQualifiedName()))
.withReviewers(List.of(reviewerRef));
org.openmetadata.schema.entity.domains.DataProduct dataProduct =
client.dataProducts().create(createDataProduct);
LOG.debug("Created data product: {}", dataProduct.getName());

org.openmetadata.schema.type.api.BulkAssets assetsBulk =
new org.openmetadata.schema.type.api.BulkAssets()
.withAssets(List.of(inputTable.getEntityReference(), outputTable.getEntityReference()));
client.dataProducts().bulkAddAssets(dataProduct.getFullyQualifiedName(), assetsBulk);

simulateWork(5000);

// Create workflow AFTER entity setup so it only catches port-change events
String portWorkflowJson =
String.format(
"""
{
"name": "%s",
"displayName": "Port Trigger Workflow",
"description": "Verifies inputPorts and outputPorts changes trigger workflow",
"trigger": {
"type": "eventBasedEntity",
"config": {
"entityTypes": ["dataProduct"],
"events": ["Updated"],
"include": ["inputPorts", "outputPorts"],
"filter": {}
},
"output": ["relatedEntity", "updatedBy"]
},
"nodes": [
{"type": "startEvent", "subType": "startEvent", "name": "Start", "displayName": "Start"},
{"type": "endEvent", "subType": "endEvent", "name": "End", "displayName": "End"},
{
"type": "userTask",
"subType": "userApprovalTask",
"name": "UserApproval",
"displayName": "User Approval",
"config": {
"assignees": {"addReviewers": true, "addOwners": false, "candidates": []},
"approvalThreshold": 1,
"rejectionThreshold": 1
},
"input": ["relatedEntity"],
"inputNamespaceMap": {"relatedEntity": "global"},
"output": ["updatedBy"],
"branches": ["true", "false"]
}
],
"edges": [
{"from": "Start", "to": "UserApproval"},
{"from": "UserApproval", "to": "End", "condition": "true"},
{"from": "UserApproval", "to": "End", "condition": "false"}
],
"config": {"storeStageStatus": true}
}
""",
portWorkflowName);

CreateWorkflowDefinition portWorkflow =
org.openmetadata.schema.utils.JsonUtils.readValue(
portWorkflowJson, CreateWorkflowDefinition.class);
client
.getHttpClient()
.executeForString(
HttpMethod.POST, BASE_PATH, portWorkflow, RequestOptions.builder().build());
LOG.debug("Created port trigger workflow: {}", portWorkflowName);
waitForWorkflowDeployment(client, portWorkflowName);

org.openmetadata.schema.api.tasks.ResolveTask resolveApproved =
new org.openmetadata.schema.api.tasks.ResolveTask()
.withResolutionType(TaskResolutionType.Approved);

// inputPorts add — ChangeEvent fieldsAdded[inputPorts] must trigger the workflow
org.openmetadata.schema.type.api.BulkAssets inputPortAssets =
new org.openmetadata.schema.type.api.BulkAssets()
.withAssets(List.of(inputTable.getEntityReference()));
client.dataProducts().bulkAddInputPorts(dataProduct.getFullyQualifiedName(), inputPortAssets);
LOG.debug("Added inputTable as inputPort");
await()
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(2))
.until(
() ->
!listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName())
.getData()
.isEmpty());
Task inputPortTask =
listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()).getData().get(0);
reviewerClient.tasks().resolve(inputPortTask.getId().toString(), resolveApproved);
LOG.info("inputPorts add triggered and resolved approval task");

// outputPorts add — outputTable is a data product asset, satisfying the prerequisite
// Using a different table so it doesn't conflict with inputTable already in inputPorts
org.openmetadata.schema.type.api.BulkAssets outputPortAssets =
new org.openmetadata.schema.type.api.BulkAssets()
.withAssets(List.of(outputTable.getEntityReference()));
client.dataProducts().bulkAddOutputPorts(dataProduct.getFullyQualifiedName(), outputPortAssets);
LOG.debug("Added outputTable as outputPort");
await()
.atMost(Duration.ofMinutes(2))
.pollInterval(Duration.ofSeconds(2))
.until(
() ->
!listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName())
.getData()
.isEmpty());
Task outputPortTask =
listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()).getData().get(0);
reviewerClient.tasks().resolve(outputPortTask.getId().toString(), resolveApproved);
LOG.info("outputPorts add triggered and resolved approval task");

try {
WorkflowDefinition wd = client.workflowDefinitions().getByName(portWorkflowName, null);
client.workflowDefinitions().delete(wd.getId());
LOG.debug("Deleted port trigger workflow");
} catch (Exception e) {
LOG.warn("Error deleting port trigger workflow: {}", e.getMessage());
}

LOG.info("test_PortChangesOnDataProductTriggerWorkflow completed successfully");
Comment on lines +6159 to +6200
}

@Test
@Order(38)
void test_CreateWorkflowWithoutEntityTypes() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,27 +251,35 @@ public BulkOperationResult bulkRemoveAssets(
return result;
}

public BulkOperationResult bulkAddInputPorts(String dataProductName, BulkAssets request) {
return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, true);
public BulkOperationResult bulkAddInputPorts(
String dataProductName, BulkAssets request, String updatedBy) {
return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, true, updatedBy);
}

public BulkOperationResult bulkRemoveInputPorts(String dataProductName, BulkAssets request) {
return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, false);
public BulkOperationResult bulkRemoveInputPorts(
String dataProductName, BulkAssets request, String updatedBy) {
return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, false, updatedBy);
}

public BulkOperationResult bulkAddOutputPorts(String dataProductName, BulkAssets request) {
return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, true);
public BulkOperationResult bulkAddOutputPorts(
String dataProductName, BulkAssets request, String updatedBy) {
return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, true, updatedBy);
}

public BulkOperationResult bulkRemoveOutputPorts(String dataProductName, BulkAssets request) {
return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, false);
public BulkOperationResult bulkRemoveOutputPorts(
String dataProductName, BulkAssets request, String updatedBy) {
return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, false, updatedBy);
}

@Transaction
private BulkOperationResult bulkPortsOperation(
String dataProductNameOrId, BulkAssets request, Relationship relationship, boolean isAdd) {
String dataProductNameOrId,
BulkAssets request,
Relationship relationship,
boolean isAdd,
String updatedBy) {
DataProduct dataProduct = resolveDataProduct(dataProductNameOrId);
return executeBulkPortsOperation(dataProduct, request, relationship, isAdd);
return executeBulkPortsOperation(dataProduct, request, relationship, isAdd, updatedBy);
}

private DataProduct resolveDataProduct(String nameOrId) {
Expand All @@ -284,7 +292,11 @@ private DataProduct resolveDataProduct(String nameOrId) {
}

private BulkOperationResult executeBulkPortsOperation(
DataProduct dataProduct, BulkAssets request, Relationship relationship, boolean isAdd) {
DataProduct dataProduct,
BulkAssets request,
Relationship relationship,
boolean isAdd,
String updatedBy) {
BulkOperationResult result =
new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false);
List<BulkResponse> success = new ArrayList<>();
Expand Down Expand Up @@ -382,8 +394,13 @@ private BulkOperationResult executeBulkPortsOperation(
change.getFieldsDeleted().get(0).setName(fieldName);
}
ChangeEvent changeEvent =
getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion());
getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion(), updatedBy);
Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*"));
entityToUpdate.setChangeDescription(change);
entityToUpdate.setUpdatedBy(updatedBy);
storeEntity(entityToUpdate, true);
Comment on lines +399 to +402
invalidate(entityToUpdate);
Comment on lines 396 to +403
Copy link
Copy Markdown

@gitar-bot gitar-bot Bot May 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Bug: ChangeEvent embeds stale entity without updated version/updatedBy

In executeBulkPortsOperation, the ChangeEvent at line 397 is constructed with the original dataProduct object which has not been refreshed after relationships were added. Its updatedBy still reflects the previous updater (not the current user), and its version is never incremented. The fresh entity fetched at line 399 gets setUpdatedBy(updatedBy) and setChangeDescription(change) but is only used for storeEntity — the ChangeEvent sent to subscribers contains the stale snapshot.

This means:

  • Workflow consumers (like WorkflowEventConsumer) see an entity snapshot with the wrong updatedBy inside the event payload.
  • The entity version is never incremented by storeEntity, so consecutive port operations produce change events with identical previousVersion, which may confuse version-tracking logic.

Consider either (a) re-fetching the entity before constructing the ChangeEvent and using that for both, or (b) incrementing the version explicitly (similar to how bulkAssetsOperation in the same file delegates to the updater path).

Re-fetch entity before building ChangeEvent, increment version, and use the fresh entity in the event payload so subscribers see consistent state.:

if (!success.isEmpty()) {
  List<EntityReference> successAssets =
      success.stream().map(r -> (EntityReference) r.getRequest()).collect(Collectors.toList());
  ChangeDescription change =
      addBulkAddRemoveChangeDescription(dataProduct.getVersion(), isAdd, successAssets, null);
  if (!change.getFieldsAdded().isEmpty()) {
    change.getFieldsAdded().get(0).setName(fieldName);
  }
  if (!change.getFieldsDeleted().isEmpty()) {
    change.getFieldsDeleted().get(0).setName(fieldName);
  }
  // Fetch fresh entity, update metadata, then build ChangeEvent from it
  DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*"));
  entityToUpdate.setChangeDescription(change);
  entityToUpdate.setUpdatedBy(updatedBy);
  entityToUpdate.setVersion(EntityUtil.nextVersion(entityToUpdate.getVersion()));
  storeEntity(entityToUpdate, true);
  invalidate(entityToUpdate);
  ChangeEvent changeEvent =
      getChangeEvent(entityToUpdate, change, DATA_PRODUCT, dataProduct.getVersion(), updatedBy);
  Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent));
}

Was this helpful? React with 👍 / 👎

}

return result;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4571,7 +4571,7 @@ private void markEntityNotFound(T entity) {

protected void entitySpecificCleanup(T entityInterface) {}

private void invalidate(T entity) {
void invalidate(T entity) {
CACHE_WITH_ID.invalidate(new ImmutablePair<>(entityType, entity.getId()));
CACHE_WITH_NAME.invalidate(cacheNameKey(entityType, entity.getFullyQualifiedName()));
RequestEntityCache.invalidate(entityType, entity.getId(), entity.getFullyQualifiedName());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,8 @@ public Response bulkAddInputPorts(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
return buildBulkOperationResponse(repository.bulkAddInputPorts(name, request));
return buildBulkOperationResponse(
repository.bulkAddInputPorts(name, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -508,7 +509,9 @@ public Response bulkRemoveInputPorts(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
return buildBulkOperationResponse(repository.bulkRemoveInputPorts(name, request));
return buildBulkOperationResponse(
repository.bulkRemoveInputPorts(
name, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -546,7 +549,8 @@ public Response bulkAddOutputPorts(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
return buildBulkOperationResponse(repository.bulkAddOutputPorts(name, request));
return buildBulkOperationResponse(
repository.bulkAddOutputPorts(name, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -584,7 +588,9 @@ public Response bulkRemoveOutputPorts(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(name));
return buildBulkOperationResponse(repository.bulkRemoveOutputPorts(name, request));
return buildBulkOperationResponse(
repository.bulkRemoveOutputPorts(
name, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -624,7 +630,8 @@ public Response bulkAddInputPortsByName(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn));
return buildBulkOperationResponse(repository.bulkAddInputPorts(fqn, request));
return buildBulkOperationResponse(
repository.bulkAddInputPorts(fqn, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -664,7 +671,9 @@ public Response bulkRemoveInputPortsByName(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn));
return buildBulkOperationResponse(repository.bulkRemoveInputPorts(fqn, request));
return buildBulkOperationResponse(
repository.bulkRemoveInputPorts(
fqn, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -704,7 +713,8 @@ public Response bulkAddOutputPortsByName(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn));
return buildBulkOperationResponse(repository.bulkAddOutputPorts(fqn, request));
return buildBulkOperationResponse(
repository.bulkAddOutputPorts(fqn, request, securityContext.getUserPrincipal().getName()));
}

@PUT
Expand Down Expand Up @@ -744,7 +754,9 @@ public Response bulkRemoveOutputPortsByName(
OperationContext operationContext =
new OperationContext(entityType, MetadataOperation.EDIT_ALL);
authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn));
return buildBulkOperationResponse(repository.bulkRemoveOutputPorts(fqn, request));
return buildBulkOperationResponse(
repository.bulkRemoveOutputPorts(
fqn, request, securityContext.getUserPrincipal().getName()));
}

@PATCH
Expand Down
Loading
Loading