diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py index 8678bdc0f3..55d7309573 100644 --- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py @@ -21,7 +21,7 @@ from minifi_test_framework.core.minifi_test_context import MinifiTestContext from minifi_test_framework.containers.file import File from minifi_test_framework.minifi.flow_definition import FlowDefinition -from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage +from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage, make_client_cert from .container import Container @@ -38,6 +38,10 @@ def __init__(self, container_name: str, test_context: MinifiTestContext): self.files.append(File("/tmp/resources/minifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_client_cert))) self.files.append(File("/tmp/resources/minifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_client_key))) + clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key) + self.files.append(File("/tmp/resources/clientuser.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=clientuser_cert))) + self.files.append(File("/tmp/resources/clientuser.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=clientuser_key))) + self.is_fhs = 'MINIFI_INSTALLATION_TYPE=FHS' in str(self.client.images.get(test_context.minifi_container_image).history()) self._fill_default_properties() diff --git a/behave_framework/src/minifi_test_framework/core/helpers.py b/behave_framework/src/minifi_test_framework/core/helpers.py index e6ae51857c..7b79920562 100644 --- a/behave_framework/src/minifi_test_framework/core/helpers.py +++ b/behave_framework/src/minifi_test_framework/core/helpers.py @@ -19,6 +19,7 @@ import logging import time +import functools from collections.abc import Callable import docker @@ -82,3 +83,21 @@ def run_cmd_in_docker_image(image_name: str, cmd: str | list, network: str) -> s def run_shell_cmd_in_docker_image(image_name: str, cmd: str, network: str) -> str: return run_cmd_in_docker_image(image_name, ["/bin/sh", "-c", cmd], network) + + +def retry_check(max_tries: int = 5, retry_interval_seconds: int = 1): + """ + Decorator for retrying a checker function that returns a boolean. The decorated function is called repeatedly until it returns True + or the maximum number of attempts is reached. The maximum number of attempts and the interval between attempts in seconds can be configured. + """ + def retry_check_func(func): + @functools.wraps(func) + def retry_wrapper(*args, **kwargs): + for i in range(max_tries): + if func(*args, **kwargs): + return True + if i < max_tries - 1: + time.sleep(retry_interval_seconds) + return False + return retry_wrapper + return retry_check_func diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index 22c3c604d5..11e19d1041 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -29,6 +29,11 @@ from minifi_test_framework.minifi.processor import Processor +@given("a MiNiFi CPP server with yaml config") +def step_impl(context: MinifiTestContext): + pass # TODO(lordgamez): Needs to be implemented after JSON config is set to be default + + @given("a transient MiNiFi flow with a LogOnDestructionProcessor processor") def step_impl(context: MinifiTestContext): context.get_or_create_default_minifi_container().command = ["/bin/sh", "-c", "timeout 10s ./bin/minifi.sh run && sleep 100"] @@ -140,6 +145,7 @@ def step_impl(context: MinifiTestContext, parameter_name: str, parameter_value: @step('a directory at "{directory}" has a file with the content "{content}" in the "{flow_name}" flow') +@step("a directory at '{directory}' has a file with the content '{content}' in the '{flow_name}' flow") def step_impl(context: MinifiTestContext, directory: str, content: str, flow_name: str): new_content = content.replace("\\n", "\n") new_dir = Directory(directory) @@ -148,6 +154,7 @@ def step_impl(context: MinifiTestContext, directory: str, content: str, flow_nam @step('a directory at "{directory}" has a file with the content "{content}"') +@step("a directory at '{directory}' has a file with the content '{content}'") def step_impl(context: MinifiTestContext, directory: str, content: str): context.execute_steps(f'given a directory at "{directory}" has a file with the content "{content}" in the "{DEFAULT_MINIFI_CONTAINER_NAME}" flow') @@ -240,14 +247,22 @@ def step_impl(context: MinifiTestContext, property_name: str, processor_name: st # TLS -@given("an ssl context service is set up for {processor_name}") -@given("an ssl context service with a manual CA cert file is set up for {processor_name}") -def step_impl(context, processor_name): +def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: str): controller_service = ControllerService(class_name="SSLContextService", service_name="SSLContextService") - controller_service.add_property("Client Certificate", "/tmp/resources/minifi_client.crt") - controller_service.add_property("Private Key", "/tmp/resources/minifi_client.key") + controller_service.add_property("Client Certificate", f"/tmp/resources/{cert_name}.crt") + controller_service.add_property("Private Key", f"/tmp/resources/{cert_name}.key") controller_service.add_property("CA Certificate", "/tmp/resources/root_ca.crt") context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) + +@given("an ssl context service is set up") +def step_impl(context: MinifiTestContext): + add_ssl_context_service_for_minifi(context, "minifi_client") + + +@given("an ssl context service is set up for {processor_name}") +@given("an ssl context service with a manual CA cert file is set up for {processor_name}") +def step_impl(context, processor_name): + add_ssl_context_service_for_minifi(context, "minifi_client") processor = context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name) processor.add_property('SSL Context Service', 'SSLContextService') diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh index 786e9e6b3f..2f9f4b9498 100755 --- a/docker/RunBehaveTests.sh +++ b/docker/RunBehaveTests.sh @@ -200,4 +200,5 @@ exec \ "${docker_dir}/../extensions/sql/tests/features" \ "${docker_dir}/../extensions/llamacpp/tests/features" \ "${docker_dir}/../extensions/opc/tests/features" \ - "${docker_dir}/../extensions/kafka/tests/features" + "${docker_dir}/../extensions/kafka/tests/features" \ + "${docker_dir}/../extensions/couchbase/tests/features" diff --git a/docker/requirements.txt b/docker/requirements.txt index 2a2da597a8..77c4c2fdc6 100644 --- a/docker/requirements.txt +++ b/docker/requirements.txt @@ -10,5 +10,4 @@ azure-storage-blob==12.24.1 prometheus-api-client==0.5.5 humanfriendly==10.0 requests<2.29 # https://github.com/docker/docker-py/issues/3113 -couchbase==4.3.5 paho-mqtt==2.1.0 diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 31e1be1b56..f225e18642 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -36,7 +36,6 @@ from .containers.GrafanaLokiContainer import GrafanaLokiContainer from .containers.GrafanaLokiContainer import GrafanaLokiOptions from .containers.ReverseProxyContainer import ReverseProxyContainer -from .containers.CouchbaseServerContainer import CouchbaseServerContainer from .FeatureContext import FeatureContext @@ -266,14 +265,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c network=self.network, image_store=self.image_store, command=command)) - elif engine == "couchbase-server": - return self.containers.setdefault(container_name, - CouchbaseServerContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) else: raise Exception('invalid flow engine: \'%s\'' % engine) diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 58902f1323..2d0aa4aadb 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -35,7 +35,6 @@ from .checkers.SplunkChecker import SplunkChecker from .checkers.GrafanaLokiChecker import GrafanaLokiChecker from .checkers.ModbusChecker import ModbusChecker -from .checkers.CouchbaseChecker import CouchbaseChecker from .checkers.MqttHelper import MqttHelper from utils import get_peak_memory_usage, get_minifi_pid, get_memory_usage, retry_check @@ -56,7 +55,6 @@ def __init__(self, context, feature_id): self.grafana_loki_checker = GrafanaLokiChecker() self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator) self.modbus_checker = ModbusChecker(self.container_communicator) - self.couchbase_checker = CouchbaseChecker() self.mqtt_helper = MqttHelper() def cleanup(self): @@ -447,8 +445,5 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd): def enable_ssl_in_nifi(self): self.container_store.enable_ssl_in_nifi() - def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): - return self.couchbase_checker.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type) - def publish_test_mqtt_message(self, topic: str, message: str): self.mqtt_helper.publish_test_mqtt_message(topic, message) diff --git a/docker/test/integration/cluster/checkers/CouchbaseChecker.py b/docker/test/integration/cluster/checkers/CouchbaseChecker.py deleted file mode 100644 index 07a332cbf6..0000000000 --- a/docker/test/integration/cluster/checkers/CouchbaseChecker.py +++ /dev/null @@ -1,69 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import json -from couchbase.cluster import Cluster -from couchbase.options import ClusterOptions -from couchbase.auth import PasswordAuthenticator -from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder - - -class CouchbaseChecker: - def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): - try: - cluster = Cluster('couchbase://localhost', ClusterOptions( - PasswordAuthenticator('Administrator', 'password123'))) - - bucket = cluster.bucket(bucket_name) - collection = bucket.default_collection() - - if expected_data_type.lower() == "binary": - binary_flag = 0x03 << 24 - result = collection.get(doc_id, transcoder=RawBinaryTranscoder()) - flags = result.flags - if not flags & binary_flag: - logging.error(f"Expected binary data for document '{doc_id}' but no binary flags were found.") - return False - - content = result.content_as[bytes] - return content.decode('utf-8') == expected_data - - if expected_data_type.lower() == "json": - json_flag = 0x02 << 24 - result = collection.get(doc_id) - flags = result.flags - if not flags & json_flag: - logging.error(f"Expected JSON data for document '{doc_id}' but no JSON flags were found.") - return False - - content = result.content_as[dict] - return content == json.loads(expected_data) - - if expected_data_type.lower() == "string": - string_flag = 0x04 << 24 - result = collection.get(doc_id, transcoder=RawStringTranscoder()) - flags = result.flags - if not flags & string_flag: - logging.error(f"Expected string data for document '{doc_id}' but no string flags were found.") - return False - - content = result.content_as[str] - return content == expected_data - - logging.error(f"Unsupported data type '{expected_data_type}'") - return False - except Exception as e: - logging.error(f"Error while fetching document '{doc_id}' from bucket '{bucket_name}': {e}") - return False diff --git a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py b/docker/test/integration/cluster/containers/CouchbaseServerContainer.py deleted file mode 100644 index b2f13b7329..0000000000 --- a/docker/test/integration/cluster/containers/CouchbaseServerContainer.py +++ /dev/null @@ -1,125 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import os -import OpenSSL.crypto -import tempfile -import docker -import requests -import logging -from requests.auth import HTTPBasicAuth -from .Container import Container -from utils import retry_check -from ssl_utils.SSL_cert_utils import make_server_cert - - -class CouchbaseServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, "couchbase-server", vols, network, image_store, command) - couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key) - - self.root_ca_file = tempfile.NamedTemporaryFile(delete=False) - self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert)) - self.root_ca_file.close() - os.chmod(self.root_ca_file.name, 0o666) - - self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False) - self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert)) - self.couchbase_cert_file.close() - os.chmod(self.couchbase_cert_file.name, 0o666) - - self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False) - self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key)) - self.couchbase_key_file.close() - os.chmod(self.couchbase_key_file.name, 0o666) - - def get_startup_finished_log_entry(self): - # after startup the logs are only available in the container, only this message is shown - return "logs available in" - - @retry_check(max_tries=12, retry_interval=5) - def _run_couchbase_cli_command(self, command): - (code, _) = self.client.containers.get(self.name).exec_run(command) - if code != 0: - logging.error(f"Failed to run command '{command}', returned error code: {code}") - return False - return True - - def _run_couchbase_cli_commands(self, commands): - return all(self._run_couchbase_cli_command(command) for command in commands) - - @retry_check(max_tries=15, retry_interval=2) - def _load_couchbase_certs(self): - response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) - if response.status_code != 200: - logging.error(f"Failed to load CA certificates, with status code: {response.status_code}") - return False - - response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123")) - if response.status_code != 200: - logging.error(f"Failed to reload certificates, with status code: {response.status_code}") - return False - - return True - - def run_post_startup_commands(self): - if self.post_startup_commands_finished: - return True - - commands = [ - ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", - "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], - ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", - "--bucket-ramsize", "1024", "--max-ttl", "36000"], - ["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123", - "--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"], - ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''], - ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json'] - ] - if not self._run_couchbase_cli_commands(commands): - return False - - if not self._load_couchbase_certs(): - return False - - self.post_startup_commands_finished = True - return True - - def deploy(self): - if not self.set_deployed(): - return - - mounts = [ - docker.types.Mount( - type='bind', - source=self.couchbase_key_file.name, - target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'), - docker.types.Mount( - type='bind', - source=self.couchbase_cert_file.name, - target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'), - docker.types.Mount( - type='bind', - source=self.root_ca_file.name, - target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt') - ] - - self.docker_container = self.client.containers.run( - "couchbase:enterprise-7.2.5", - detach=True, - name=self.name, - network=self.network.name, - ports={'8091/tcp': 8091, '11210/tcp': 11210}, - entrypoint=self.command, - mounts=mounts) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 9e07421c6f..f04d0b6dca 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -85,11 +85,6 @@ def start_minifi_c2_server(self, context): self.cluster.deploy_container('minifi-c2-server') assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output() - def start_couchbase_server(self, context): - self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server') - self.cluster.deploy_container('couchbase-server') - assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output() - def start_nifi(self, context): self.cluster.acquire_container(context=context, name='nifi', engine='nifi') self.cluster.deploy_container('nifi') @@ -507,8 +502,5 @@ def set_value_on_plc_with_modbus(self, container_name, modbus_cmd): def enable_ssl_in_nifi(self): self.cluster.enable_ssl_in_nifi() - def check_is_data_present_on_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): - assert self.cluster.is_data_present_in_couchbase(doc_id, bucket_name, expected_data, expected_data_type) - def publish_test_mqtt_message(self, topic, message): self.cluster.publish_test_mqtt_message(topic, message) diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index 9483b05801..4f06d8b7e4 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -27,7 +27,6 @@ from minifi.controllers.JsonTreeReader import JsonTreeReader from minifi.controllers.XMLReader import XMLReader from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter -from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService from minifi.controllers.XMLReader import XMLReader from behave import given, then, when @@ -1297,54 +1296,6 @@ def step_impl(context, parameter_context_name): container.set_parameter_context_name(parameter_context_name) -# Couchbase -@when(u'a Couchbase server is started') -def step_impl(context): - context.test.start_couchbase_server(context) - - -@given("a CouchbaseClusterService is setup up with the name \"{service_name}\"") -def step_impl(context, service_name): - couchbase_cluster_controller_service = CouchbaseClusterService( - name=service_name, - connection_string="couchbase://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server"))) - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") - container.add_controller(couchbase_cluster_controller_service) - - -@given("a CouchbaseClusterService is set up up with SSL connection with the name \"{service_name}\"") -def step_impl(context, service_name): - ssl_context_service = SSLContextService(name="SSLContextService", - ca_cert='/tmp/resources/root_ca.crt') - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") - container.add_controller(ssl_context_service) - couchbase_cluster_controller_service = CouchbaseClusterService( - name=service_name, - connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), - ssl_context_service=ssl_context_service) - container.add_controller(couchbase_cluster_controller_service) - - -@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase") -def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str): - context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type) - - -@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"") -def step_impl(context, service_name): - ssl_context_service = SSLContextService(name="SSLContextService", - cert='/tmp/resources/clientuser.crt', - key='/tmp/resources/clientuser.key', - ca_cert='/tmp/resources/root_ca.crt') - container = context.test.acquire_container(context=context, name="minifi-cpp-flow") - container.add_controller(ssl_context_service) - couchbase_cluster_controller_service = CouchbaseClusterService( - name=service_name, - connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")), - ssl_context_service=ssl_context_service) - container.add_controller(couchbase_cluster_controller_service) - - @given("a LlamaCpp model is present on the MiNiFi host") def step_impl(context): context.test.llama_model_is_downloaded_in_minifi() diff --git a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py b/docker/test/integration/minifi/controllers/CouchbaseClusterService.py deleted file mode 100644 index 94494fe175..0000000000 --- a/docker/test/integration/minifi/controllers/CouchbaseClusterService.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.ControllerService import ControllerService - - -class CouchbaseClusterService(ControllerService): - def __init__(self, name, connection_string, ssl_context_service=None): - super(CouchbaseClusterService, self).__init__(name=name) - - self.service_class = 'CouchbaseClusterService' - self.properties['Connection String'] = connection_string - if ssl_context_service: - self.linked_services.append(ssl_context_service) - if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties: - self.properties['User Name'] = "Administrator" - self.properties['User Password'] = "password123" diff --git a/docker/test/integration/minifi/processors/GetCouchbaseKey.py b/docker/test/integration/minifi/processors/GetCouchbaseKey.py deleted file mode 100644 index 0b48dcbdd2..0000000000 --- a/docker/test/integration/minifi/processors/GetCouchbaseKey.py +++ /dev/null @@ -1,24 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class GetCouchbaseKey(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(GetCouchbaseKey, self).__init__( - context=context, - clazz='GetCouchbaseKey', - auto_terminate=['success', 'failure', 'retry'], - schedule=schedule) diff --git a/docker/test/integration/features/couchbase.feature b/extensions/couchbase/tests/features/couchbase.feature similarity index 81% rename from docker/test/integration/features/couchbase.feature rename to extensions/couchbase/tests/features/couchbase.feature index ecf4ce8a1f..3464f0410a 100644 --- a/docker/test/integration/features/couchbase.feature +++ b/extensions/couchbase/tests/features/couchbase.feature @@ -15,26 +15,27 @@ @ENABLE_COUCHBASE Feature: Executing Couchbase operations from MiNiFi-C++ - Background: - Given the content of "/tmp/output" is monitored Scenario: A MiNiFi instance can insert json data to test bucket with PutCouchbaseKey processor Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "Json" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "retry" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds @@ -46,21 +47,24 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can insert binary data to test bucket with PutCouchbaseKey processor Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "retry" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up Then the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds @@ -72,16 +76,20 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -89,11 +97,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds @@ -101,18 +110,22 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using binary storage Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -120,11 +133,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds @@ -132,19 +146,23 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor and put the result in an attribute Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "String" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the GetCouchbaseKey processor is set to "String" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And the "Put Value to Attribute" property of the GetCouchbaseKey processor is set to "get_couchbase_result" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -152,11 +170,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds @@ -165,24 +184,27 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Scenario: GetCouchbaseKey transfers FlowFile to failure relationship on Couchbase value type mismatch Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the PutCouchbaseKey processor is set to "String" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Document Type" property of the GetCouchbaseKey processor is set to "Binary" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" - And a CouchbaseClusterService is setup up with the name "CouchbaseClusterService" + And a CouchbaseClusterService controller service is set up to communicate with the Couchbase server And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "retry" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey + And GetCouchbaseKey's failure relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 100 seconds @@ -190,16 +212,21 @@ Feature: Executing Couchbase operations from MiNiFi-C++ Given a GetFile processor with the "Input Directory" property set to "/tmp/input" And the "Keep Source File" property of the GetFile processor is set to "true" And the scheduling period of the GetFile processor is set to "20 seconds" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is set up up with SSL connection with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + + And a CouchbaseClusterService is set up using SSL connection And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -207,29 +234,33 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication - Given a MiNiFi CPP server with yaml config - And a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input' + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a directory at '/tmp/input' has a file with the content '{"field1": "value1", "field2": "value2"}' And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And PutCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService" And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket" + And GetCouchbaseKey is EVENT_DRIVEN And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id" And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN And a LogAttribute processor with the "FlowFiles To Log" property set to "0" - And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService" + And LogAttribute is EVENT_DRIVEN + And a CouchbaseClusterService is setup up using mTLS authentication And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey And the "failure" relationship of the PutCouchbaseKey processor is connected to the PutCouchbaseKey @@ -237,11 +268,12 @@ Feature: Executing Couchbase operations from MiNiFi-C++ And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile And the "success" relationship of the PutFile processor is connected to the LogAttribute + And LogAttribute's success relationship is auto-terminated When a Couchbase server is started - And all instances start up + And the MiNiFi instance starts up - Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 100 seconds + Then a file with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the '/tmp/output' directory in less than 100 seconds And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds diff --git a/docker/test/integration/minifi/processors/PutCouchbaseKey.py b/extensions/couchbase/tests/features/environment.py similarity index 53% rename from docker/test/integration/minifi/processors/PutCouchbaseKey.py rename to extensions/couchbase/tests/features/environment.py index 341338771b..01c4d4791b 100644 --- a/docker/test/integration/minifi/processors/PutCouchbaseKey.py +++ b/extensions/couchbase/tests/features/environment.py @@ -12,13 +12,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..core.Processor import Processor +from minifi_test_framework.containers.docker_image_builder import DockerImageBuilder +from minifi_test_framework.core.hooks import common_before_scenario +from minifi_test_framework.core.hooks import common_after_scenario -class PutCouchbaseKey(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PutCouchbaseKey, self).__init__( - context=context, - clazz='PutCouchbaseKey', - auto_terminate=['success', 'failure', 'retry'], - schedule=schedule) +def before_all(context): + dockerfile = """ + FROM python:3.13-slim-bookworm + RUN pip install couchbase==4.3.5 requests""" + builder = DockerImageBuilder( + image_tag="minifi-couchbase-helper:latest", + dockerfile_content=dockerfile + ) + builder.build() + + +def before_scenario(context, scenario): + common_before_scenario(context, scenario) + + +def after_scenario(context, scenario): + common_after_scenario(context, scenario) diff --git a/extensions/couchbase/tests/features/steps/couchbase_server_container.py b/extensions/couchbase/tests/features/steps/couchbase_server_container.py new file mode 100644 index 0000000000..96cddf821d --- /dev/null +++ b/extensions/couchbase/tests/features/steps/couchbase_server_container.py @@ -0,0 +1,165 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from OpenSSL import crypto +from minifi_test_framework.core.helpers import wait_for_condition, retry_check +from minifi_test_framework.core.ssl_utils import make_server_cert +from minifi_test_framework.containers.container import Container +from minifi_test_framework.containers.file import File +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from docker.errors import ContainerError + + +class CouchbaseServerContainer(Container): + def __init__(self, test_context: MinifiTestContext): + super().__init__("couchbase:enterprise-7.2.5", f"couchbase-server-{test_context.scenario_id}", test_context.network) + + couchbase_cert, couchbase_key = make_server_cert(self.container_name, test_context.root_ca_cert, test_context.root_ca_key) + + root_ca_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert) + self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt", root_ca_content, permissions=0o666)) + couchbase_cert_content = crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=couchbase_cert) + self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/chain.pem", couchbase_cert_content, permissions=0o666)) + couchbase_key_content = crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=couchbase_key) + self.files.append(File("/opt/couchbase/var/lib/couchbase/inbox/pkey.key", couchbase_key_content, permissions=0o666)) + + def deploy(self): + super().deploy() + finished_str = "logs available in" + assert wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=15, + bail_condition=lambda: self.exited, + context=None) + return self.run_post_startup_commands() + + def run_post_startup_commands(self): + commands = [ + ["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query", + "--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"], + ["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase", + "--bucket-ramsize", "1024", "--max-ttl", "36000"], + ["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123", + "--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"], + ["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''], + ['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json'] + ] + if not self._run_couchbase_cli_commands(commands): + return False + + if not self._load_couchbase_certs(): + return False + + return True + + @retry_check(max_tries=12, retry_interval=5) + def _run_couchbase_cli_command(self, command): + (code, output) = self.exec_run(command) + if code != 0: + logging.error(f"Failed to run command '{command}', returned error code: {code}, output: '{output}'") + return False + return True + + def _run_couchbase_cli_commands(self, commands): + return all(self._run_couchbase_cli_command(command) for command in commands) + + def _run_python_in_couchbase_helper_docker(self, command: str): + try: + self.client.containers.run("minifi-couchbase-helper:latest", ["python", "-c", command], remove=True, stdout=True, stderr=True, network=self.network.name) + return True + except ContainerError as e: + stdout = e.stdout.decode("utf-8", errors="replace") if e.stdout else "" + stderr = e.stderr.decode("utf-8", errors="replace") if e.stderr else "" + logging.error(f"Python command '{command}' failed in couchbase helper docker with error: '{e}', stdout: '{stdout}', stderr: '{stderr}'") + return False + except Exception as e: + logging.error(f"Unexpected error while running python command '{command}' in couchbase helper docker: '{e}'") + return False + + @retry_check(max_tries=15, retry_interval=2) + def _load_couchbase_certs(self): + python_command = f""" +import requests +import sys +from requests.auth import HTTPBasicAuth +response = requests.post(f"http://{self.container_name}:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123")) +if response.status_code != 200: + sys.exit(1) + +response = requests.post(f"http://{self.container_name}:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123")) +if response.status_code != 200: + sys.exit(1) +sys.exit(0) + """ + return self._run_python_in_couchbase_helper_docker(python_command) + + def is_data_present_in_couchbase(self, doc_id: str, bucket_name: str, expected_data: str, expected_data_type: str): + python_command = f""" +from couchbase.cluster import Cluster +from couchbase.options import ClusterOptions +from couchbase.auth import PasswordAuthenticator +from couchbase.transcoder import RawBinaryTranscoder, RawStringTranscoder +import json +import sys + +try: + cluster = Cluster("couchbase://{self.container_name}", ClusterOptions( + PasswordAuthenticator('Administrator', 'password123'))) + + bucket = cluster.bucket("{bucket_name}") + collection = bucket.default_collection() + + if "{expected_data_type}".lower() == "binary": + binary_flag = 0x03 << 24 + result = collection.get("{doc_id}", transcoder=RawBinaryTranscoder()) + flags = result.flags + if not flags & binary_flag: + print("Expected binary data for document '{doc_id}' but no binary flags were found.") + sys.exit(1) + + content = result.content_as[bytes] + if content.decode('utf-8') == '{expected_data}': + sys.exit(0) + + if "{expected_data_type}".lower() == "json": + json_flag = 0x02 << 24 + result = collection.get("{doc_id}") + flags = result.flags + if not flags & json_flag: + print("Expected JSON data for document '{doc_id}' but no JSON flags were found.") + sys.exit(1) + + content = result.content_as[dict] + if content == json.loads('{expected_data}'): + sys.exit(0) + if "{expected_data_type}".lower() == "string": + string_flag = 0x04 << 24 + result = collection.get("{doc_id}", transcoder=RawStringTranscoder()) + flags = result.flags + if not flags & string_flag: + print("Expected string data for document '{doc_id}' but no string flags were found.") + sys.exit(1) + + content = result.content_as[str] + if content == '{expected_data}': + sys.exit(0) + + sys.exit(1) +except Exception as e: + sys.exit(1) + """ + return self._run_python_in_couchbase_helper_docker(python_command) diff --git a/extensions/couchbase/tests/features/steps/steps.py b/extensions/couchbase/tests/features/steps/steps.py new file mode 100644 index 0000000000..f20cfda2e4 --- /dev/null +++ b/extensions/couchbase/tests/features/steps/steps.py @@ -0,0 +1,67 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from behave import step, then + +from minifi_test_framework.steps import checking_steps # noqa: F401 +from minifi_test_framework.steps import configuration_steps # noqa: F401 +from minifi_test_framework.steps import core_steps # noqa: F401 +from minifi_test_framework.steps import flow_building_steps # noqa: F401 +from minifi_test_framework.steps.flow_building_steps import add_ssl_context_service_for_minifi +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.core.helpers import log_due_to_failure +from minifi_test_framework.minifi.controller_service import ControllerService +from couchbase_server_container import CouchbaseServerContainer + + +@step("a Couchbase server is started") +def step_impl(context: MinifiTestContext): + context.containers["couchbase-server"] = CouchbaseServerContainer(context) + assert context.containers["couchbase-server"].deploy() + + +@step("a CouchbaseClusterService controller service is set up to communicate with the Couchbase server") +def step_impl(context: MinifiTestContext): + controller_service = ControllerService(class_name="CouchbaseClusterService", service_name="CouchbaseClusterService") + controller_service.add_property("Connection String", f"couchbase://couchbase-server-{context.scenario_id}") + controller_service.add_property("User Name", "Administrator") + controller_service.add_property("User Password", "password123") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) + + +@step("a CouchbaseClusterService is set up using SSL connection") +def step_impl(context): + ssl_context_service = ControllerService(class_name="SSLContextService", service_name="SSLContextService") + ssl_context_service.add_property("CA Certificate", "/tmp/resources/root_ca.crt") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(ssl_context_service) + couchbase_cluster_service = ControllerService(class_name="CouchbaseClusterService", service_name="CouchbaseClusterService") + couchbase_cluster_service.add_property("Connection String", f"couchbases://couchbase-server-{context.scenario_id}") + couchbase_cluster_service.add_property("User Name", "Administrator") + couchbase_cluster_service.add_property("User Password", "password123") + couchbase_cluster_service.add_property("Linked Services", "SSLContextService") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(couchbase_cluster_service) + + +@step("a CouchbaseClusterService is setup up using mTLS authentication") +def step_impl(context: MinifiTestContext): + add_ssl_context_service_for_minifi(context, "clientuser") + controller_service = ControllerService(class_name="CouchbaseClusterService", service_name="CouchbaseClusterService") + controller_service.add_property("Connection String", f"couchbases://couchbase-server-{context.scenario_id}") + controller_service.add_property("Linked Services", "SSLContextService") + context.get_or_create_default_minifi_container().flow_definition.controller_services.append(controller_service) + + +@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase") +def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str): + assert context.containers["couchbase-server"].is_data_present_in_couchbase(doc_id, bucket_name, data, data_type) or log_due_to_failure(context)