Skip to content

Commit 33f379d

Browse files
authored
Merge pull request #416 from E3SM-Project/issue-374-refactor-tar-deletion
Refactor to resolve tars not deleting when `--non-blocking` is set
2 parents c14b8ee + d57c311 commit 33f379d

File tree

10 files changed

+1155
-578
lines changed

10 files changed

+1155
-578
lines changed

tests/integration/bash_tests/run_from_any/test_globus_tar_deletion.bash

Lines changed: 120 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -137,16 +137,17 @@ test_globus_tar_deletion()
137137
keep_flag=""
138138
fi
139139

140-
zstash create ${blocking_flag} ${keep_flag} --hpss=${globus_path}/${case_name} --maxsize 128 zstash_demo 2>&1 | tee ${case_name}.log
140+
# Use -v so debug logs show up.
141+
zstash create ${blocking_flag} ${keep_flag} --hpss=${globus_path}/${case_name} --maxsize 128 -v zstash_demo 2>&1 | tee ${case_name}.log
141142
if [ $? != 0 ]; then
142-
echo "${case_name} failed. Check ${case_name}_create.log for details. Cannot continue."
143+
echo "${case_name} failed. Check ${case_name}.log for details. Cannot continue."
143144
return 1
144145
fi
145146
echo "${case_name} completed successfully. Checking ${case_name}.log now."
146147
check_log_has "Creating new tar archive 000000.tar" ${case_name}.log || return 2
147148

148149
echo ""
149-
echo "Checking directory status after 'zstash create' has completed. src should only have index.db. dst should have tar and index.db."
150+
echo "Checking directory status after 'zstash create' has completed."
150151
echo "Checking logs in current directory: ${PWD}"
151152

152153
echo ""
@@ -181,6 +182,98 @@ test_globus_tar_deletion()
181182
return 0 # Success
182183
}
183184

