|
19 | 19 | # with Crate these terms will supersede the license and you may use the |
20 | 20 | # software solely pursuant to the terms of the relevant commercial agreement. |
21 | 21 |
|
22 | | -import json |
23 | 22 | import logging |
24 | 23 | from typing import Any, Dict, List, Optional |
25 | 24 |
|
26 | | -from aiohttp.client_exceptions import WSServerHandshakeError |
27 | 25 | from kopf import TemporaryError |
28 | | -from kubernetes_asyncio.client import ApiException, CoreV1Api |
29 | | -from kubernetes_asyncio.stream import WsApiClient |
| 26 | +from kubernetes_asyncio.client import CoreV1Api |
30 | 27 |
|
31 | 28 | from crate.operator.config import config |
32 | 29 | from crate.operator.constants import ( |
|
36 | 33 | SYSTEM_USERNAME, |
37 | 34 | ) |
38 | 35 | from crate.operator.cratedb import create_user, get_connection |
| 36 | +from crate.operator.sql import execute_sql_via_crate_control |
39 | 37 | from crate.operator.utils import crate |
40 | 38 | from crate.operator.utils.k8s_api_client import GlobalApiClient |
41 | 39 | from crate.operator.utils.kopf import StateBasedSubHandler |
@@ -78,121 +76,121 @@ async def bootstrap_system_user( |
78 | 76 | when SSL/TLS is enabled, and encrypted connections aren't possible when |
79 | 77 | no SSL/TLS is configured. |
80 | 78 | """ |
81 | | - scheme = "https" if has_ssl else "http" |
82 | 79 | password = await get_system_user_password(core, namespace, name) |
83 | 80 |
|
84 | | - def get_curl_command(payload: dict) -> List[str]: |
85 | | - return [ |
86 | | - "curl", |
87 | | - "-s", |
88 | | - "-k", |
89 | | - "-X", |
90 | | - "POST", |
91 | | - f"{scheme}://localhost:4200/_sql", |
92 | | - "-H", |
93 | | - "Content-Type: application/json", |
94 | | - "-d", |
95 | | - json.dumps(payload), |
96 | | - "-w", |
97 | | - "\\n", |
98 | | - ] |
99 | | - |
100 | | - command_create_user = get_curl_command( |
101 | | - { |
102 | | - "stmt": 'CREATE USER "{}" WITH (password = $1)'.format(SYSTEM_USERNAME), |
103 | | - "args": [password], |
104 | | - } |
105 | | - ) |
106 | | - command_alter_user = get_curl_command( |
| 81 | + control_host = f"crate-control-{name}.{namespace}.svc.cluster.local" |
| 82 | + bootstrap_token = await resolve_secret_key_ref( |
| 83 | + core, |
| 84 | + namespace, |
107 | 85 | { |
108 | | - "stmt": 'ALTER USER "{}" SET (password = $1)'.format(SYSTEM_USERNAME), |
109 | | - "args": [password], |
110 | | - } |
| 86 | + "key": "token", |
| 87 | + "name": f"crate-control-{name}", |
| 88 | + }, |
111 | 89 | ) |
112 | | - command_grant = get_curl_command( |
113 | | - {"stmt": 'GRANT ALL PRIVILEGES TO "{}" '.format(SYSTEM_USERNAME)} |
| 90 | + logger.info("************************** new handling **************************") |
| 91 | + logger.info( |
| 92 | + "Bootstrapping system user '%s' via sidecar at %s", |
| 93 | + SYSTEM_USERNAME, |
| 94 | + control_host, |
114 | 95 | ) |
| 96 | + logger.info("bootstrap_token: %s", bootstrap_token) |
| 97 | + |
| 98 | + command_create_user: Dict[str, Any] = { |
| 99 | + "stmt": f'CREATE USER "{SYSTEM_USERNAME}" WITH (password = ?)', |
| 100 | + "args": [password], |
| 101 | + } |
| 102 | + |
| 103 | + command_alter_user: Dict[str, Any] = { |
| 104 | + "stmt": f'ALTER USER "{SYSTEM_USERNAME}" SET (password = ?)', |
| 105 | + "args": [password], |
| 106 | + } |
| 107 | + |
| 108 | + # Grant doesn't usually need args since it's all identifiers |
| 109 | + command_grant: Dict[str, Any] = { |
| 110 | + "stmt": f'GRANT ALL PRIVILEGES TO "{SYSTEM_USERNAME}"', |
| 111 | + "args": [], |
| 112 | + } |
| 113 | + |
115 | 114 | exception_logger = logger.exception if config.TESTING else logger.error |
116 | 115 |
|
117 | | - async def pod_exec(cmd): |
118 | | - return await core_ws.connect_get_namespaced_pod_exec( |
| 116 | + needs_update = False |
| 117 | + try: |
| 118 | + logger.info("Trying to create system user ...") |
| 119 | + result = await execute_sql_via_crate_control( |
119 | 120 | namespace=namespace, |
120 | | - name=master_node_pod, |
121 | | - command=cmd, |
122 | | - container="crate", |
123 | | - stderr=True, |
124 | | - stdin=False, |
125 | | - stdout=True, |
126 | | - tty=False, |
| 121 | + name=name, |
| 122 | + sql=command_create_user["stmt"], |
| 123 | + args=command_create_user["args"], |
| 124 | + logger=logger, |
127 | 125 | ) |
| 126 | + except Exception as e: |
| 127 | + # We don't use `logger.exception()` to not accidentally include the |
| 128 | + # password in the log messages which might be part of the string |
| 129 | + # representation of the exception. |
| 130 | + exception_logger("... failed. %s", str(e)) |
| 131 | + raise _temporary_error() |
| 132 | + else: |
| 133 | + logger.info("Create user result: %s", result) |
| 134 | + if "rowcount" in result: |
| 135 | + logger.info("... success") |
| 136 | + elif ( |
| 137 | + "error" in result |
| 138 | + and "RoleAlreadyExistsException" in result["error"]["message"] |
| 139 | + ): |
| 140 | + needs_update = True |
| 141 | + logger.info("... success. Already present") |
| 142 | + else: |
| 143 | + logger.info("... error. %s", result) |
| 144 | + raise _temporary_error() |
128 | 145 |
|
129 | | - needs_update = False |
130 | | - async with WsApiClient() as ws_api_client: |
131 | | - core_ws = CoreV1Api(ws_api_client) |
| 146 | + if needs_update: |
132 | 147 | try: |
133 | | - logger.info("Trying to create system user ...") |
134 | | - result = await pod_exec(command_create_user) |
135 | | - except ApiException as e: |
136 | | - # We don't use `logger.exception()` to not accidentally include the |
137 | | - # password in the log messages which might be part of the string |
138 | | - # representation of the exception. |
139 | | - exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason) |
140 | | - raise _temporary_error() |
141 | | - except WSServerHandshakeError as e: |
| 148 | + logger.info("Trying to update system user password ...") |
| 149 | + result = await execute_sql_via_crate_control( |
| 150 | + namespace=namespace, |
| 151 | + name=name, |
| 152 | + sql=command_alter_user["stmt"], |
| 153 | + args=command_alter_user["args"], |
| 154 | + logger=logger, |
| 155 | + ) |
| 156 | + except Exception as e: |
142 | 157 | # We don't use `logger.exception()` to not accidentally include the |
143 | 158 | # password in the log messages which might be part of the string |
144 | 159 | # representation of the exception. |
145 | | - exception_logger("... failed. Status: %s Message: %s", e.status, e.message) |
| 160 | + exception_logger("... failed: %s", str(e)) |
146 | 161 | raise _temporary_error() |
147 | 162 | else: |
148 | 163 | if "rowcount" in result: |
149 | 164 | logger.info("... success") |
150 | | - elif "AlreadyExistsException" in result: |
151 | | - needs_update = True |
152 | | - logger.info("... success. Already present") |
153 | 165 | else: |
154 | 166 | logger.info("... error. %s", result) |
155 | 167 | raise _temporary_error() |
156 | 168 |
|
157 | | - if needs_update: |
158 | | - try: |
159 | | - logger.info("Trying to update system user password ...") |
160 | | - result = await pod_exec(command_alter_user) |
161 | | - except ApiException as e: |
162 | | - # We don't use `logger.exception()` to not accidentally include the |
163 | | - # password in the log messages which might be part of the string |
164 | | - # representation of the exception. |
165 | | - exception_logger( |
166 | | - "... failed. Status: %s Reason: %s", e.status, e.reason |
167 | | - ) |
168 | | - raise _temporary_error() |
169 | | - except WSServerHandshakeError as e: |
170 | | - # We don't use `logger.exception()` to not accidentally include the |
171 | | - # password in the log messages which might be part of the string |
172 | | - # representation of the exception. |
173 | | - exception_logger( |
174 | | - "... failed. Status: %s Message: %s", e.status, e.message |
175 | | - ) |
176 | | - raise _temporary_error() |
177 | | - else: |
178 | | - if "rowcount" in result: |
179 | | - logger.info("... success") |
180 | | - else: |
181 | | - logger.info("... error. %s", result) |
182 | | - raise _temporary_error() |
183 | | - |
184 | | - try: |
185 | | - logger.info("Trying to grant system user all privileges ...") |
186 | | - result = await pod_exec(command_grant) |
187 | | - except (ApiException, WSServerHandshakeError): |
188 | | - logger.exception("... failed") |
189 | | - raise _temporary_error() |
| 169 | + try: |
| 170 | + logger.info("Trying to grant system user all privileges ...") |
| 171 | + result = await execute_sql_via_crate_control( |
| 172 | + namespace=namespace, |
| 173 | + name=name, |
| 174 | + sql=command_grant["stmt"], |
| 175 | + args=command_grant["args"], |
| 176 | + logger=logger, |
| 177 | + ) |
| 178 | + except Exception: |
| 179 | + logger.exception("... failed") |
| 180 | + raise _temporary_error() |
| 181 | + else: |
| 182 | + if "rowcount" in result: |
| 183 | + logger.info("... success") |
190 | 184 | else: |
191 | | - if "rowcount" in result: |
192 | | - logger.info("... success") |
193 | | - else: |
194 | | - logger.info("... error. %s", result) |
195 | | - raise _temporary_error() |
| 185 | + logger.info("... error. %s", result) |
| 186 | + raise _temporary_error() |
| 187 | + logger.info( |
| 188 | + "************************** new handling finished **************************" |
| 189 | + ) |
| 190 | + |
| 191 | + |
| 192 | +def get_control_service_host(namespace: str, name: str) -> str: |
| 193 | + return f"crate-control-{name}.{namespace}.svc.cluster.local" |
196 | 194 |
|
197 | 195 |
|
198 | 196 | async def bootstrap_gc_admin_user(core: CoreV1Api, namespace: str, name: str): |
|
0 commit comments