diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java index d6f1434cfc9b..e02f2a803441 100644 --- a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/WorkflowDefinitionResourceIT.java @@ -5991,6 +5991,215 @@ void test_AutoApprovalForEntitiesWithoutReviewers(TestNamespace ns) } } + @Test + @Order(43) + void test_PortChangesOnDataProductTriggerWorkflow(TestNamespace ns) throws Exception { + LOG.info("Starting test_PortChangesOnDataProductTriggerWorkflow"); + + OpenMetadataClient client = SdkClients.adminClient(); + String suffix = String.valueOf(System.currentTimeMillis()); + + Domain domain; + try { + domain = client.domains().getByName("port_test_domain"); + } catch (Exception e) { + domain = + client + .domains() + .create( + new CreateDomain() + .withName("port_test_domain") + .withDescription("Domain for port trigger tests") + .withDomainType(CreateDomain.DomainType.AGGREGATE)); + } + + CreateUser createReviewer = + new CreateUser() + .withName("port_rvwr_" + suffix) + .withEmail("port_rvwr_" + suffix + "@example.com") + .withDisplayName("Port Test Reviewer") + .withPassword("password123"); + User reviewer = client.users().create(createReviewer); + EntityReference reviewerRef = reviewer.getEntityReference(); + OpenMetadataClient reviewerClient = + SdkClients.createClient(reviewer.getName(), reviewer.getEmail(), new String[] {}); + + String portWorkflowName = "PortTriggerWf_" + suffix; + + CreateDatabaseService createDbService = + new CreateDatabaseService() + .withName("port_dbs_" + suffix) + .withServiceType(CreateDatabaseService.DatabaseServiceType.Mysql) + .withConnection( + new org.openmetadata.schema.api.services.DatabaseConnection() + .withConfig(new MysqlConnection())) + .withDomains(List.of(domain.getFullyQualifiedName())); + DatabaseService dbService = client.databaseServices().create(createDbService); + + CreateDatabase createDatabase = + new CreateDatabase() + .withName("port_db") + .withService(dbService.getFullyQualifiedName()) + .withDomains(List.of(domain.getFullyQualifiedName())); + Database database = client.databases().create(createDatabase); + + CreateDatabaseSchema createSchema = + new CreateDatabaseSchema() + .withName("port_sc") + .withDatabase(database.getFullyQualifiedName()); + DatabaseSchema schema = client.databaseSchemas().create(createSchema); + + CreateTable createTable = + new CreateTable() + .withName("port_in_table") + .withDatabaseSchema(schema.getFullyQualifiedName()) + .withColumns( + List.of( + new Column().withName("id").withDataType(ColumnDataType.INT), + new Column().withName("name").withDataType(ColumnDataType.STRING))) + .withDomains(List.of(domain.getFullyQualifiedName())); + Table inputTable = client.tables().create(createTable); + + CreateTable createTable2 = + new CreateTable() + .withName("port_out_table") + .withDatabaseSchema(schema.getFullyQualifiedName()) + .withColumns( + List.of( + new Column().withName("id").withDataType(ColumnDataType.INT), + new Column().withName("value").withDataType(ColumnDataType.STRING))) + .withDomains(List.of(domain.getFullyQualifiedName())); + Table outputTable = client.tables().create(createTable2); + LOG.debug("Created tables: {}, {}", inputTable.getName(), outputTable.getName()); + + org.openmetadata.schema.api.domains.CreateDataProduct createDataProduct = + new org.openmetadata.schema.api.domains.CreateDataProduct() + .withName("port_dp_" + suffix) + .withDescription("Data product for port trigger test") + .withDomains(List.of(domain.getFullyQualifiedName())) + .withReviewers(List.of(reviewerRef)); + org.openmetadata.schema.entity.domains.DataProduct dataProduct = + client.dataProducts().create(createDataProduct); + LOG.debug("Created data product: {}", dataProduct.getName()); + + org.openmetadata.schema.type.api.BulkAssets assetsBulk = + new org.openmetadata.schema.type.api.BulkAssets() + .withAssets(List.of(inputTable.getEntityReference(), outputTable.getEntityReference())); + client.dataProducts().bulkAddAssets(dataProduct.getFullyQualifiedName(), assetsBulk); + + simulateWork(5000); + + // Create workflow AFTER entity setup so it only catches port-change events + String portWorkflowJson = + String.format( + """ + { + "name": "%s", + "displayName": "Port Trigger Workflow", + "description": "Verifies inputPorts and outputPorts changes trigger workflow", + "trigger": { + "type": "eventBasedEntity", + "config": { + "entityTypes": ["dataProduct"], + "events": ["Updated"], + "include": ["inputPorts", "outputPorts"], + "filter": {} + }, + "output": ["relatedEntity", "updatedBy"] + }, + "nodes": [ + {"type": "startEvent", "subType": "startEvent", "name": "Start", "displayName": "Start"}, + {"type": "endEvent", "subType": "endEvent", "name": "End", "displayName": "End"}, + { + "type": "userTask", + "subType": "userApprovalTask", + "name": "UserApproval", + "displayName": "User Approval", + "config": { + "assignees": {"addReviewers": true, "addOwners": false, "candidates": []}, + "approvalThreshold": 1, + "rejectionThreshold": 1 + }, + "input": ["relatedEntity"], + "inputNamespaceMap": {"relatedEntity": "global"}, + "output": ["updatedBy"], + "branches": ["true", "false"] + } + ], + "edges": [ + {"from": "Start", "to": "UserApproval"}, + {"from": "UserApproval", "to": "End", "condition": "true"}, + {"from": "UserApproval", "to": "End", "condition": "false"} + ], + "config": {"storeStageStatus": true} + } + """, + portWorkflowName); + + CreateWorkflowDefinition portWorkflow = + org.openmetadata.schema.utils.JsonUtils.readValue( + portWorkflowJson, CreateWorkflowDefinition.class); + client + .getHttpClient() + .executeForString( + HttpMethod.POST, BASE_PATH, portWorkflow, RequestOptions.builder().build()); + LOG.debug("Created port trigger workflow: {}", portWorkflowName); + waitForWorkflowDeployment(client, portWorkflowName); + + org.openmetadata.schema.api.tasks.ResolveTask resolveApproved = + new org.openmetadata.schema.api.tasks.ResolveTask() + .withResolutionType(TaskResolutionType.Approved); + + // inputPorts add — ChangeEvent fieldsAdded[inputPorts] must trigger the workflow + org.openmetadata.schema.type.api.BulkAssets inputPortAssets = + new org.openmetadata.schema.type.api.BulkAssets() + .withAssets(List.of(inputTable.getEntityReference())); + client.dataProducts().bulkAddInputPorts(dataProduct.getFullyQualifiedName(), inputPortAssets); + LOG.debug("Added inputTable as inputPort"); + await() + .atMost(Duration.ofMinutes(2)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> + !listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()) + .getData() + .isEmpty()); + Task inputPortTask = + listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()).getData().get(0); + reviewerClient.tasks().resolve(inputPortTask.getId().toString(), resolveApproved); + LOG.info("inputPorts add triggered and resolved approval task"); + + // outputPorts add — outputTable is a data product asset, satisfying the prerequisite + // Using a different table so it doesn't conflict with inputTable already in inputPorts + org.openmetadata.schema.type.api.BulkAssets outputPortAssets = + new org.openmetadata.schema.type.api.BulkAssets() + .withAssets(List.of(outputTable.getEntityReference())); + client.dataProducts().bulkAddOutputPorts(dataProduct.getFullyQualifiedName(), outputPortAssets); + LOG.debug("Added outputTable as outputPort"); + await() + .atMost(Duration.ofMinutes(2)) + .pollInterval(Duration.ofSeconds(2)) + .until( + () -> + !listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()) + .getData() + .isEmpty()); + Task outputPortTask = + listOpenApprovalTasks(reviewerClient, dataProduct.getFullyQualifiedName()).getData().get(0); + reviewerClient.tasks().resolve(outputPortTask.getId().toString(), resolveApproved); + LOG.info("outputPorts add triggered and resolved approval task"); + + try { + WorkflowDefinition wd = client.workflowDefinitions().getByName(portWorkflowName, null); + client.workflowDefinitions().delete(wd.getId()); + LOG.debug("Deleted port trigger workflow"); + } catch (Exception e) { + LOG.warn("Error deleting port trigger workflow: {}", e.getMessage()); + } + + LOG.info("test_PortChangesOnDataProductTriggerWorkflow completed successfully"); + } + @Test @Order(38) void test_CreateWorkflowWithoutEntityTypes() { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java index d2cce73054de..2bc8008ef2e5 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/DataProductRepository.java @@ -251,27 +251,35 @@ public BulkOperationResult bulkRemoveAssets( return result; } - public BulkOperationResult bulkAddInputPorts(String dataProductName, BulkAssets request) { - return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, true); + public BulkOperationResult bulkAddInputPorts( + String dataProductName, BulkAssets request, String updatedBy) { + return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, true, updatedBy); } - public BulkOperationResult bulkRemoveInputPorts(String dataProductName, BulkAssets request) { - return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, false); + public BulkOperationResult bulkRemoveInputPorts( + String dataProductName, BulkAssets request, String updatedBy) { + return bulkPortsOperation(dataProductName, request, Relationship.INPUT_PORT, false, updatedBy); } - public BulkOperationResult bulkAddOutputPorts(String dataProductName, BulkAssets request) { - return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, true); + public BulkOperationResult bulkAddOutputPorts( + String dataProductName, BulkAssets request, String updatedBy) { + return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, true, updatedBy); } - public BulkOperationResult bulkRemoveOutputPorts(String dataProductName, BulkAssets request) { - return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, false); + public BulkOperationResult bulkRemoveOutputPorts( + String dataProductName, BulkAssets request, String updatedBy) { + return bulkPortsOperation(dataProductName, request, Relationship.OUTPUT_PORT, false, updatedBy); } @Transaction private BulkOperationResult bulkPortsOperation( - String dataProductNameOrId, BulkAssets request, Relationship relationship, boolean isAdd) { + String dataProductNameOrId, + BulkAssets request, + Relationship relationship, + boolean isAdd, + String updatedBy) { DataProduct dataProduct = resolveDataProduct(dataProductNameOrId); - return executeBulkPortsOperation(dataProduct, request, relationship, isAdd); + return executeBulkPortsOperation(dataProduct, request, relationship, isAdd, updatedBy); } private DataProduct resolveDataProduct(String nameOrId) { @@ -284,7 +292,11 @@ private DataProduct resolveDataProduct(String nameOrId) { } private BulkOperationResult executeBulkPortsOperation( - DataProduct dataProduct, BulkAssets request, Relationship relationship, boolean isAdd) { + DataProduct dataProduct, + BulkAssets request, + Relationship relationship, + boolean isAdd, + String updatedBy) { BulkOperationResult result = new BulkOperationResult().withStatus(ApiStatus.SUCCESS).withDryRun(false); List success = new ArrayList<>(); @@ -382,8 +394,13 @@ private BulkOperationResult executeBulkPortsOperation( change.getFieldsDeleted().get(0).setName(fieldName); } ChangeEvent changeEvent = - getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion()); + getChangeEvent(dataProduct, change, DATA_PRODUCT, dataProduct.getVersion(), updatedBy); Entity.getCollectionDAO().changeEventDAO().insert(JsonUtils.pojoToJson(changeEvent)); + DataProduct entityToUpdate = get(null, dataProduct.getId(), getFields("*")); + entityToUpdate.setChangeDescription(change); + entityToUpdate.setUpdatedBy(updatedBy); + storeEntity(entityToUpdate, true); + invalidate(entityToUpdate); } return result; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index d962bd042425..64185467245c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -4571,7 +4571,7 @@ private void markEntityNotFound(T entity) { protected void entitySpecificCleanup(T entityInterface) {} - private void invalidate(T entity) { + void invalidate(T entity) { CACHE_WITH_ID.invalidate(new ImmutablePair<>(entityType, entity.getId())); CACHE_WITH_NAME.invalidate(cacheNameKey(entityType, entity.getFullyQualifiedName())); RequestEntityCache.invalidate(entityType, entity.getId(), entity.getFullyQualifiedName()); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/domains/DataProductResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/domains/DataProductResource.java index 967d470c9ab5..c19e026e6424 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/domains/DataProductResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/domains/DataProductResource.java @@ -470,7 +470,8 @@ public Response bulkAddInputPorts( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(name)); - return buildBulkOperationResponse(repository.bulkAddInputPorts(name, request)); + return buildBulkOperationResponse( + repository.bulkAddInputPorts(name, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -508,7 +509,9 @@ public Response bulkRemoveInputPorts( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(name)); - return buildBulkOperationResponse(repository.bulkRemoveInputPorts(name, request)); + return buildBulkOperationResponse( + repository.bulkRemoveInputPorts( + name, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -546,7 +549,8 @@ public Response bulkAddOutputPorts( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(name)); - return buildBulkOperationResponse(repository.bulkAddOutputPorts(name, request)); + return buildBulkOperationResponse( + repository.bulkAddOutputPorts(name, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -584,7 +588,9 @@ public Response bulkRemoveOutputPorts( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(name)); - return buildBulkOperationResponse(repository.bulkRemoveOutputPorts(name, request)); + return buildBulkOperationResponse( + repository.bulkRemoveOutputPorts( + name, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -624,7 +630,8 @@ public Response bulkAddInputPortsByName( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn)); - return buildBulkOperationResponse(repository.bulkAddInputPorts(fqn, request)); + return buildBulkOperationResponse( + repository.bulkAddInputPorts(fqn, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -664,7 +671,9 @@ public Response bulkRemoveInputPortsByName( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn)); - return buildBulkOperationResponse(repository.bulkRemoveInputPorts(fqn, request)); + return buildBulkOperationResponse( + repository.bulkRemoveInputPorts( + fqn, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -704,7 +713,8 @@ public Response bulkAddOutputPortsByName( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn)); - return buildBulkOperationResponse(repository.bulkAddOutputPorts(fqn, request)); + return buildBulkOperationResponse( + repository.bulkAddOutputPorts(fqn, request, securityContext.getUserPrincipal().getName())); } @PUT @@ -744,7 +754,9 @@ public Response bulkRemoveOutputPortsByName( OperationContext operationContext = new OperationContext(entityType, MetadataOperation.EDIT_ALL); authorizer.authorize(securityContext, operationContext, getResourceContextByName(fqn)); - return buildBulkOperationResponse(repository.bulkRemoveOutputPorts(fqn, request)); + return buildBulkOperationResponse( + repository.bulkRemoveOutputPorts( + fqn, request, securityContext.getUserPrincipal().getName())); } @PATCH diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/tasks/TaskWorkflowHandler.java b/openmetadata-service/src/main/java/org/openmetadata/service/tasks/TaskWorkflowHandler.java index 380c58280df0..fc4dafea8a0c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/tasks/TaskWorkflowHandler.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/tasks/TaskWorkflowHandler.java @@ -43,6 +43,7 @@ import org.openmetadata.schema.type.change.ChangeSource; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.service.Entity; +import org.openmetadata.service.governance.workflows.WorkflowEventConsumer; import org.openmetadata.service.governance.workflows.WorkflowHandler; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.TaskRepository; @@ -881,7 +882,8 @@ private void applyPatchEntityFieldAction( user, action.entityField(), value == null ? null : String.valueOf(value), - true); + true, + WorkflowEventConsumer.GOVERNANCE_BOT); } catch (Exception e) { LOG.error( "[TaskWorkflowHandler] Failed to apply patchEntityField action for task '{}': {}", diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImplTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImplTest.java index bf5c037531b0..969d0428472f 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImplTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/governance/workflows/elements/triggers/impl/FilterEntityImplTest.java @@ -92,6 +92,26 @@ void testNewDataProductFieldsAreRecognizedAsTriggerFields() throws Exception { assertTrue(invokeFilter(List.of(fieldChange("lifecycleStage")), null, null)); } + @Test + void testInputOutputPortsAndGlossaryTermsAreRecognizedAsTriggerFields() throws Exception { + assertTrue(invokeFilter(List.of(fieldChange("inputPorts")), null, null)); + assertTrue(invokeFilter(List.of(fieldChange("outputPorts")), null, null)); + assertTrue(invokeFilter(List.of(fieldChange("glossaryTerms")), null, null)); + } + + @Test + void testInputOutputPortsCanBeIncludedOrExcluded() throws Exception { + List includePortFields = List.of("inputPorts", "outputPorts"); + assertTrue(invokeFilter(List.of(fieldChange("inputPorts")), includePortFields, null)); + assertTrue(invokeFilter(List.of(fieldChange("outputPorts")), includePortFields, null)); + assertFalse(invokeFilter(List.of(fieldChange("description")), includePortFields, null)); + + List excludePortFields = List.of("inputPorts", "outputPorts"); + assertFalse(invokeFilter(List.of(fieldChange("inputPorts")), null, excludePortFields)); + assertFalse(invokeFilter(List.of(fieldChange("outputPorts")), null, excludePortFields)); + assertTrue(invokeFilter(List.of(fieldChange("description")), null, excludePortFields)); + } + @Test void testUnknownFieldIsNotRecognizedAsTriggerField() throws Exception { assertFalse(invokeFilter(List.of(fieldChange("someUnknownField")), null, null)); diff --git a/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json b/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json index aef712623bd0..16bf6fabc83c 100644 --- a/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json +++ b/openmetadata-spec/src/main/resources/json/schema/type/workflowTriggerFields.json @@ -37,6 +37,9 @@ "latestResult", "consumesFrom", "providesTo", - "lifecycleStage" + "lifecycleStage", + "glossaryTerms", + "inputPorts", + "outputPorts" ] } \ No newline at end of file diff --git a/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts b/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts index 51de55eeedb9..1fdf188a04f1 100644 --- a/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts +++ b/openmetadata-ui/src/main/resources/ui/src/generated/type/workflowTriggerFields.ts @@ -26,10 +26,13 @@ export enum WorkflowTriggerFields { Extension = "extension", FullyQualifiedName = "fullyQualifiedName", Glossary = "glossary", + GlossaryTerms = "glossaryTerms", + InputPorts = "inputPorts", LatestResult = "latestResult", LifeCycle = "lifeCycle", LifecycleStage = "lifecycleStage", Name = "name", + OutputPorts = "outputPorts", Owners = "owners", Parent = "parent", ProvidesTo = "providesTo",