Skip to content

Commit 2ac456e

Browse files
feat(clp-package)!: Add compress-from-s3 Python and shell scripts for S3-object and S3-key-prefix compression; Restrict compress.py to accept local file paths only. (#1476)
Co-authored-by: LinZhihao-723 <[email protected]>
1 parent 6c6de99 commit 2ac456e

File tree

5 files changed

+519
-85
lines changed

5 files changed

+519
-85
lines changed

components/clp-package-utils/clp_package_utils/general.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,9 @@
4949

5050
DOCKER_MOUNT_TYPE_STRINGS = ["bind"]
5151

52+
S3_KEY_PREFIX_COMPRESSION = "s3-key-prefix"
53+
S3_OBJECT_COMPRESSION = "s3-object"
54+
5255

5356
class DockerDependencyError(OSError):
5457
"""Base class for errors related to Docker dependencies."""

components/clp-package-utils/clp_package_utils/scripts/compress.py

Lines changed: 51 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH,
1414
CLP_DEFAULT_DATASET_NAME,
1515
StorageEngine,
16+
StorageType,
1617
)
17-
from job_orchestration.scheduler.job_config import InputType
1818

1919
from clp_package_utils.general import (
2020
CONTAINER_INPUT_LOGS_ROOT_DIR,
@@ -30,43 +30,45 @@
3030
validate_dataset_name,
3131
)
3232

33-
logger = logging.getLogger(__file__)
33+
logger = logging.getLogger(__name__)
3434

3535

3636
def _generate_logs_list(
37-
input_type: InputType,
3837
container_logs_list_path: pathlib.Path,
3938
parsed_args: argparse.Namespace,
40-
) -> None:
41-
if InputType.FS == input_type:
42-
host_logs_list_path = parsed_args.path_list
43-
with open(container_logs_list_path, "w") as container_logs_list_file:
44-
if host_logs_list_path is not None:
45-
with open(host_logs_list_path, "r") as host_logs_list_file:
46-
for line in host_logs_list_file:
47-
stripped_path_str = line.rstrip()
48-
if "" == stripped_path_str:
49-
# Skip empty paths
50-
continue
51-
resolved_path = pathlib.Path(stripped_path_str).resolve()
52-
mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
53-
resolved_path.anchor
54-
)
55-
container_logs_list_file.write(f"{mounted_path}\n")
56-
39+
) -> bool:
40+
"""
41+
Generates logs list file for the native compression script.
42+
43+
:param container_logs_list_path: Path to write logs list.
44+
:param parsed_args: Parsed command-line arguments.
45+
:return: Whether any paths were written to the logs list.
46+
"""
47+
host_logs_list_path = parsed_args.path_list
48+
with open(container_logs_list_path, "w") as container_logs_list_file:
49+
if host_logs_list_path is None:
5750
for path in parsed_args.paths:
5851
resolved_path = pathlib.Path(path).resolve()
5952
mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
6053
resolved_path.anchor
6154
)
6255
container_logs_list_file.write(f"{mounted_path}\n")
63-
64-
elif InputType.S3 == input_type:
65-
with open(container_logs_list_path, "w") as container_logs_list_file:
66-
container_logs_list_file.write(f"{parsed_args.paths[0]}\n")
67-
68-
else:
69-
raise ValueError(f"Unsupported input type: {input_type}.")
56+
return len(parsed_args.paths) != 0
57+
58+
no_path_found = True
59+
with open(host_logs_list_path, "r") as host_logs_list_file:
60+
for line in host_logs_list_file:
61+
stripped_path_str = line.rstrip()
62+
if "" == stripped_path_str:
63+
# Skip empty paths
64+
continue
65+
no_path_found = False
66+
resolved_path = pathlib.Path(stripped_path_str).resolve()
67+
mounted_path = CONTAINER_INPUT_LOGS_ROOT_DIR / resolved_path.relative_to(
68+
resolved_path.anchor
69+
)
70+
container_logs_list_file.write(f"{mounted_path}\n")
71+
return not no_path_found
7072

7173

