diff --git a/behave_framework/src/minifi_test_framework/containers/minifi_container.py b/behave_framework/src/minifi_test_framework/containers/minifi_container.py index 55d7309573..4d1a1a5b31 100644 --- a/behave_framework/src/minifi_test_framework/containers/minifi_container.py +++ b/behave_framework/src/minifi_test_framework/containers/minifi_container.py @@ -28,7 +28,6 @@ class MinifiContainer(Container): def __init__(self, container_name: str, test_context: MinifiTestContext): super().__init__(test_context.minifi_container_image, f"{container_name}-{test_context.scenario_id}", test_context.network) - self.flow_config_str: str = "" self.flow_definition = FlowDefinition() self.properties: dict[str, str] = {} self.log_properties: dict[str, str] = {} diff --git a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py index c616ebf094..e53e8d0e70 100644 --- a/behave_framework/src/minifi_test_framework/minifi/flow_definition.py +++ b/behave_framework/src/minifi_test_framework/minifi/flow_definition.py @@ -39,6 +39,9 @@ def add_processor(self, processor: Processor): def get_processor(self, processor_name: str) -> Processor | None: return next((proc for proc in self.processors if proc.name == processor_name), None) + def get_controller_service(self, controller_service_name: str) -> ControllerService | None: + return next((controller for controller in self.controller_services if controller.name == controller_service_name), None) + def get_parameter_context(self, parameter_context_name: str) -> ParameterContext | None: return next((parameter_context for parameter_context in self.parameter_contexts if parameter_context.name == parameter_context_name), None) diff --git a/behave_framework/src/minifi_test_framework/minifi/processor.py b/behave_framework/src/minifi_test_framework/minifi/processor.py index fabbc733f1..d91bedf289 100644 --- a/behave_framework/src/minifi_test_framework/minifi/processor.py +++ b/behave_framework/src/minifi_test_framework/minifi/processor.py @@ -20,12 +20,13 @@ class Processor: def __init__(self, class_name: str, proc_name: str, scheduling_strategy: str = "TIMER_DRIVEN", - scheduling_period: str = "1 sec", ): + scheduling_period: str = "1 sec", penalization_period: str = "1 sec"): self.class_name = class_name self.id: str = str(uuid.uuid4()) self.name = proc_name self.scheduling_strategy: str = scheduling_strategy self.scheduling_period: str = scheduling_period + self.penalization_period: str = penalization_period self.properties: dict[str, str] = {} self.auto_terminated_relationships: list[str] = [] @@ -46,6 +47,7 @@ def to_yaml_dict(self) -> dict: 'class': self.class_name, 'scheduling strategy': self.scheduling_strategy, 'scheduling period': self.scheduling_period, + 'penalization period': self.penalization_period, } if self.auto_terminated_relationships: data['auto-terminated relationships list'] = self.auto_terminated_relationships diff --git a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py index 11e19d1041..032b09e776 100644 --- a/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py +++ b/behave_framework/src/minifi_test_framework/steps/flow_building_steps.py @@ -16,6 +16,7 @@ # import logging +import uuid from behave import given, step from minifi_test_framework.containers.directory import Directory @@ -246,8 +247,18 @@ def step_impl(context: MinifiTestContext, property_name: str, processor_name: st processor.add_property(property_name, filtering) +@given("the \"{property_name}\" properties of the {processor_name_one} and {processor_name_two} processors are set to the same random UUID") +def step_impl(context, property_name, processor_name_one, processor_name_two): + uuid_str = str(uuid.uuid4()) + context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name_one).add_property(property_name, uuid_str) + context.get_or_create_default_minifi_container().flow_definition.get_processor(processor_name_two).add_property(property_name, uuid_str) + + # TLS def add_ssl_context_service_for_minifi(context: MinifiTestContext, cert_name: str): + ssl_context_service = context.get_or_create_default_minifi_container().flow_definition.get_controller_service("SSLContextService") + if ssl_context_service is not None: + return controller_service = ControllerService(class_name="SSLContextService", service_name="SSLContextService") controller_service.add_property("Client Certificate", f"/tmp/resources/{cert_name}.crt") controller_service.add_property("Private Key", f"/tmp/resources/{cert_name}.key") diff --git a/docker/RunBehaveTests.sh b/docker/RunBehaveTests.sh index aaf80a82c1..a3b5c1973e 100755 --- a/docker/RunBehaveTests.sh +++ b/docker/RunBehaveTests.sh @@ -202,4 +202,5 @@ exec \ "${docker_dir}/../extensions/opc/tests/features" \ "${docker_dir}/../extensions/kafka/tests/features" \ "${docker_dir}/../extensions/couchbase/tests/features" \ - "${docker_dir}/../extensions/elasticsearch/tests/features" + "${docker_dir}/../extensions/elasticsearch/tests/features" \ + "${docker_dir}/../extensions/splunk/tests/features" diff --git a/docker/test/integration/cluster/ContainerStore.py b/docker/test/integration/cluster/ContainerStore.py index 7c1c0d6c08..873489999f 100644 --- a/docker/test/integration/cluster/ContainerStore.py +++ b/docker/test/integration/cluster/ContainerStore.py @@ -25,7 +25,6 @@ from .containers.HttpProxyContainer import HttpProxyContainer from .containers.PostgreSQLServerContainer import PostgreSQLServerContainer from .containers.MqttBrokerContainer import MqttBrokerContainer -from .containers.SplunkContainer import SplunkContainer from .containers.SyslogUdpClientContainer import SyslogUdpClientContainer from .containers.SyslogTcpClientContainer import SyslogTcpClientContainer from .containers.MinifiAsPodInKubernetesCluster import MinifiAsPodInKubernetesCluster @@ -169,14 +168,6 @@ def acquire_container(self, context, container_name: str, engine='minifi-cpp', c network=self.network, image_store=self.image_store, command=command)) - elif engine == 'splunk': - return self.containers.setdefault(container_name, - SplunkContainer(feature_context=feature_context, - name=container_name, - vols=self.vols, - network=self.network, - image_store=self.image_store, - command=command)) elif engine == "syslog-udp-client": return self.containers.setdefault(container_name, SyslogUdpClientContainer( diff --git a/docker/test/integration/cluster/DockerTestCluster.py b/docker/test/integration/cluster/DockerTestCluster.py index df94c893bb..6450a78e9d 100644 --- a/docker/test/integration/cluster/DockerTestCluster.py +++ b/docker/test/integration/cluster/DockerTestCluster.py @@ -31,7 +31,6 @@ from .checkers.GcsChecker import GcsChecker from .checkers.PostgresChecker import PostgresChecker from .checkers.PrometheusChecker import PrometheusChecker -from .checkers.SplunkChecker import SplunkChecker from .checkers.GrafanaLokiChecker import GrafanaLokiChecker from .checkers.ModbusChecker import ModbusChecker from .checkers.MqttHelper import MqttHelper @@ -48,7 +47,6 @@ def __init__(self, context, feature_id): self.azure_checker = AzureChecker(self.container_communicator) self.gcs_checker = GcsChecker(self.container_communicator) self.postgres_checker = PostgresChecker(self.container_communicator) - self.splunk_checker = SplunkChecker(self.container_communicator) self.prometheus_checker = PrometheusChecker() self.grafana_loki_checker = GrafanaLokiChecker() self.minifi_controller_executor = MinifiControllerExecutor(self.container_communicator) @@ -236,22 +234,6 @@ def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_s def check_azure_blob_storage_is_empty(self, timeout_seconds): return self.azure_checker.check_azure_blob_storage_is_empty(timeout_seconds) - def check_splunk_event(self, container_name, query): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.splunk_checker.check_splunk_event(container_name, query) - - def check_splunk_event_with_attributes(self, container_name, query, attributes): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.splunk_checker.check_splunk_event_with_attributes(container_name, query, attributes) - - def enable_splunk_hec_indexer(self, container_name, hec_name): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.splunk_checker.enable_splunk_hec_indexer(container_name, hec_name) - - def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem): - container_name = self.container_store.get_container_name_with_postfix(container_name) - return self.splunk_checker.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem) - def check_google_cloud_storage(self, gcs_container_name, content): gcs_container_name = self.container_store.get_container_name_with_postfix(gcs_container_name) return self.gcs_checker.check_google_cloud_storage(gcs_container_name, content) diff --git a/docker/test/integration/cluster/ImageStore.py b/docker/test/integration/cluster/ImageStore.py index 88d06cff96..4e52f7deb8 100644 --- a/docker/test/integration/cluster/ImageStore.py +++ b/docker/test/integration/cluster/ImageStore.py @@ -65,8 +65,6 @@ def get_image(self, container_engine): image = self.__build_postgresql_server_image() elif container_engine == "mqtt-broker": image = self.__build_mqtt_broker_image() - elif container_engine == "splunk": - image = self.__build_splunk_image() elif container_engine == "kinesis-server": image = self.__build_kinesis_image() elif container_engine == "reverse-proxy": @@ -295,9 +293,6 @@ def __build_mqtt_broker_image(self): def __build_kinesis_image(self): return self.__build_image_by_path(self.test_dir + "/resources/kinesis-mock", 'kinesis-server') - def __build_splunk_image(self): - return self.__build_image_by_path(self.test_dir + "/resources/splunk-hec", 'minifi-splunk') - def __build_reverse_proxy_image(self): return self.__build_image_by_path(self.test_dir + "/resources/reverse-proxy", 'reverse-proxy') diff --git a/docker/test/integration/cluster/checkers/SplunkChecker.py b/docker/test/integration/cluster/checkers/SplunkChecker.py deleted file mode 100644 index 8cc2e27587..0000000000 --- a/docker/test/integration/cluster/checkers/SplunkChecker.py +++ /dev/null @@ -1,80 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import json -from utils import retry_check - - -class SplunkChecker: - def __init__(self, container_communicator): - self.container_communicator = container_communicator - - @retry_check() - def check_splunk_event(self, container_name, query): - (code, output) = self.container_communicator.execute_command(container_name, ["sudo", "/opt/splunk/bin/splunk", "search", query, "-auth", "admin:splunkadmin"]) - if code != 0: - return False - return query in output.decode("utf-8") - - @retry_check() - def check_splunk_event_with_attributes(self, container_name, query, attributes): - (code, output) = self.container_communicator.execute_command(container_name, ["sudo", "/opt/splunk/bin/splunk", "search", query, "-output", "json", "-auth", "admin:splunkadmin"]) - if code != 0: - return False - result_lines = output.splitlines() - for result_line in result_lines: - try: - result_line_json = json.loads(result_line) - except json.decoder.JSONDecodeError: - continue - if "result" not in result_line_json: - continue - if "host" in attributes: - if result_line_json["result"]["host"] != attributes["host"]: - continue - if "source" in attributes: - if result_line_json["result"]["source"] != attributes["source"]: - continue - if "sourcetype" in attributes: - if result_line_json["result"]["sourcetype"] != attributes["sourcetype"]: - continue - if "index" in attributes: - if result_line_json["result"]["index"] != attributes["index"]: - continue - return True - return False - - def enable_splunk_hec_indexer(self, container_name, hec_name): - (code, _) = self.container_communicator.execute_command(container_name, ["sudo", - "/opt/splunk/bin/splunk", "http-event-collector", - "update", hec_name, - "-uri", "https://localhost:8089", - "-use-ack", "1", - "-disabled", "0", - "-auth", "admin:splunkadmin"]) - return code == 0 - - def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem): - assert self.container_communicator.write_content_to_container(splunk_cert_pem.decode() + splunk_key_pem.decode() + root_ca_cert_pem.decode(), container_name, '/opt/splunk/etc/auth/splunk_cert.pem') - assert self.container_communicator.write_content_to_container(root_ca_cert_pem.decode(), container_name, '/opt/splunk/etc/auth/root_ca.pem') - (code, _) = self.container_communicator.execute_command(container_name, ["sudo", - "/opt/splunk/bin/splunk", "http-event-collector", - "update", - "-uri", "https://localhost:8089", - "-enable-ssl", "1", - "-server-cert", "/opt/splunk/etc/auth/splunk_cert.pem", - "-ca-cert-file", "/opt/splunk/etc/auth/root_ca.pem", - "-require-client-cert", "1", - "-auth", "admin:splunkadmin"]) - return code == 0 diff --git a/docker/test/integration/cluster/containers/SplunkContainer.py b/docker/test/integration/cluster/containers/SplunkContainer.py deleted file mode 100644 index 9a57342765..0000000000 --- a/docker/test/integration/cluster/containers/SplunkContainer.py +++ /dev/null @@ -1,44 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -from .Container import Container - - -class SplunkContainer(Container): - def __init__(self, feature_context, name, vols, network, image_store, command=None): - super().__init__(feature_context, name, 'splunk', vols, network, image_store, command) - - def get_startup_finished_log_entry(self): - return "Ansible playbook complete, will begin streaming splunkd_stderr.log" - - def deploy(self): - if not self.set_deployed(): - return - - logging.info('Creating and running Splunk docker container...') - self.client.containers.run( - self.image_store.get_image(self.get_engine()), - detach=True, - name=self.name, - network=self.network.name, - environment=[ - "SPLUNK_LICENSE_URI=Free", - "SPLUNK_START_ARGS=--accept-license", - "SPLUNK_PASSWORD=splunkadmin" - ], - entrypoint=self.command) - logging.info('Added container \'%s\'', self.name) diff --git a/docker/test/integration/features/MiNiFi_integration_test_driver.py b/docker/test/integration/features/MiNiFi_integration_test_driver.py index 7a42e23202..eb15dded3e 100644 --- a/docker/test/integration/features/MiNiFi_integration_test_driver.py +++ b/docker/test/integration/features/MiNiFi_integration_test_driver.py @@ -64,12 +64,6 @@ def acquire_container(self, context, name, engine='minifi-cpp', command=None): def acquire_transient_minifi(self, context, name, engine='minifi-cpp'): return self.cluster.acquire_transient_minifi(context=context, name=name, engine=engine) - def start_splunk(self, context): - self.cluster.acquire_container(context=context, name='splunk', engine='splunk') - self.cluster.deploy_container(name='splunk') - assert self.cluster.wait_for_container_startup_to_finish('splunk') or self.cluster.log_app_output() - assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token') or self.cluster.log_app_output() - def start_minifi_c2_server(self, context): self.cluster.acquire_container(context=context, name="minifi-c2-server", engine="minifi-c2-server") self.cluster.deploy_container('minifi-c2-server') @@ -303,12 +297,6 @@ def check_http_proxy_access(self, http_proxy_container_name, url): def check_azure_storage_server_data(self, azure_container_name, object_data): assert self.cluster.check_azure_storage_server_data(azure_container_name, object_data) or self.cluster.log_app_output() - def check_splunk_event(self, splunk_container_name, query): - assert self.cluster.check_splunk_event(splunk_container_name, query) or self.cluster.log_app_output() - - def check_splunk_event_with_attributes(self, splunk_container_name, query, attributes): - assert self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, attributes) or self.cluster.log_app_output() - def check_google_cloud_storage(self, gcs_container_name, content): assert self.cluster.check_google_cloud_storage(gcs_container_name, content) or self.cluster.log_app_output() @@ -384,9 +372,6 @@ def enable_prometheus_in_minifi(self): def enable_prometheus_with_ssl_in_minifi(self): self.cluster.enable_prometheus_with_ssl_in_minifi() - def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem): - self.cluster.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem) - def enable_sql_in_minifi(self): self.cluster.enable_sql_in_minifi() diff --git a/docker/test/integration/features/steps/steps.py b/docker/test/integration/features/steps/steps.py index e57e4e9876..c0305746f0 100644 --- a/docker/test/integration/features/steps/steps.py +++ b/docker/test/integration/features/steps/steps.py @@ -15,7 +15,6 @@ from filesystem_validation.FileSystemObserver import FileSystemObserver from minifi.core.RemoteProcessGroup import RemoteProcessGroup -from ssl_utils.SSL_cert_utils import make_server_cert from minifi.core.Funnel import Funnel from minifi.controllers.SSLContextService import SSLContextService @@ -36,7 +35,6 @@ import time import uuid import humanfriendly -import OpenSSL.crypto import os @@ -127,7 +125,6 @@ def step_impl(context, processor_type, minifi_container_name): @given("a {processor_type} processor set up to communicate with the same s3 server") @given("a {processor_type} processor set up to communicate with an Azure blob storage") @given("a {processor_type} processor set up to communicate with an MQTT broker instance") -@given("a {processor_type} processor set up to communicate with the Splunk HEC instance") @given("a {processor_type} processor set up to communicate with the kinesis server") def step_impl(context, processor_type): __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow") @@ -557,35 +554,12 @@ def setUpSslContextServiceForRPG(context, rpg_name: str): rpg.add_property("SSL Context Service", ssl_context_service.name) -# splunk hec -@given("a Splunk HEC is set up and running") -def step_impl(context): - context.test.start_splunk(context) - - # TCP client @given('a TCP client is set up to send a test TCP message to minifi') def step_impl(context): context.test.acquire_container(context=context, name="tcp-client", engine="tcp-client") -@given("SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus") -def step_impl(context): - minifi_crt_file = '/tmp/resources/minifi_client.crt' - minifi_key_file = '/tmp/resources/minifi_client.key' - root_ca_crt_file = '/tmp/resources/root_ca.crt' - ssl_context_service = SSLContextService(name='SSLContextService', cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) - - splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.root_ca_cert, context.root_ca_key) - put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP") - put_splunk_http.controller_services.append(ssl_context_service) - put_splunk_http.set_property("SSL Context Service", ssl_context_service.name) - query_splunk_indexing_status = context.test.get_node_by_name("QuerySplunkIndexingStatus") - query_splunk_indexing_status.controller_services.append(ssl_context_service) - query_splunk_indexing_status.set_property("SSL Context Service", ssl_context_service.name) - context.test.enable_splunk_hec_ssl('splunk', OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, splunk_cert), OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, splunk_key), OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, context.root_ca_cert)) - - @given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') def step_impl(context, processor_one): gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials") @@ -911,18 +885,6 @@ def step_impl(context): context.test.check_empty_gcs_bucket("fake-gcs-server") -# Splunk -@then('an event is registered in Splunk HEC with the content \"{content}\"') -def step_imp(context, content): - context.test.check_splunk_event("splunk", content) - - -@then('an event is registered in Splunk HEC with the content \"{content}\" with \"{source}\" set as source and \"{source_type}\" set as sourcetype and \"{host}\" set as host') -def step_imp(context, content, source, source_type, host): - attr = {"source": source, "sourcetype": source_type, "host": host} - context.test.check_splunk_event_with_attributes("splunk", content, attr) - - # Prometheus @given("a Prometheus server is set up") def step_impl(context): diff --git a/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py b/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py deleted file mode 100644 index e4638cec89..0000000000 --- a/docker/test/integration/minifi/processors/QuerySplunkIndexingStatus.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ..core.Processor import Processor - - -class QuerySplunkIndexingStatus(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN', 'penalization period': '1 sec'}): - super(QuerySplunkIndexingStatus, self).__init__( - context=context, - clazz='QuerySplunkIndexingStatus', - properties={ - 'Hostname': 'splunk', - 'Port': '8088', - 'Token': 'Splunk 176fae97-f59d-4f08-939a-aa6a543f2485' # Token of the default splunk_hec_token HTTP Event Collector in the Splunk container image - }, - auto_terminate=['acknowledged', 'unacknowledged', 'undetermined', 'failure'], - schedule=schedule) diff --git a/docker/test/integration/resources/splunk-hec/Dockerfile b/docker/test/integration/resources/splunk-hec/Dockerfile deleted file mode 100644 index a56160096c..0000000000 --- a/docker/test/integration/resources/splunk-hec/Dockerfile +++ /dev/null @@ -1,2 +0,0 @@ -FROM splunk/splunk:9.2.1-patch2 -ADD conf/default.yml /tmp/defaults/default.yml diff --git a/docker/test/integration/resources/splunk-hec/conf/default.yml b/docker/test/integration/resources/splunk-hec/conf/default.yml deleted file mode 100644 index a59ce2ac5c..0000000000 --- a/docker/test/integration/resources/splunk-hec/conf/default.yml +++ /dev/null @@ -1,6 +0,0 @@ -splunk: - hec: - enable: True - ssl: False - port: 8088 - token: 176fae97-f59d-4f08-939a-aa6a543f2485 diff --git a/extensions/splunk/PutSplunkHTTP.cpp b/extensions/splunk/PutSplunkHTTP.cpp index 953966e4e8..3e59934ff2 100644 --- a/extensions/splunk/PutSplunkHTTP.cpp +++ b/extensions/splunk/PutSplunkHTTP.cpp @@ -66,31 +66,52 @@ std::optional getContentType(core::ProcessContext& context, const c return context.getProperty(PutSplunkHTTP::ContentType) | utils::toOptional() | utils::orElse ([&flow_file] {return flow_file.getAttribute("mime.type");}); } -bool setAttributesFromClientResponse(core::FlowFile& flow_file, http::HTTPClient& client) { +bool setAttributesFromClientResponse(core::FlowFile& flow_file, http::HTTPClient& client, const std::shared_ptr& logger) { rapidjson::Document response_json; rapidjson::ParseResult parse_result = response_json.Parse(client.getResponseBody().data()); bool result = true; - if (parse_result.IsError()) + if (parse_result.IsError()) { + logger->log_error("Failed to parse Splunk HEC response JSON"); return false; + } - if (response_json.HasMember("code") && response_json["code"].IsInt()) - flow_file.setAttribute(SPLUNK_RESPONSE_CODE, std::to_string(response_json["code"].GetInt())); - else + if (response_json.HasMember("code") && response_json["code"].IsInt()) { + auto code = response_json["code"].GetInt(); + flow_file.setAttribute(SPLUNK_RESPONSE_CODE, std::to_string(code)); + if (code != 0) { + logger->log_error("Splunk HEC returned error code: {}", code); + result = false; + } + } else { + logger->log_error("Splunk HEC response JSON does not contain a valid 'code' field"); result = false; + } - if (response_json.HasMember("ackId") && response_json["ackId"].IsUint64()) + if (response_json.HasMember("ackId") && response_json["ackId"].IsUint64()) { flow_file.setAttribute(SPLUNK_ACK_ID, std::to_string(response_json["ackId"].GetUint64())); - else + } else { + logger->log_error("Splunk HEC response JSON does not contain a valid 'ackId' field"); result = false; + } return result; } -bool enrichFlowFileWithAttributes(core::FlowFile& flow_file, http::HTTPClient& client) { +bool enrichFlowFileWithAttributes(core::FlowFile& flow_file, http::HTTPClient& client, const std::shared_ptr& logger) { flow_file.setAttribute(SPLUNK_STATUS_CODE, std::to_string(client.getResponseCode())); flow_file.setAttribute(SPLUNK_RESPONSE_TIME, std::to_string(std::chrono::duration_cast(std::chrono::system_clock::now().time_since_epoch()).count())); - return setAttributesFromClientResponse(flow_file, client) && client.getResponseCode() == 200; + auto result = true; + if (client.getResponseCode() != 200) { + logger->log_error("Received failure response code from Splunk HEC: {}", client.getResponseCode()); + result = false; + } + + if (!setAttributesFromClientResponse(flow_file, client, logger)) { + return false; + } + + return result; } void setFlowFileAsPayload(core::ProcessSession& session, @@ -142,8 +163,11 @@ void PutSplunkHTTP::onTrigger(core::ProcessContext& context, core::ProcessSessio setFlowFileAsPayload(session, context, *client, flow_file); bool success = false; - if (client->submit()) - success = enrichFlowFileWithAttributes(*flow_file, *client); + if (client->submit()) { + success = enrichFlowFileWithAttributes(*flow_file, *client, logger_); + } else { + logger_->log_error("Failed to submit HTTP request to Splunk HEC"); + } session.transfer(flow_file, success ? Success : Failure); } diff --git a/docker/test/integration/minifi/processors/PutSplunkHTTP.py b/extensions/splunk/tests/features/environment.py similarity index 55% rename from docker/test/integration/minifi/processors/PutSplunkHTTP.py rename to extensions/splunk/tests/features/environment.py index 1bf26e59db..347b35688a 100644 --- a/docker/test/integration/minifi/processors/PutSplunkHTTP.py +++ b/extensions/splunk/tests/features/environment.py @@ -13,19 +13,22 @@ # See the License for the specific language governing permissions and # limitations under the License. +import platform -from ..core.Processor import Processor - - -class PutSplunkHTTP(Processor): - def __init__(self, context, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): - super(PutSplunkHTTP, self).__init__( - context=context, - clazz='PutSplunkHTTP', - properties={ - 'Hostname': 'splunk', - 'Port': '8088', - 'Token': 'Splunk 176fae97-f59d-4f08-939a-aa6a543f2485' # Token of the default splunk_hec_token HTTP Event Collector in the Splunk container image - }, - auto_terminate=['success', 'failure'], - schedule=schedule) +from minifi_test_framework.core.hooks import common_before_scenario +from minifi_test_framework.core.hooks import common_after_scenario + + +def before_feature(context, feature): + if "x86_x64_only" in feature.tags: + is_x86 = platform.machine() in ("i386", "AMD64", "x86_64") + if not is_x86: + feature.skip("This feature is only x86/x64 compatible") + + +def before_scenario(context, scenario): + common_before_scenario(context, scenario) + + +def after_scenario(context, scenario): + common_after_scenario(context, scenario) diff --git a/docker/test/integration/features/splunk.feature b/extensions/splunk/tests/features/splunk.feature similarity index 66% rename from docker/test/integration/features/splunk.feature rename to extensions/splunk/tests/features/splunk.feature index a6f99285e6..8f929e3156 100644 --- a/docker/test/integration/features/splunk.feature +++ b/extensions/splunk/tests/features/splunk.feature @@ -17,51 +17,67 @@ @ENABLE_SPLUNK Feature: Sending data to Splunk HEC using PutSplunkHTTP - Background: - Given the content of "/tmp/output" is monitored - Scenario: A MiNiFi instance transfers data to a Splunk HEC Given a Splunk HEC is set up and running And a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "foobar" is present in "/tmp/input" - And a PutSplunkHTTP processor set up to communicate with the Splunk HEC instance - And a QuerySplunkIndexingStatus processor set up to communicate with the Splunk HEC Instance - And the "Splunk Request Channel" properties of the PutSplunkHTTP and QuerySplunkIndexingStatus processors are set to the same random guid + And a PutSplunkHTTP processor + And PutSplunkHTTP is EVENT_DRIVEN + And a QuerySplunkIndexingStatus processor + And QuerySplunkIndexingStatus is EVENT_DRIVEN + And the "Splunk Request Channel" properties of the PutSplunkHTTP and QuerySplunkIndexingStatus processors are set to the same random UUID And the "Source" property of the PutSplunkHTTP processor is set to "my-source" And the "Source Type" property of the PutSplunkHTTP processor is set to "my-source-type" And the "Host" property of the PutSplunkHTTP processor is set to "my-host" + And the "Hostname" property of the PutSplunkHTTP processor is set to "http://splunk-${scenario_id}" + And the "Port" property of the PutSplunkHTTP processor is set to "8088" + And the "Token" property of the PutSplunkHTTP processor is set to "Splunk 176fae97-f59d-4f08-939a-aa6a543f2485" + And the "Hostname" property of the QuerySplunkIndexingStatus processor is set to "http://splunk-${scenario_id}" + And the "Port" property of the QuerySplunkIndexingStatus processor is set to "8088" + And the "Token" property of the QuerySplunkIndexingStatus processor is set to "Splunk 176fae97-f59d-4f08-939a-aa6a543f2485" And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN + And the "success" relationship of the GetFile processor is connected to the PutSplunkHTTP And the "success" relationship of the PutSplunkHTTP processor is connected to the QuerySplunkIndexingStatus And the "undetermined" relationship of the QuerySplunkIndexingStatus processor is connected to the QuerySplunkIndexingStatus And the "acknowledged" relationship of the QuerySplunkIndexingStatus processor is connected to the PutFile - And the "Hostname" property of the PutSplunkHTTP processor is set to "http://splunk-${feature_id}" - And the "Hostname" property of the QuerySplunkIndexingStatus processor is set to "http://splunk-${feature_id}" + And PutFile's success relationship is auto-terminated - When both instances start up - Then a flowfile with the content "foobar" is placed in the monitored directory in less than 20 seconds + When the MiNiFi instance starts up + Then a single file with the content "foobar" is placed in the "/tmp/output" directory in less than 20 seconds And an event is registered in Splunk HEC with the content "foobar" with "my-source" set as source and "my-source-type" set as sourcetype and "my-host" set as host - Scenario: A MiNiFi instance transfers data to a Splunk HEC with SSL enabled Given a Splunk HEC is set up and running And a GetFile processor with the "Input Directory" property set to "/tmp/input" And a file with the content "foobar" is present in "/tmp/input" - And a PutSplunkHTTP processor set up to communicate with the Splunk HEC instance - And a QuerySplunkIndexingStatus processor set up to communicate with the Splunk HEC Instance - And the "Splunk Request Channel" properties of the PutSplunkHTTP and QuerySplunkIndexingStatus processors are set to the same random guid + And a PutSplunkHTTP processor + And PutSplunkHTTP is EVENT_DRIVEN + And a QuerySplunkIndexingStatus processor + And QuerySplunkIndexingStatus is EVENT_DRIVEN + And the "Splunk Request Channel" properties of the PutSplunkHTTP and QuerySplunkIndexingStatus processors are set to the same random UUID And the "Source" property of the PutSplunkHTTP processor is set to "my-source" And the "Source Type" property of the PutSplunkHTTP processor is set to "my-source-type" And the "Host" property of the PutSplunkHTTP processor is set to "my-host" + And the "Hostname" property of the PutSplunkHTTP processor is set to "https://splunk-${scenario_id}" + And the "Port" property of the PutSplunkHTTP processor is set to "8088" + And the "Token" property of the PutSplunkHTTP processor is set to "Splunk 176fae97-f59d-4f08-939a-aa6a543f2485" + And the "Hostname" property of the QuerySplunkIndexingStatus processor is set to "https://splunk-${scenario_id}" + And the "Port" property of the QuerySplunkIndexingStatus processor is set to "8088" + And the "Token" property of the QuerySplunkIndexingStatus processor is set to "Splunk 176fae97-f59d-4f08-939a-aa6a543f2485" + And an ssl context service is set up for PutSplunkHTTP + And an ssl context service is set up for QuerySplunkIndexingStatus And a PutFile processor with the "Directory" property set to "/tmp/output" + And PutFile is EVENT_DRIVEN + And the "success" relationship of the GetFile processor is connected to the PutSplunkHTTP And the "success" relationship of the PutSplunkHTTP processor is connected to the QuerySplunkIndexingStatus And the "undetermined" relationship of the QuerySplunkIndexingStatus processor is connected to the QuerySplunkIndexingStatus And the "acknowledged" relationship of the QuerySplunkIndexingStatus processor is connected to the PutFile + And PutFile's success relationship is auto-terminated And SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus - And the "Hostname" property of the PutSplunkHTTP processor is set to "https://splunk-${feature_id}" - And the "Hostname" property of the QuerySplunkIndexingStatus processor is set to "https://splunk-${feature_id}" - When both instances start up - Then a flowfile with the content "foobar" is placed in the monitored directory in less than 20 seconds + When the MiNiFi instance starts up + Then a single file with the content "foobar" is placed in the "/tmp/output" directory in less than 20 seconds And an event is registered in Splunk HEC with the content "foobar" with "my-source" set as source and "my-source-type" set as sourcetype and "my-host" set as host diff --git a/extensions/splunk/tests/features/steps/splunk_container.py b/extensions/splunk/tests/features/steps/splunk_container.py new file mode 100644 index 0000000000..865407a64c --- /dev/null +++ b/extensions/splunk/tests/features/steps/splunk_container.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from OpenSSL import crypto +from minifi_test_framework.containers.container import Container +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from minifi_test_framework.core.helpers import wait_for_condition, retry_check +from minifi_test_framework.containers.file import File +from minifi_test_framework.core.ssl_utils import make_server_cert + + +class SplunkContainer(Container): + def __init__(self, test_context: MinifiTestContext): + super().__init__("splunk/splunk:9.2.1-patch2", f"splunk-{test_context.scenario_id}", test_context.network) + self.user = None + + self.environment = ["SPLUNK_LICENSE_URI=Free", + "SPLUNK_START_ARGS=--accept-license", + "SPLUNK_PASSWORD=splunkadmin"] + + splunk_config_content = """ +splunk: + hec: + enable: True + ssl: False + port: 8088 + token: 176fae97-f59d-4f08-939a-aa6a543f2485 +""" + self.files.append(File("/tmp/defaults/default.yml", splunk_config_content, mode="rw", permissions=0o644)) + + splunk_cert, splunk_key = make_server_cert(self.container_name, test_context.root_ca_cert, test_context.root_ca_key) + splunk_cert_content = crypto.dump_certificate(crypto.FILETYPE_PEM, splunk_cert) + splunk_key_content = crypto.dump_privatekey(crypto.FILETYPE_PEM, splunk_key) + root_ca_content = crypto.dump_certificate(crypto.FILETYPE_PEM, test_context.root_ca_cert) + self.files.append(File("/opt/splunk/etc/auth/splunk_cert.pem", splunk_cert_content.decode() + splunk_key_content.decode() + root_ca_content.decode(), permissions=0o644)) + self.files.append(File("/opt/splunk/etc/auth/root_ca.pem", root_ca_content.decode(), permissions=0o644)) + + def deploy(self): + super().deploy() + finished_str = "Ansible playbook complete, will begin streaming splunkd_stderr.log" + return wait_for_condition( + condition=lambda: finished_str in self.get_logs(), + timeout_seconds=300, + bail_condition=lambda: False, + context=None) + + @retry_check() + def check_splunk_event(self, query: str) -> bool: + (code, output) = self.exec_run(["sudo", "/opt/splunk/bin/splunk", "search", query, "-auth", "admin:splunkadmin"]) + if code != 0: + return False + return query in output.decode("utf-8") + + @retry_check() + def check_splunk_event_with_attributes(self, query: str, attributes: dict[str, str]) -> bool: + (code, output) = self.exec_run(["sudo", "/opt/splunk/bin/splunk", "search", query, "-output", "json", "-auth", "admin:splunkadmin"]) + if code != 0: + return False + result_lines = output.splitlines() + for result_line in result_lines: + try: + result_line_json = json.loads(result_line) + except json.decoder.JSONDecodeError: + continue + if "result" not in result_line_json: + continue + if "host" in attributes: + if result_line_json["result"]["host"] != attributes["host"]: + continue + if "source" in attributes: + if result_line_json["result"]["source"] != attributes["source"]: + continue + if "sourcetype" in attributes: + if result_line_json["result"]["sourcetype"] != attributes["sourcetype"]: + continue + if "index" in attributes: + if result_line_json["result"]["index"] != attributes["index"]: + continue + return True + return False + + def enable_splunk_hec_indexer(self, hec_name: str): + (code, _) = self.exec_run(["sudo", + "/opt/splunk/bin/splunk", "http-event-collector", + "update", hec_name, + "-uri", "https://localhost:8089", + "-use-ack", "1", + "-disabled", "0", + "-auth", "admin:splunkadmin"]) + return code == 0 + + def enable_splunk_hec_ssl(self): + (code, _) = self.exec_run(["sudo", + "/opt/splunk/bin/splunk", "http-event-collector", + "update", + "-uri", "https://localhost:8089", + "-enable-ssl", "1", + "-server-cert", "/opt/splunk/etc/auth/splunk_cert.pem", + "-ca-cert-file", "/opt/splunk/etc/auth/root_ca.pem", + "-require-client-cert", "1", + "-auth", "admin:splunkadmin"]) + return code == 0 diff --git a/extensions/splunk/tests/features/steps/steps.py b/extensions/splunk/tests/features/steps/steps.py new file mode 100644 index 0000000000..16a8d66e91 --- /dev/null +++ b/extensions/splunk/tests/features/steps/steps.py @@ -0,0 +1,46 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from behave import step, then + +from minifi_test_framework.steps import checking_steps # noqa: F401 +from minifi_test_framework.steps import configuration_steps # noqa: F401 +from minifi_test_framework.steps import core_steps # noqa: F401 +from minifi_test_framework.steps import flow_building_steps # noqa: F401 +from minifi_test_framework.core.helpers import log_due_to_failure +from minifi_test_framework.core.minifi_test_context import MinifiTestContext +from splunk_container import SplunkContainer + + +@step("a Splunk HEC is set up and running") +def step_impl(context: MinifiTestContext): + context.containers["splunk"] = SplunkContainer(context) + assert context.containers["splunk"].deploy() + assert context.containers["splunk"].enable_splunk_hec_indexer('splunk_hec_token') or context.containers["splunk"].log_app_output() + + +@then('an event is registered in Splunk HEC with the content \"{content}\"') +def step_imp(context, content): + assert context.containers["splunk"].check_splunk_event(content) or log_due_to_failure(context) + + +@then('an event is registered in Splunk HEC with the content \"{content}\" with \"{source}\" set as source and \"{source_type}\" set as sourcetype and \"{host}\" set as host') +def step_imp(context, content, source, source_type, host): + attr = {"source": source, "sourcetype": source_type, "host": host} + assert context.containers["splunk"].check_splunk_event_with_attributes(content, attr) or log_due_to_failure(context) + + +@step("SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus") +def step_impl(context): + assert context.containers["splunk"].enable_splunk_hec_ssl() or context.containers["splunk"].log_app_output()