diff --git a/behave_framework/src/minifi_test_framework/containers/directory.py b/behave_framework/src/minifi_test_framework/containers/directory.py index ac07da32cf..50be5a4314 100644 --- a/behave_framework/src/minifi_test_framework/containers/directory.py +++ b/behave_framework/src/minifi_test_framework/containers/directory.py @@ -16,10 +16,12 @@ # class Directory: - def __init__(self, path): + def __init__(self, path, files: dict[str, str] | None = None, mode="rw"): self.path = path self.files: dict[str, str] = {} - self.mode = "rw" + if files is not None: + self.files = files + self.mode = mode def add_file(self, file_name: str, content: str): self.files[file_name] = content diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index 032b09e776..e2ac9df508 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -90,6 +90,15 @@ def step_impl(context: MinifiTestContext, property_name: str, processor_name: st processor.add_property(property_name, property_value) +@step('the "{property_name}" property of the {controller_name} controller service is set to "{property_value}"') +def step_impl(context: MinifiTestContext, property_name: str, controller_name: str, property_value: str): + controller_service = context.get_or_create_default_minifi_container().flow_definition.get_controller_service(controller_name) + if property_value == "(not set)": + controller_service.remove_property(property_name) + else: + controller_service.add_property(property_name, property_value) + + @step('a Funnel with the name "{funnel_name}" is set up') def step_impl(context: MinifiTestContext, funnel_name: str): context.get_or_create_default_minifi_container().flow_definition.add_funnel(Funnel(funnel_name)) diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh index a3b5c1973e..4a6e9b1820 100755 --- a/docker/RunBehaveTests.sh +++ b/docker/RunBehaveTests.sh @@ -203,4 +203,5 @@ exec \ "${docker_dir}/../extensions/kafka/tests/features" \ "${docker_dir}/../extensions/couchbase/tests/features" \ "${docker_dir}/../extensions/elasticsearch/tests/features" \ - "${docker_dir}/../extensions/splunk/tests/features" + "${docker_dir}/../extensions/splunk/tests/features" \ + "${docker_dir}/../extensions/gcp/tests/features" diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 873489999f..d7cb787c35 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -21,7 +21,6 @@ from .containers.KinesisServerContainer import KinesisServerContainer from .containers.S3ServerContainer import S3ServerContainer from .containers.AzureStorageServerContainer import AzureStorageServerContainer -from .containers.FakeGcsServerContainer import FakeGcsServerContainer from .containers.HttpProxyContainer import HttpProxyContainer from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer from .containers.MqttBrokerContainer import MqttBrokerContainer @@ -144,14 +143,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c network=self.network, image_store=self.image_store, command=command)) - elif engine == 'fake-gcs-server': - return self.containers.setdefault(container_name, - FakeGcsServerContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) elif engine == 'postgresql-server': return self.containers.setdefault(container_name, PostgreSQLServerContainer(feature_context=feature_context, diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index 6450a78e9d..8747084206 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -28,7 +28,6 @@ from .MinifiControllerExecutor import MinifiControllerExecutor from .checkers.AwsChecker import AwsChecker from .checkers.AzureChecker import AzureChecker -from .checkers.GcsChecker import GcsChecker from .checkers.PostgresChecker import PostgresChecker from .checkers.PrometheusChecker import PrometheusChecker from .checkers.GrafanaLokiChecker import GrafanaLokiChecker @@ -45,7 +44,6 @@ def __init__(self, context, feature_id): self.container_store = ContainerStore(self.container_communicator.create_docker_network(feature_id), context.image_store, context.kubernetes_proxy, feature_id=feature_id) self.aws_checker = AwsChecker(self.container_communicator) self.azure_checker = AzureChecker(self.container_communicator) - self.gcs_checker = GcsChecker(self.container_communicator) self.postgres_checker = PostgresChecker(self.container_communicator) self.prometheus_checker = PrometheusChecker() self.grafana_loki_checker = GrafanaLokiChecker() @@ -234,14 +232,6 @@ def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_s def check_azure_blob_storage_is_empty(self, timeout_seconds): return self.azure_checker.check_azure_blob_storage_is_empty(timeout_seconds) - def check_google_cloud_storage(self, gcs_container_name, content): - gcs_container_name = self.container_store.get_container_name_with_postfix(gcs_container_name) - return self.gcs_checker.check_google_cloud_storage(gcs_container_name, content) - - def is_gcs_bucket_empty(self, container_name): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.gcs_checker.is_gcs_bucket_empty(container_name) - def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds): postgresql_container_name = self.container_store.get_container_name_with_postfix(postgresql_container_name) return self.postgres_checker.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds) diff --git a/docker/test/integration/cluster/checkers/GcsChecker.py b/docker/test/integration/cluster/checkers/GcsChecker.py deleted file mode 100644 index 923c079366..0000000000 --- a/docker/test/integration/cluster/checkers/GcsChecker.py +++ /dev/null @@ -1,30 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from utils import retry_check - - -class GcsChecker: - def __init__(self, container_communicator): - self.container_communicator = container_communicator - - @retry_check() - def check_google_cloud_storage(self, gcs_container_name, content): - (code, _) = self.container_communicator.execute_command(gcs_container_name, ["grep", "-r", content, "/storage"]) - return code == 0 - - @retry_check() - def is_gcs_bucket_empty(self, container_name): - (code, output) = self.container_communicator.execute_command(container_name, ["ls", "/storage/test-bucket"]) - return code == 0 and output == "" diff --git a/docker/test/integration/cluster/containers/FakeGcsServerContainer.py b/docker/test/integration/cluster/containers/FakeGcsServerContainer.py deleted file mode 100644 index 96f8ab70f3..0000000000 --- a/docker/test/integration/cluster/containers/FakeGcsServerContainer.py +++ /dev/null @@ -1,40 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import os -from .Container import Container - - -class FakeGcsServerContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'fake-gcs-server', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "server started at http" - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running google cloud storage server docker container...') - self.client.containers.run( - "fsouza/fake-gcs-server:1.45.1", - detach=True, - name=self.name, - network=self.network.name, - entrypoint=self.command, - volumes=[os.environ['TEST_DIRECTORY'] + "/resources/fake-gcs-server-data:/data"], - command=f'-scheme http -host fake-gcs-server-{self.feature_context.id}') - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index eb15dded3e..9373c9b99e 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -297,12 +297,6 @@ def check_http_proxy_access(self, http_proxy_container_name, url): def check_azure_storage_server_data(self, azure_container_name, object_data): assert self.cluster.check_azure_storage_server_data(azure_container_name, object_data) or self.cluster.log_app_output() - def check_google_cloud_storage(self, gcs_container_name, content): - assert self.cluster.check_google_cloud_storage(gcs_container_name, content) or self.cluster.log_app_output() - - def check_empty_gcs_bucket(self, gcs_container_name): - assert self.cluster.is_gcs_bucket_empty(gcs_container_name) or self.cluster.log_app_output() - def check_minifi_log_contents(self, line, timeout_seconds=60, count=1): self.check_container_log_contents("minifi-cpp", line, timeout_seconds, count) diff --git a/docker/test/integration/features/google_cloud_storage.feature b/docker/test/integration/features/google_cloud_storage.feature deleted file mode 100644 index 2d0b49c86e..0000000000 --- a/docker/test/integration/features/google_cloud_storage.feature +++ /dev/null @@ -1,64 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -@ENABLE_GCP -Feature: Sending data to Google Cloud Storage using PutGCSObject - - Background: - Given the content of "/tmp/output" is monitored - - Scenario: A MiNiFi instance can upload data to Google Cloud storage - Given a GetFile processor with the "Input Directory" property set to "/tmp/input" - And a file with the content "hello_gcs" is present in "/tmp/input" - And a Google Cloud storage server is set up - And a PutGCSObject processor - And the PutGCSObject processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server - And a PutFile processor with the "Directory" property set to "/tmp/output" - And the "success" relationship of the GetFile processor is connected to the PutGCSObject - And the "success" relationship of the PutGCSObject processor is connected to the PutFile - And the "failure" relationship of the PutGCSObject processor is connected to the PutGCSObject - - When all instances start up - - Then a flowfile with the content "hello_gcs" is placed in the monitored directory in less than 45 seconds - And an object with the content "hello_gcs" is present in the Google Cloud storage - - Scenario: A MiNiFi instance can fetch the listed objects from Google Cloud storage bucket - Given a Google Cloud storage server is set up and a single object with contents "preloaded data" is present - And a ListGCSBucket processor - And a FetchGCSObject processor - And the ListGCSBucket and the FetchGCSObject processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server - And a PutFile processor with the "Directory" property set to "/tmp/output" - And the "success" relationship of the ListGCSBucket processor is connected to the FetchGCSObject - And the "success" relationship of the FetchGCSObject processor is connected to the PutFile - - When all instances start up - - Then a flowfile with the content "preloaded data" is placed in the monitored directory in less than 10 seconds - - - Scenario: A MiNiFi instance can delete the listed objects from Google Cloud storage bucket - Given a Google Cloud storage server is set up with some test data - And a ListGCSBucket processor - And a DeleteGCSObject processor - And the ListGCSBucket and the DeleteGCSObject processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server - And a PutFile processor with the "Directory" property set to "/tmp/output" - And the "success" relationship of the ListGCSBucket processor is connected to the DeleteGCSObject - And the "success" relationship of the DeleteGCSObject processor is connected to the PutFile - - When all instances start up - - Then the test bucket of Google Cloud Storage is empty - And at least one empty flowfile is placed in the monitored directory in less than 10 seconds diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index c0305746f0..f317294f20 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -18,7 +18,6 @@ from minifi.core.Funnel import Funnel from minifi.controllers.SSLContextService import SSLContextService -from minifi.controllers.GCPCredentialsControllerService import GCPCredentialsControllerService from minifi.controllers.ODBCService import ODBCService from minifi.controllers.KubernetesControllerService import KubernetesControllerService from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter @@ -525,14 +524,6 @@ def step_impl(context, protocol): context.test.acquire_container(context=context, name=client_name, engine=client_name) -# google cloud storage setup -@given("a Google Cloud storage server is set up") -@given("a Google Cloud storage server is set up with some test data") -@given('a Google Cloud storage server is set up and a single object with contents "preloaded data" is present') -def step_impl(context): - context.test.acquire_container(context=context, name="fake-gcs-server", engine="fake-gcs-server") - - def setUpSslContextServiceForProcessor(context, processor_name: str): minifi_crt_file = '/tmp/resources/minifi_client.crt' minifi_key_file = '/tmp/resources/minifi_client.key' @@ -560,31 +551,6 @@ def step_impl(context): context.test.acquire_container(context=context, name="tcp-client", engine="tcp-client") -@given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') -def step_impl(context, processor_one): - gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials") - p1 = context.test.get_node_by_name(processor_one) - p1.controller_services.append(gcp_controller_service) - p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name) - processor = context.test.get_node_by_name(processor_one) - processor.set_property("Endpoint Override URL", f"fake-gcs-server-{context.feature_id}:4443") - - -@given(u'the {processor_one} and the {processor_two} processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') -def step_impl(context, processor_one, processor_two): - gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials") - p1 = context.test.get_node_by_name(processor_one) - p2 = context.test.get_node_by_name(processor_two) - p1.controller_services.append(gcp_controller_service) - p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name) - p2.controller_services.append(gcp_controller_service) - p2.set_property("GCP Credentials Provider Service", gcp_controller_service.name) - processor_one = context.test.get_node_by_name(processor_one) - processor_one.set_property("Endpoint Override URL", f"fake-gcs-server-{context.feature_id}:4443") - processor_two = context.test.get_node_by_name(processor_two) - processor_two.set_property("Endpoint Override URL", f"fake-gcs-server-{context.feature_id}:4443") - - # SQL @given("an ODBCService is setup up for {processor_name} with the name \"{service_name}\"") def step_impl(context, processor_name, service_name): @@ -874,17 +840,6 @@ def step_impl(context, minifi_container_name, log_pattern, duration): context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, humanfriendly.parse_timespan(duration), count=1) -# Google Cloud Storage -@then('an object with the content \"{content}\" is present in the Google Cloud storage') -def step_imp(context, content): - context.test.check_google_cloud_storage("fake-gcs-server", content) - - -@then("the test bucket of Google Cloud Storage is empty") -def step_impl(context): - context.test.check_empty_gcs_bucket("fake-gcs-server") - - # Prometheus @given("a Prometheus server is set up") def step_impl(context): diff --git a/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py b/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py deleted file mode 100644 index 18428e6ec5..0000000000 --- a/docker/test/integration/minifi/controllers/GCPCredentialsControllerService.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.ControllerService import ControllerService - - -class GCPCredentialsControllerService(ControllerService): - def __init__(self, name=None, credentials_location=None, json_path=None, raw_json=None): - super(GCPCredentialsControllerService, self).__init__(name=name) - - self.service_class = 'GCPCredentialsControllerService' - - if credentials_location is not None: - self.properties['Credentials Location'] = credentials_location - - if json_path is not None: - self.properties['Service Account JSON File'] = json_path - - if raw_json is not None: - self.properties['Service Account JSON'] = raw_json diff --git a/docker/test/integration/minifi/processors/DeleteGCSObject.py b/docker/test/integration/minifi/processors/DeleteGCSObject.py deleted file mode 100644 index b39e77bff4..0000000000 --- a/docker/test/integration/minifi/processors/DeleteGCSObject.py +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class DeleteGCSObject(Processor): - def __init__(self, context): - super(DeleteGCSObject, self).__init__( - context=context, - clazz='DeleteGCSObject', - properties={ - 'Bucket': 'test-bucket', - 'Endpoint Override URL': f'fake-gcs-server-{context.feature_id}:4443', - 'Number of retries': 2 - }, - auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/minifi/processors/FetchGCSObject.py b/docker/test/integration/minifi/processors/FetchGCSObject.py deleted file mode 100644 index 31583cf7c4..0000000000 --- a/docker/test/integration/minifi/processors/FetchGCSObject.py +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class FetchGCSObject(Processor): - def __init__(self, context): - super(FetchGCSObject, self).__init__( - context=context, - clazz='FetchGCSObject', - properties={ - 'Bucket': 'test-bucket', - 'Endpoint Override URL': f'fake-gcs-server-{context.feature_id}:4443', - 'Number of retries': 2 - }, - auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/minifi/processors/PutGCSObject.py b/docker/test/integration/minifi/processors/PutGCSObject.py deleted file mode 100644 index 05a7fc6dfd..0000000000 --- a/docker/test/integration/minifi/processors/PutGCSObject.py +++ /dev/null @@ -1,28 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from ..core.Processor import Processor - - -class PutGCSObject(Processor): - def __init__(self, context): - super(PutGCSObject, self).__init__( - context=context, - clazz='PutGCSObject', - properties={ - 'Bucket': 'test-bucket', - 'Endpoint Override URL': f'fake-gcs-server-{context.feature_id}:4443', - 'Number of retries': 2 - }, - auto_terminate=["success", "failure"]) diff --git a/docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file b/docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file deleted file mode 100644 index 1aee3ef7f2..0000000000 --- a/docker/test/integration/resources/fake-gcs-server-data/test-bucket/test-file +++ /dev/null @@ -1 +0,0 @@ -preloaded data diff --git a/docker/test/integration/minifi/processors/ListGCSBucket.py b/extensions/gcp/tests/features/environment.py similarity index 62% rename from docker/test/integration/minifi/processors/ListGCSBucket.py rename to extensions/gcp/tests/features/environment.py index f6fa67447b..ab35b52a56 100644 --- a/docker/test/integration/minifi/processors/ListGCSBucket.py +++ b/extensions/gcp/tests/features/environment.py @@ -12,17 +12,13 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from ..core.Processor import Processor +from minifi_test_framework.core.hooks import common_before_scenario +from minifi_test_framework.core.hooks import common_after_scenario -class ListGCSBucket(Processor): - def __init__(self, context): - super(ListGCSBucket, self).__init__( - context=context, - clazz='ListGCSBucket', - properties={ - 'Bucket': 'test-bucket', - 'Endpoint Override URL': f'fake-gcs-server-{context.feature_id}:4443', - 'Number of retries': 2 - }, - auto_terminate=["success"]) +def before_scenario(context, scenario): + common_before_scenario(context, scenario) + + +def after_scenario(context, scenario): + common_after_scenario(context, scenario) diff --git a/extensions/gcp/tests/features/google_cloud_storage.feature b/extensions/gcp/tests/features/google_cloud_storage.feature new file mode 100644 index 0000000000..2c19abcf33 --- /dev/null +++ b/extensions/gcp/tests/features/google_cloud_storage.feature @@ -0,0 +1,91 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +@ENABLE_GCP +Feature: Sending data to Google Cloud Storage using PutGCSObject + + Scenario: A MiNiFi instance can upload data to Google Cloud storage + Given a GetFile processor with the "Input Directory" property set to "/tmp/input" + And a file with the content "hello_gcs" is present in "/tmp/input" + And a Google Cloud storage server is set up + And a PutGCSObject processor + And PutGCSObject is EVENT_DRIVEN + And a GCPCredentialsControllerService controller service is set up + And the "Credentials Location" property of the GCPCredentialsControllerService controller service is set to "Use Anonymous credentials" + And the "GCP Credentials Provider Service" property of the PutGCSObject processor is set to "GCPCredentialsControllerService" + And the "Bucket" property of the PutGCSObject processor is set to "test-bucket" + And the "Number of retries" property of the PutGCSObject processor is set to "2" + And the "Endpoint Override URL" property of the PutGCSObject processor is set to "fake-gcs-server-${scenario_id}:4443" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN + And the "success" relationship of the GetFile processor is connected to the PutGCSObject + And the "success" relationship of the PutGCSObject processor is connected to the PutFile + And the "failure" relationship of the PutGCSObject processor is connected to the PutGCSObject + And PutFile's success relationship is auto-terminated + + When the MiNiFi instance starts up + + Then a single file with the content "hello_gcs" is placed in the "/tmp/output" directory in less than 45 seconds + And an object with the content "hello_gcs" is present in the Google Cloud storage + + Scenario: A MiNiFi instance can fetch the listed objects from Google Cloud storage bucket + Given a Google Cloud storage server is set up and a single object with contents "preloaded data" is present + And a GCPCredentialsControllerService controller service is set up + And the "Credentials Location" property of the GCPCredentialsControllerService controller service is set to "Use Anonymous credentials" + And a ListGCSBucket processor + And the "Bucket" property of the ListGCSBucket processor is set to "test-bucket" + And the "Number of retries" property of the ListGCSBucket processor is set to "2" + And the "Endpoint Override URL" property of the ListGCSBucket processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the ListGCSBucket processor is set to "GCPCredentialsControllerService" + And a FetchGCSObject processor + And FetchGCSObject is EVENT_DRIVEN + And the "Bucket" property of the FetchGCSObject processor is set to "test-bucket" + And the "Number of retries" property of the FetchGCSObject processor is set to "2" + And the "Endpoint Override URL" property of the FetchGCSObject processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the FetchGCSObject processor is set to "GCPCredentialsControllerService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN + And the "success" relationship of the ListGCSBucket processor is connected to the FetchGCSObject + And the "success" relationship of the FetchGCSObject processor is connected to the PutFile + And PutFile's success relationship is auto-terminated + + When the MiNiFi instance starts up + + Then a single file with the content "preloaded data" is placed in the "/tmp/output" directory in less than 10 seconds + + Scenario: A MiNiFi instance can delete the listed objects from Google Cloud storage bucket + Given a Google Cloud storage server is set up with some test data + And a GCPCredentialsControllerService controller service is set up + And the "Credentials Location" property of the GCPCredentialsControllerService controller service is set to "Use Anonymous credentials" + And a ListGCSBucket processor + And the "Bucket" property of the ListGCSBucket processor is set to "test-bucket" + And the "Number of retries" property of the ListGCSBucket processor is set to "2" + And the "Endpoint Override URL" property of the ListGCSBucket processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the ListGCSBucket processor is set to "GCPCredentialsControllerService" + And a DeleteGCSObject processor + And DeleteGCSObject is EVENT_DRIVEN + And the "Bucket" property of the DeleteGCSObject processor is set to "test-bucket" + And the "Number of retries" property of the DeleteGCSObject processor is set to "2" + And the "Endpoint Override URL" property of the DeleteGCSObject processor is set to "fake-gcs-server-${scenario_id}:4443" + And the "GCP Credentials Provider Service" property of the DeleteGCSObject processor is set to "GCPCredentialsControllerService" + And a PutFile processor with the "Directory" property set to "/tmp/output" + And the "success" relationship of the ListGCSBucket processor is connected to the DeleteGCSObject + And the "success" relationship of the DeleteGCSObject processor is connected to the PutFile + And PutFile's success relationship is auto-terminated + + When the MiNiFi instance starts up + + Then the test bucket of Google Cloud Storage is empty + And at least one empty file is placed in the "/tmp/output" directory in less than 10 seconds diff --git a/extensions/gcp/tests/features/steps/fake_gcs_server_container.py b/extensions/gcp/tests/features/steps/fake_gcs_server_container.py new file mode 100644 index 0000000000..132931878c --- /dev/null +++ b/extensions/gcp/tests/features/steps/fake_gcs_server_container.py @@ -0,0 +1,48 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from minifi_test_framework.core.helpers import wait_for_condition, retry_check +from minifi_test_framework.containers.container import Container +from minifi_test_framework.containers.directory import Directory +from minifi_test_framework.core.minifi_test_context import MinifiTestContext + + +class FakeGcsServerContainer(Container): + def __init__(self, test_context: MinifiTestContext): + super().__init__("fsouza/fake-gcs-server:1.45.1", f"fake-gcs-server-{test_context.scenario_id}", test_context.network, + command=f'-scheme http -host fake-gcs-server-{test_context.scenario_id}') + self.dirs.append(Directory(path="/data/test-bucket", files={"test-file": "preloaded data\n"})) + + def deploy(self): + super().deploy() + finished_str = "server started at http" + return wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=30, + bail_condition=lambda: self.exited, + context=None) + + @retry_check() + def check_google_cloud_storage(self, content): + (code, output) = self.exec_run(["grep", "-r", content, "/storage"]) + logging.info(f"GCS storage contents matching '{content}': {output}") + return code == 0 + + @retry_check() + def is_gcs_bucket_empty(self): + (code, output) = self.exec_run(["ls", "/storage/test-bucket"]) + logging.info(f"GCS bucket contents: {output}") + return code == 0 and output == "" diff --git a/extensions/gcp/tests/features/steps/steps.py b/extensions/gcp/tests/features/steps/steps.py new file mode 100644 index 0000000000..639434d9d7 --- /dev/null +++ b/extensions/gcp/tests/features/steps/steps.py @@ -0,0 +1,41 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from behave import step, then + +from minifi_test_framework.steps import checking_steps # noqa: F401 +from minifi_test_framework.steps import configuration_steps # noqa: F401 +from minifi_test_framework.steps import core_steps # noqa: F401 +from minifi_test_framework.steps import flow_building_steps # noqa: F401 +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.core.helpers import log_due_to_failure +from fake_gcs_server_container import FakeGcsServerContainer + + +@step("a Google Cloud storage server is set up") +@step("a Google Cloud storage server is set up with some test data") +@step('a Google Cloud storage server is set up and a single object with contents "preloaded data" is present') +def step_impl(context: MinifiTestContext): + context.containers["fake-gcs-server"] = FakeGcsServerContainer(context) + assert context.containers["fake-gcs-server"].deploy() + + +@then('an object with the content \"{content}\" is present in the Google Cloud storage') +def step_imp(context: MinifiTestContext, content: str): + assert context.containers["fake-gcs-server"].check_google_cloud_storage(content) or log_due_to_failure(context) + + +@then("the test bucket of Google Cloud Storage is empty") +def step_impl(context: MinifiTestContext): + assert context.containers["fake-gcs-server"].is_gcs_bucket_empty() or log_due_to_failure(context)