Skip to content
2 changes: 2 additions & 0 deletions ambari-agent/conf/unix/ambari-agent.ini
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ ssl_verify_cert=0
credential_lib_dir=/var/lib/ambari-agent/cred/lib
credential_conf_dir=/var/lib/ambari-agent/cred/conf
credential_shell_cmd=org.apache.hadoop.security.alias.CredentialShell
agent_secret=default-secret-change-me
agent_salt=ambari-agent-cache-salt

[network]
; this option apply only for Agent communication
Expand Down
16 changes: 16 additions & 0 deletions ambari-agent/src/main/python/ambari_agent/AmbariConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,22 @@ def get_ca_cert_file_path(self):
"""
return self.get("security", "ca_cert_path", default="")

def get_agent_secret(self):
"""
Get agent secret used to authenticate with the server.

:return: agent secret string
"""
return self.get('security', 'agent_secret', default="")

def get_agent_salt(self):
"""
Get agent salt used for hashing/encryption.

:return: agent salt string
"""
return self.get('security', 'agent_salt', default="")

@property
def send_alert_changes_only(self):
return bool(self.get("agent", "send_alert_changes_only", "0"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ class ClusterAlertDefinitionsCache(ClusterCache):
differently for every host.
"""

def __init__(self, cluster_cache_dir):
def __init__(self, cluster_cache_dir, secret=None, salt=None):
"""
Initializes the host level params cache.
:param cluster_cache_dir:
:return:
"""
super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir)
super(ClusterAlertDefinitionsCache, self).__init__(cluster_cache_dir, secret, salt)

def get_alert_definition_index_by_id(self, alert_dict, cluster_id, alert_id):
definitions = alert_dict[cluster_id]["alertDefinitions"]
Expand Down
64 changes: 57 additions & 7 deletions ambari-agent/src/main/python/ambari_agent/ClusterCache.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
import os
import threading
from collections import defaultdict
from cryptography.fernet import Fernet
import base64
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC


from ambari_agent.Utils import Utils

Expand All @@ -38,14 +43,16 @@ class ClusterCache(dict):

file_locks = defaultdict(threading.RLock)

def __init__(self, cluster_cache_dir):
def __init__(self, cluster_cache_dir, secret=None, salt=None):
"""
Initializes the cache.
:param cluster_cache_dir:
:return:
"""

self.cluster_cache_dir = cluster_cache_dir
self.secret = secret
self.salt = salt

