|
11 | 11 | from cephadm.inventory import ( |
12 | 12 | HostCacheStatus, |
13 | 13 | ClientKeyringSpec, |
14 | | - Cert, |
15 | | - PrivKey, |
16 | | - CERT_STORE_CERT_PREFIX, |
17 | | - CERT_STORE_KEY_PREFIX, |
18 | 14 | SpecDescription, |
19 | 15 | ) |
20 | 16 | from cephadm.services.osd import OSD, OSDRemovalQueue, OsdIdClaims |
@@ -1743,207 +1739,6 @@ def _fake_inv(key): |
1743 | 1739 | assert cephadm_module.cache._get_host_cache_entry_status( |
1744 | 1740 | 'host.nothing.com') == HostCacheStatus.stray |
1745 | 1741 |
|
1746 | | - @mock.patch("cephadm.module.CephadmOrchestrator.set_store") |
1747 | | - def test_cert_store_save_cert(self, _set_store, cephadm_module: CephadmOrchestrator): |
1748 | | - cephadm_module.cert_key_store._init_known_cert_key_dicts() |
1749 | | - |
1750 | | - rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' |
1751 | | - nvmeof_client_cert = 'fake-nvmeof-client-cert' |
1752 | | - nvmeof_server_cert = 'fake-nvmeof-server-cert' |
1753 | | - nvmeof_root_ca_cert = 'fake-nvmeof-root-ca-cert' |
1754 | | - grafana_cert_host_1 = 'grafana-cert-host-1' |
1755 | | - grafana_cert_host_2 = 'grafana-cert-host-2' |
1756 | | - cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True) |
1757 | | - cephadm_module.cert_key_store.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True) |
1758 | | - cephadm_module.cert_key_store.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True) |
1759 | | - cephadm_module.cert_key_store.save_cert('nvmeof_root_ca_cert', nvmeof_root_ca_cert, service_name='nvmeof.foo', user_made=True) |
1760 | | - cephadm_module.cert_key_store.save_cert('grafana_cert', grafana_cert_host_1, host='host-1', user_made=True) |
1761 | | - cephadm_module.cert_key_store.save_cert('grafana_cert', grafana_cert_host_2, host='host-2', user_made=True) |
1762 | | - |
1763 | | - expected_calls = [ |
1764 | | - mock.call(f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert', json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()})), |
1765 | | - mock.call(f'{CERT_STORE_CERT_PREFIX}nvmeof_server_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()})), |
1766 | | - mock.call(f'{CERT_STORE_CERT_PREFIX}nvmeof_client_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()})), |
1767 | | - mock.call(f'{CERT_STORE_CERT_PREFIX}nvmeof_root_ca_cert', json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()})), |
1768 | | - mock.call(f'{CERT_STORE_CERT_PREFIX}grafana_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json()})), |
1769 | | - mock.call(f'{CERT_STORE_CERT_PREFIX}grafana_cert', json.dumps({'host-1': Cert(grafana_cert_host_1, True).to_json(), |
1770 | | - 'host-2': Cert(grafana_cert_host_2, True).to_json()})) |
1771 | | - ] |
1772 | | - _set_store.assert_has_calls(expected_calls) |
1773 | | - |
1774 | | - @mock.patch("cephadm.module.CephadmOrchestrator.set_store") |
1775 | | - def test_cert_store_cert_ls(self, _set_store, cephadm_module: CephadmOrchestrator): |
1776 | | - cephadm_module.cert_key_store._init_known_cert_key_dicts() |
1777 | | - |
1778 | | - expected_ls = { |
1779 | | - 'rgw_frontend_ssl_cert': False, |
1780 | | - 'iscsi_ssl_cert': False, |
1781 | | - 'ingress_ssl_cert': False, |
1782 | | - 'mgmt_gw_cert': False, |
1783 | | - 'oauth2_proxy_cert': False, |
1784 | | - 'cephadm_root_ca_cert': False, |
1785 | | - 'grafana_cert': False, |
1786 | | - 'nvmeof_client_cert': False, |
1787 | | - 'nvmeof_server_cert': False, |
1788 | | - 'nvmeof_root_ca_cert': False, |
1789 | | - } |
1790 | | - assert cephadm_module.cert_key_store.cert_ls() == expected_ls |
1791 | | - |
1792 | | - cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.foo', user_made=True) |
1793 | | - cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', 'xxx', service_name='rgw.bar', user_made=True) |
1794 | | - expected_ls['rgw_frontend_ssl_cert'] = {} |
1795 | | - expected_ls['rgw_frontend_ssl_cert']['rgw.foo'] = True |
1796 | | - expected_ls['rgw_frontend_ssl_cert']['rgw.bar'] = True |
1797 | | - assert cephadm_module.cert_key_store.cert_ls() == expected_ls |
1798 | | - |
1799 | | - cephadm_module.cert_key_store.save_cert('nvmeof_client_cert', 'xxx', service_name='nvmeof.foo', user_made=True) |
1800 | | - cephadm_module.cert_key_store.save_cert('nvmeof_server_cert', 'xxx', service_name='nvmeof.foo', user_made=True) |
1801 | | - cephadm_module.cert_key_store.save_cert('nvmeof_root_ca_cert', 'xxx', service_name='nvmeof.foo', user_made=True) |
1802 | | - expected_ls['nvmeof_client_cert'] = {} |
1803 | | - expected_ls['nvmeof_client_cert']['nvmeof.foo'] = True |
1804 | | - expected_ls['nvmeof_server_cert'] = {} |
1805 | | - expected_ls['nvmeof_server_cert']['nvmeof.foo'] = True |
1806 | | - expected_ls['nvmeof_root_ca_cert'] = {} |
1807 | | - expected_ls['nvmeof_root_ca_cert']['nvmeof.foo'] = True |
1808 | | - assert cephadm_module.cert_key_store.cert_ls() == expected_ls |
1809 | | - |
1810 | | - @mock.patch("cephadm.module.CephadmOrchestrator.set_store") |
1811 | | - def test_cert_store_save_key(self, _set_store, cephadm_module: CephadmOrchestrator): |
1812 | | - cephadm_module.cert_key_store._init_known_cert_key_dicts() |
1813 | | - |
1814 | | - grafana_host1_key = 'fake-grafana-host1-key' |
1815 | | - grafana_host2_key = 'fake-grafana-host2-key' |
1816 | | - nvmeof_client_key = 'nvmeof-client-key' |
1817 | | - nvmeof_server_key = 'nvmeof-server-key' |
1818 | | - nvmeof_encryption_key = 'nvmeof-encryption-key' |
1819 | | - cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1') |
1820 | | - cephadm_module.cert_key_store.save_key('grafana_key', grafana_host2_key, host='host2') |
1821 | | - cephadm_module.cert_key_store.save_key('nvmeof_client_key', nvmeof_client_key, service_name='nvmeof.foo') |
1822 | | - cephadm_module.cert_key_store.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo') |
1823 | | - cephadm_module.cert_key_store.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo') |
1824 | | - |
1825 | | - expected_calls = [ |
1826 | | - mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json()})), |
1827 | | - mock.call(f'{CERT_STORE_KEY_PREFIX}grafana_key', json.dumps({'host1': PrivKey(grafana_host1_key).to_json(), |
1828 | | - 'host2': PrivKey(grafana_host2_key).to_json()})), |
1829 | | - mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_client_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()})), |
1830 | | - mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_server_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()})), |
1831 | | - mock.call(f'{CERT_STORE_KEY_PREFIX}nvmeof_encryption_key', json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()})), |
1832 | | - ] |
1833 | | - _set_store.assert_has_calls(expected_calls) |
1834 | | - |
1835 | | - @mock.patch("cephadm.module.CephadmOrchestrator.set_store") |
1836 | | - def test_cert_store_key_ls(self, _set_store, cephadm_module: CephadmOrchestrator): |
1837 | | - cephadm_module.cert_key_store._init_known_cert_key_dicts() |
1838 | | - |
1839 | | - expected_ls = { |
1840 | | - 'grafana_key': False, |
1841 | | - 'mgmt_gw_key': False, |
1842 | | - 'oauth2_proxy_key': False, |
1843 | | - 'cephadm_root_ca_key': False, |
1844 | | - 'iscsi_ssl_key': False, |
1845 | | - 'ingress_ssl_key': False, |
1846 | | - 'nvmeof_client_key': False, |
1847 | | - 'nvmeof_server_key': False, |
1848 | | - 'nvmeof_encryption_key': False, |
1849 | | - } |
1850 | | - assert cephadm_module.cert_key_store.key_ls() == expected_ls |
1851 | | - |
1852 | | - cephadm_module.cert_key_store.save_key('nvmeof_client_key', 'xxx', service_name='nvmeof.foo') |
1853 | | - cephadm_module.cert_key_store.save_key('nvmeof_server_key', 'xxx', service_name='nvmeof.foo') |
1854 | | - cephadm_module.cert_key_store.save_key('nvmeof_encryption_key', 'xxx', service_name='nvmeof.foo') |
1855 | | - expected_ls['nvmeof_server_key'] = {} |
1856 | | - expected_ls['nvmeof_server_key']['nvmeof.foo'] = True |
1857 | | - expected_ls['nvmeof_client_key'] = {} |
1858 | | - expected_ls['nvmeof_client_key']['nvmeof.foo'] = True |
1859 | | - expected_ls['nvmeof_encryption_key'] = {} |
1860 | | - expected_ls['nvmeof_encryption_key']['nvmeof.foo'] = True |
1861 | | - assert cephadm_module.cert_key_store.key_ls() == expected_ls |
1862 | | - |
1863 | | - @mock.patch("cephadm.module.CephadmOrchestrator.get_store_prefix") |
1864 | | - def test_cert_store_load(self, _get_store_prefix, cephadm_module: CephadmOrchestrator): |
1865 | | - cephadm_module.cert_key_store._init_known_cert_key_dicts() |
1866 | | - |
1867 | | - rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' |
1868 | | - grafana_host1_key = 'fake-grafana-host1-cert' |
1869 | | - nvmeof_server_cert = 'nvmeof-server-cert' |
1870 | | - nvmeof_client_cert = 'nvmeof-client-cert' |
1871 | | - nvmeof_root_ca_cert = 'nvmeof-root-ca-cert' |
1872 | | - nvmeof_server_key = 'nvmeof-server-key' |
1873 | | - nvmeof_client_key = 'nvmeof-client-key' |
1874 | | - nvmeof_encryption_key = 'nvmeof-encryption-key' |
1875 | | - |
1876 | | - def _fake_prefix_store(key): |
1877 | | - if key == 'cert_store.cert.': |
1878 | | - return { |
1879 | | - f'{CERT_STORE_CERT_PREFIX}rgw_frontend_ssl_cert': json.dumps({'rgw.foo': Cert(rgw_frontend_rgw_foo_host2_cert, True).to_json()}), |
1880 | | - f'{CERT_STORE_CERT_PREFIX}nvmeof_server_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_server_cert, True).to_json()}), |
1881 | | - f'{CERT_STORE_CERT_PREFIX}nvmeof_client_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_client_cert, True).to_json()}), |
1882 | | - f'{CERT_STORE_CERT_PREFIX}nvmeof_root_ca_cert': json.dumps({'nvmeof.foo': Cert(nvmeof_root_ca_cert, True).to_json()}), |
1883 | | - } |
1884 | | - elif key == 'cert_store.key.': |
1885 | | - return { |
1886 | | - f'{CERT_STORE_KEY_PREFIX}grafana_key': json.dumps({'host1': PrivKey(grafana_host1_key).to_json()}), |
1887 | | - f'{CERT_STORE_KEY_PREFIX}nvmeof_server_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_server_key).to_json()}), |
1888 | | - f'{CERT_STORE_KEY_PREFIX}nvmeof_client_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_client_key).to_json()}), |
1889 | | - f'{CERT_STORE_KEY_PREFIX}nvmeof_encryption_key': json.dumps({'nvmeof.foo': PrivKey(nvmeof_encryption_key).to_json()}), |
1890 | | - } |
1891 | | - else: |
1892 | | - raise Exception(f'Get store with unexpected value {key}') |
1893 | | - |
1894 | | - _get_store_prefix.side_effect = _fake_prefix_store |
1895 | | - cephadm_module.cert_key_store.load() |
1896 | | - assert cephadm_module.cert_key_store.known_certs['rgw_frontend_ssl_cert']['rgw.foo'] == Cert(rgw_frontend_rgw_foo_host2_cert, True) |
1897 | | - assert cephadm_module.cert_key_store.known_certs['nvmeof_server_cert']['nvmeof.foo'] == Cert(nvmeof_server_cert, True) |
1898 | | - assert cephadm_module.cert_key_store.known_certs['nvmeof_client_cert']['nvmeof.foo'] == Cert(nvmeof_client_cert, True) |
1899 | | - assert cephadm_module.cert_key_store.known_certs['nvmeof_root_ca_cert']['nvmeof.foo'] == Cert(nvmeof_root_ca_cert, True) |
1900 | | - assert cephadm_module.cert_key_store.known_keys['grafana_key']['host1'] == PrivKey(grafana_host1_key) |
1901 | | - assert cephadm_module.cert_key_store.known_keys['nvmeof_server_key']['nvmeof.foo'] == PrivKey(nvmeof_server_key) |
1902 | | - assert cephadm_module.cert_key_store.known_keys['nvmeof_client_key']['nvmeof.foo'] == PrivKey(nvmeof_client_key) |
1903 | | - assert cephadm_module.cert_key_store.known_keys['nvmeof_encryption_key']['nvmeof.foo'] == PrivKey(nvmeof_encryption_key) |
1904 | | - |
1905 | | - def test_cert_store_get_cert_key(self, cephadm_module: CephadmOrchestrator): |
1906 | | - cephadm_module.cert_key_store._init_known_cert_key_dicts() |
1907 | | - |
1908 | | - rgw_frontend_rgw_foo_host2_cert = 'fake-rgw-cert' |
1909 | | - nvmeof_client_cert = 'fake-nvmeof-client-cert' |
1910 | | - nvmeof_server_cert = 'fake-nvmeof-server-cert' |
1911 | | - cephadm_module.cert_key_store.save_cert('rgw_frontend_ssl_cert', rgw_frontend_rgw_foo_host2_cert, service_name='rgw.foo', user_made=True) |
1912 | | - cephadm_module.cert_key_store.save_cert('nvmeof_server_cert', nvmeof_server_cert, service_name='nvmeof.foo', user_made=True) |
1913 | | - cephadm_module.cert_key_store.save_cert('nvmeof_client_cert', nvmeof_client_cert, service_name='nvmeof.foo', user_made=True) |
1914 | | - |
1915 | | - assert cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', service_name='rgw.foo') == rgw_frontend_rgw_foo_host2_cert |
1916 | | - assert cephadm_module.cert_key_store.get_cert('nvmeof_server_cert', service_name='nvmeof.foo') == nvmeof_server_cert |
1917 | | - assert cephadm_module.cert_key_store.get_cert('nvmeof_client_cert', service_name='nvmeof.foo') == nvmeof_client_cert |
1918 | | - assert cephadm_module.cert_key_store.get_cert('grafana_cert', host='host1') == '' |
1919 | | - assert cephadm_module.cert_key_store.get_cert('iscsi_ssl_cert', service_name='iscsi.foo') == '' |
1920 | | - assert cephadm_module.cert_key_store.get_cert('nvmeof_root_ca_cert', service_name='nvmeof.foo') == '' |
1921 | | - |
1922 | | - with pytest.raises(OrchestratorError, match='Attempted to access cert for unknown entity'): |
1923 | | - cephadm_module.cert_key_store.get_cert('unknown_entity') |
1924 | | - with pytest.raises(OrchestratorError, match='Need host to access cert for entity'): |
1925 | | - cephadm_module.cert_key_store.get_cert('grafana_cert') |
1926 | | - with pytest.raises(OrchestratorError, match='Need service name to access cert for entity'): |
1927 | | - cephadm_module.cert_key_store.get_cert('rgw_frontend_ssl_cert', host='foo') |
1928 | | - |
1929 | | - grafana_host1_key = 'fake-grafana-host1-cert' |
1930 | | - nvmeof_server_key = 'nvmeof-server-key' |
1931 | | - nvmeof_encryption_key = 'nvmeof-encryption-key' |
1932 | | - cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1') |
1933 | | - cephadm_module.cert_key_store.save_key('grafana_key', grafana_host1_key, host='host1') |
1934 | | - cephadm_module.cert_key_store.save_key('nvmeof_server_key', nvmeof_server_key, service_name='nvmeof.foo') |
1935 | | - cephadm_module.cert_key_store.save_key('nvmeof_encryption_key', nvmeof_encryption_key, service_name='nvmeof.foo') |
1936 | | - |
1937 | | - assert cephadm_module.cert_key_store.get_key('grafana_key', host='host1') == grafana_host1_key |
1938 | | - assert cephadm_module.cert_key_store.get_key('nvmeof_server_key', service_name='nvmeof.foo') == nvmeof_server_key |
1939 | | - assert cephadm_module.cert_key_store.get_key('nvmeof_client_key', service_name='nvmeof.foo') == '' |
1940 | | - assert cephadm_module.cert_key_store.get_key('nvmeof_encryption_key', service_name='nvmeof.foo') == nvmeof_encryption_key |
1941 | | - |
1942 | | - with pytest.raises(OrchestratorError, match='Attempted to access priv key for unknown entity'): |
1943 | | - cephadm_module.cert_key_store.get_key('unknown_entity') |
1944 | | - with pytest.raises(OrchestratorError, match='Need host to access priv key for entity'): |
1945 | | - cephadm_module.cert_key_store.get_key('grafana_key') |
1946 | | - |
1947 | 1742 | @mock.patch("cephadm.serve.CephadmServe._run_cephadm", _run_cephadm('{}')) |
1948 | 1743 | @mock.patch("cephadm.services.nfs.NFSService.run_grace_tool", mock.MagicMock()) |
1949 | 1744 | @mock.patch("cephadm.services.nfs.NFSService.purge", mock.MagicMock()) |
|
0 commit comments