Skip to content

Commit 46ffa55

Browse files
committed
Merge branch 'main' into package-image
2 parents dcc8a70 + dab1010 commit 46ffa55

File tree

164 files changed

+4274
-1388
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

164 files changed

+4274
-1388
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 167 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
import typing
1010
import uuid
1111
from enum import auto
12-
from typing import List, Optional, Tuple
12+
from typing import Dict, List, Optional, Tuple
1313

1414
import yaml
1515
from clp_py_utils.clp_config import (
1616
CLP_DEFAULT_CREDENTIALS_FILE_PATH,
17+
CLP_SHARED_CONFIG_FILENAME,
1718
CLPConfig,
1819
DB_COMPONENT_NAME,
20+
QueryEngine,
1921
QUEUE_COMPONENT_NAME,
2022
REDIS_COMPONENT_NAME,
2123
REDUCER_COMPONENT_NAME,
@@ -94,6 +96,14 @@ def __init__(self, clp_home: pathlib.Path, docker_clp_home: pathlib.Path):
9496
self.archives_output_dir: typing.Optional[DockerMount] = None
9597
self.stream_output_dir: typing.Optional[DockerMount] = None
9698
self.aws_config_dir: typing.Optional[DockerMount] = None
99+
self.generated_config_file: typing.Optional[DockerMount] = None
100+
101+
102+
def _validate_data_directory(data_dir: pathlib.Path, component_name: str) -> None:
103+
try:
104+
validate_path_could_be_dir(data_dir)
105+
except ValueError as ex:
106+
raise ValueError(f"{component_name} data directory is invalid: {ex}")
97107

98108

99109
def get_clp_home():
@@ -175,6 +185,13 @@ def is_container_exited(container_name):
175185
return False
176186

177187

188+
def validate_log_directory(logs_dir: pathlib.Path, component_name: str) -> None:
189+
try:
190+
validate_path_could_be_dir(logs_dir)
191+
except ValueError as ex:
192+
raise ValueError(f"{component_name} logs directory is invalid: {ex}")
193+
194+
178195
def validate_port(port_name: str, hostname: str, port: int):
179196
try:
180197
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
@@ -270,6 +287,18 @@ def generate_container_config(
270287
container_clp_config.stream_output.get_directory(),
271288
)
272289

290+
if not is_path_already_mounted(
291+
clp_home,
292+
CONTAINER_CLP_HOME,
293+
clp_config.get_shared_config_file_path(),
294+
container_clp_config.get_shared_config_file_path(),
295+
):
296+
docker_mounts.generated_config_file = DockerMount(
297+
DockerMountType.BIND,
298+
clp_config.get_shared_config_file_path(),
299+
container_clp_config.get_shared_config_file_path(),
300+
)
301+
273302
# Only create the mount if the directory exists
274303
if clp_config.aws_config_directory is not None:
275304
container_clp_config.aws_config_directory = CONTAINER_AWS_CONFIG_DIRECTORY
@@ -293,33 +322,57 @@ def generate_worker_config(clp_config: CLPConfig) -> WorkerConfig:
293322
return worker_config
294323

295324

325+
def get_container_config_filename(container_name: str) -> str:
326+
return f".{container_name}-config.yml"
327+
328+
296329
def dump_container_config(
297-
container_clp_config: CLPConfig, clp_config: CLPConfig, container_name: str
298-
) -> Tuple[pathlib.Path, pathlib.Path]:
330+
container_clp_config: CLPConfig, clp_config: CLPConfig, config_filename: str
331+
):
299332
"""
300-
Writes the given config to the logs directory so that it's accessible in the container.
333+
Writes the given container config to the logs directory, so that it's accessible in the
334+
container.
335+
301336
:param container_clp_config: The config to write.
302337
:param clp_config: The corresponding config on the host (used to determine the logs directory).
303-
:param container_name:
338+
:param config_filename:
304339
:return: The path to the config file in the container and on the host.
305340
"""
306-
container_config_filename = f".{container_name}-config.yml"
307-
config_file_path_on_host = clp_config.logs_directory / container_config_filename
308-
config_file_path_on_container = container_clp_config.logs_directory / container_config_filename
341+
config_file_path_on_host = clp_config.logs_directory / config_filename
342+
config_file_path_on_container = container_clp_config.logs_directory / config_filename
309343
with open(config_file_path_on_host, "w") as f:
310344
yaml.safe_dump(container_clp_config.dump_to_primitive_dict(), f)
311345

312346
return config_file_path_on_container, config_file_path_on_host
313347

314348