7274
def _generate_compress_cmd(
@@ -81,6 +83,7 @@ def _generate_compress_cmd(
8183
"python3",
8284
"-m", "clp_package_utils.scripts.native.compress",
8385
"--config", str(config_path),
86+
"--input-type", "fs",
8487
]
8588
# fmt: on
8689
if parsed_args.verbose:
@@ -116,22 +119,6 @@ def _validate_fs_input_args(
116119
args_parser.error("Paths cannot be specified on the command line AND through a file.")
117120

118121

119-
def _validate_s3_input_args(
120-
parsed_args: argparse.Namespace,
121-
args_parser: argparse.ArgumentParser,
122-
storage_engine: StorageEngine,
123-
) -> None:
124-
if StorageEngine.CLP_S != storage_engine:
125-
args_parser.error(
126-
f"Input type {InputType.S3} is only supported for the storage engine"
127-
f" {StorageEngine.CLP_S}."
128-
)
129-
if len(parsed_args.paths) != 1:
130-
args_parser.error(f"Only one URL can be specified for input type {InputType.S3}.")
131-
if parsed_args.path_list is not None:
132-
args_parser.error(f"Path list file is unsupported for input type {InputType.S3}.")
133-
134-
135122
def main(argv):
136123
clp_home = get_clp_home()
137124
default_config_file_path = clp_home / CLP_DEFAULT_CONFIG_FILE_RELATIVE_PATH
@@ -167,9 +154,7 @@ def main(argv):
167154
args_parser.add_argument(
168155
"--no-progress-reporting", action="store_true", help="Disables progress reporting."
169156
)
170-
args_parser.add_argument(
171-
"paths", metavar="PATH", nargs="*", help="Paths or an S3 URL to compress."
172-
)
157+
args_parser.add_argument("paths", metavar="PATH", nargs="*", help="Paths to compress.")
173158
args_parser.add_argument(
174159
"-f", "--path-list", dest="path_list", help="A file listing all paths to compress."
175160
)
@@ -188,10 +173,20 @@ def main(argv):
188173

189174
# Validate and load necessary credentials
190175
validate_and_load_db_credentials_file(clp_config, clp_home, False)
191-
except:
176+
except Exception:
192177
logger.exception("Failed to load config.")
193178
return -1
194179

180+
# Validate logs_input type is FS
181+
if clp_config.logs_input.type != StorageType.FS:
182+
logger.error(
183+
"Filesystem compression expects `logs_input.type` to be `%s`, but `%s` is found. For S3"
184+
" compression, use `compress-from-s3.sh` instead.",
185+
StorageType.FS,
186+
clp_config.logs_input.type,
187+
)
188+
return -1
189+
195190
storage_engine: StorageEngine = clp_config.package.storage_engine
196191
dataset = parsed_args.dataset
197192
if StorageEngine.CLP_S == storage_engine:
@@ -212,13 +207,8 @@ def main(argv):
212207
logger.error(f"Dataset selection is not supported for storage engine: {storage_engine}.")
213208
return -1
214209

215-
input_type = clp_config.logs_input.type
216-
if InputType.FS == input_type:
217-
_validate_fs_input_args(parsed_args, args_parser)
218-
elif InputType.S3 == input_type:
219-
_validate_s3_input_args(parsed_args, args_parser, storage_engine)
220-
else:
221-
raise ValueError(f"Unsupported input type: {input_type}.")
210+
# Validate filesystem input arguments
211+
_validate_fs_input_args(parsed_args, args_parser)
222212

223213
container_name = generate_container_name(str(JobType.COMPRESSION))
224214

@@ -227,9 +217,7 @@ def main(argv):
227217
container_clp_config, clp_config, get_container_config_filename(container_name)
228218
)
229219

230-
necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir]
231-
if InputType.FS == input_type:
232-
necessary_mounts.append(mounts.input_logs_dir)
220+
necessary_mounts = [mounts.clp_home, mounts.data_dir, mounts.logs_dir, mounts.input_logs_dir]
233221

234222
# Write compression logs to a file
235223
while True:
@@ -242,7 +230,9 @@ def main(argv):
242230
if not container_logs_list_path.exists():
243231
break
244232

245-
_generate_logs_list(clp_config.logs_input.type, container_logs_list_path, parsed_args)
233+
if not _generate_logs_list(container_logs_list_path, parsed_args):
234+
logger.error("No filesystem paths given for compression.")
235+
return -1
246236

247237
extra_env_vars = {
248238
CLP_DB_USER_ENV_VAR_NAME: clp_config.database.username,
@@ -259,13 +249,13 @@ def main(argv):
259249

260250
proc = subprocess.run(cmd)
261251
ret_code = proc.returncode
262-
if 0 != ret_code:
252+
if ret_code != 0:
263253
logger.error("Compression failed.")
264254
logger.debug(f"Docker command failed: {shlex.join(cmd)}")
255+
else:
256+
container_logs_list_path.unlink()
265257

266-
# Remove generated files
267258
generated_config_path_on_host.unlink()
268-
269259
return ret_code
270260

271261

0 commit comments

Comments
 (0)