Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions CONTROLLERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.
- [NetworkPrioritizerService](#NetworkPrioritizerService)
- [ODBCService](#ODBCService)
- [PersistentMapStateStorage](#PersistentMapStateStorage)
- [ProxyConfigurationService](#ProxyConfigurationService)
- [RocksDbStateStorage](#RocksDbStateStorage)
- [SmbConnectionControllerService](#SmbConnectionControllerService)
- [SSLContextService](#SSLContextService)
Expand Down Expand Up @@ -244,6 +245,24 @@ In the list below, the names of required properties appear in bold. Any other pr
| **File** | | | Path to a file to store state |


## ProxyConfigurationService

### Description

Provides a set of configurations for different MiNiFi C++ components to use a proxy server. Currently these properties can only be used for HTTP proxy configuration, not other protocols are supported at this time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Provides a set of configurations for different MiNiFi C++ components to use a proxy server. Currently these properties can only be used for HTTP proxy configuration, not other protocols are supported at this time.
Provides a set of configurations for various MiNiFi C++ components to use a proxy server. Currently these properties can only be used for HTTP proxy configuration, no other protocols are supported at this time.


### Properties

In the list below, the names of required properties appear in bold. Any other properties (not in bold) are considered optional. The table also indicates any default values, and whether a property supports the NiFi Expression Language.

| Name | Default Value | Allowable Values | Description |
|-----------------------|---------------|------------------|--------------------------------------------------------------------------------------------|
| **Proxy Server Host** | | | Proxy server hostname or ip-address. |
| Proxy Server Port | | | Proxy server port number. |
| Proxy User Name | | | The name of the proxy client for user authentication. |
| Proxy User Password | | | The password of the proxy client for user authentication.<br/>**Sensitive Property: true** |


## RocksDbStateStorage

### Description
Expand Down
20 changes: 16 additions & 4 deletions PROCESSORS.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -192,10 +192,25 @@ def step_impl(context: MinifiTestContext):


@step("the http proxy server is set up")
def step_impl(context):
def step_impl(context: MinifiTestContext):
context.containers["http-proxy"] = HttpProxy(context)


@given("a ProxyConfigurationService controller service is set up with HTTP proxy configuration in the \"{container_name}\" flow")
def step_impl(context: MinifiTestContext, container_name: str):
controller_service = ControllerService(class_name="ProxyConfigurationService", service_name="ProxyConfigurationService")
controller_service.add_property("Proxy Server Host", f"http-proxy-{context.scenario_id}")
controller_service.add_property("Proxy Server Port", "3128")
controller_service.add_property("Proxy User Name", "admin")
controller_service.add_property("Proxy User Password", "test101")
context.get_or_create_minifi_container(container_name).flow_definition.controller_services.append(controller_service)


@given("a ProxyConfigurationService controller service is set up with HTTP proxy configuration")
def step_impl(context: MinifiTestContext):
context.execute_steps(f"given a ProxyConfigurationService controller service is set up with HTTP proxy configuration in the \"{DEFAULT_MINIFI_CONTAINER_NAME}\" flow")


@step("the processors are connected up as described here")
def step_impl(context: MinifiTestContext):
for row in context.table:
Expand Down
34 changes: 34 additions & 0 deletions core-framework/include/controllers/ProxyConfiguration.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include <string>
#include <optional>

#include "minifi-cpp/controllers/ProxyConfigurationServiceInterface.h"

namespace org::apache::nifi::minifi::controllers {

struct ProxyConfiguration {
ProxyType proxy_type;
std::string proxy_host;
std::optional<uint16_t> proxy_port;
std::optional<std::string> proxy_user;
std::optional<std::string> proxy_password;
};

} // namespace org::apache::nifi::minifi::controllers
13 changes: 13 additions & 0 deletions docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from minifi.controllers.XMLReader import XMLReader
from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter
from minifi.controllers.XMLReader import XMLReader
from minifi.controllers.ProxyConfigurationService import ProxyConfigurationService

from behave import given, then, when
from behave.model_describe import ModelDescriptor
Expand Down Expand Up @@ -406,6 +407,18 @@ def step_impl(context):
context.test.acquire_container(context=context, name="http-proxy", engine="http-proxy")


@given("a ProxyConfigurationService controller service is set up with HTTP proxy configuration in the \"{container_name}\" flow")
def step_impl(context, container_name):
proxy_service = ProxyConfigurationService("ProxyConfigurationService", host=f"http-proxy-{context.feature_id}", port=3128, username="admin", password="test101")
container = context.test.acquire_container(context=context, name=container_name)
container.add_controller(proxy_service)


@given("a ProxyConfigurationService controller service is set up with HTTP proxy configuration")
def step_impl(context):
context.execute_steps("given a ProxyConfigurationService controller service is set up with HTTP proxy configuration in the \"{container_name}\" flow".format(container_name="minifi-cpp-flow"))


# TLS
@given("an ssl context service is set up for {processor_name}")
@given("an ssl context service with a manual CA cert file is set up for {processor_name}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from ..core.ControllerService import ControllerService


class ProxyConfigurationService(ControllerService):
def __init__(self, name, host, port=None, username=None, password=None):
super(ProxyConfigurationService, self).__init__(name=name)

self.service_class = 'ProxyConfigurationService'

self.properties['Proxy Server Host'] = host

if port is not None:
self.properties['Proxy Server Port'] = port

if username is not None:
self.properties['Proxy User Name'] = username

if password is not None:
self.properties['Proxy User Password'] = password
19 changes: 15 additions & 4 deletions extensions/aws/processors/AwsProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,10 +69,21 @@ std::optional<Aws::Auth::AWSCredentials> AwsProcessor::getAWSCredentials(
aws::ProxyOptions AwsProcessor::getProxy(core::ProcessContext& context, const core::FlowFile* const flow_file) {
aws::ProxyOptions proxy;

proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or("");
proxy.port = gsl::narrow<uint32_t>(minifi::utils::parseOptionalU64Property(context, ProxyPort, flow_file).value_or(0));
proxy.username = minifi::utils::parseOptionalProperty(context, ProxyUsername, flow_file).value_or("");
proxy.password = minifi::utils::parseOptionalProperty(context, ProxyPassword, flow_file).value_or("");
auto proxy_controller_service = minifi::utils::parseOptionalControllerService<minifi::controllers::ProxyConfigurationServiceInterface>(context, ProxyConfigurationService, getUUID());
if (proxy_controller_service) {
proxy.host = proxy_controller_service->getHost();
auto port_opt = proxy_controller_service->getPort();
proxy.port = port_opt ? *port_opt : 0;
auto username_opt = proxy_controller_service->getUsername();
proxy.username = username_opt ? *username_opt : "";
auto password_opt = proxy_controller_service->getPassword();
proxy.password = password_opt ? *password_opt : "";
Comment on lines +74 to +80
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very minor, but why not

Suggested change
proxy.host = proxy_controller_service->getHost();
auto port_opt = proxy_controller_service->getPort();
proxy.port = port_opt ? *port_opt : 0;
auto username_opt = proxy_controller_service->getUsername();
proxy.username = username_opt ? *username_opt : "";
auto password_opt = proxy_controller_service->getPassword();
proxy.password = password_opt ? *password_opt : "";
proxy.host = proxy_controller_service->getHost();
proxy.port = proxy_controller_service->getPort().value_or(0);
proxy.username = proxy_controller_service->getUsername().value_or("");
proxy.password = proxy_controller_service->getPassword().value_or("");

as it is done in the else branch?

} else {
proxy.host = minifi::utils::parseOptionalProperty(context, ProxyHost, flow_file).value_or("");
proxy.port = gsl::narrow<uint32_t>(minifi::utils::parseOptionalU64Property(context, ProxyPort, flow_file).value_or(0));
proxy.username = minifi::utils::parseOptionalProperty(context, ProxyUsername, flow_file).value_or("");
proxy.password = minifi::utils::parseOptionalProperty(context, ProxyPassword, flow_file).value_or("");
}

if (!proxy.host.empty()) {
logger_->log_info("Proxy for AwsProcessor was set.");
Expand Down
7 changes: 7 additions & 0 deletions extensions/aws/processors/AwsProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "core/PropertyDefinitionBuilder.h"
#include "minifi-cpp/core/PropertyValidator.h"
#include "core/ProcessorImpl.h"
#include "minifi-cpp/controllers/ProxyConfigurationServiceInterface.h"


namespace org::apache::nifi::minifi::aws::processors {
Expand Down Expand Up @@ -148,6 +149,11 @@ class AwsProcessor : public core::ProcessorImpl { // NOLINT(cppcoreguidelines-s
.supportsExpressionLanguage(true)
.isSensitive(true)
.build();
EXTENSIONAPI static constexpr auto ProxyConfigurationService = core::PropertyDefinitionBuilder<>::createProperty("Proxy Configuration Service")
.withDescription("Specifies the Proxy Configuration Controller Service to proxy network requests. When used, "
"this will override any values specified for Proxy Host, Proxy Port, Proxy Username, and Proxy Password properties.")
.withAllowedTypes<minifi::controllers::ProxyConfigurationServiceInterface>()
.build();
EXTENSIONAPI static constexpr auto UseDefaultCredentials = core::PropertyDefinitionBuilder<>::createProperty("Use Default Credentials")
.withDescription("If true, uses the Default Credential chain, including EC2 instance profiles or roles, environment variables, default user credentials, etc.")
.withValidator(core::StandardPropertyValidators::BOOLEAN_VALIDATOR)
Expand All @@ -166,6 +172,7 @@ class AwsProcessor : public core::ProcessorImpl { // NOLINT(cppcoreguidelines-s
ProxyPort,
ProxyUsername,
ProxyPassword,
ProxyConfigurationService,
UseDefaultCredentials
});

Expand Down
7 changes: 6 additions & 1 deletion extensions/aws/tests/DeleteS3ObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,12 @@ TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Non blank validator tests") {

TEST_CASE_METHOD(DeleteS3ObjectTestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
SECTION("Use proxy configuration service") {
setProxy(true);
}
SECTION("Use processor properties") {
setProxy(false);
}
test_controller.runSession(plan, true);
checkProxySettings();
}
Expand Down
7 changes: 6 additions & 1 deletion extensions/aws/tests/FetchS3ObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,12 @@ TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Non blank validator tests") {

TEST_CASE_METHOD(FetchS3ObjectTestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
SECTION("Use proxy configuration service") {
setProxy(true);
}
SECTION("Use processor properties") {
setProxy(false);
}
test_controller.runSession(plan, true);
checkProxySettings();
}
Expand Down
7 changes: 6 additions & 1 deletion extensions/aws/tests/ListS3Tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ TEST_CASE_METHOD(ListS3TestsFixture, "Non blank validator tests") {

TEST_CASE_METHOD(ListS3TestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
SECTION("Use proxy configuration service") {
setProxy(true);
}
SECTION("Use processor properties") {
setProxy(false);
}
test_controller.runSession(plan, true);
checkProxySettings();
}
Expand Down
7 changes: 6 additions & 1 deletion extensions/aws/tests/PutS3ObjectTests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,12 @@ TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test multiple user metadata", "[awsS3

TEST_CASE_METHOD(PutS3ObjectTestsFixture, "Test proxy setting", "[awsS3Proxy]") {
setRequiredProperties();
setProxy();
SECTION("Use proxy configuration service") {
setProxy(true);
}
SECTION("Use processor properties") {
setProxy(false);
}
test_controller.runSession(plan);
checkProxySettings();
}
Expand Down
51 changes: 35 additions & 16 deletions extensions/aws/tests/S3TestsFixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class S3TestsFixture {

virtual void setAccesKeyCredentialsInProcessor() = 0;
virtual void setBucket() = 0;
virtual void setProxy() = 0;
virtual void setProxy(bool use_controller_service) = 0;

void setRequiredProperties() {
setAccesKeyCredentialsInProcessor();
Expand Down Expand Up @@ -136,7 +136,8 @@ class FlowProcessorS3TestsFixture : public S3TestsFixture<T> {
auto mock_s3_request_sender = std::make_unique<MockS3RequestSender>();
this->mock_s3_request_sender_ptr = mock_s3_request_sender.get();
auto uuid = utils::IdGenerator::getIdGenerator()->generate();
auto impl = std::unique_ptr<T>(new T(core::ProcessorMetadata{.uuid = uuid, .name = "S3Processor", .logger = core::logging::LoggerFactory<T>::getLogger(uuid)}, std::move(mock_s3_request_sender)));
auto impl = std::unique_ptr<T>(new T(core::ProcessorMetadata{ // NOLINT(clang-analyzer-cplusplus.NewDeleteLeaks)
.uuid = uuid, .name = "S3Processor", .logger = core::logging::LoggerFactory<T>::getLogger(uuid)}, std::move(mock_s3_request_sender)));
auto s3_processor_unique_ptr = std::make_unique<core::Processor>("S3Processor", uuid, std::move(impl));
this->s3_processor = s3_processor_unique_ptr.get();

Expand Down Expand Up @@ -178,15 +179,24 @@ class FlowProcessorS3TestsFixture : public S3TestsFixture<T> {
this->plan->setProperty(this->s3_processor, "Bucket", "${test.bucket}");
}

void setProxy() override {
this->plan->setDynamicProperty(update_attribute, "test.proxyHost", "host");
this->plan->setProperty(this->s3_processor, "Proxy Host", "${test.proxyHost}");
this->plan->setDynamicProperty(update_attribute, "test.proxyPort", "1234");
this->plan->setProperty(this->s3_processor, "Proxy Port", "${test.proxyPort}");
this->plan->setDynamicProperty(update_attribute, "test.proxyUsername", "username");
this->plan->setProperty(this->s3_processor, "Proxy Username", "${test.proxyUsername}");
this->plan->setDynamicProperty(update_attribute, "test.proxyPassword", "password");
this->plan->setProperty(this->s3_processor, "Proxy Password", "${test.proxyPassword}");
void setProxy(bool use_controller_service) override {
if (use_controller_service) {
auto proxy_configuration_service = this->plan->addController("ProxyConfigurationService", "ProxyConfigurationService");
this->plan->setProperty(proxy_configuration_service, "Proxy Server Host", "host");
this->plan->setProperty(proxy_configuration_service, "Proxy Server Port", "1234");
this->plan->setProperty(proxy_configuration_service, "Proxy User Name", "username");
this->plan->setProperty(proxy_configuration_service, "Proxy User Password", "password");
this->plan->setProperty(this->s3_processor, "Proxy Configuration Service", "ProxyConfigurationService");
} else {
this->plan->setDynamicProperty(update_attribute, "test.proxyHost", "host");
this->plan->setProperty(this->s3_processor, "Proxy Host", "${test.proxyHost}");
this->plan->setDynamicProperty(update_attribute, "test.proxyPort", "1234");
this->plan->setProperty(this->s3_processor, "Proxy Port", "${test.proxyPort}");
this->plan->setDynamicProperty(update_attribute, "test.proxyUsername", "username");
this->plan->setProperty(this->s3_processor, "Proxy Username", "${test.proxyUsername}");
this->plan->setDynamicProperty(update_attribute, "test.proxyPassword", "password");
this->plan->setProperty(this->s3_processor, "Proxy Password", "${test.proxyPassword}");
}
}

protected:
Expand Down Expand Up @@ -224,10 +234,19 @@ class FlowProducerS3TestsFixture : public S3TestsFixture<T> {
this->plan->setProperty(this->s3_processor, "Bucket", this->S3_BUCKET);
}

void setProxy() override {
this->plan->setProperty(this->s3_processor, "Proxy Host", "host");
this->plan->setProperty(this->s3_processor, "Proxy Port", "1234");
this->plan->setProperty(this->s3_processor, "Proxy Username", "username");
this->plan->setProperty(this->s3_processor, "Proxy Password", "password");
void setProxy(bool use_controller_service) override {
if (use_controller_service) {
auto proxy_configuration_service = this->plan->addController("ProxyConfigurationService", "ProxyConfigurationService");
this->plan->setProperty(proxy_configuration_service, "Proxy Server Host", "host");
this->plan->setProperty(proxy_configuration_service, "Proxy Server Port", "1234");
this->plan->setProperty(proxy_configuration_service, "Proxy User Name", "username");
this->plan->setProperty(proxy_configuration_service, "Proxy User Password", "password");
this->plan->setProperty(this->s3_processor, "Proxy Configuration Service", "ProxyConfigurationService");
} else {
this->plan->setProperty(this->s3_processor, "Proxy Host", "host");
this->plan->setProperty(this->s3_processor, "Proxy Port", "1234");
this->plan->setProperty(this->s3_processor, "Proxy Username", "username");
this->plan->setProperty(this->s3_processor, "Proxy Password", "password");
}
}
};
Loading
Loading