self.__current_cache_json_file = os.path.join(
self.cluster_cache_dir, self.get_cache_name() + ".json"
Expand All @@ -63,8 +70,10 @@ def __init__(self, cluster_cache_dir):
try:
with self.__file_lock:
if os.path.isfile(self.__current_cache_json_file):
with open(self.__current_cache_json_file, "r") as fp:
cache_dict = json.load(fp)
with open(self.__current_cache_json_file, "rb") as fp: # Note: 'rb' for binary
encrypted_data = fp.read()
decrypted_json = self._decrypt_data(encrypted_data)
cache_dict = json.loads(decrypted_json)

if os.path.isfile(self.__current_cache_hash_file):
with open(self.__current_cache_hash_file, "r") as fp:
Expand All @@ -83,6 +92,39 @@ def __init__(self, cluster_cache_dir):
logger.exception(f"Loading saved cache for {self.__class__.__name__} failed")
self.rewrite_cache({}, None)

def _get_encryption_key(self):
"""
Generate encryption key from a secret and PBKDF2-HMAC.
"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=self.salt.encode('utf-8'),
iterations=100000,
)

key = base64.urlsafe_b64encode(kdf.derive(self.secret.encode()))
return Fernet(key)

def _is_encryption_enabled(self):
return not self.secret or not self.salt

def _encrypt_data(self, data):
"""Encrypt string data"""
if self._is_encryption_enabled():
return data
else:
fernet = self._get_encryption_key()
return fernet.encrypt(data.encode())

def _decrypt_data(self, encrypted_data):
"""Decrypt encrypted bytes to string"""
if self._is_encryption_enabled():
return encrypted_data
else:
fernet = self._get_encryption_key()
return fernet.decrypt(encrypted_data).decode()

def get_cluster_indepedent_data(self):
return self[ClusterCache.COMMON_DATA_CLUSTER]

Expand Down Expand Up @@ -141,8 +183,16 @@ def persist_cache(self, cache_hash):
os.makedirs(self.cluster_cache_dir)

with self.__file_lock:
with open(self.__current_cache_json_file, "w") as f:
json.dump(self, f, indent=2)
# Encrypt JSON data
json_str = json.dumps(self, indent=2)
encrypted_json = self._encrypt_data(json_str)

if self._is_encryption_enabled():
with open(self.__current_cache_json_file, "w") as f:
f.write(encrypted_json)
else:
with open(self.__current_cache_json_file, "wb") as f: # Note: 'wb' for binary
f.write(encrypted_json)

if self.hash is not None:
with open(self.__current_cache_hash_file, "w") as fp:
Expand Down Expand Up @@ -173,7 +223,7 @@ def get_cache_name(self):
raise NotImplemented()

def __deepcopy__(self, memo):
return self.__class__(self.cluster_cache_dir)
return self.__class__(self.cluster_cache_dir, self.secret, self.salt)

def __copy__(self):
return self.__class__(self.cluster_cache_dir)
return self.__class__(self.cluster_cache_dir, self.secret, self.salt)
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ class ClusterConfigurationCache(ClusterCache):
configuration properties.
"""

def __init__(self, cluster_cache_dir):
def __init__(self, cluster_cache_dir, secret=None, salt=None):
"""
Initializes the configuration cache.
:param cluster_cache_dir: directory the changed json are saved
:return:
"""
super(ClusterConfigurationCache, self).__init__(cluster_cache_dir)
super(ClusterConfigurationCache, self).__init__(cluster_cache_dir, secret, salt)

def get_cache_name(self):
return "configurations"
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ class ClusterHostLevelParamsCache(ClusterCache):
differently for every host.
"""

def __init__(self, cluster_cache_dir):
def __init__(self, cluster_cache_dir, secret=None, salt=None):
"""
Initializes the host level params cache.
:param cluster_cache_dir:
:return:
"""
super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir)
super(ClusterHostLevelParamsCache, self).__init__(cluster_cache_dir, secret, salt)

def get_cache_name(self):
return "host_level_params"
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def __init__(self, cluster_cache_dir, config):
:return:
"""
self.config = config
super(ClusterMetadataCache, self).__init__(cluster_cache_dir)
super(ClusterMetadataCache, self).__init__(cluster_cache_dir, config.get_agent_secret(), config.get_agent_salt())

def on_cache_update(self):
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, cluster_cache_dir, config):
self.cluster_local_components = {}
self.cluster_host_info = None
self.component_version_map = {}
super(ClusterTopologyCache, self).__init__(cluster_cache_dir)
super(ClusterTopologyCache, self).__init__(cluster_cache_dir, config.get_agent_secret(), config.get_agent_salt())

def get_cache_name(self):
return "topology"
Expand Down
14 changes: 11 additions & 3 deletions ambari-agent/src/main/python/ambari_agent/InitializerModule.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,19 @@ def init(self):
self.config.cluster_cache_dir, self.config
)
self.host_level_params_cache = ClusterHostLevelParamsCache(
self.config.cluster_cache_dir
self.config.cluster_cache_dir,
self.config.get_agent_secret(),
self.config.get_agent_salt()
)
self.configurations_cache = ClusterConfigurationCache(
self.config.cluster_cache_dir,
self.config.get_agent_secret(),
self.config.get_agent_salt()
)
self.configurations_cache = ClusterConfigurationCache(self.config.cluster_cache_dir)
self.alert_definitions_cache = ClusterAlertDefinitionsCache(
self.config.cluster_cache_dir
self.config.cluster_cache_dir,
self.config.get_agent_secret(),
self.config.get_agent_salt()
)
self.configuration_builder = ConfigurationBuilder(self)
self.stale_alerts_monitor = StaleAlertsMonitor(self)
Expand Down
106 changes: 106 additions & 0 deletions ambari-agent/src/test/python/ambari_agent/TestClusterCache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
#!/usr/bin/env python3

