Skip to content

Commit 6e2aea9

Browse files
committed
Add OpenShift sidecar support
1 parent ebb40bc commit 6e2aea9

17 files changed

+878
-292
lines changed

crate/operator/bootstrap.py

Lines changed: 93 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,11 @@
1919
# with Crate these terms will supersede the license and you may use the
2020
# software solely pursuant to the terms of the relevant commercial agreement.
2121

22-
import json
2322
import logging
2423
from typing import Any, Dict, List, Optional
2524

26-
from aiohttp.client_exceptions import WSServerHandshakeError
2725
from kopf import TemporaryError
28-
from kubernetes_asyncio.client import ApiException, CoreV1Api
29-
from kubernetes_asyncio.stream import WsApiClient
26+
from kubernetes_asyncio.client import CoreV1Api
3027

3128
from crate.operator.config import config
3229
from crate.operator.constants import (
@@ -36,6 +33,7 @@
3633
SYSTEM_USERNAME,
3734
)
3835
from crate.operator.cratedb import create_user, get_connection
36+
from crate.operator.sql import execute_sql_via_crate_control
3937
from crate.operator.utils import crate
4038
from crate.operator.utils.k8s_api_client import GlobalApiClient
4139
from crate.operator.utils.kopf import StateBasedSubHandler
@@ -78,121 +76,120 @@ async def bootstrap_system_user(
7876
when SSL/TLS is enabled, and encrypted connections aren't possible when
7977
no SSL/TLS is configured.
8078
"""
81-
scheme = "https" if has_ssl else "http"
8279
password = await get_system_user_password(core, namespace, name)
8380

84-
def get_curl_command(payload: dict) -> List[str]:
85-
return [
86-
"curl",
87-
"-s",
88-
"-k",
89-
"-X",
90-
"POST",
91-
f"{scheme}://localhost:4200/_sql",
92-
"-H",
93-
"Content-Type: application/json",
94-
"-d",
95-
json.dumps(payload),
96-
"-w",
97-
"\\n",
98-
]
99-
100-
command_create_user = get_curl_command(
101-
{
102-
"stmt": 'CREATE USER "{}" WITH (password = $1)'.format(SYSTEM_USERNAME),
103-
"args": [password],
104-
}
105-
)
106-
command_alter_user = get_curl_command(
81+
control_host = f"crate-control-{name}.{namespace}.svc.cluster.local"
82+
bootstrap_token = await resolve_secret_key_ref(
83+
core,
84+
namespace,
10785
{
108-
"stmt": 'ALTER USER "{}" SET (password = $1)'.format(SYSTEM_USERNAME),
109-
"args": [password],
110-
}
86+
"key": "token",
87+
"name": f"crate-control-{name}",
88+
},
11189
)
112-
command_grant = get_curl_command(
113-
{"stmt": 'GRANT ALL PRIVILEGES TO "{}" '.format(SYSTEM_USERNAME)}
90+
logger.info("************************** new handling **************************")
91+
logger.info(
92+
"Bootstrapping system user '%s' via sidecar at %s",
93+
SYSTEM_USERNAME,
94+
control_host,
11495
)
96+
logger.info("bootstrap_token: %s", bootstrap_token)
97+
98+
command_create_user: Dict[str, Any] = {
99+
"stmt": f'CREATE USER "{SYSTEM_USERNAME}" WITH (password = ?)',
100+
"args": [password],
101+
}
102+
103+
command_alter_user: Dict[str, Any] = {
104+
"stmt": f'ALTER USER "{SYSTEM_USERNAME}" SET (password = ?)',
105+
"args": [password],
106+
}
107+
108+
command_grant: Dict[str, Any] = {
109+
"stmt": f'GRANT ALL PRIVILEGES TO "{SYSTEM_USERNAME}"',
110+
"args": [],
111+
}
112+
115113
exception_logger = logger.exception if config.TESTING else logger.error
116114

117-
async def pod_exec(cmd):
118-
return await core_ws.connect_get_namespaced_pod_exec(
115+
needs_update = False
116+
try:
117+
logger.info("Trying to create system user ...")
118+
result = await execute_sql_via_crate_control(
119119
namespace=namespace,
120-
name=master_node_pod,
121-
command=cmd,
122-
container="crate",
123-
stderr=True,
124-
stdin=False,
125-
stdout=True,
126-
tty=False,
120+
name=name,
121+
sql=command_create_user["stmt"],
122+
args=command_create_user["args"],
123+
logger=logger,
127124
)
125+
except Exception as e:
126+
# We don't use `logger.exception()` to not accidentally include the
127+
# password in the log messages which might be part of the string
128+
# representation of the exception.
129+
exception_logger("... failed. %s", str(e))
130+
raise _temporary_error()
131+
else:
132+
logger.info("Create user result: %s", result)
133+
if "rowcount" in result:
134+
logger.info("... success")
135+
elif (
136+
"error" in result
137+
and "RoleAlreadyExistsException" in result["error"]["message"]
138+
):
139+
needs_update = True
140+
logger.info("... success. Already present")
141+
else:
142+
logger.info("... error. %s", result)
143+
raise _temporary_error()
128144

129-
needs_update = False
130-
async with WsApiClient() as ws_api_client:
131-
core_ws = CoreV1Api(ws_api_client)
145+
if needs_update:
132146
try:
133-
logger.info("Trying to create system user ...")
134-
result = await pod_exec(command_create_user)
135-
except ApiException as e:
136-
# We don't use `logger.exception()` to not accidentally include the
137-
# password in the log messages which might be part of the string
138-
# representation of the exception.
139-
exception_logger("... failed. Status: %s Reason: %s", e.status, e.reason)
140-
raise _temporary_error()
141-
except WSServerHandshakeError as e:
147+
logger.info("Trying to update system user password ...")
148+
result = await execute_sql_via_crate_control(
149+
namespace=namespace,
150+
name=name,
151+
sql=command_alter_user["stmt"],
152+
args=command_alter_user["args"],
153+
logger=logger,
154+
)
155+
except Exception as e:
142156
# We don't use `logger.exception()` to not accidentally include the
143157
# password in the log messages which might be part of the string
144158
# representation of the exception.
145-
exception_logger("... failed. Status: %s Message: %s", e.status, e.message)
159+
exception_logger("... failed: %s", str(e))
146160
raise _temporary_error()
147161
else:
148162
if "rowcount" in result:
149163
logger.info("... success")
150-
elif "AlreadyExistsException" in result:
151-
needs_update = True
152-
logger.info("... success. Already present")
153164
else:
154165
logger.info("... error. %s", result)
155166
raise _temporary_error()
156167

157-
if needs_update:
158-
try:
159-
logger.info("Trying to update system user password ...")
160-
result = await pod_exec(command_alter_user)
161-
except ApiException as e:
162-
# We don't use `logger.exception()` to not accidentally include the
163-
# password in the log messages which might be part of the string
164-
# representation of the exception.
165-
exception_logger(
166-
"... failed. Status: %s Reason: %s", e.status, e.reason
167-
)
168-
raise _temporary_error()
169-
except WSServerHandshakeError as e:
170-
# We don't use `logger.exception()` to not accidentally include the
171-
# password in the log messages which might be part of the string
172-
# representation of the exception.
173-
exception_logger(
174-
"... failed. Status: %s Message: %s", e.status, e.message
175-
)
176-
raise _temporary_error()
177-
else:
178-
if "rowcount" in result:
179-
logger.info("... success")
180-
else:
181-
logger.info("... error. %s", result)
182-
raise _temporary_error()
183-
184-
try:
185-
logger.info("Trying to grant system user all privileges ...")
186-
result = await pod_exec(command_grant)
187-
except (ApiException, WSServerHandshakeError):
188-
logger.exception("... failed")
189-
raise _temporary_error()
168+
try:
169+
logger.info("Trying to grant system user all privileges ...")
170+
result = await execute_sql_via_crate_control(
171+
namespace=namespace,
172+
name=name,
173+
sql=command_grant["stmt"],
174+
args=command_grant["args"],
175+
logger=logger,
176+
)
177+
except Exception:
178+
logger.exception("... failed")
179+
raise _temporary_error()
180+
else:
181+
if "rowcount" in result:
182+
logger.info("... success")
190183
else:
191-
if "rowcount" in result:
192-
logger.info("... success")
193-
else:
194-
logger.info("... error. %s", result)
195-
raise _temporary_error()
184+
logger.info("... error. %s", result)
185+
raise _temporary_error()
186+
logger.info(
187+
"************************** new handling finished **************************"
188+
)
189+
190+
191+
def get_control_service_host(namespace: str, name: str) -> str:
192+
return f"crate-control-{name}.{namespace}.svc.cluster.local"
196193

197194

198195
async def bootstrap_gc_admin_user(core: CoreV1Api, namespace: str, name: str):

crate/operator/config.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@ class Config:
6262
#: The Docker image that contains scripts to run cluster backups.
6363
CLUSTER_BACKUP_IMAGE: Optional[str] = None
6464

65+
#: The Docker image that contains CrateControl sidecar.
66+
CRATE_CONTROL_IMAGE: Optional[str] = None
67+
6568
#: The volume size for the ``PersistentVolume`` that is used as a storage
6669
#: location for Java heap dumps.
6770
DEBUG_VOLUME_SIZE: bitmath.Byte = bitmath.GiB(64)
@@ -233,6 +236,10 @@ def load(self):
233236
"CLUSTER_BACKUP_IMAGE", default=self.CLUSTER_BACKUP_IMAGE
234237
)
235238

239+
self.CRATE_CONTROL_IMAGE = self.env(
240+
"CRATE_CONTROL_IMAGE", default=self.CRATE_CONTROL_IMAGE
241+
)
242+
236243
debug_volume_size = self.env(
237244
"DEBUG_VOLUME_SIZE", default=str(self.DEBUG_VOLUME_SIZE)
238245
)

0 commit comments

Comments
 (0)