Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion docker/test/integration/cluster/DockerTestDirectoryBindings.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import hashlib
import subprocess
import OpenSSL.crypto
from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert
from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert, make_client_cert


class DockerTestDirectoryBindings:
Expand Down Expand Up @@ -214,3 +214,11 @@ def create_cert_files(self):
os.path.join(base, "root_ca.crt"),
]
subprocess.run(cmd, check=True)

clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=self.root_ca_cert, ca_key=self.root_ca_key)
self.put_test_resource('clientuser.crt',
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
cert=clientuser_cert))
self.put_test_resource('clientuser.key',
OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
pkey=clientuser_key))
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,67 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import OpenSSL.crypto
import tempfile
import docker
import requests
import logging
from requests.auth import HTTPBasicAuth
from .Container import Container
from utils import retry_check
from ssl_utils.SSL_cert_utils import make_server_cert


class CouchbaseServerContainer(Container):
def __init__(self, feature_context, name, vols, network, image_store, command=None):
super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command)
super().__init__(feature_context, name, "couchbase-server", vols, network, image_store, command)
couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key)

self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert))
self.root_ca_file.close()
os.chmod(self.root_ca_file.name, 0o666)

self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False)
self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert))
self.couchbase_cert_file.close()
os.chmod(self.couchbase_cert_file.name, 0o666)

self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False)
self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key))
self.couchbase_key_file.close()
os.chmod(self.couchbase_key_file.name, 0o666)

def get_startup_finished_log_entry(self):
# after startup the logs are only available in the container, only this message is shown
return "logs available in"

@retry_check(15, 2)
@retry_check(max_tries=12, retry_interval=5)
def _run_couchbase_cli_command(self, command):
(code, _) = self.client.containers.get(self.name).exec_run(command)
if code != 0:
logging.error(f"Failed to run command '{command}', returned error code: {code}")
return False
return True

def _run_couchbase_cli_commands(self, commands):
return all(self._run_couchbase_cli_command(command) for command in commands)

@retry_check(max_tries=15, retry_interval=2)
def _load_couchbase_certs(self):
response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123"))
if response.status_code != 200:
logging.error(f"Failed to load CA certificates, with status code: {response.status_code}")
return False

response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123"))
if response.status_code != 200:
logging.error(f"Failed to reload certificates, with status code: {response.status_code}")
return False

return True

def run_post_startup_commands(self):
if self.post_startup_commands_finished:
return True
Expand All @@ -33,23 +81,45 @@ def run_post_startup_commands(self):
["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query",
"--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase",
"--bucket-ramsize", "1024", "--max-ttl", "36000"]
"--bucket-ramsize", "1024", "--max-ttl", "36000"],
["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123",
"--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"],
["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''],
['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json']
]
for command in commands:
(code, _) = self.client.containers.get(self.name).exec_run(command)
if code != 0:
return False
if not self._run_couchbase_cli_commands(commands):
return False

if not self._load_couchbase_certs():
return False

self.post_startup_commands_finished = True
return True

def deploy(self):
if not self.set_deployed():
return

mounts = [
docker.types.Mount(
type='bind',
source=self.couchbase_key_file.name,
target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'),
docker.types.Mount(
type='bind',
source=self.couchbase_cert_file.name,
target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'),
docker.types.Mount(
type='bind',
source=self.root_ca_file.name,
target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt')
]

self.docker_container = self.client.containers.run(
"couchbase:enterprise-7.2.5",
detach=True,
name=self.name,
network=self.network.name,
ports={'11210/tcp': 11210},
entrypoint=self.command)
ports={'8091/tcp': 8091, '11210/tcp': 11210},
entrypoint=self.command,
mounts=mounts)
57 changes: 57 additions & 0 deletions docker/test/integration/features/couchbase.feature
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,60 @@ Feature: Executing Couchbase operations from MiNiFi-C++
And all instances start up

Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using SSL connection
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
And the "Keep Source File" property of the GetFile processor is set to "true"
And the scheduling period of the GetFile processor is set to "20 seconds"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is set up up with SSL connection with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds

Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication
Given a MiNiFi CPP server with yaml config
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService"
And a PutFile processor with the "Directory" property set to "/tmp/output"
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService"

And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
And the "success" relationship of the PutFile processor is connected to the LogAttribute

When a Couchbase server is started
And all instances start up

Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
30 changes: 29 additions & 1 deletion docker/test/integration/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ def step_impl(context):
minifi_crt_file = '/tmp/resources/minifi_client.crt'
minifi_key_file = '/tmp/resources/minifi_client.key'
root_ca_crt_file = '/tmp/resources/root_ca.crt'
ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
ssl_context_service = SSLContextService(name='SSLContextService', cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)

splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.root_ca_cert, context.root_ca_key)
put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP")
Expand Down Expand Up @@ -1384,6 +1384,34 @@ def step_impl(context, service_name):
container.add_controller(couchbase_cluster_controller_service)


@given("a CouchbaseClusterService is set up up with SSL connection with the name \"{service_name}\"")
def step_impl(context, service_name):
ssl_context_service = SSLContextService(name="SSLContextService",
ca_cert='/tmp/resources/root_ca.crt')
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
ssl_context_service=ssl_context_service)
container.add_controller(couchbase_cluster_controller_service)


