Skip to content

Commit 327d81b

Browse files
committed
MINIFICPP-2470 Add SSL and mTLS authentication support to CouchbaseClusterService
1 parent 6b8d35c commit 327d81b

File tree

14 files changed

+415
-53
lines changed

14 files changed

+415
-53
lines changed

docker/test/integration/cluster/DockerTestDirectoryBindings.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import hashlib
1919
import subprocess
2020
import OpenSSL.crypto
21-
from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert
21+
from ssl_utils.SSL_cert_utils import make_self_signed_cert, make_cert_without_extended_usage, make_server_cert, make_client_cert
2222

2323

2424
class DockerTestDirectoryBindings:
@@ -214,3 +214,11 @@ def create_cert_files(self):
214214
os.path.join(base, "root_ca.crt"),
215215
]
216216
subprocess.run(cmd, check=True)
217+
218+
clientuser_cert, clientuser_key = make_client_cert("clientuser", ca_cert=self.root_ca_cert, ca_key=self.root_ca_key)
219+
self.put_test_resource('clientuser.crt',
220+
OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM,
221+
cert=clientuser_cert))
222+
self.put_test_resource('clientuser.key',
223+
OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM,
224+
pkey=clientuser_key))

docker/test/integration/cluster/containers/CouchbaseServerContainer.py

Lines changed: 84 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,72 @@
1212
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1313
# See the License for the specific language governing permissions and
1414
# limitations under the License.
15+
import os
16+
import OpenSSL.crypto
17+
import tempfile
18+
import docker
19+
import requests
20+
import logging
21+
from requests.auth import HTTPBasicAuth
1522
from .Container import Container
1623
from utils import retry_check
24+
from ssl_utils.SSL_cert_utils import make_server_cert
1725

1826

1927
class CouchbaseServerContainer(Container):
20-
def __init__(self, feature_context, name, vols, network, image_store, command=None):
21-
super().__init__(feature_context, name, 'couchbase-server', vols, network, image_store, command)
28+
def __init__(self, feature_context, name, vols, network, image_store, command=None, ssl=False):
29+
self.ssl = ssl
30+
engine = "couchbase-server" if not ssl else "couchbase-server-ssl"
31+
super().__init__(feature_context, name, engine, vols, network, image_store, command)
32+
couchbase_cert, couchbase_key = make_server_cert(f"couchbase-server-{feature_context.id}", feature_context.root_ca_cert, feature_context.root_ca_key)
33+
34+
self.root_ca_file = tempfile.NamedTemporaryFile(delete=False)
35+
self.root_ca_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=feature_context.root_ca_cert))
36+
self.root_ca_file.close()
37+
os.chmod(self.root_ca_file.name, 0o666)
38+
39+
self.couchbase_cert_file = tempfile.NamedTemporaryFile(delete=False)
40+
self.couchbase_cert_file.write(OpenSSL.crypto.dump_certificate(type=OpenSSL.crypto.FILETYPE_PEM, cert=couchbase_cert))
41+
self.couchbase_cert_file.close()
42+
os.chmod(self.couchbase_cert_file.name, 0o666)
43+
44+
self.couchbase_key_file = tempfile.NamedTemporaryFile(delete=False)
45+
self.couchbase_key_file.write(OpenSSL.crypto.dump_privatekey(type=OpenSSL.crypto.FILETYPE_PEM, pkey=couchbase_key))
46+
self.couchbase_key_file.close()
47+
os.chmod(self.couchbase_key_file.name, 0o666)
2248

2349
def get_startup_finished_log_entry(self):
2450
# after startup the logs are only available in the container, only this message is shown
2551
return "logs available in"
2652

