Skip to content

Commit c10e4a5

Browse files
committed
MINIFICPP-2679 Move HTTP tests to modular docker tests
1 parent 0306957 commit c10e4a5

File tree

13 files changed

+627
-148
lines changed

13 files changed

+627
-148
lines changed

behave_framework/src/minifi_test_framework/containers/container.py

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131

3232
class Container:
33-
def __init__(self, image_name: str, container_name: str, network: Network, command: str | None = None):
33+
def __init__(self, image_name: str, container_name: str, network: Network, command: str | None = None, entrypoint: str | None = None):
3434
self.image_name: str = image_name
3535
self.container_name: str = container_name
3636
self.network: Network = network
@@ -42,6 +42,7 @@ def __init__(self, image_name: str, container_name: str, network: Network, comma
4242
self.host_files: list[HostFile] = []
4343
self.volumes = {}
4444
self.command: str | None = command
45+
self.entrypoint: str | None = entrypoint
4546
self._temp_dir: tempfile.TemporaryDirectory | None = None
4647
self.ports: dict[str, int] | None = None
4748
self.environment: list[str] = []
@@ -91,7 +92,7 @@ def deploy(self) -> bool:
9192
self.container = self.client.containers.run(
9293
image=self.image_name, name=self.container_name, ports=self.ports,
9394
environment=self.environment, volumes=self.volumes, network=self.network.name,
94-
command=self.command, user=self.user, detach=True)
95+
command=self.command, entrypoint=self.entrypoint, user=self.user, detach=True)
9596
except Exception as e:
9697
logging.error(f"Error starting container: {e}")
9798
raise
@@ -418,3 +419,25 @@ def directory_contains_file_with_json_content(self, directory_path: str, expecte
418419
continue
419420

420421
return False
422+
423+
def directory_contains_file_with_minimum_size(self, directory_path: str, expected_size: int) -> bool:
424+
if not self.container or not self.not_empty_dir_exists(directory_path):
425+
return False
426+
427+
command = "sh -c {}".format(shlex.quote(f"find {directory_path} -maxdepth 1 -type f -exec stat -c %s {{}} \\;"))
428+
429+
exit_code, output = self.exec_run(command)
430+
if exit_code != 0:
431+
logging.error(f"Error running command to get file sizes: {output}")
432+
return False
433+
sizes = output.strip().split('\n')
434+
for size_str in sizes:
435+
try:
436+
size = int(size_str)
437+
if size >= expected_size:
438+
return True
439+
except ValueError:
440+
logging.error(f"Error parsing size '{size_str}' as integer for file size comparison.")
441+
continue
442+
443+
return False

behave_framework/src/minifi_test_framework/containers/minifi_container.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,21 @@
2020

2121
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
2222
from minifi_test_framework.containers.file import File
23-
from minifi_test_framework.minifi.flow_definition import FlowDefinition
24-
from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage, make_client_cert
23+
from minifi_test_framework.minifi.minifi_flow_definition import MinifiFlowDefinition
24+
from minifi_test_framework.core.ssl_utils import make_cert_without_extended_usage, make_client_cert, make_server_cert
2525
from .container import Container
2626

2727

2828
class MinifiContainer(Container):
2929
def __init__(self, container_name: str, test_context: MinifiTestContext):
3030
super().__init__(test_context.minifi_container_image, f"{container_name}-{test_context.scenario_id}", test_context.network)
3131
self.flow_config_str: str = ""
32-
self.flow_definition = FlowDefinition()
32+
self.flow_definition = MinifiFlowDefinition()
3333
self.properties: dict[str, str] = {}
3434
self.log_properties: dict[str, str] = {}
3535

3636
minifi_client_cert, minifi_client_key = make_cert_without_extended_usage(common_name=self.container_name, ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
37+
self.files.append(File("/usr/local/share/certs/ca-root-nss.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert)))
3738
self.files.append(File("/tmp/resources/root_ca.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=test_context.root_ca_cert)))
3839
self.files.append(File("/tmp/resources/minifi_client.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_client_cert)))
3940
self.files.append(File("/tmp/resources/minifi_client.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_client_key)))
@@ -42,6 +43,10 @@ def __init__(self, container_name: str, test_context: MinifiTestContext):
4243
self.files.append(File("/tmp/resources/clientuser.crt", crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=clientuser_cert)))
4344
self.files.append(File("/tmp/resources/clientuser.key", crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=clientuser_key)))
4445

46+
minifi_server_cert, minifi_server_key = make_server_cert(common_name=f"server-{test_context.scenario_id}", ca_cert=test_context.root_ca_cert, ca_key=test_context.root_ca_key)
47+
self.files.append(File("/tmp/resources/minifi_server.crt",
48+
crypto.dump_certificate(type=crypto.FILETYPE_PEM, cert=minifi_server_cert) + crypto.dump_privatekey(type=crypto.FILETYPE_PEM, pkey=minifi_server_key)))
49+
4550
self.is_fhs = 'MINIFI_INSTALLATION_TYPE=FHS' in str(self.client.images.get(test_context.minifi_container_image).history())
4651

4752
self._fill_default_properties()
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one or more
2+
# contributor license agreements. See the NOTICE file distributed with
3+
# this work for additional information regarding copyright ownership.
4+
# The ASF licenses this file to You under the Apache License, Version 2.0
5+
# (the "License"); you may not use this file except in compliance with
6+
# the License. You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
import io
17+
import gzip
18+
from typing import List, Optional
19+
from minifi_test_framework.containers.file import File
20+
from minifi_test_framework.containers.container import Container
21+
from minifi_test_framework.core.helpers import wait_for_condition
22+
from minifi_test_framework.core.minifi_test_context import MinifiTestContext
23+
from minifi_test_framework.minifi.nifi_flow_definition import NifiFlowDefinition
24+
25+
26+
class NifiContainer(Container):
27+
NIFI_VERSION = '2.2.0'
28+
29+
def __init__(self, test_context: MinifiTestContext, command: Optional[List[str]] = None, use_ssl: bool = False):
30+
self.flow_definition = NifiFlowDefinition()
31+
name = f"nifi-{test_context.scenario_id}"
32+
if use_ssl:
33+
entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' /opt/nifi/nifi-current/conf/nifi.properties && "
34+
r"sed -i -e 's/^\(nifi.remote.input.secure\)=.*/\1=true/' /opt/nifi/nifi-current/conf/nifi.properties && "
35+
r"sed -i -e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' /opt/nifi/nifi-current/conf/nifi.properties && "
36+
r"sed -i -e 's/^\(nifi.web.https.port\)=.*/\1=8443/' /opt/nifi/nifi-current/conf/nifi.properties && "
37+
r"sed -i -e 's/^\(nifi.web.https.host\)=.*/\1={name}/' /opt/nifi/nifi-current/conf/nifi.properties && "
38+
r"sed -i -e 's/^\(nifi.security.keystore\)=.*/\1=\/tmp\/resources\/keystore.jks/' /opt/nifi/nifi-current/conf/nifi.properties && "
39+
r"sed -i -e 's/^\(nifi.security.keystoreType\)=.*/\1=jks/' /opt/nifi/nifi-current/conf/nifi.properties && "
40+
r"sed -i -e 's/^\(nifi.security.keystorePasswd\)=.*/\1=passw0rd1!/' /opt/nifi/nifi-current/conf/nifi.properties && "
41+
r"sed -i -e 's/^\(nifi.security.keyPasswd\)=.*/#\1=passw0rd1!/' /opt/nifi/nifi-current/conf/nifi.properties && "
42+
r"sed -i -e 's/^\(nifi.security.truststore\)=.*/\1=\/tmp\/resources\/truststore.jks/' /opt/nifi/nifi-current/conf/nifi.properties && "
43+
r"sed -i -e 's/^\(nifi.security.truststoreType\)=.*/\1=jks/' /opt/nifi/nifi-current/conf/nifi.properties && "
44+
r"sed -i -e 's/^\(nifi.security.truststorePasswd\)=.*/\1=passw0rd1!/' /opt/nifi/nifi-current/conf/nifi.properties && "
45+
r"sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10443/' /opt/nifi/nifi-current/conf/nifi.properties && "
46+
r"cp /tmp/nifi_config/flow.json.gz /opt/nifi/nifi-current/conf && /opt/nifi/nifi-current/bin/nifi.sh run & "
47+
r"nifi_pid=$! &&"
48+
r"tail -F --pid=${{nifi_pid}} /opt/nifi/nifi-current/logs/nifi-app.log").format(name=name)
49+
else:
50+
entry_command = (r"sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' /opt/nifi/nifi-current/conf/nifi.properties && "
51+
r"sed -i -e 's/^\(nifi.sensitive.props.key\)=.*/\1=secret_key_12345/' /opt/nifi/nifi-current/conf/nifi.properties && "
52+
r"sed -i -e 's/^\(nifi.remote.input.secure\)=.*/\1=false/' /opt/nifi/nifi-current/conf/nifi.properties && "
53+
r"sed -i -e 's/^\(nifi.web.http.port\)=.*/\1=8080/' /opt/nifi/nifi-current/conf/nifi.properties && "
54+
r"sed -i -e 's/^\(nifi.web.https.port\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
55+
r"sed -i -e 's/^\(nifi.web.https.host\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
56+
r"sed -i -e 's/^\(nifi.web.http.host\)=.*/\1={name}/' /opt/nifi/nifi-current/conf/nifi.properties && "
57+
r"sed -i -e 's/^\(nifi.security.keystore\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
58+
r"sed -i -e 's/^\(nifi.security.keystoreType\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
59+
r"sed -i -e 's/^\(nifi.security.keystorePasswd\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
60+
r"sed -i -e 's/^\(nifi.security.keyPasswd\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
61+
r"sed -i -e 's/^\(nifi.security.truststore\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
62+
r"sed -i -e 's/^\(nifi.security.truststoreType\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
63+
r"sed -i -e 's/^\(nifi.security.truststorePasswd\)=.*/\1=/' /opt/nifi/nifi-current/conf/nifi.properties && "
64+
r"sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=10000/' /opt/nifi/nifi-current/conf/nifi.properties && "
65+
r"cp /tmp/nifi_config/flow.json.gz /opt/nifi/nifi-current/conf && /opt/nifi/nifi-current/bin/nifi.sh run & "
66+
r"nifi_pid=$! &&"
67+
r"tail -F --pid=${{nifi_pid}} /opt/nifi/nifi-current/logs/nifi-app.log").format(name=name)
68+
if not command:
69+
command = ["/bin/sh", "-c", entry_command]
70+
71+
super().__init__("apache/nifi:" + self.NIFI_VERSION, name, test_context.network, entrypoint=command)
72+
73+
def deploy(self):
74+
flow_config = self.flow_definition.to_json()
75+
buffer = io.BytesIO()
76+
77+
with gzip.GzipFile(fileobj=buffer, mode='wb') as gz_file:
78+
gz_file.write(flow_config.encode())
79+
80+
gzipped_bytes = buffer.getvalue()
81+
self.files.append(File("/tmp/nifi_config/flow.json.gz", gzipped_bytes))
82+
83+
super().deploy()
84+
finished_str = "Started Application in"
85+
return wait_for_condition(
86+
condition=lambda: finished_str in self.get_logs(),
87+
timeout_seconds=300,
88+
bail_condition=lambda: self.exited,
89+
context=None)

behave_framework/src/minifi_test_framework/minifi/flow_definition.py

Lines changed: 5 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
# limitations under the License.
1616
#
1717

18-
import yaml
18+
from abc import ABC
1919

2020
from .connection import Connection
2121
from .controller_service import ControllerService
@@ -24,7 +24,7 @@
2424
from .processor import Processor
2525

2626

27-
class FlowDefinition:
27+
class FlowDefinition(ABC):
2828
def __init__(self, flow_name: str = "MiNiFi Flow"):
2929
self.flow_name = flow_name
3030
self.processors: list[Processor] = []
@@ -53,42 +53,10 @@ def add_connection(self, connection: Connection):
5353
self.connections.append(connection)
5454

5555
def to_yaml(self) -> str:
56-
"""Serializes the entire flow definition into the MiNiFi YAML format."""
56+
raise NotImplementedError("to_yaml method must be implemented in subclasses")
5757

58-
# Create a quick lookup map of processor names to their objects
59-
# This is crucial for finding the source/destination IDs for connections
60-
processors_by_name = {p.name: p for p in self.processors}
61-
funnels_by_name = {f.name: f for f in self.funnels}
62-
63-
connectables_by_name = {**processors_by_name, **funnels_by_name}
64-
65-
if len(self.parameter_contexts) > 0:
66-
parameter_context_name = self.parameter_contexts[0].name
67-
else:
68-
parameter_context_name = ''
69-
# Build the final dictionary structure
70-
config = {'MiNiFi Config Version': 3, 'Flow Controller': {'name': self.flow_name},
71-
'Parameter Contexts': [p.to_yaml_dict() for p in self.parameter_contexts],
72-
'Processors': [p.to_yaml_dict() for p in self.processors],
73-
'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [],
74-
'Controller Services': [c.to_yaml_dict() for c in self.controller_services],
75-
'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name}
76-
77-
# Build the connections list by looking up processor IDs
78-
for conn in self.connections:
79-
source_proc = connectables_by_name.get(conn.source_name)
80-
dest_proc = connectables_by_name.get(conn.target_name)
81-
82-
if not source_proc or not dest_proc:
83-
raise ValueError(
84-
f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'")
85-
86-
config['Connections'].append(
87-
{'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id,
88-
'source id': source_proc.id, 'source relationship name': conn.source_relationship,
89-
'destination id': dest_proc.id})
90-
91-
return yaml.dump(config, sort_keys=False, indent=2, width=120)
58+
def to_json(self) -> str:
59+
raise NotImplementedError("to_json method must be implemented in subclasses")
9260

9361
def __repr__(self):
9462
return f"FlowDefinition(Processors: {self.processors}, Controller Services: {self.controller_services})"
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
import yaml
19+
20+
from .flow_definition import FlowDefinition
21+
22+
23+
class MinifiFlowDefinition(FlowDefinition):
24+
def __init__(self, flow_name: str = "MiNiFi Flow"):
25+
super().__init__(flow_name)
26+
27+
def to_yaml(self) -> str:
28+
"""Serializes the entire flow definition into the MiNiFi YAML format."""
29+
30+
# Create a quick lookup map of processor names to their objects
31+
# This is crucial for finding the source/destination IDs for connections
32+
processors_by_name = {p.name: p for p in self.processors}
33+
funnels_by_name = {f.name: f for f in self.funnels}
34+
35+
connectables_by_name = {**processors_by_name, **funnels_by_name}
36+
37+
if len(self.parameter_contexts) > 0:
38+
parameter_context_name = self.parameter_contexts[0].name
39+
else:
40+
parameter_context_name = ''
41+
# Build the final dictionary structure
42+
config = {'MiNiFi Config Version': 3, 'Flow Controller': {'name': self.flow_name},
43+
'Parameter Contexts': [p.to_yaml_dict() for p in self.parameter_contexts],
44+
'Processors': [p.to_yaml_dict() for p in self.processors],
45+
'Funnels': [f.to_yaml_dict() for f in self.funnels], 'Connections': [],
46+
'Controller Services': [c.to_yaml_dict() for c in self.controller_services],
47+
'Remote Processing Groups': [], 'Parameter Context Name': parameter_context_name}
48+
49+
# Build the connections list by looking up processor IDs
50+
for conn in self.connections:
51+
source_proc = connectables_by_name.get(conn.source_name)
52+
dest_proc = connectables_by_name.get(conn.target_name)
53+
54+
if not source_proc or not dest_proc:
55+
raise ValueError(
56+
f"Could not find processors for connection from '{conn.source_name}' to '{conn.target_name}'")
57+
58+
config['Connections'].append(
59+
{'name': f"{conn.source_name}/{conn.source_relationship}/{conn.target_name}", 'id': conn.id,
60+
'source id': source_proc.id, 'source relationship name': conn.source_relationship,
61+
'destination id': dest_proc.id})
62+
63+
return yaml.dump(config, sort_keys=False, indent=2, width=120)

0 commit comments

Comments
 (0)