349+
def dump_shared_container_config(
350+
container_clp_config: CLPConfig, clp_config: CLPConfig
351+
) -> Tuple[pathlib.Path, pathlib.Path]:
352+
"""
353+
Dumps the given container config to `CLP_SHARED_CONFIG_FILENAME` in the logs directory, so that
354+
it's accessible in the container.
355+
356+
:param container_clp_config:
357+
:param clp_config:
358+
"""
359+
return dump_container_config(container_clp_config, clp_config, CLP_SHARED_CONFIG_FILENAME)
360+
361+
315362
def generate_container_start_cmd(
316-
container_name: str, container_mounts: List[Optional[DockerMount]], container_image: str
363+
container_name: str,
364+
container_mounts: List[Optional[DockerMount]],
365+
container_image: str,
366+
extra_env_vars: Optional[Dict[str, str]] = None,
317367
) -> List[str]:
318368
"""
319-
Generates the command to start a container with the given mounts and name.
369+
Generates the command to start a container with the given mounts, environment variables, and
370+
name.
371+
320372
:param container_name:
321373
:param container_mounts:
322374
:param container_image:
375+
:param extra_env_vars: Environment variables to set on top of the predefined ones.
323376
:return: The command.
324377
"""
325378
clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
@@ -335,6 +388,12 @@ def generate_container_start_cmd(
335388
"--name", container_name,
336389
"--log-driver", "local"
337390
]
391+
env_vars = {
392+
"PYTHONPATH": clp_site_packages_dir,
393+
**(extra_env_vars if extra_env_vars is not None else {}),
394+
}
395+
for key, value in env_vars.items():
396+
container_start_cmd.extend(["-e", f"{key}={value}"])
338397
for mount in container_mounts:
339398
if mount:
340399
container_start_cmd.append("--mount")
@@ -413,58 +472,41 @@ def validate_and_load_db_credentials_file(
413472
clp_config: CLPConfig, clp_home: pathlib.Path, generate_default_file: bool
414473
):
415474
validate_credentials_file_path(clp_config, clp_home, generate_default_file)
416-
clp_config.load_database_credentials_from_file()
475+
clp_config.database.load_credentials_from_file(clp_config.credentials_file_path)
417476

418477

419478
def validate_and_load_queue_credentials_file(
420479
clp_config: CLPConfig, clp_home: pathlib.Path, generate_default_file: bool
421480
):
422481
validate_credentials_file_path(clp_config, clp_home, generate_default_file)
423-
clp_config.load_queue_credentials_from_file()
482+
clp_config.queue.load_credentials_from_file(clp_config.credentials_file_path)
424483

425484

426485
def validate_and_load_redis_credentials_file(
427486
clp_config: CLPConfig, clp_home: pathlib.Path, generate_default_file: bool
428487
):
429488
validate_credentials_file_path(clp_config, clp_home, generate_default_file)
430-
clp_config.load_redis_credentials_from_file()
489+
clp_config.redis.load_credentials_from_file(clp_config.credentials_file_path)
431490

432491

433492
def validate_db_config(clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path):
434-
try:
435-
validate_path_could_be_dir(data_dir)
436-
except ValueError as ex:
437-
raise ValueError(f"{DB_COMPONENT_NAME} data directory is invalid: {ex}")
438-
439-
try:
440-
validate_path_could_be_dir(logs_dir)
441-
except ValueError as ex:
442-
raise ValueError(f"{DB_COMPONENT_NAME} logs directory is invalid: {ex}")
493+
_validate_data_directory(data_dir, DB_COMPONENT_NAME)
494+
validate_log_directory(logs_dir, DB_COMPONENT_NAME)
443495

444496
validate_port(f"{DB_COMPONENT_NAME}.port", clp_config.database.host, clp_config.database.port)
445497

446498

447499
def validate_queue_config(clp_config: CLPConfig, logs_dir: pathlib.Path):
448-
try:
449-
validate_path_could_be_dir(logs_dir)
450-
except ValueError as ex:
451-
raise ValueError(f"{QUEUE_COMPONENT_NAME} logs directory is invalid: {ex}")
500+
validate_log_directory(logs_dir, QUEUE_COMPONENT_NAME)
452501

453502
validate_port(f"{QUEUE_COMPONENT_NAME}.port", clp_config.queue.host, clp_config.queue.port)
454503

455504

456505
def validate_redis_config(
457506
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path, base_config: pathlib.Path
458507
):
459-
try:
460-
validate_path_could_be_dir(data_dir)
461-
except ValueError as ex:
462-
raise ValueError(f"{REDIS_COMPONENT_NAME} data directory is invalid {ex}")
463-
464-
try:
465-
validate_path_could_be_dir(logs_dir)
466-
except ValueError as ex:
467-
raise ValueError(f"{REDIS_COMPONENT_NAME} logs directory is invalid: {ex}")
508+
_validate_data_directory(data_dir, REDIS_COMPONENT_NAME)
509+
validate_log_directory(logs_dir, REDIS_COMPONENT_NAME)
468510

