Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,16 @@ public interface NiFiServiceFacade {
Set<ControllerServiceEntity> getConnectorControllerServices(String connectorId, String processGroupId, boolean includeAncestorGroups,
boolean includeDescendantGroups, boolean includeReferencingComponents);

/**
* Returns the parameter context bound to the specified process group within the connector's hierarchy. Sensitive parameter values are masked
* by the underlying DTO factory.
*
* @param connectorId the connector id
* @param processGroupId the process group id within the connector's hierarchy
* @return the parameter context entity with effective parameters (inherited included), or {@code null} if the process group has no bound parameter context
*/
ParameterContextEntity getConnectorParameterContext(String connectorId, String processGroupId);

void verifyCanVerifyConnectorConfigurationStep(String connectorId, String configurationStepName);

List<ConfigVerificationResultDTO> performConnectorConfigurationStepVerification(String connectorId, String configurationStepName, ConfigurationStepConfigurationDTO configurationStepConfiguration);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3950,6 +3950,27 @@ public Set<ControllerServiceEntity> getConnectorControllerServices(final String
.collect(Collectors.toSet());
}

@Override
public ParameterContextEntity getConnectorParameterContext(final String connectorId, final String processGroupId) {
final ConnectorNode connectorNode = connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY);
final ProcessGroup managedProcessGroup = connectorNode.getActiveFlowContext().getManagedProcessGroup();
final ProcessGroup targetProcessGroup = managedProcessGroup.findProcessGroup(processGroupId);
if (targetProcessGroup == null) {
throw new ResourceNotFoundException("Process Group with ID " + processGroupId + " was not found within Connector " + connectorId);
}

final ParameterContext parameterContext = targetProcessGroup.getParameterContext();
if (parameterContext == null) {
return null;
}

// Connector-managed parameter contexts (and any contexts they inherit from) are not registered with the
// global flow's ParameterContextManager, so a DAO-backed lookup would fail to resolve inherited parameters.
// The DTO factory walks the in-memory inheritance graph reachable from the supplied context to resolve
// parameter source contexts for connector-managed flows, making an empty lookup safe here.
return createParameterContextEntity(parameterContext, true, NiFiUserUtils.getNiFiUser(), ParameterContextLookup.EMPTY);
}