53+
@retry_check(12, 5)
54+
def _run_couchbase_cli_command(self, command):
55+
(code, _) = self.client.containers.get(self.name).exec_run(command)
56+
if code != 0:
57+
logging.error(f"Failed to run command '{command}', returned error code: {code}")
58+
return False
59+
return True
60+
61+
def _run_couchbase_cli_commands(self, commands):
62+
for command in commands:
63+
if not self._run_couchbase_cli_command(command):
64+
return False
65+
return True
66+
2767
@retry_check(15, 2)
68+
def _load_couchbase_certs(self):
69+
response = requests.post("http://localhost:8091/node/controller/loadTrustedCAs", auth=HTTPBasicAuth("Administrator", "password123"))
70+
if response.status_code != 200:
71+
logging.error(f"Failed to load CA certificates, with status code: {response.status_code}")
72+
return False
73+
74+
response = requests.post("http://localhost:8091/node/controller/reloadCertificate", auth=HTTPBasicAuth("Administrator", "password123"))
75+
if response.status_code != 200:
76+
logging.error(f"Failed to reload certificates, with status code: {response.status_code}")
77+
return False
78+
79+
return True
80+
2881
def run_post_startup_commands(self):
2982
if self.post_startup_commands_finished:
3083
return True
@@ -33,23 +86,45 @@ def run_post_startup_commands(self):
3386
["couchbase-cli", "cluster-init", "-c", "localhost", "--cluster-username", "Administrator", "--cluster-password", "password123", "--services", "data,index,query",
3487
"--cluster-ramsize", "2048", "--cluster-index-ramsize", "256"],
3588
["couchbase-cli", "bucket-create", "-c", "localhost", "--username", "Administrator", "--password", "password123", "--bucket", "test_bucket", "--bucket-type", "couchbase",
36-
"--bucket-ramsize", "1024", "--max-ttl", "36000"]
89+
"--bucket-ramsize", "1024", "--max-ttl", "36000"],
90+
["couchbase-cli", "user-manage", "-c", "localhost", "-u", "Administrator", "-p", "password123", "--set", "--rbac-username", "clientuser", "--rbac-password", "password123",
91+
"--roles", "data_reader[test_bucket],data_writer[test_bucket]", "--auth-domain", "local"],
92+
["bash", "-c", 'tee /tmp/auth.json <<< \'{"state": "enable", "prefixes": [ {"path": "subject.cn", "prefix": "", "delimiter": "."}]}\''],
93+
['couchbase-cli', 'ssl-manage', '-c', 'localhost', '-u', 'Administrator', '-p', 'password123', '--set-client-auth', '/tmp/auth.json']
3794
]
38-
for command in commands:
39-
(code, _) = self.client.containers.get(self.name).exec_run(command)
40-
if code != 0:
41-
return False
95+
if not self._run_couchbase_cli_commands(commands):
96+
return False
97+
98+
if not self._load_couchbase_certs():
99+
return False
100+
42101
self.post_startup_commands_finished = True
43102
return True
44103

45104
def deploy(self):
46105
if not self.set_deployed():
47106
return
48107

108+
mounts = [
109+
docker.types.Mount(
110+
type='bind',
111+
source=self.couchbase_key_file.name,
112+
target='/opt/couchbase/var/lib/couchbase/inbox/pkey.key'),
113+
docker.types.Mount(
114+
type='bind',
115+
source=self.couchbase_cert_file.name,
116+
target='/opt/couchbase/var/lib/couchbase/inbox/chain.pem'),
117+
docker.types.Mount(
118+
type='bind',
119+
source=self.root_ca_file.name,
120+
target='/opt/couchbase/var/lib/couchbase/inbox/CA/root_ca.crt')
121+
]
122+
49123
self.docker_container = self.client.containers.run(
50124
"couchbase:enterprise-7.2.5",
51125
detach=True,
52126
name=self.name,
53127
network=self.network.name,
54-
ports={'11210/tcp': 11210},
55-
entrypoint=self.command)
128+
ports={'8091/tcp': 8091, '11210/tcp': 11210},
129+
entrypoint=self.command,
130+
mounts=mounts)

docker/test/integration/features/MiNiFi_integration_test_driver.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ def start_minifi_c2_server(self, context):
8686
self.cluster.deploy_container('minifi-c2-server')
8787
assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') or self.cluster.log_app_output()
8888