@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)


@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"")
def step_impl(context, service_name):
ssl_context_service = SSLContextService(name="SSLContextService",
cert='/tmp/resources/clientuser.crt',
key='/tmp/resources/clientuser.key',
ca_cert='/tmp/resources/root_ca.crt')
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
ssl_context_service=ssl_context_service)
container.add_controller(couchbase_cluster_controller_service)
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@


class CouchbaseClusterService(ControllerService):
def __init__(self, name, connection_string):
def __init__(self, name, connection_string, ssl_context_service=None):
super(CouchbaseClusterService, self).__init__(name=name)

self.service_class = 'CouchbaseClusterService'
self.properties['Connection String'] = connection_string
self.properties['User Name'] = "Administrator"
self.properties['User Password'] = "password123"
if ssl_context_service:
self.linked_services.append(ssl_context_service)
if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties:
self.properties['User Name'] = "Administrator"
self.properties['User Password'] = "password123"
1 change: 1 addition & 0 deletions docker/test/integration/minifi/core/ControllerService.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ def __init__(self, name=None, properties=None):
properties = {}

self.properties = properties
self.linked_services = []
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,7 @@ def serialize_node(self, connectable, root, visited):
continue

visited.append(svc)
root['controllerServices'].append({
'name': svc.name,
'identifier': svc.id,
'type': svc.service_class,
'properties': svc.properties
})
self.serialize_controller(svc, root)

if isinstance(connectable, Funnel):
root['funnels'].append({
Expand Down Expand Up @@ -159,3 +154,9 @@ def serialize_controller(self, controller, root):
'type': controller.service_class,
'properties': controller.properties
})

if controller.linked_services:
if len(controller.linked_services) == 1:
root['controllerServices'][-1]['properties']['Linked Services'] = controller.linked_services[0].name
else:
root['controllerServices'][-1]['properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services]
Original file line number Diff line number Diff line change
Expand Up @@ -119,12 +119,7 @@ def serialize_node(self, connectable, res=None, visited=None):
continue

visited.append(svc)
res['Controller Services'].append({
'name': svc.name,
'id': svc.id,
'class': svc.service_class,
'Properties': svc.properties
})
self._add_controller_service_node(svc, res)

if isinstance(connectable, Funnel):
res['Funnels'].append({
Expand Down Expand Up @@ -160,6 +155,25 @@ def serialize_node(self, connectable, res=None, visited=None):

return (res, visited)

def _add_controller_service_node(self, controller, parent):
if hasattr(controller, 'name'):
connectable_name = controller.name
else:
connectable_name = str(controller.uuid)

parent['Controller Services'].append({
'name': connectable_name,
'id': controller.id,
'class': controller.service_class,
'Properties': controller.properties
})

if controller.linked_services:
if len(controller.linked_services) == 1:
parent['Controller Services'][-1]['Properties']['Linked Services'] = controller.linked_services[0].name
else:
parent['Controller Services'][-1]['Properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services]

def serialize_controller(self, controller, root=None):
if root is None:
res = {
Expand All @@ -175,16 +189,6 @@ def serialize_controller(self, controller, root=None):
else:
res = root

if hasattr(controller, 'name'):
connectable_name = controller.name
else:
connectable_name = str(controller.uuid)

res['Controller Services'].append({
'name': connectable_name,
'id': controller.id,
'class': controller.service_class,
'Properties': controller.properties
})
self._add_controller_service_node(controller, res)

return res
Loading