@Override
public void verifyCanVerifyConnectorConfigurationStep(final String connectorId, final String configurationStepName) {
connectorDAO.verifyCanVerifyConfigurationStep(connectorId, configurationStepName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@
import org.apache.nifi.web.api.entity.ControllerServiceEntity;
import org.apache.nifi.web.api.entity.ControllerServicesEntity;
import org.apache.nifi.web.api.entity.DropRequestEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ProcessGroupFlowEntity;
import org.apache.nifi.web.api.entity.ProcessGroupStatusEntity;
import org.apache.nifi.web.api.entity.SearchResultsEntity;
Expand Down Expand Up @@ -1874,6 +1875,59 @@ public Response getControllerServicesFromConnectorProcessGroup(
return generateOkResponse(entity).build();
}

/**
* Retrieves the parameter context bound to the specified process group within a connector.
*
* @param connectorId The id of the connector
* @param processGroupId The process group id within the connector's hierarchy
* @return A parameterContextEntity, or 204 No Content if the process group has no bound parameter context
*/
@GET
@Consumes(MediaType.WILDCARD)
@Produces(MediaType.APPLICATION_JSON)
@Path("/{connectorId}/flow/process-groups/{processGroupId}/parameter-context")
@Operation(
summary = "Gets the parameter context bound to a process group within a connector",
responses = {
@ApiResponse(responseCode = "200", content = @Content(schema = @Schema(implementation = ParameterContextEntity.class))),
@ApiResponse(responseCode = "204", description = "The specified process group has no bound parameter context."),
@ApiResponse(responseCode = "400", description = "NiFi was unable to complete the request because it was invalid. The request should not be retried without modification."),
@ApiResponse(responseCode = "401", description = "Client could not be authenticated."),
@ApiResponse(responseCode = "403", description = "Client is not authorized to make this request."),
@ApiResponse(responseCode = "404", description = "The specified resource could not be found."),
@ApiResponse(responseCode = "409", description = "The request was valid but NiFi was not in the appropriate state to process it.")
},
security = {
@SecurityRequirement(name = "Read - /connectors/{uuid}")
},
description = "Returns the parameter context (with effective parameters, including those inherited from other contexts) bound to the " +
"specified process group within the connector's hierarchy. Sensitive parameter values are masked. Returns 204 No Content if the " +
"process group has no bound parameter context."
)
public Response getParameterContextForConnectorProcessGroup(
@Parameter(description = "The connector id.", required = true)
@PathParam("connectorId") final String connectorId,
@Parameter(description = "The process group id.", required = true)
@PathParam("processGroupId") final String processGroupId) {

if (isReplicateRequest()) {
return replicate(HttpMethod.GET);
}

// authorize access to the connector
serviceFacade.authorizeAccess(lookup -> {
final Authorizable connector = lookup.getConnector(connectorId);
connector.authorize(authorizer, RequestAction.READ, NiFiUserUtils.getNiFiUser());
});

final ParameterContextEntity entity = serviceFacade.getConnectorParameterContext(connectorId, processGroupId);
if (entity == null) {
return Response.noContent().build();
}

return generateOkResponse(entity).build();
}

/**
* Retrieves the status for the process group managed by the specified connector.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1620,8 +1620,7 @@ public ParameterDTO createParameterDto(final ParameterContext parameterContext,
final Set<AffectedComponentEntity> referencingComponentEntities = createAffectedComponentEntities(referencingComponents, revisionManager);
dto.setReferencingComponents(referencingComponentEntities);

final ParameterContext containingParameterContext = (parameter.getParameterContextId() == null)
? parameterContext : parameterContextLookup.getParameterContext(parameter.getParameterContextId());
final ParameterContext containingParameterContext = resolveContainingParameterContext(parameterContext, parameter, parameterContextLookup);

dto.setInherited(!containingParameterContext.getIdentifier().equals(parameterContext.getIdentifier()));

Expand All @@ -1631,6 +1630,55 @@ public ParameterDTO createParameterDto(final ParameterContext parameterContext,
return dto;
}

/**
* Resolves the {@link ParameterContext} where the given parameter was originally defined.
*
* <p>For parameters declared directly on the current context (or whose source id matches the current
* context's identifier), the current context is returned without consulting any external lookup. For
* inherited parameters, the source context is found by walking the in-memory inheritance graph reachable
* from the current context via {@link ParameterContext#getInheritedParameterContexts()}. If the source
* context is not reachable on that graph (expected only for legacy callers that pass a registry-backed
* lookup), the provided {@link ParameterContextLookup} is consulted as a fallback. If neither resolves
* the source id (e.g. an empty lookup combined with an unreachable id from a transient data
* inconsistency), the current context is returned so the parameter is reported as locally defined
* rather than producing a {@link NullPointerException} at the call site.</p>
*/
private ParameterContext resolveContainingParameterContext(final ParameterContext parameterContext, final Parameter parameter,
final ParameterContextLookup parameterContextLookup) {
final String sourceId = parameter.getParameterContextId();
if (sourceId == null || sourceId.equals(parameterContext.getIdentifier())) {
return parameterContext;
}

final ParameterContext fromGraph = findInheritedParameterContext(parameterContext, sourceId, new HashSet<>());
if (fromGraph != null) {
return fromGraph;
}

final ParameterContext fromLookup = parameterContextLookup.getParameterContext(sourceId);
return fromLookup != null ? fromLookup : parameterContext;
}

private ParameterContext findInheritedParameterContext(final ParameterContext parameterContext, final String sourceId, final Set<String> visited) {
if (parameterContext == null || !visited.add(parameterContext.getIdentifier())) {
return null;
}
if (sourceId.equals(parameterContext.getIdentifier())) {
return parameterContext;
}
final List<ParameterContext> inherited = parameterContext.getInheritedParameterContexts();
if (inherited == null) {
return null;
}
for (final ParameterContext inheritedContext : inherited) {
final ParameterContext match = findInheritedParameterContext(inheritedContext, sourceId, visited);
if (match != null) {
return match;
}
}
return null;
}

public ReportingTaskDTO createReportingTaskDto(final ReportingTaskNode reportingTaskNode) {
final BundleCoordinate bundleCoordinate = reportingTaskNode.getBundleCoordinate();
final List<Bundle> compatibleBundles = extensionManager.getBundles(reportingTaskNode.getCanonicalClassName()).stream().filter(bundle -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@
import org.apache.nifi.history.History;
import org.apache.nifi.history.HistoryQuery;
import org.apache.nifi.nar.ExtensionManager;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextLookup;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.registry.flow.FlowRegistryUtil;
import org.apache.nifi.registry.flow.RegisteredFlowSnapshot;
Expand Down Expand Up @@ -104,6 +106,7 @@
import org.apache.nifi.web.api.dto.CountersSnapshotDTO;
import org.apache.nifi.web.api.dto.DtoFactory;
import org.apache.nifi.web.api.dto.EntityFactory;
import org.apache.nifi.web.api.dto.ParameterContextDTO;
import org.apache.nifi.web.api.dto.ProcessGroupDTO;
import org.apache.nifi.web.api.dto.RemoteProcessGroupDTO;
import org.apache.nifi.web.api.dto.RevisionDTO;
Expand All @@ -118,6 +121,7 @@
import org.apache.nifi.web.api.entity.ConnectorEntity;
import org.apache.nifi.web.api.entity.CopyRequestEntity;
import org.apache.nifi.web.api.entity.CopyResponseEntity;
import org.apache.nifi.web.api.entity.ParameterContextEntity;
import org.apache.nifi.web.api.entity.ProcessGroupEntity;
import org.apache.nifi.web.api.entity.SecretsEntity;
import org.apache.nifi.web.api.entity.StatusHistoryEntity;
Expand Down Expand Up @@ -2246,4 +2250,100 @@ public void testGetConnectorClusterNodeRequest() {
assertNotNull(entity.getComponent(), "Component should be populated when clusterNodeRequest is true");
assertEquals("RUNNING", entity.getComponent().getState());
}

@Test
public void testGetConnectorParameterContextReturnsEntityWhenContextBound() {
final String connectorId = "connector-id";
final String processGroupId = "process-group-id";
final String parameterContextId = "parameter-context-id";

final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
final DtoFactory dtoFactory = mock(DtoFactory.class);
final RevisionManager revisionManager = mock(RevisionManager.class);
final EntityFactory entityFactory = new EntityFactory();
serviceFacade.setConnectorDAO(connectorDAO);
serviceFacade.setDtoFactory(dtoFactory);
serviceFacade.setRevisionManager(revisionManager);
serviceFacade.setEntityFactory(entityFactory);

final ConnectorNode connectorNode = mock(ConnectorNode.class);
final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class);
final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
final ProcessGroup targetProcessGroup = mock(ProcessGroup.class);
final ParameterContext parameterContext = mock(ParameterContext.class);

when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
when(managedProcessGroup.findProcessGroup(processGroupId)).thenReturn(targetProcessGroup);
when(targetProcessGroup.getParameterContext()).thenReturn(parameterContext);
when(parameterContext.getIdentifier()).thenReturn(parameterContextId);

final ParameterContextDTO parameterContextDto = new ParameterContextDTO();
parameterContextDto.setId(parameterContextId);
parameterContextDto.setName("context-name");
when(dtoFactory.createParameterContextDto(eq(parameterContext), eq(revisionManager), eq(true), any(ParameterContextLookup.class)))
.thenReturn(parameterContextDto);
when(dtoFactory.createPermissionsDto(eq(parameterContext), any())).thenReturn(null);
when(dtoFactory.createRevisionDTO(any(Revision.class))).thenReturn(new RevisionDTO());
when(revisionManager.getRevision(parameterContextId)).thenReturn(new Revision(1L, null, parameterContextId));

final ParameterContextEntity entity = serviceFacade.getConnectorParameterContext(connectorId, processGroupId);

assertNotNull(entity);
assertEquals(parameterContextId, entity.getId());

final ArgumentCaptor<ParameterContextLookup> lookupCaptor = ArgumentCaptor.forClass(ParameterContextLookup.class);
verify(dtoFactory).createParameterContextDto(eq(parameterContext), eq(revisionManager), eq(true), lookupCaptor.capture());
assertNotNull(lookupCaptor.getValue());
assertNull(lookupCaptor.getValue().getParameterContext("any-id"));
assertFalse(lookupCaptor.getValue().hasParameterContext("any-id"));
}

@Test
public void testGetConnectorParameterContextReturnsNullWhenNoBoundContext() {
final String connectorId = "connector-id";
final String processGroupId = "process-group-id";

final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
final DtoFactory dtoFactory = mock(DtoFactory.class);
serviceFacade.setConnectorDAO(connectorDAO);
serviceFacade.setDtoFactory(dtoFactory);

final ConnectorNode connectorNode = mock(ConnectorNode.class);
final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class);
final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);
final ProcessGroup targetProcessGroup = mock(ProcessGroup.class);

when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
when(managedProcessGroup.findProcessGroup(processGroupId)).thenReturn(targetProcessGroup);
when(targetProcessGroup.getParameterContext()).thenReturn(null);

final ParameterContextEntity entity = serviceFacade.getConnectorParameterContext(connectorId, processGroupId);

assertNull(entity);
Mockito.verifyNoInteractions(dtoFactory);
}

@Test
public void testGetConnectorParameterContextThrowsWhenProcessGroupNotFound() {
final String connectorId = "connector-id";
final String processGroupId = "missing-process-group";

final ConnectorDAO connectorDAO = mock(ConnectorDAO.class);
serviceFacade.setConnectorDAO(connectorDAO);

final ConnectorNode connectorNode = mock(ConnectorNode.class);
final FrameworkFlowContext flowContext = mock(FrameworkFlowContext.class);
final ProcessGroup managedProcessGroup = mock(ProcessGroup.class);

when(connectorDAO.getConnector(connectorId, ConnectorSyncMode.LOCAL_ONLY)).thenReturn(connectorNode);
when(connectorNode.getActiveFlowContext()).thenReturn(flowContext);
when(flowContext.getManagedProcessGroup()).thenReturn(managedProcessGroup);
when(managedProcessGroup.findProcessGroup(processGroupId)).thenReturn(null);

assertThrows(ResourceNotFoundException.class, () -> serviceFacade.getConnectorParameterContext(connectorId, processGroupId));
}
}
Loading
Loading