diff --git a/behave_framework/src/minifi_test_framework/containers/container.py b/behave_framework/src/minifi_test_framework/containers/container.py index 07abded28f..4b6de86341 100644 --- a/behave_framework/src/minifi_test_framework/containers/container.py +++ b/behave_framework/src/minifi_test_framework/containers/container.py @@ -30,7 +30,7 @@ class Container: - def __init__(self, image_name: str, container_name: str, network: Network, command: str | None = None): + def __init__(self, image_name: str, container_name: str, network: Network, command: str | None = None, entrypoint: str | None = None): self.image_name: str = image_name self.container_name: str = container_name self.network: Network = network @@ -42,6 +42,7 @@ def __init__(self, image_name: str, container_name: str, network: Network, comma self.host_files: list[HostFile] = [] self.volumes = {} self.command: str | None = command + self.entrypoint: str | None = entrypoint self._temp_dir: tempfile.TemporaryDirectory | None = None self.ports: dict[str, int] | None = None self.environment: list[str] = [] @@ -91,7 +92,7 @@ def deploy(self) -> bool: self.container = self.client.containers.run( image=self.image_name, name=self.container_name, ports=self.ports, environment=self.environment, volumes=self.volumes, network=self.network.name, - command=self.command, user=self.user, detach=True) + command=self.command, entrypoint=self.entrypoint, user=self.user, detach=True) except Exception as e: logging.error(f"Error starting container: {e}") raise @@ -430,3 +431,18 @@ def directory_contains_file_with_json_content(self, directory_path: str, expecte continue return False + + def directory_contains_file_with_minimum_size(self, directory_path: str, expected_size: int) -> bool: + if not self.container or not self.nonempty_dir_exists(directory_path): + return False + + command = f"find \"{directory_path}\" -maxdepth 1 -type f -size +{expected_size}c" + + exit_code, output = self.exec_run(command) + if exit_code != 0: + logging.error(f"Error running command to get file sizes: {output}") + return False + if len(output.strip()) > 0: + return True + + return False diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py index 4d1a1a5b31..f6951519f3 100644 --- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py @@ -20,19 +20,20 @@ from minifi_test_framework.core.minifi_test_context import MinifiTestContext from minifi_test_framework.containers.file import File -from minifi_test_framework.minifi.flow_definition import FlowDefinition -from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage, make_client_cert +from minifi_test_framework.minifi.minifi_flow_definition import MinifiFlowDefinition +from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage, make_client_cert, make_server_cert from .container import Container class MinifiContainer(Container): def __init__(self, container_name: str, test_context: MinifiTestContext): super().__init__(test_context.minifi_container_image, f"{container_name}-{test_context.scenario_id}", test_context.network) - self.flow_definition = FlowDefinition() + self.flow_definition = MinifiFlowDefinition() self.properties: dict[str, str] = {} self.log_properties: dict[str, str] = {} minifi_client_cert, minifi_client_key = make_cert_without_extended_usage(common_name=self.container_name, ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key) + self.files.append(File("/usr/local/share/certs/ca-root-nss.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert))) self.files.append(File("/tmp/resources/root_ca.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert))) self.files.append(File("/tmp/resources/minifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_client_cert))) self.files.append(File("/tmp/resources/minifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_client_key))) @@ -41,6 +42,10 @@ def __init__(self, container_name: str, test_context: MinifiTestContext): self.files.append(File("/tmp/resources/clientuser.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=clientuser_cert))) self.files.append(File("/tmp/resources/clientuser.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=clientuser_key))) + minifi_server_cert, minifi_server_key = make_server_cert(common_name=f"server-{test_context.scenario_id}", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key) + self.files.append(File("/tmp/resources/minifi_server.crt", + crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_server_cert) + crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_server_key))) + self.is_fhs = 'MINIFI_INSTALLATION_TYPE=FHS' in str(self.client.images.get(test_context.minifi_container_image).history()) self._fill_default_properties() diff --git a/behave_framework/src/minifi_test_framework/containers/nifi_container.py b/behave_framework/src/minifi_test_framework/containers/nifi_container.py new file mode 100644 index 0000000000..396b1e1a75 --- /dev/null +++ b/behave_framework/src/minifi_test_framework/containers/nifi_container.py @@ -0,0 +1,89 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import io +import gzip +from typing import List, Optional +from minifi_test_framework.containers.file import File +from minifi_test_framework.containers.container import Container +from minifi_test_framework.core.helpers import wait_for_condition +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.minifi.nifi_flow_definition import NifiFlowDefinition + + +class NifiContainer(Container): + NIFI_VERSION = '2.7.2' + + def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] = None, use_ssl: bool = False): + self.flow_definition = NifiFlowDefinition() + name = f"nifi-{test_context.scenario_id}" + if use_ssl: + entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' " + r"-e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' " + r"-e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' " + r"-e 's/^\(nifi.web.https.port\)=.*/\1=8443/' " + r"-e 's/^\(nifi.web.https.host\)=.*/\1={name}/' " + r"-e 's/^\(nifi.security.keystore\)=.*/\1=\/tmp\/resources\/keystore.jks/' " + r"-e 's/^\(nifi.security.keystoreType\)=.*/\1=jks/' " + r"-e 's/^\(nifi.security.keystorePasswd\)=.*/\1=passw0rd1!/' " + r"-e 's/^\(nifi.security.keyPasswd\)=.*/#\1=passw0rd1!/' " + r"-e 's/^\(nifi.security.truststore\)=.*/\1=\/tmp\/resources\/truststore.jks/' " + r"-e 's/^\(nifi.security.truststoreType\)=.*/\1=jks/' " + r"-e 's/^\(nifi.security.truststorePasswd\)=.*/\1=passw0rd1!/' " + r"-e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10443/' /opt/nifi/nifi-current/conf/nifi.properties && " + r"cp /tmp/nifi_config/flow.json.gz /opt/nifi/nifi-current/conf && /opt/nifi/nifi-current/bin/nifi.sh run & " + r"nifi_pid=$! &&" + r"tail -F --pid=${{nifi_pid}} /opt/nifi/nifi-current/logs/nifi-app.log").format(name=name) + else: + entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' " + r"-e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' " + r"-e 's/^\(nifi.remote.input.secure\)=.*/\1=false/' " + r"-e 's/^\(nifi.web.http.port\)=.*/\1=8080/' " + r"-e 's/^\(nifi.web.https.port\)=.*/\1=/' " + r"-e 's/^\(nifi.web.https.host\)=.*/\1=/' " + r"-e 's/^\(nifi.web.http.host\)=.*/\1={name}/' " + r"-e 's/^\(nifi.security.keystore\)=.*/\1=/' " + r"-e 's/^\(nifi.security.keystoreType\)=.*/\1=/' " + r"-e 's/^\(nifi.security.keystorePasswd\)=.*/\1=/' " + r"-e 's/^\(nifi.security.keyPasswd\)=.*/\1=/' " + r"-e 's/^\(nifi.security.truststore\)=.*/\1=/' " + r"-e 's/^\(nifi.security.truststoreType\)=.*/\1=/' " + r"-e 's/^\(nifi.security.truststorePasswd\)=.*/\1=/' " + r"-e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10000/' /opt/nifi/nifi-current/conf/nifi.properties && " + r"cp /tmp/nifi_config/flow.json.gz /opt/nifi/nifi-current/conf && /opt/nifi/nifi-current/bin/nifi.sh run & " + r"nifi_pid=$! &&" + r"tail -F --pid=${{nifi_pid}} /opt/nifi/nifi-current/logs/nifi-app.log").format(name=name) + if not command: + command = ["/bin/sh", "-c", entry_command] + + super().__init__("apache/nifi:" + self.NIFI_VERSION, name, test_context.network, entrypoint=command) + + def deploy(self): + flow_config = self.flow_definition.to_json() + buffer = io.BytesIO() + + with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file: + gz_file.write(flow_config.encode()) + + gzipped_bytes = buffer.getvalue() + self.files.append(File("/tmp/nifi_config/flow.json.gz", gzipped_bytes)) + + super().deploy() + finished_str = "Started Application in" + return wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=300, + bail_condition=lambda: self.exited, + context=None) diff --git a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py index e53e8d0e70..cbaa4b8de6 100644 --- a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py +++ b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py @@ -15,7 +15,7 @@ # limitations under the License. # -import yaml +from abc import ABC from .connection import Connection from .controller_service import ControllerService @@ -24,7 +24,7 @@ from .processor import Processor -class FlowDefinition: +class FlowDefinition(ABC): def __init__(self, flow_name: str = "MiNiFi Flow"): self.flow_name = flow_name self.processors: list[Processor] = [] @@ -53,42 +53,10 @@ def add_connection(self, connection: Connection): self.connections.append(connection) def to_yaml(self) -> str: - """Serializes the entire flow definition into the MiNiFi YAML format.""" + raise NotImplementedError("to_yaml method must be implemented in subclasses") - # Create a quick lookup map of processor names to their objects - # This is crucial for finding the source/destination IDs for connections - processors_by_name = {p.name: p for p in self.processors} - funnels_by_name = {f.name: f for f in self.funnels} - - connectables_by_name = {**processors_by_name, **funnels_by_name} - - if len(self.parameter_contexts) > 0: - parameter_context_name = self.parameter_contexts[0].name - else: - parameter_context_name = '' - # Build the final dictionary structure - config = {'MiNiFi Config Version': 3, 'Flow Controller': {'name': self.flow_name}, - 'Parameter Contexts': [p.to_yaml_dict() for p in self.parameter_contexts], - 'Processors': [p.to_yaml_dict() for p in self.processors], - 'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [], - 'Controller Services': [c.to_yaml_dict() for c in self.controller_services], - 'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name} - - # Build the connections list by looking up processor IDs - for conn in self.connections: - source_proc = connectables_by_name.get(conn.source_name) - dest_proc = connectables_by_name.get(conn.target_name) - - if not source_proc or not dest_proc: - raise ValueError( - f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'") - - config['Connections'].append( - {'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id, - 'source id': source_proc.id, 'source relationship name': conn.source_relationship, - 'destination id': dest_proc.id}) - - return yaml.dump(config, sort_keys=False, indent=2, width=120) + def to_json(self) -> str: + raise NotImplementedError("to_json method must be implemented in subclasses") def __repr__(self): return f"FlowDefinition(Processors: {self.processors}, Controller Services: {self.controller_services})" diff --git a/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py new file mode 100644 index 0000000000..469a3b8c17 --- /dev/null +++ b/behave_framework/src/minifi_test_framework/minifi/minifi_flow_definition.py @@ -0,0 +1,63 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import yaml + +from .flow_definition import FlowDefinition + + +class MinifiFlowDefinition(FlowDefinition): + def __init__(self, flow_name: str = "MiNiFi Flow"): + super().__init__(flow_name) + + def to_yaml(self) -> str: + """Serializes the entire flow definition into the MiNiFi YAML format.""" + + # Create a quick lookup map of processor names to their objects + # This is crucial for finding the source/destination IDs for connections + processors_by_name = {p.name: p for p in self.processors} + funnels_by_name = {f.name: f for f in self.funnels} + + connectables_by_name = {**processors_by_name, **funnels_by_name} + + if len(self.parameter_contexts) > 0: + parameter_context_name = self.parameter_contexts[0].name + else: + parameter_context_name = '' + # Build the final dictionary structure + config = {'MiNiFi Config Version': 3, 'Flow Controller': {'name': self.flow_name}, + 'Parameter Contexts': [p.to_yaml_dict() for p in self.parameter_contexts], + 'Processors': [p.to_yaml_dict() for p in self.processors], + 'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [], + 'Controller Services': [c.to_yaml_dict() for c in self.controller_services], + 'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name} + + # Build the connections list by looking up processor IDs + for conn in self.connections: + source_proc = connectables_by_name.get(conn.source_name) + dest_proc = connectables_by_name.get(conn.target_name) + + if not source_proc or not dest_proc: + raise ValueError( + f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'") + + config['Connections'].append( + {'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id, + 'source id': source_proc.id, 'source relationship name': conn.source_relationship, + 'destination id': dest_proc.id}) + + return yaml.dump(config, sort_keys=False, indent=2, width=120) diff --git a/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py new file mode 100644 index 0000000000..8e7faab5ad --- /dev/null +++ b/behave_framework/src/minifi_test_framework/minifi/nifi_flow_definition.py @@ -0,0 +1,158 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import json +import uuid +from .flow_definition import FlowDefinition + + +class NifiFlowDefinition(FlowDefinition): + NIFI_VERSION: str = '2.2.0' + + def __init__(self, flow_name: str = "NiFi Flow"): + super().__init__(flow_name) + + def to_json(self) -> str: + config = { + "encodingVersion": { + "majorVersion": 2, + "minorVersion": 0 + }, + "maxTimerDrivenThreadCount": 10, + "maxEventDrivenThreadCount": 1, + "registries": [], + "parameterContexts": [], + "parameterProviders": [], + "controllerServices": [], + "reportingTasks": [], + "templates": [], + "rootGroup": { + "identifier": "9802c873-3322-3b60-a71d-732d02bd60f8", + "instanceIdentifier": str(uuid.uuid4()), + "name": "NiFi Flow", + "comments": "", + "position": { + "x": 0, + "y": 0 + }, + "processGroups": [], + "remoteProcessGroups": [], + "processors": [], + "inputPorts": [], + "outputPorts": [], + "connections": [], + "labels": [], + "funnels": [], + "controllerServices": [], + "defaultFlowFileExpiration": "0 sec", + "defaultBackPressureObjectThreshold": 10000, + "defaultBackPressureDataSizeThreshold": "1 GB", + "scheduledState": "RUNNING", + "executionEngine": "INHERITED", + "maxConcurrentTasks": 1, + "statelessFlowTimeout": "1 min", + "flowFileConcurrency": "UNBOUNDED", + "flowFileOutboundPolicy": "STREAM_WHEN_AVAILABLE", + "componentType": "PROCESS_GROUP" + } + } + + processors_by_name = {p.name: p for p in self.processors} + processors_node = config["rootGroup"]["processors"] + + for proc in self.processors: + processors_node.append({ + "identifier": str(proc.id), + "instanceIdentifier": str(proc.id), + "name": proc.name, + "comments": "", + "position": { + "x": 0, + "y": 0 + }, + "type": 'org.apache.nifi.processors.standard.' + proc.class_name, + "bundle": { + "group": "org.apache.nifi", + "artifact": "nifi-standard-nar", + "version": self.NIFI_VERSION + }, + "properties": {key: value for key, value in proc.properties.items() if key}, + "propertyDescriptors": {}, + "style": {}, + "schedulingPeriod": "0 sec" if proc.scheduling_strategy == "EVENT_DRIVEN" else proc.scheduling_period, + "schedulingStrategy": "TIMER_DRIVEN", + "executionNode": "ALL", + "penaltyDuration": "5 sec", + "yieldDuration": "1 sec", + "bulletinLevel": "WARN", + "runDurationMillis": "0", + "concurrentlySchedulableTaskCount": proc.max_concurrent_tasks if proc.max_concurrent_tasks is not None else 1, + "autoTerminatedRelationships": proc.auto_terminated_relationships, + "scheduledState": "RUNNING", + "retryCount": 10, + "retriedRelationships": [], + "backoffMechanism": "PENALIZE_FLOWFILE", + "maxBackoffPeriod": "10 mins", + "componentType": "PROCESSOR", + "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8" + }) + + connections_node = config["rootGroup"]["connections"] + + for conn in self.connections: + source_proc = processors_by_name.get(conn.source_name) + dest_proc = processors_by_name.get(conn.target_name) + if not source_proc or not dest_proc: + raise ValueError( + f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'") + + connections_node.append({ + "identifier": conn.id, + "instanceIdentifier": conn.id, + "name": f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", + "source": { + "id": source_proc.id, + "type": "PROCESSOR", + "groupId": "9802c873-3322-3b60-a71d-732d02bd60f8", + "name": conn.source_name, + "comments": "", + "instanceIdentifier": source_proc.id + }, + "destination": { + "id": dest_proc.id, + "type": "PROCESSOR", + "groupId": "9802c873-3322-3b60-a71d-732d02bd60f8", + "name": dest_proc.name, + "comments": "", + "instanceIdentifier": dest_proc.id + }, + "labelIndex": 1, + "zIndex": 0, + "selectedRelationships": [conn.source_relationship], + "backPressureObjectThreshold": 10, + "backPressureDataSizeThreshold": "50 B", + "flowFileExpiration": "0 sec", + "prioritizers": [], + "bends": [], + "loadBalanceStrategy": "DO_NOT_LOAD_BALANCE", + "partitioningAttribute": "", + "loadBalanceCompression": "DO_NOT_COMPRESS", + "componentType": "CONNECTION", + "groupIdentifier": "9802c873-3322-3b60-a71d-732d02bd60f8" + }) + + return json.dumps(config) diff --git a/behave_framework/src/minifi_test_framework/steps/checking_steps.py b/behave_framework/src/minifi_test_framework/steps/checking_steps.py index c964aea465..0ced1816f9 100644 --- a/behave_framework/src/minifi_test_framework/steps/checking_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/checking_steps.py @@ -105,13 +105,18 @@ def step_impl(context: MinifiTestContext, url: str): assert http_proxy_container.check_http_proxy_access(url) or http_proxy_container.log_app_output() -@then('no files are placed in the "{directory}" directory in {duration} of running time') -def step_impl(context, directory, duration): +@then('in the "{container}" container no files are placed in the "{directory}" directory in {duration} of running time') +def step_impl(context, container, directory, duration): duration_seconds = humanfriendly.parse_timespan(duration) - assert check_condition_after_wait(condition=lambda: context.get_default_minifi_container().get_number_of_files(directory) == 0, + assert check_condition_after_wait(condition=lambda: context.get_minifi_container(container).get_number_of_files(directory) == 0, context=context, wait_time=duration_seconds) +@then('no files are placed in the "{directory}" directory in {duration} of running time') +def step_impl(context, directory, duration): + context.execute_steps(f'then in the "{DEFAULT_MINIFI_CONTAINER_NAME}" container no files are placed in the "{directory}" directory in {duration} of running time') + + @then('{num:d} files are placed in the "{directory}" directory in less than {duration}') @then('{num:d} file is placed in the "{directory}" directory in less than {duration}') def step_impl(context: MinifiTestContext, num: int, directory: str, duration: str): @@ -221,3 +226,31 @@ def step_impl(context, directory, duration): assert wait_for_condition( condition=lambda: context.get_default_minifi_container().directory_contains_empty_file(directory), timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_default_minifi_container().exited, context=context) + + +@then("in the \"{container_name}\" container at least one empty file is placed in the \"{directory}\" directory in less than {duration}") +def step_impl(context, container_name, directory, duration): + timeout_in_seconds = humanfriendly.parse_timespan(duration) + if container_name == "nifi": + assert wait_for_condition( + condition=lambda: context.containers["nifi"].directory_contains_empty_file(directory), + timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_minifi_container(container_name).exited, context=context) + return + assert wait_for_condition( + condition=lambda: context.get_minifi_container(container_name).directory_contains_empty_file(directory), + timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_minifi_container(container_name).exited, context=context) + + +@then("in the \"{container_name}\" container at least one file with minimum size of \"{size}\" is placed in the \"{directory}\" directory in less than {duration}") +def step_impl(context, container_name: str, directory: str, size: str, duration: str): + timeout_in_seconds = humanfriendly.parse_timespan(duration) + size_in_bytes = humanfriendly.parse_size(size) + assert wait_for_condition( + condition=lambda: context.get_minifi_container(container_name).directory_contains_file_with_minimum_size(directory, size_in_bytes), + timeout_seconds=timeout_in_seconds, bail_condition=lambda: context.get_minifi_container(container_name).exited, context=context) + + +@then("at least one file with minimum size of \"{size}\" is placed in the \"{directory}\" directory in less than {duration}") +def step_impl(context, directory: str, size: str, duration: str): + context.execute_steps( + f'Then in the "{DEFAULT_MINIFI_CONTAINER_NAME}" container at least one file with minimum size of "{size}" is placed in the "{directory}" directory in less than {duration}') diff --git a/behave_framework/src/minifi_test_framework/steps/core_steps.py b/behave_framework/src/minifi_test_framework/steps/core_steps.py index eb4050ab6b..836b588346 100644 --- a/behave_framework/src/minifi_test_framework/steps/core_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/core_steps.py @@ -25,6 +25,8 @@ import humanfriendly from behave import when, step, given +from minifi_test_framework.containers.http_proxy_container import HttpProxy +from minifi_test_framework.containers.nifi_container import NifiContainer from minifi_test_framework.containers.directory import Directory from minifi_test_framework.containers.file import File from minifi_test_framework.core.minifi_test_context import DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext @@ -100,3 +102,13 @@ def step_impl(context: MinifiTestContext): @given("OpenSSL FIPS mode is enabled in MiNiFi") def step_impl(context): context.get_or_create_default_minifi_container().enable_openssl_fips_mode() + + +@step("the http proxy server is set up") +def step_impl(context): + context.containers["http-proxy"] = HttpProxy(context) + + +@step("a NiFi container is set up") +def step_impl(context): + context.containers["nifi"] = NifiContainer(context) diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index d780673a45..23db9824e8 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -20,7 +20,6 @@ from behave import given, step from minifi_test_framework.containers.directory import Directory -from minifi_test_framework.containers.http_proxy_container import HttpProxy from minifi_test_framework.core.minifi_test_context import DEFAULT_MINIFI_CONTAINER_NAME, MinifiTestContext from minifi_test_framework.minifi.connection import Connection from minifi_test_framework.minifi.controller_service import ControllerService @@ -61,6 +60,9 @@ def step_impl(context: MinifiTestContext, processor_type: str, property_name: st def step_impl(context: MinifiTestContext, processor_type: str, property_name: str, property_value: str, minifi_container_name: str): processor = Processor(processor_type, processor_type) processor.add_property(property_name, property_value) + if minifi_container_name == "nifi": + context.containers["nifi"].flow_definition.add_processor(processor) + return context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor) @@ -73,6 +75,9 @@ def step_impl(context: MinifiTestContext, processor_type: str, processor_name: s @given("a {processor_type} processor in the \"{minifi_container_name}\" flow") def step_impl(context: MinifiTestContext, processor_type: str, minifi_container_name: str): processor = Processor(processor_type, processor_type) + if minifi_container_name == "nifi": + context.containers["nifi"].flow_definition.add_processor(processor) + return context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_processor(processor) @@ -81,15 +86,25 @@ def step_impl(context: MinifiTestContext, processor_type: str): context.execute_steps(f'given a {processor_type} processor in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') -@step('the "{property_name}" property of the {processor_name} processor is set to "{property_value}"') -def step_impl(context: MinifiTestContext, property_name: str, processor_name: str, property_value: str): - processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name) +@given('the "{property_name}" property of the {processor_name} processor is set to "{property_value}" in the "{minifi_container_name}" flow') +def step_impl(context: MinifiTestContext, property_name: str, processor_name: str, property_value: str, minifi_container_name: str): + processor = None + if minifi_container_name == "nifi": + processor = context.containers["nifi"].flow_definition.get_processor(processor_name) + else: + processor = context.get_or_create_minifi_container(minifi_container_name).flow_definition.get_processor(processor_name) if property_value == "(not set)": processor.remove_property(property_name) else: processor.add_property(property_name, property_value) +@given('the "{property_name}" property of the {processor_name} processor is set to "{property_value}"') +def step_impl(context: MinifiTestContext, property_name: str, processor_name: str, property_value: str): + context.execute_steps( + f'given the "{property_name}" property of the {processor_name} processor is set to "{property_value}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') + + @step('the "{property_name}" property of the {controller_name} controller service is set to "{property_value}"') def step_impl(context: MinifiTestContext, property_name: str, controller_name: str, property_value: str): controller_service = context.get_or_create_default_minifi_container().flow_definition.get_controller_service(controller_name) @@ -107,6 +122,9 @@ def step_impl(context: MinifiTestContext, funnel_name: str): @step('in the "{minifi_container_name}" flow the "{relationship_name}" relationship of the {source} processor is connected to the {target}') def step_impl(context: MinifiTestContext, relationship_name: str, source: str, target: str, minifi_container_name: str): connection = Connection(source_name=source, source_relationship=relationship_name, target_name=target) + if minifi_container_name == "nifi": + context.containers["nifi"].flow_definition.add_connection(connection) + return context.get_or_create_minifi_container(minifi_container_name).flow_definition.add_connection(connection) @@ -123,6 +141,9 @@ def step_impl(context: MinifiTestContext, funnel_name: str, target: str): @step("{processor_name}'s {relationship} relationship is auto-terminated in the \"{minifi_container_name}\" flow") def step_impl(context: MinifiTestContext, processor_name: str, relationship: str, minifi_container_name: str): + if minifi_container_name == "nifi": + context.containers["nifi"].flow_definition.get_processor(processor_name).auto_terminated_relationships.append(relationship) + return context.get_or_create_minifi_container(minifi_container_name).flow_definition.get_processor(processor_name).auto_terminated_relationships.append( relationship) @@ -137,6 +158,14 @@ def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().command = ["/bin/sh", "-c", "timeout 10s ./bin/minifi.sh run && sleep 100"] +@step('the scheduling period of the {processor_name} processor is set to "{duration_str}" in the "{minifi_container_name}" flow') +def step_impl(context: MinifiTestContext, processor_name: str, duration_str: str, minifi_container_name: str): + if minifi_container_name == "nifi": + context.containers["nifi"].flow_definition.get_processor(processor_name).scheduling_period = duration_str + return + context.get_or_create_minifi_container(minifi_container_name).flow_definition.get_processor(processor_name).scheduling_period = duration_str + + @step('the scheduling period of the {processor_name} processor is set to "{duration_str}"') def step_impl(context: MinifiTestContext, processor_name: str, duration_str: str): context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name).scheduling_period = duration_str @@ -191,11 +220,6 @@ def step_impl(context: MinifiTestContext): processor.add_property(row["property name"], row["property value"]) -@step("the http proxy server is set up") -def step_impl(context): - context.containers["http-proxy"] = HttpProxy(context) - - @step("the processors are connected up as described here") def step_impl(context: MinifiTestContext): for row in context.table: @@ -270,14 +294,17 @@ def step_impl(context, property_name, processor_name_one, processor_name_two): # TLS -def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: str): +def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: str, use_system_cert_store: bool = False): ssl_context_service = context.get_or_create_default_minifi_container().flow_definition.get_controller_service("SSLContextService") if ssl_context_service is not None: return controller_service = ControllerService(class_name="SSLContextService", service_name="SSLContextService") controller_service.add_property("Client Certificate", f"/tmp/resources/{cert_name}.crt") controller_service.add_property("Private Key", f"/tmp/resources/{cert_name}.key") - controller_service.add_property("CA Certificate", "/tmp/resources/root_ca.crt") + if use_system_cert_store: + controller_service.add_property("Use System Cert Store", "true") + else: + controller_service.add_property("CA Certificate", "/tmp/resources/root_ca.crt") context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) @@ -292,3 +319,10 @@ def step_impl(context, processor_name): add_ssl_context_service_for_minifi(context, "minifi_client") processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name) processor.add_property('SSL Context Service', 'SSLContextService') + + +@given("an ssl context service using the system CA cert store is set up for {processor_name}") +def step_impl(context: MinifiTestContext, processor_name): + add_ssl_context_service_for_minifi(context, "minifi_client", use_system_cert_store=True) + processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name) + processor.add_property('SSL Context Service', 'SSLContextService') diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh index 32a7d7a68e..9d90bbf8e5 100755 --- a/docker/RunBehaveTests.sh +++ b/docker/RunBehaveTests.sh @@ -206,4 +206,5 @@ exec \ "${docker_dir}/../extensions/splunk/tests/features" \ "${docker_dir}/../extensions/gcp/tests/features" \ "${docker_dir}/../extensions/grafana-loki/tests/features" \ - "${docker_dir}/../extensions/lua/tests/features/" + "${docker_dir}/../extensions/lua/tests/features/" \ + "${docker_dir}/../extensions/civetweb/tests/features/" diff --git a/extensions/civetweb/tests/features/environment.py b/extensions/civetweb/tests/features/environment.py new file mode 100644 index 0000000000..d6c856340a --- /dev/null +++ b/extensions/civetweb/tests/features/environment.py @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from minifi_test_framework.core.hooks import common_before_scenario +from minifi_test_framework.core.hooks import common_after_scenario + + +def before_scenario(context, scenario): + common_before_scenario(context, scenario) + + +def after_scenario(context, scenario): + common_after_scenario(context, scenario) diff --git a/docker/test/integration/features/http.feature b/extensions/civetweb/tests/features/http.feature similarity index 61% rename from docker/test/integration/features/http.feature rename to extensions/civetweb/tests/features/http.feature index 19a9d37458..afd9c6dbc4 100644 --- a/docker/test/integration/features/http.feature +++ b/extensions/civetweb/tests/features/http.feature @@ -19,134 +19,164 @@ Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP As a user of MiNiFi I need to have ListenHTTP and InvokeHTTP processors - Background: - Given the content of "/tmp/output" is monitored - Scenario: A MiNiFi instance transfers data to another MiNiFi instance with message body Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" - And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${scenario_id}:8080/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "success" relationship of the GetFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow + And a ListenHTTP processor with the "Listening Port" property set to "8080" in the "secondary" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "secondary" flow + And in the "secondary" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "secondary" flow When both instances start up - Then at least one flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + Then in the "secondary" container at least one file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds Scenario: A MiNiFi instance sends data through a HTTP proxy and another one listens - Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + Given the http proxy server is set up + And a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" - And a InvokeHTTP processor with the "Remote URL" property set to "http://minifi-listen-${feature_id}:8080/contentListener" - And these processor properties are set to match the http proxy: + And a InvokeHTTP processor with the "Remote URL" property set to "http://minifi-listen-${scenario_id}:8080/contentListener" + And InvokeHTTP is EVENT_DRIVEN + And these processor properties are set | processor name | property name | property value | | InvokeHTTP | HTTP Method | POST | - | InvokeHTTP | Proxy Host | http-proxy-${feature_id} | + | InvokeHTTP | Proxy Host | http-proxy-${scenario_id} | | InvokeHTTP | Proxy Port | 3128 | | InvokeHTTP | invokehttp-proxy-username | admin | | InvokeHTTP | invokehttp-proxy-password | test101 | And the "success" relationship of the GetFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a http proxy server is set up accordingly - - And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "minifi-listen" flow + And a ListenHTTP processor with the "Listening Port" property set to "8080" in the "minifi-listen" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "minifi-listen" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "minifi-listen" flow + And in the "minifi-listen" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "minifi-listen" flow When all instances start up - Then at least one flowfile with the content "test" is placed in the monitored directory in less than 60 seconds - And no errors were generated on the http-proxy regarding "http://minifi-listen-${feature_id}:8080/contentListener" + Then in the "minifi-listen" container at least one file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds + And no errors were generated on the http-proxy regarding "http://minifi-listen-${scenario_id}:8080/contentListener" Scenario: A MiNiFi instance and transfers hashed data to another MiNiFi instance Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" And a HashContent processor with the "Hash Attribute" property set to "hash" - And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener" + And HashContent is EVENT_DRIVEN + And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${scenario_id}:8080/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "success" relationship of the GetFile processor is connected to the HashContent And the "success" relationship of the HashContent processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow + And a ListenHTTP processor with the "Listening Port" property set to "8080" in the "secondary" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "secondary" flow + And in the "secondary" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "secondary" flow When both instances start up - Then at least one flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + Then in the "secondary" container at least one file with the content "test" is placed in the "/tmp/output" directory in less than 60 seconds Scenario: A MiNiFi instance transfers data to another MiNiFi instance without message body Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" - And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${scenario_id}:8080/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "Send Message Body" property of the InvokeHTTP processor is set to "false" And the "success" relationship of the GetFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow + And a ListenHTTP processor with the "Listening Port" property set to "8080" in the "secondary" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "secondary" flow + And in the "secondary" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "secondary" flow When both instances start up - Then at least one empty flowfile is placed in the monitored directory in less than 60 seconds + Then in the "secondary" container at least one empty file is placed in the "/tmp/output" directory in less than 60 seconds Scenario: A MiNiFi instance transfers data to a NiFi instance with message body Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And a file with the content "test" is present in "/tmp/input" - And a InvokeHTTP processor with the "Remote URL" property set to "http://nifi-${feature_id}:8081/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "http://nifi-${scenario_id}:8081/contentListener" And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "success" relationship of the GetFile processor is connected to the InvokeHTTP - And a NiFi flow with the name "nifi" is set up + And a NiFi container is set up And a ListenHTTP processor with the "Listening Port" property set to "8081" in the "nifi" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And in the "nifi" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "nifi" flow + And PutFile's failure relationship is auto-terminated in the "nifi" flow When both instances start up - Then at least one flowfile with the content "test" is placed in the monitored directory in less than 60 seconds + Then in the "nifi" container at least one empty file is placed in the "/tmp/output" directory in less than 60 seconds Scenario: A MiNiFi instance transfers data to another MiNiFi instance with message body and limited speed Given a GenerateFlowFile processor with the "File Size" property set to "10 MB" And the scheduling period of the GenerateFlowFile processor is set to "30 sec" - And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${scenario_id}:8080/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "Connection Timeout" property of the InvokeHTTP processor is set to "30 s" And the "Read Timeout" property of the InvokeHTTP processor is set to "30 s" And the "Upload Speed Limit" property of the InvokeHTTP processor is set to "800 KB/s" And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow + And a ListenHTTP processor with the "Listening Port" property set to "8080" in the "secondary" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "secondary" flow + And in the "secondary" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "secondary" flow When both instances start up - Then at least one flowfile with minimum size of "1 MB" is placed in the monitored directory in less than 60 seconds + Then in the "secondary" container at least one file with minimum size of "1 MB" is placed in the "/tmp/output" directory in less than 60 seconds And the Minifi logs contain the following message: "[warning] InvokeHTTP::onTrigger has been running for" in less than 10 seconds Scenario: A MiNiFi instance retrieves data from another MiNiFi instance with message body and limited speed - Given a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${feature_id}:8080/contentListener&testfile" + Given a InvokeHTTP processor with the "Remote URL" property set to "http://secondary-${scenario_id}:8080/contentListener&testfile" And the scheduling period of the InvokeHTTP processor is set to "3 sec" And the "HTTP Method" property of the InvokeHTTP processor is set to "GET" And the "Connection Timeout" property of the InvokeHTTP processor is set to "30 s" And the "Read Timeout" property of the InvokeHTTP processor is set to "30 s" And the "Download Speed Limit" property of the InvokeHTTP processor is set to "800 KB/s" + And the "Data Format" property of the InvokeHTTP processor is set to "Text" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And the "response" relationship of the InvokeHTTP processor is connected to the PutFile + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated And a GenerateFlowFile processor with the "File Size" property set to "10 MB" in the "secondary" flow - And the "Data Format" property of the InvokeHTTP processor is set to "Text" And a UpdateAttribute processor with the "http.type" property set to "response_body" in the "secondary" flow - And the "filename" property of the UpdateAttribute processor is set to "testfile" - And the scheduling period of the GenerateFlowFile processor is set to "30 sec" - And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow - And the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute - And the "success" relationship of the UpdateAttribute processor is connected to the ListenHTTP + And UpdateAttribute is EVENT_DRIVEN in the "secondary" flow + And the "filename" property of the UpdateAttribute processor is set to "testfile" in the "secondary" flow + And the scheduling period of the GenerateFlowFile processor is set to "30 sec" in the "secondary" flow + And a ListenHTTP processor with the "Listening Port" property set to "8080" in the "secondary" flow + And ListenHTTP is EVENT_DRIVEN in the "secondary" flow + And in the "secondary" flow the "success" relationship of the GenerateFlowFile processor is connected to the UpdateAttribute + And in the "secondary" flow the "success" relationship of the UpdateAttribute processor is connected to the ListenHTTP + And ListenHTTP's success relationship is auto-terminated in the "secondary" flow When both instances start up - Then at least one flowfile with minimum size of "10 MB" is placed in the monitored directory in less than 60 seconds + Then at least one file with minimum size of "10 MB" is placed in the "/tmp/output" directory in less than 60 seconds And the Minifi logs contain the following message: "[warning] InvokeHTTP::onTrigger has been running for" in less than 10 seconds diff --git a/docker/test/integration/features/https.feature b/extensions/civetweb/tests/features/https.feature similarity index 61% rename from docker/test/integration/features/https.feature rename to extensions/civetweb/tests/features/https.feature index 863cc782e4..b3d4eb6bd8 100644 --- a/docker/test/integration/features/https.feature +++ b/extensions/civetweb/tests/features/https.feature @@ -16,135 +16,155 @@ @ENABLE_CIVET Feature: Transfer data from and to MiNiFi using HTTPS - Background: - Given the content of "/tmp/output" is monitored - - Scenario: InvokeHTTP to ListenHTTP without an SSLContextService works (no mutual TLS in this case) Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "Lorem ipsum dolor sit amet" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "server" flow + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then a flowfile with the content "Lorem ipsum dolor sit amet" is placed in the monitored directory in less than 10s - + Then in the "server" container at least one file with the content "Lorem ipsum dolor sit amet" is placed in the "/tmp/output" directory in less than 10 seconds Scenario: InvokeHTTP to ListenHTTP without an SSLContextService requires a server cert signed by a CA Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "consectetur adipiscing elit" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/self_signed_server.crt" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/self_signed_server.crt" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then no files are placed in the monitored directory in 10s of running time - + Then in the "server" container no files are placed in the "/tmp/output" directory in 10s of running time Scenario: InvokeHTTP to ListenHTTP with mutual TLS, using certificate files Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "ut labore et dolore magna aliqua" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And an ssl context service with a manual CA cert file is set up for InvokeHTTP And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" - And the "SSL Certificate Authority" property of the ListenHTTP processor is set to "/usr/local/share/certs/ca-root-nss.crt" - And the "SSL Verify Peer" property of the ListenHTTP processor is set to "yes" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" in the "server" flow + And the "SSL Certificate Authority" property of the ListenHTTP processor is set to "/usr/local/share/certs/ca-root-nss.crt" in the "server" flow + And the "SSL Verify Peer" property of the ListenHTTP processor is set to "yes" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "server" flow + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then a flowfile with the content "ut labore et dolore magna aliqua" is placed in the monitored directory in less than 10s - + Then in the "server" container at least one file with the content "ut labore et dolore magna aliqua" is placed in the "/tmp/output" directory in less than 10s Scenario: InvokeHTTP to ListenHTTP without mutual TLS, using the system certificate store Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "Ut enim ad minim veniam" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And an ssl context service using the system CA cert store is set up for InvokeHTTP And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then a flowfile with the content "Ut enim ad minim veniam" is placed in the monitored directory in less than 10s - + Then in the "server" container at least one file with the content "Ut enim ad minim veniam" is placed in the "/tmp/output" directory in less than 10s Scenario: InvokeHTTP to ListenHTTP with mutual TLS, using the system certificate store Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "quis nostrud exercitation ullamco laboris nisi" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" + And InvokeHTTP is EVENT_DRIVEN And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And an ssl context service using the system CA cert store is set up for InvokeHTTP And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" - And the "SSL Certificate Authority" property of the ListenHTTP processor is set to "/usr/local/share/certs/ca-root-nss.crt" - And the "SSL Verify Peer" property of the ListenHTTP processor is set to "yes" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/minifi_server.crt" in the "server" flow + And the "SSL Certificate Authority" property of the ListenHTTP processor is set to "/usr/local/share/certs/ca-root-nss.crt" in the "server" flow + And the "SSL Verify Peer" property of the ListenHTTP processor is set to "yes" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "server" flow + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then a flowfile with the content "quis nostrud exercitation ullamco laboris nisi" is placed in the monitored directory in less than 10s - + Then in the "server" container at least one file with the content "quis nostrud exercitation ullamco laboris nisi" is placed in the "/tmp/output" directory in less than 10s Scenario: InvokeHTTP to ListenHTTP without mutual TLS, using the system certificate store, requires a server cert signed by a CA Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "ut aliquip ex ea commodo consequat" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And an ssl context service using the system CA cert store is set up for InvokeHTTP And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/self_signed_server.crt" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/self_signed_server.crt" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then no files are placed in the monitored directory in 10s of running time - + Then in the "server" container no files are placed in the "/tmp/output" directory in 10s of running time Scenario: InvokeHTTP to ListenHTTP with mutual TLS, using the system certificate store, requires a server cert signed by a CA Given a GenerateFlowFile processor with the "Data Format" property set to "Text" And the "Unique FlowFiles" property of the GenerateFlowFile processor is set to "false" And the "Custom Text" property of the GenerateFlowFile processor is set to "Duis aute irure dolor in reprehenderit in voluptate" - And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${feature_id}:4430/contentListener" + And a InvokeHTTP processor with the "Remote URL" property set to "https://server-${scenario_id}:4430/contentListener" And the "HTTP Method" property of the InvokeHTTP processor is set to "POST" And an ssl context service using the system CA cert store is set up for InvokeHTTP And the "success" relationship of the GenerateFlowFile processor is connected to the InvokeHTTP + And InvokeHTTP's success relationship is auto-terminated + And InvokeHTTP's response relationship is auto-terminated - And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "server" flow - And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/self_signed_server.crt" - And the "SSL Certificate Authority" property of the ListenHTTP processor is set to "/usr/local/share/certs/ca-root-nss.crt" - And the "SSL Verify Peer" property of the ListenHTTP processor is set to "yes" + And a ListenHTTP processor with the "Listening Port" property set to "4430" in the "server" flow + And the "SSL Certificate" property of the ListenHTTP processor is set to "/tmp/resources/self_signed_server.crt" in the "server" flow + And the "SSL Certificate Authority" property of the ListenHTTP processor is set to "/usr/local/share/certs/ca-root-nss.crt" in the "server" flow + And the "SSL Verify Peer" property of the ListenHTTP processor is set to "yes" in the "server" flow And a PutFile processor with the "Directory" property set to "/tmp/output" in the "server" flow - And the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile is EVENT_DRIVEN in the "server" flow + And in the "server" flow the "success" relationship of the ListenHTTP processor is connected to the PutFile + And PutFile's success relationship is auto-terminated in the "server" flow When both instances start up - Then no files are placed in the monitored directory in 10s of running time + Then in the "server" container no files are placed in the "/tmp/output" directory in 10s of running time diff --git a/extensions/civetweb/tests/features/steps/steps.py b/extensions/civetweb/tests/features/steps/steps.py new file mode 100644 index 0000000000..90304a7f8a --- /dev/null +++ b/extensions/civetweb/tests/features/steps/steps.py @@ -0,0 +1,19 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from minifi_test_framework.steps import checking_steps # noqa: F401 +from minifi_test_framework.steps import configuration_steps # noqa: F401 +from minifi_test_framework.steps import core_steps # noqa: F401 +from minifi_test_framework.steps import flow_building_steps # noqa: F401