"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""

from ambari_agent import main

main.MEMORY_LEAK_DEBUG_FILEPATH = "/tmp/memory_leak_debug.out"
import os
import tempfile
import shutil
from unittest import TestCase

from ambari_agent.ClusterCache import ClusterCache
from mock.mock import patch, MagicMock
from ambari_commons import OSCheck
from only_for_platform import os_distro_value


class TestCertGeneration(TestCase):
"""
Test suite for verifying encryption behavior of ClusterCache.

It covers:
- encryption/decryption round-trip when secret/salt are provided
- behavior when encryption is effectively disabled (no secret/salt)
"""

# so that ClusterCache initialization is OS-agnostic in this test.
@patch.object(OSCheck, "os_distribution", new=MagicMock(return_value=os_distro_value))
def setUp(self):
# Create a temporary directory that will be cleaned up after each test.
self.tmpdir = tempfile.mkdtemp()
cluster_cache_dir = self.tmpdir + "/cluster_cache_dir"

# Instance with encryption enabled (secret and salt provided).
self.cluster_cache_encrypted = DummyClusterCache(
cluster_cache_dir,
"super_secret",
"super_secret_salt"
)

# Instance with encryption disabled (no secret or salt).
self.cluster_cache_unencrypted = DummyClusterCache(cluster_cache_dir)

@patch.object(os, "chmod")
def test_enc(self, chmod_mock):
"""
Verify that:
- encrypted instance changes the data and can restore it back
- unencrypted instance is a no-op for encrypt/decrypt
"""
string_json = '{"a": 1, "b": 2}'

# Encrypted cache should not store raw JSON.
encrypted = self.cluster_cache_encrypted._encrypt_data(string_json)
self.assertNotEqual(string_json, encrypted)
# Round-trip must produce original JSON.
decrypted = self.cluster_cache_encrypted._decrypt_data(encrypted)
self.assertEqual(string_json, decrypted)

# For unencrypted cache, encrypt/decrypt should behave as pass-through.
string_json = '{"a": 1, "b": 2}'
encrypted = self.cluster_cache_unencrypted._encrypt_data(string_json)
self.assertEqual(string_json, encrypted)
decrypted = self.cluster_cache_unencrypted._decrypt_data(encrypted)
self.assertEqual(string_json, decrypted)

@patch.object(os, "chmod")
def test_encryption_enable(self, chmod_mock):
"""
Verify that _is_encryption_enabled reflects whether secret/salt were provided.
"""
# When secret/salt are given, encryption flag should reflect enabled status.
self.assertFalse(self.cluster_cache_encrypted._is_encryption_enabled())

# When no secret/salt, encryption should be reported as disabled.
self.assertTrue(self.cluster_cache_unencrypted._is_encryption_enabled())

def tearDown(self):
shutil.rmtree(self.tmpdir)

class DummyClusterCache(ClusterCache):
"""
Minimal ClusterCache subclass used only for unit testing.

It overrides get_cache_name to avoid depending on real production cache names.
"""
def get_cache_name(self):
# Dummy implementation just for tests.
return "configuration"
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def get_version():
+ get_ambari_server_stack_package()
+ get_extra_common_packages(),
package_dir=create_package_dir_map(),
install_requires=["coilmq==1.0.1"],
install_requires=["coilmq==1.0.1", "cryptography==46.0.3"],
include_package_data=True,
long_description="The Apache Ambari project is aimed at making Hadoop management simpler by developing software for provisioning, managing, and monitoring Apache Hadoop clusters. "
"Ambari provides an intuitive, easy-to-use Hadoop management web UI backed by its RESTful APIs.",
Expand Down