Skip to content
Original file line number Diff line number Diff line change
@@ -0,0 +1,244 @@
package org.openmetadata.it.tests;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.parallel.Execution;
import org.junit.jupiter.api.parallel.ExecutionMode;
import org.openmetadata.it.factories.DashboardServiceTestFactory;
import org.openmetadata.it.util.SdkClients;
import org.openmetadata.it.util.TestNamespace;
import org.openmetadata.it.util.TestNamespaceExtension;
import org.openmetadata.schema.api.policies.CreatePolicy;
import org.openmetadata.schema.api.services.ingestionPipelines.CreateIngestionPipeline;
import org.openmetadata.schema.api.teams.CreateRole;
import org.openmetadata.schema.api.teams.CreateUser;
import org.openmetadata.schema.entity.policies.Policy;
import org.openmetadata.schema.entity.policies.accessControl.Rule;
import org.openmetadata.schema.entity.services.DashboardService;
import org.openmetadata.schema.entity.services.ingestionPipelines.AirflowConfig;
import org.openmetadata.schema.entity.services.ingestionPipelines.IngestionPipeline;
import org.openmetadata.schema.entity.services.ingestionPipelines.PipelineType;
import org.openmetadata.schema.entity.teams.Role;
import org.openmetadata.schema.entity.teams.User;
import org.openmetadata.schema.metadataIngestion.DashboardServiceMetadataPipeline;
import org.openmetadata.schema.metadataIngestion.SourceConfig;
import org.openmetadata.schema.type.EntityReference;
import org.openmetadata.schema.type.MetadataOperation;
import org.openmetadata.sdk.client.OpenMetadataClient;
import org.openmetadata.sdk.network.HttpMethod;

/**
* Integration tests for IngestionPipeline owner inheritance and trigger authorization.
*
* <p>Covers two coordinated changes that fix GH-27962 (Pylon-19838):
*
* <ul>
* <li>{@code IngestionPipelineRepository.setInheritedFields} now inherits owners from the
* referenced service / TestSuite / App, so {@code isOwner()} conditions on pipeline policies
* evaluate correctly.
* <li>{@code POST /v1/services/ingestionPipelines/trigger/{id}} now authorizes against {@code
* MetadataOperation.TRIGGER}.
* </ul>
*/
@Execution(ExecutionMode.CONCURRENT)
@ExtendWith(TestNamespaceExtension.class)
public class IngestionPipelineOwnerInheritanceIT {

private static final Date START_DATE = Date.from(Instant.parse("2022-06-10T15:06:47Z"));

@Test
void test_inheritedOwners_fromService(TestNamespace ns) {
OpenMetadataClient adminClient = SdkClients.adminClient();
String unique = UUID.randomUUID().toString().substring(0, 8);
String userName = "ipinhowner_" + unique;
User serviceOwner =
adminClient
.users()
.create(
new CreateUser().withName(userName).withEmail(userName + "@test.openmetadata.org"));

try {
DashboardService service = DashboardServiceTestFactory.createMetabase(ns);
DashboardService fetchedService =
adminClient.dashboardServices().get(service.getId().toString());
fetchedService.setOwners(List.of(serviceOwner.getEntityReference()));
adminClient.dashboardServices().update(service.getId().toString(), fetchedService);

try {
IngestionPipeline pipeline =
adminClient
.ingestionPipelines()
.create(
new CreateIngestionPipeline()
.withName(ns.prefix("ipinhPipeline"))
.withPipelineType(PipelineType.METADATA)
.withService(service.getEntityReference())
.withSourceConfig(
new SourceConfig().withConfig(new DashboardServiceMetadataPipeline()))
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE)));

try {
IngestionPipeline withOwners =
adminClient.ingestionPipelines().get(pipeline.getId().toString(), "owners");
assertNotNull(withOwners.getOwners(), "Inherited owners should be populated");
assertEquals(1, withOwners.getOwners().size(), "Pipeline should inherit one owner");
EntityReference inherited = withOwners.getOwners().get(0);
assertEquals(
serviceOwner.getId(),
inherited.getId(),
"Inherited owner should match service owner");
assertTrue(
Boolean.TRUE.equals(inherited.getInherited()),
"Owner inherited from the parent service must be marked inherited=true");
} finally {
adminClient.ingestionPipelines().delete(pipeline.getId().toString());
}
} finally {
adminClient
.dashboardServices()
.delete(service.getId().toString(), Map.of("hardDelete", "true", "recursive", "true"));
}
} finally {
adminClient.users().delete(serviceOwner.getId());
}
}