89-
def start_couchbase_server(self, context):
90-
self.cluster.acquire_container(context=context, name='couchbase-server', engine='couchbase-server')
89+
def start_couchbase_server(self, context, ssl=False):
90+
engine = 'couchbase-server-ssl' if ssl else 'couchbase-server'
91+
self.cluster.acquire_container(context=context, name='couchbase-server', engine=engine)
9192
self.cluster.deploy_container('couchbase-server')
9293
assert self.cluster.wait_for_container_startup_to_finish('couchbase-server') or self.cluster.log_app_output()
9394

docker/test/integration/features/couchbase.feature

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,3 +173,60 @@ Feature: Executing Couchbase operations from MiNiFi-C++
173173
And all instances start up
174174

175175
Then the Minifi logs contain the following message: "Failed to get content for document 'test_doc_id' from collection 'test_bucket._default._default' with the following exception: 'raw_binary_transcoder expects document to have BINARY common flags" in less than 60 seconds
176+
177+
Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using SSL connection
178+
Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
179+
And the "Keep Source File" property of the GetFile processor is set to "true"
180+
And the scheduling period of the GetFile processor is set to "20 seconds"
181+
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
182+
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
183+
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
184+
And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
185+
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
186+
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
187+
And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService"
188+
And a PutFile processor with the "Directory" property set to "/tmp/output"
189+
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
190+
And a CouchbaseClusterService is setup up with SSL connection with the name "CouchbaseClusterService"
191+
192+
And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
193+
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
194+
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
195+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
196+
197+
When a Couchbase server is started
198+
And all instances start up
199+
200+
Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds
201+
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
202+
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
203+
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
204+
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds
205+
206+
Scenario: A MiNiFi instance can get data from test bucket with GetCouchbaseKey processor using mTLS authentication
207+
Given a MiNiFi CPP server with yaml config
208+
And a GetFile processor with the "Input Directory" property set to "/tmp/input"
209+
And a file with the content '{"field1": "value1", "field2": "value2"}' is present in '/tmp/input'
210+
And a PutCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
211+
And the "Document Id" property of the PutCouchbaseKey processor is set to "test_doc_id"
212+
And the "Couchbase Cluster Controller Service" property of the PutCouchbaseKey processor is set to "CouchbaseClusterService"
213+
And a GetCouchbaseKey processor with the "Bucket Name" property set to "test_bucket"
214+
And the "Document Id" property of the GetCouchbaseKey processor is set to "test_doc_id"
215+
And the "Couchbase Cluster Controller Service" property of the GetCouchbaseKey processor is set to "CouchbaseClusterService"
216+
And a PutFile processor with the "Directory" property set to "/tmp/output"
217+
And a LogAttribute processor with the "FlowFiles To Log" property set to "0"
218+
And a CouchbaseClusterService is setup up using mTLS authentication with the name "CouchbaseClusterService"
219+
220+
And the "success" relationship of the GetFile processor is connected to the PutCouchbaseKey
221+
And the "success" relationship of the PutCouchbaseKey processor is connected to the GetCouchbaseKey
222+
And the "success" relationship of the GetCouchbaseKey processor is connected to the PutFile
223+
And the "success" relationship of the PutFile processor is connected to the LogAttribute
224+
225+
When a Couchbase server is started
226+
And all instances start up
227+
228+
Then a flowfile with the JSON content '{"field1": "value1", "field2": "value2"}' is placed in the monitored directory in less than 6000 seconds
229+
And the Minifi logs contain the following message: "key:couchbase.bucket value:test_bucket" in less than 10 seconds
230+
And the Minifi logs contain the following message: "key:couchbase.doc.id value:test_doc_id" in less than 1 seconds
231+
And the Minifi logs match the following regex: "key:couchbase.doc.cas value:[1-9][0-9]*" in less than 1 seconds
232+
And the Minifi logs match the following regex: "key:couchbase.doc.expiry value:\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d{3}" in less than 1 seconds