185+
test_globus_progressive_deletion()
186+
{
187+
local path_to_repo=$1
188+
local dst_endpoint=$2
189+
local dst_dir=$3
190+
local blocking_str=$4
191+
192+
src_dir=${path_to_repo}/tests/utils/globus_tar_deletion
193+
rm -rf ${src_dir}
194+
mkdir -p ${src_dir}
195+
dst_endpoint_uuid=$(get_endpoint ${dst_endpoint})
196+
globus_path=globus://${dst_endpoint_uuid}/${dst_dir}
197+
198+
case_name=${blocking_str}_progressive_deletion
199+
echo "Running test_globus_progressive_deletion on case=${case_name}"
200+
echo "Exit codes: 0 -- success, 1 -- zstash failed, 2 -- grep check failed"
201+
202+
setup ${case_name} "${src_dir}"
203+
204+
# Create files totaling >2 GB to trigger multiple tars with maxsize=1 GB
205+
# Each file is ~700 MB, so we'll get 3 tars
206+
echo "Creating large test files (this may take a minute)..."
207+
dd if=/dev/zero of=zstash_demo/file1.dat bs=1M count=700 2>/dev/null # 700 MB
208+
dd if=/dev/zero of=zstash_demo/file2.dat bs=1M count=700 2>/dev/null # 700 MB
209+
dd if=/dev/zero of=zstash_demo/file3.dat bs=1M count=700 2>/dev/null # 700 MB
210+
echo "✓ Test files created"
211+
212+
if [ "$blocking_str" == "non-blocking" ]; then
213+
blocking_flag="--non-blocking"
214+
else
215+
blocking_flag=""
216+
fi
217+
218+
# Run with maxsize=1 GB to create multiple tars
219+
echo "Running zstash create (this may take several minutes due to file size and transfers)..."
220+
zstash create ${blocking_flag} --hpss=${globus_path}/${case_name} --maxsize 1 -v zstash_demo 2>&1 | tee ${case_name}.log
221+
if [ $? != 0 ]; then
222+
echo "${case_name} failed."
223+
return 1
224+
fi
225+
226+
# Check that multiple tar files were created
227+
tar_count=$(grep -c "Creating new tar archive" ${case_name}.log)
228+
if [ ${tar_count} -lt 2 ]; then
229+
echo "Expected at least 2 tar archives to be created, found ${tar_count}"
230+
return 2
231+
fi
232+
echo "✓ Created ${tar_count} tar archives"
233+
234+
# Check that files were deleted progressively
235+
deletion_count=$(grep -c "Deleting .* files from successful transfer" ${case_name}.log)
236+
237+
if [ "$blocking_str" == "blocking" ]; then
238+
# In blocking mode, we should see deletion after each tar transfer
239+
if [ ${deletion_count} -lt $((tar_count - 1)) ]; then
240+
echo "Expected at least $((tar_count - 1)) deletion events in blocking mode, found ${deletion_count}"
241+
return 2
242+
fi
243+
echo "✓ Files deleted progressively (${deletion_count} deletion events)"
244+
else
245+
# In non-blocking mode, deletions happen when we check status
246+
if [ ${deletion_count} -lt 1 ]; then
247+
echo "Expected at least 1 deletion event in non-blocking mode, found ${deletion_count}"
248+
return 2
249+
fi
250+
echo "✓ Files deleted (${deletion_count} deletion events in non-blocking mode)"
251+
fi
252+
253+
# Verify that NO tar files remain in source after completion
254+
echo "Checking that no tar files remain in source"
255+
ls ${src_dir}/${case_name}/zstash_demo/zstash/*.tar 2>&1 | tee ls_tar_check.log
256+
if grep -q "\.tar" ls_tar_check.log && ! grep -q "No such file" ls_tar_check.log; then
257+
echo "Found tar files that should have been deleted!"
258+
return 2
259+
fi
260+
echo "✓ All tar files successfully deleted from source"
261+
262+
# Verify tar files exist in destination
263+
if [ "$blocking_str" == "non-blocking" ]; then
264+
wait_for_directory "${dst_dir}/${case_name}" || return 1
265+
fi
266+
267+
dst_tar_count=$(ls ${dst_dir}/${case_name}/*.tar 2>/dev/null | wc -l)
268+
if [ ${dst_tar_count} -ne ${tar_count} ]; then
269+
echo "Expected ${tar_count} tar files in destination, found ${dst_tar_count}"
270+
return 2
271+
fi
272+
echo "✓ All ${tar_count} tar files present in destination"
273+
274+
return 0
275+
}
276+
184277
# Follow these directions #####################################################
185278

186279
# Example usage:
@@ -232,7 +325,14 @@ run_test_with_tracking() {
232325
echo "Running: ${test_name}"
233326
echo "=========================================="
234327

235-
if test_globus_tar_deletion "${args[@]}"; then
328+
# Determine which test function to call based on test name
329+
if [[ "${test_name}" == *"progressive"* ]]; then
330+
test_func=test_globus_progressive_deletion
331+
else
332+
test_func=test_globus_tar_deletion
333+
fi
334+
335+
if ${test_func} "${args[@]}"; then
236336
# Print test result in the output block AND at the end
237337
echo "${test_name} PASSED"
238338
test_results+=("${test_name} PASSED") # Uses Global variable
@@ -252,15 +352,29 @@ tests_passed=0
252352
tests_failed=0
253353
test_results=() # Global variable to hold test results
254354

255-
echo "Primary tests: single authentication code tests for each endpoint"
355+
echo "Primary tests: basic functionality tests"
256356
echo "If a test hangs, check if https://app.globus.org/activity reports any errors on your transfers."
257357

258-
# Run all tests independently
358+
# Run basic tests
359+
# These check that AT THE END of the run,
360+
# we either still have the files (keep) or the files are deleted (non-keep).
259361
run_test_with_tracking "blocking_non-keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" "non-keep" || true
260362
run_test_with_tracking "non-blocking_non-keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" "non-keep" || true
261363
run_test_with_tracking "blocking_keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" "keep" || true
262364
run_test_with_tracking "non-blocking_keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" "keep" || true
263365

366+
echo ""
367+
echo "Progressive deletion tests: verify files are deleted as transfers complete"
368+
echo "WARNING: These tests create ~2GB of data and will take several minutes"
369+
370+
# Run progressive deletion tests
371+
# These check that DURING the run,
372+
# files are deleted after successful transfers (non-keep only).
373+
# Blocking -- get files, transfer files, delete at src, start next transfer.
374+
# Non-blocking -- get files, transfer files, get next set of files, transfer those files, check if previous transfer is done (and if so, delete at src).
375+
run_test_with_tracking "blocking_progressive_deletion" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" || true
376+
run_test_with_tracking "non-blocking_progressive_deletion" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" || true
377+
264378
# Print summary
265379
echo ""
266380
echo "=========================================="

tests/unit/test_optimized_update.py

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -563,40 +563,6 @@ def test_time_tolerance_check(self):
563563
assert is_within_tolerance == should_match
564564

565565

566-
class TestBackwardCompatibility:
567-
"""Tests to ensure backward compatibility with existing code."""
568-
569-
def test_get_files_to_archive_still_works(self, tmp_path):
570-
"""Test that legacy get_files_to_archive function still works."""
571-
from zstash.utils import get_files_to_archive
572-
573-
(tmp_path / "file.txt").write_text("content")
574-
575-
os.chdir(tmp_path)
576-
result = get_files_to_archive("cache", None, None)
577-
578-
# Should return list of strings
579-
assert isinstance(result, list)
580-
assert len(result) > 0
581-
assert all(isinstance(item, str) for item in result)
582-
583-
def test_output_format_matches_original(self, tmp_path):
584-
"""Test that file paths are normalized the same way as original."""
585-
subdir = tmp_path / "subdir"
586-
subdir.mkdir()
587-
(subdir / "file.txt").write_text("content")
588-
589-
os.chdir(tmp_path)
590-
591-
from zstash.utils import get_files_to_archive
592-
593-
legacy_result = get_files_to_archive("cache", None, None)
594-
new_result = list(get_files_to_archive_with_stats("cache", None, None).keys())
595-
596-
# Should produce same file list
597-
assert legacy_result == new_result
598-
599-
600566
@pytest.fixture
601567
def mock_database():
602568
"""Fixture providing a mock database cursor."""

zstash/create.py

Lines changed: 42 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -6,17 +6,19 @@
66
import os.path
77
import sqlite3
88
import sys
9-
from typing import Any, List, Tuple
9+
from datetime import datetime
10+
from typing import Any, Dict, List, Tuple
1011

1112
from six.moves.urllib.parse import urlparse
1213

1314
from .globus import globus_activate, globus_finalize
1415
from .hpss import hpss_put
15-
from .hpss_utils import add_files
16+
from .hpss_utils import DevOptions, construct_tars
1617
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
18+
from .transfer_tracking import TransferManager
1719
from .utils import (
1820
create_tars_table,
19-
get_files_to_archive,
21+
get_files_to_archive_with_stats,
2022
run_command,
2123
tars_table_exists,
2224
ts_utc,
@@ -52,12 +54,13 @@ def create():
5254
logger.error(input_path_error_str)
5355
raise NotADirectoryError(input_path_error_str)
5456

57+
transfer_manager: TransferManager = TransferManager()
5558
if hpss != "none":
5659
url = urlparse(hpss)
5760
if url.scheme == "globus":
5861
# identify globus endpoints
5962
logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)")
60-
globus_activate(hpss)
63+
transfer_manager.globus_config = globus_activate(hpss)
6164
else:
6265
# config.hpss is not "none", so we need to
6366
# create target HPSS directory
@@ -88,14 +91,21 @@ def create():
8891

8992
# Create and set up the database
9093
logger.debug(f"{ts_utc()}: Calling create_database()")
91-
failures: List[str] = create_database(cache, args)
94+
failures: List[str] = create_database(cache, args, transfer_manager)
9295

9396
# Transfer to HPSS. Always keep a local copy.
9497
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
95-
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)
98+
hpss_put(
99+
hpss,
100+
get_db_filename(cache),
101+
cache,
102+
transfer_manager,
103+
keep=args.keep,
104+
is_index=True,
105+
)
96106

97107
logger.debug(f"{ts_utc()}: calling globus_finalize()")
98-
globus_finalize(non_blocking=args.non_blocking)
108+
globus_finalize(transfer_manager, args.keep)
99109

100110
if len(failures) > 0:
101111
# List the failures
@@ -204,7 +214,9 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
204214
return cache, args
205215

206216

207-
def create_database(cache: str, args: argparse.Namespace) -> List[str]:
217+
def create_database(
218+
cache: str, args: argparse.Namespace, transfer_manager: TransferManager
219+
) -> List[str]:
208220
# Create new database
209221
logger.debug(f"{ts_utc()}:Creating index database")
210222
if os.path.exists(get_db_filename(cache)):
@@ -260,44 +272,30 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
260272
cur.execute("insert into config values (?,?)", (attr, value))
261273
con.commit()
262274

263-
files: List[str] = get_files_to_archive(cache, args.include, args.exclude)
275+
file_stats: Dict[str, Tuple[int, datetime]] = get_files_to_archive_with_stats(
276+
cache, args.include, args.exclude
277+
)
264278

265279
failures: List[str]
266-
if args.follow_symlinks:
267-
try:
268-
# Add files to archive
269-
failures = add_files(
270-
cur,
271-
con,
272-
-1,
273-
files,
274-
cache,
275-
args.keep,
276-
args.follow_symlinks,
277-
skip_tars_md5=args.no_tars_md5,
278-
non_blocking=args.non_blocking,
279-
error_on_duplicate_tar=args.error_on_duplicate_tar,
280-
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
281-
force_database_corruption=args.for_developers_force_database_corruption,
282-
)
283-
except FileNotFoundError:
284-
raise Exception("Archive creation failed due to broken symlink.")
285-
else:
286-
# Add files to archive
287-
failures = add_files(
288-
cur,
289-
con,
290-
-1,
291-
files,
292-
cache,
293-
args.keep,
294-
args.follow_symlinks,
295-
skip_tars_md5=args.no_tars_md5,
296-
non_blocking=args.non_blocking,
297-
error_on_duplicate_tar=args.error_on_duplicate_tar,
298-
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
299-
force_database_corruption=args.for_developers_force_database_corruption,
300-
)
280+
dev_options: DevOptions = DevOptions(
281+
error_on_duplicate_tar=args.error_on_duplicate_tar,
282+
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
283+
force_database_corruption=args.for_developers_force_database_corruption,
284+
)
285+
# Add files to archive
286+
failures = construct_tars(
287+
cur,
288+
con,
289+
-1,
290+
file_stats,
291+
cache,
292+
args.keep,
293+
args.follow_symlinks,
294+
dev_options,
295+
transfer_manager,
296+
skip_tars_table=args.no_tars_md5,
297+
non_blocking=args.non_blocking,
298+
)
301299

302300
# Close database
303301
con.commit()

0 commit comments

Comments
 (0)