@Test
void test_isOwnerPolicy_appliesToEditAndTrigger(TestNamespace ns) {
OpenMetadataClient adminClient = SdkClients.adminClient();
String unique = UUID.randomUUID().toString().substring(0, 8);

Rule ownerRule =
new Rule()
.withName("pipelineOwnerEditAndTrigger")
.withDescription("Allow owners to edit and trigger ingestion pipelines")
.withEffect(Rule.Effect.ALLOW)
.withOperations(List.of(MetadataOperation.EDIT_ALL, MetadataOperation.TRIGGER))
.withResources(List.of("ingestionPipeline"))
.withCondition("isOwner()");
Policy ownerPolicy =
adminClient
.policies()
.create(
new CreatePolicy()
.withName("ipauthPolicy_" + unique)
.withDescription("Owner-only policy for ingestion pipelines")
.withRules(List.of(ownerRule)));

try {
Role ownerRole =
adminClient
.roles()
.create(
new CreateRole()
.withName("ipauthRole_" + unique)
.withPolicies(List.of(ownerPolicy.getFullyQualifiedName())));

try {
String ownerName = "ipauthowner_" + unique;
User pipelineOwner =
adminClient
.users()
.create(
new CreateUser()
.withName(ownerName)
.withEmail(ownerName + "@test.openmetadata.org")
.withRoles(List.of(ownerRole.getId())));

String otherName = "ipauthother_" + unique;
User otherUser =
adminClient
.users()
.create(
new CreateUser()
.withName(otherName)
.withEmail(otherName + "@test.openmetadata.org"));

try {
DashboardService service = DashboardServiceTestFactory.createMetabase(ns);
DashboardService fetchedService =
adminClient.dashboardServices().get(service.getId().toString());
fetchedService.setOwners(List.of(pipelineOwner.getEntityReference()));
adminClient.dashboardServices().update(service.getId().toString(), fetchedService);

try {
IngestionPipeline pipeline =
adminClient
.ingestionPipelines()
.create(
new CreateIngestionPipeline()
.withName(ns.prefix("ipauthPipeline_" + unique))
.withPipelineType(PipelineType.METADATA)
.withService(service.getEntityReference())
.withSourceConfig(
new SourceConfig()
.withConfig(new DashboardServiceMetadataPipeline()))
.withAirflowConfig(new AirflowConfig().withStartDate(START_DATE)));

try {
OpenMetadataClient ownerClient =
SdkClients.createClient(ownerName, ownerName, new String[] {});
OpenMetadataClient otherClient =
SdkClients.createClient(otherName, otherName, new String[] {});

// Owner can PATCH displayName.
IngestionPipeline ownerEdit =
adminClient.ingestionPipelines().get(pipeline.getId().toString());
ownerEdit.setDisplayName("owner-updated-display-name");
ownerClient.ingestionPipelines().update(pipeline.getId().toString(), ownerEdit);

// Non-owner cannot PATCH displayName.
IngestionPipeline otherEdit =
adminClient.ingestionPipelines().get(pipeline.getId().toString());
otherEdit.setDisplayName("non-owner-attempt");
assertThrows(
Exception.class,
() ->
otherClient
.ingestionPipelines()
.update(pipeline.getId().toString(), otherEdit),
"Non-owner PATCH should be forbidden");

// Owner can trigger.
String triggerPath = "/v1/services/ingestionPipelines/trigger/" + pipeline.getId();
ownerClient.getHttpClient().execute(HttpMethod.POST, triggerPath, null, Void.class);

// Non-owner cannot trigger.
assertThrows(
Exception.class,
() ->
otherClient
.getHttpClient()
.execute(HttpMethod.POST, triggerPath, null, Void.class),
"Non-owner trigger should be forbidden");
} finally {
adminClient.ingestionPipelines().delete(pipeline.getId().toString());
}
} finally {
adminClient
.dashboardServices()
.delete(
service.getId().toString(), Map.of("hardDelete", "true", "recursive", "true"));
}
} finally {
adminClient.users().delete(otherUser.getId());
adminClient.users().delete(pipelineOwner.getId());
}
} finally {
adminClient.roles().delete(ownerRole.getId());
}
} finally {
adminClient.policies().delete(ownerPolicy.getId());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import static org.openmetadata.schema.type.EventType.ENTITY_FIELDS_CHANGED;
import static org.openmetadata.schema.type.EventType.ENTITY_UPDATED;
import static org.openmetadata.schema.type.Include.ALL;
import static org.openmetadata.service.Entity.INGESTION_PIPELINE;

import jakarta.ws.rs.core.Response;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.openmetadata.service.Entity;
import org.openmetadata.service.OpenMetadataApplicationConfig;
import org.openmetadata.service.events.lifecycle.EntityLifecycleEventDispatcher;
import org.openmetadata.service.exception.EntityNotFoundException;
import org.openmetadata.service.logstorage.LogStorageInterface;
import org.openmetadata.service.logstorage.S3LogStorage.LogStreamListener;
import org.openmetadata.service.monitoring.IngestionProgressTracker;
Expand Down Expand Up @@ -150,6 +152,24 @@ public void setFields(
}
}

@Override
public void setInheritedFields(IngestionPipeline ingestionPipeline, Fields fields) {
EntityReference serviceRef = ingestionPipeline.getService();
if (serviceRef == null) {
return;
}
try {
EntityInterface parent = Entity.getEntity(serviceRef, "owners,domains", ALL);
inheritOwners(ingestionPipeline, fields, parent);
inheritDomains(ingestionPipeline, fields, parent);
} catch (EntityNotFoundException e) {
LOG.debug(
"Parent service {} not found for ingestion pipeline {}; skipping owner/domain inheritance",
serviceRef.getFullyQualifiedName(),
ingestionPipeline.getFullyQualifiedName());
}
}

@Override
public void setFieldsInBulk(Fields fields, List<IngestionPipeline> entities) {
if (entities == null || entities.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.openmetadata.service.migration.mysql.v1129;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;

public class Migration extends MigrationProcessImpl {

public Migration(MigrationFile migrationFile) {
super(migrationFile);
}

@Override
@SneakyThrows
public void runDataMigration() {
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.openmetadata.service.migration.mysql.v1130;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
Expand Down Expand Up @@ -31,5 +34,7 @@ public void runDataMigration() {
LOG.error("v1130 glossaryTerm version relatedTerms transform failed; re-run to retry.", e);
}
MigrationUtil.addTableColumnSearchSettings();
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package org.openmetadata.service.migration.postgres.v1129;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
import org.openmetadata.service.migration.utils.MigrationFile;

public class Migration extends MigrationProcessImpl {

public Migration(MigrationFile migrationFile) {
super(migrationFile);
}

@Override
@SneakyThrows
public void runDataMigration() {
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.openmetadata.service.migration.postgres.v1130;

import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerOperationToDefaultBotPolicies;
import static org.openmetadata.service.migration.utils.v1129.MigrationUtil.addTriggerRuleToDataStewardPolicy;

import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.migration.api.MigrationProcessImpl;
Expand Down Expand Up @@ -31,5 +34,7 @@ public void runDataMigration() {
LOG.error("v1130 glossaryTerm version relatedTerms transform failed; re-run to retry.", e);
}
MigrationUtil.addTableColumnSearchSettings();
addTriggerOperationToDefaultBotPolicies(collectionDAO);
addTriggerRuleToDataStewardPolicy(collectionDAO);
}
}
Loading
Loading