469511
if not base_config.exists():
470512
raise ValueError(
@@ -475,10 +517,7 @@ def validate_redis_config(
475517

476518

477519
def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_workers: int):
478-
try:
479-
validate_path_could_be_dir(logs_dir)
480-
except ValueError as ex:
481-
raise ValueError(f"{REDUCER_COMPONENT_NAME} logs directory is invalid: {ex}")
520+
validate_log_directory(logs_dir, REDUCER_COMPONENT_NAME)
482521

483522
for i in range(0, num_workers):
484523
validate_port(
@@ -491,15 +530,8 @@ def validate_reducer_config(clp_config: CLPConfig, logs_dir: pathlib.Path, num_w
491530
def validate_results_cache_config(
492531
clp_config: CLPConfig, data_dir: pathlib.Path, logs_dir: pathlib.Path
493532
):
494-
try:
495-
validate_path_could_be_dir(data_dir)
496-
except ValueError as ex:
497-
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} data directory is invalid: {ex}")
498-
499-
try:
500-
validate_path_could_be_dir(logs_dir)
501-
except ValueError as ex:
502-
raise ValueError(f"{RESULTS_CACHE_COMPONENT_NAME} logs directory is invalid: {ex}")
533+
_validate_data_directory(data_dir, RESULTS_CACHE_COMPONENT_NAME)
534+
validate_log_directory(logs_dir, RESULTS_CACHE_COMPONENT_NAME)
503535

504536
validate_port(
505537
f"{RESULTS_CACHE_COMPONENT_NAME}.port",
@@ -508,8 +540,11 @@ def validate_results_cache_config(
508540
)
509541

510542

511-
def validate_worker_config(clp_config: CLPConfig):
543+
def validate_logs_input_config(clp_config: CLPConfig) -> None:
512544
clp_config.validate_logs_input_config()
545+
546+
547+
def validate_output_storage_config(clp_config: CLPConfig) -> None:
513548
clp_config.validate_archive_output_config()
514549
clp_config.validate_stream_output_config()
515550

@@ -590,3 +625,86 @@ def validate_dataset_name(clp_table_prefix: str, dataset_name: str) -> None:
590625
f"Invalid dataset name: `{dataset_name}`. Names can only be a maximum of"
591626
f" {dataset_name_max_len} characters long."
592627
)
628+
629+
630+
def validate_retention_config(clp_config: CLPConfig) -> None:
631+
clp_query_engine = clp_config.package.query_engine
632+
if is_retention_period_configured(clp_config) and clp_query_engine == QueryEngine.PRESTO:
633+
raise ValueError(
634+
f"Retention control is not supported with query_engine `{clp_query_engine}`"
635+
)
636+
637+
638+
def is_retention_period_configured(clp_config: CLPConfig) -> bool:
639+
if clp_config.archive_output.retention_period is not None:
640+
return True
641+
642+
if clp_config.results_cache.retention_period is not None:
643+
return True
644+
645+
return False
646+
647+
648+
def get_common_env_vars_list(
649+
include_clp_home_env_var=True,
650+
) -> List[str]:
651+
"""
652+
:param include_clp_home_env_var:
653+
:return: A list of common environment variables for Docker containers, in the format
654+
"KEY=VALUE".
655+
"""
656+
clp_site_packages_dir = CONTAINER_CLP_HOME / "lib" / "python3" / "site-packages"
657+
env_vars = [f"PYTHONPATH={clp_site_packages_dir}"]
658+
659+
if include_clp_home_env_var:
660+
env_vars.append(f"CLP_HOME={CONTAINER_CLP_HOME}")
661+
662+
return env_vars
663+
664+
665+
def get_credential_env_vars_list(
666+
container_clp_config: CLPConfig,
667+
include_db_credentials=False,
668+
include_queue_credentials=False,
669+
include_redis_credentials=False,
670+
) -> List[str]:
671+
"""
672+
:param container_clp_config:
673+
:param include_db_credentials:
674+
:param include_queue_credentials:
675+
:param include_redis_credentials:
676+
:return: A list of credential environment variables for Docker containers, in the format
677+
"KEY=VALUE".
678+
"""
679+
env_vars = []
680+
681+
if include_db_credentials:
682+
env_vars.append(f"CLP_DB_USER={container_clp_config.database.username}")
683+
env_vars.append(f"CLP_DB_PASS={container_clp_config.database.password}")
684+
685+
if include_queue_credentials:
686+
env_vars.append(f"CLP_QUEUE_USER={container_clp_config.queue.username}")
687+
env_vars.append(f"CLP_QUEUE_PASS={container_clp_config.queue.password}")
688+
689+
if include_redis_credentials:
690+
env_vars.append(f"CLP_REDIS_PASS={container_clp_config.redis.password}")
691+
692+
return env_vars
693+
694+
695+
def get_celery_connection_env_vars_list(container_clp_config: CLPConfig) -> List[str]:
696+
"""
697+
:param container_clp_config:
698+
:return: A list of Celery connection environment variables for Docker containers, in the format
699+
"KEY=VALUE".
700+
"""
701+
env_vars = [
702+
f"BROKER_URL=amqp://"
703+
f"{container_clp_config.queue.username}:{container_clp_config.queue.password}@"
704+
f"{container_clp_config.queue.host}:{container_clp_config.queue.port}",
705+
f"RESULT_BACKEND=redis://default:{container_clp_config.redis.password}@"
706+
f"{container_clp_config.redis.host}:{container_clp_config.redis.port}/"
707+
f"{container_clp_config.redis.query_backend_database}",
708+
]
709+
710+
return env_vars

0 commit comments

Comments
 (0)