docker/test/integration/features/steps/steps.py

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ def step_impl(context):
573573
minifi_crt_file = '/tmp/resources/minifi_client.crt'
574574
minifi_key_file = '/tmp/resources/minifi_client.key'
575575
root_ca_crt_file = '/tmp/resources/root_ca.crt'
576-
ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
576+
ssl_context_service = SSLContextService(name='SSLContextService', cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
577577

578578
splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.root_ca_cert, context.root_ca_key)
579579
put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP")
@@ -1384,6 +1384,34 @@ def step_impl(context, service_name):
13841384
container.add_controller(couchbase_cluster_controller_service)
13851385

13861386

1387+
@given("a CouchbaseClusterService is setup up with SSL connection with the name \"{service_name}\"")
1388+
def step_impl(context, service_name):
1389+
ssl_context_service = SSLContextService(name="SSLContextService",
1390+
ca_cert='/tmp/resources/root_ca.crt')
1391+
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
1392+
container.add_controller(ssl_context_service)
1393+
couchbase_cluster_controller_service = CouchbaseClusterService(
1394+
name=service_name,
1395+
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
1396+
ssl_context_service=ssl_context_service)
1397+
container.add_controller(couchbase_cluster_controller_service)
1398+
1399+
13871400
@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
13881401
def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
13891402
context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)
1403+
1404+
1405+
@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"")
1406+
def step_impl(context, service_name):
1407+
ssl_context_service = SSLContextService(name="SSLContextService",
1408+
cert='/tmp/resources/clientuser.crt',
1409+
key='/tmp/resources/clientuser.key',
1410+
ca_cert='/tmp/resources/root_ca.crt')
1411+
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
1412+
container.add_controller(ssl_context_service)
1413+
couchbase_cluster_controller_service = CouchbaseClusterService(
1414+
name=service_name,
1415+
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
1416+
ssl_context_service=ssl_context_service)
1417+
container.add_controller(couchbase_cluster_controller_service)

docker/test/integration/minifi/controllers/CouchbaseClusterService.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818

1919

2020
class CouchbaseClusterService(ControllerService):
21-
def __init__(self, name, connection_string):
21+
def __init__(self, name, connection_string, ssl_context_service=None):
2222
super(CouchbaseClusterService, self).__init__(name=name)
2323

2424
self.service_class = 'CouchbaseClusterService'
2525
self.properties['Connection String'] = connection_string
26-
self.properties['User Name'] = "Administrator"
27-
self.properties['User Password'] = "password123"
26+
if ssl_context_service:
27+
self.linked_services.append(ssl_context_service)
28+
if not ssl_context_service or ssl_context_service and 'Client Certificate' not in ssl_context_service.properties:
29+
self.properties['User Name'] = "Administrator"
30+
self.properties['User Password'] = "password123"

docker/test/integration/minifi/core/ControllerService.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,4 @@ def __init__(self, name=None, properties=None):
3434
properties = {}
3535

3636
self.properties = properties
37+
self.linked_services = []

docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,7 @@ def serialize_node(self, connectable, root, visited):
118118
continue
119119

120120
visited.append(svc)
121-
root['controllerServices'].append({
122-
'name': svc.name,
123-
'identifier': svc.id,
124-
'type': svc.service_class,
125-
'properties': svc.properties
126-
})
121+
self.serialize_controller(svc, root)
127122

128123
if isinstance(connectable, Funnel):
129124
root['funnels'].append({
@@ -159,3 +154,9 @@ def serialize_controller(self, controller, root):
159154
'type': controller.service_class,
160155
'properties': controller.properties
161156
})
157+
158+
if controller.linked_services:
159+
if len(controller.linked_services) == 1:
160+
root['controllerServices'][-1]['properties']['Linked Services'] = controller.linked_services[0].name
161+
else:
162+
root['controllerServices'][-1]['properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services]

0 commit comments

Comments
 (0)