Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ test_globus_tar_deletion()
keep_flag=""
fi

zstash create ${blocking_flag} ${keep_flag} --hpss=${globus_path}/${case_name} --maxsize 128 zstash_demo 2>&1 | tee ${case_name}.log
# Use -v so debug logs show up.
zstash create ${blocking_flag} ${keep_flag} --hpss=${globus_path}/${case_name} --maxsize 128 -v zstash_demo 2>&1 | tee ${case_name}.log
if [ $? != 0 ]; then
echo "${case_name} failed. Check ${case_name}_create.log for details. Cannot continue."
return 1
Expand All @@ -146,7 +147,7 @@ test_globus_tar_deletion()
check_log_has "Creating new tar archive 000000.tar" ${case_name}.log || return 2

echo ""
echo "Checking directory status after 'zstash create' has completed. src should only have index.db. dst should have tar and index.db."
echo "Checking directory status after 'zstash create' has completed."
echo "Checking logs in current directory: ${PWD}"

echo ""
Expand Down Expand Up @@ -181,6 +182,98 @@ test_globus_tar_deletion()
return 0 # Success
}

test_globus_progressive_deletion()
{
local path_to_repo=$1
local dst_endpoint=$2
local dst_dir=$3
local blocking_str=$4

src_dir=${path_to_repo}/tests/utils/globus_tar_deletion
rm -rf ${src_dir}
mkdir -p ${src_dir}
dst_endpoint_uuid=$(get_endpoint ${dst_endpoint})
globus_path=globus://${dst_endpoint_uuid}/${dst_dir}

case_name=${blocking_str}_progressive_deletion
echo "Running test_globus_progressive_deletion on case=${case_name}"
echo "Exit codes: 0 -- success, 1 -- zstash failed, 2 -- grep check failed"

setup ${case_name} "${src_dir}"

# Create files totaling >2 GB to trigger multiple tars with maxsize=1 GB
# Each file is ~700 MB, so we'll get 3 tars
echo "Creating large test files (this may take a minute)..."
dd if=/dev/zero of=zstash_demo/file1.dat bs=1M count=700 2>/dev/null # 700 MB
dd if=/dev/zero of=zstash_demo/file2.dat bs=1M count=700 2>/dev/null # 700 MB
dd if=/dev/zero of=zstash_demo/file3.dat bs=1M count=700 2>/dev/null # 700 MB
echo "✓ Test files created"

if [ "$blocking_str" == "non-blocking" ]; then
blocking_flag="--non-blocking"
else
blocking_flag=""
fi

# Run with maxsize=1 GB to create multiple tars
echo "Running zstash create (this may take several minutes due to file size and transfers)..."
zstash create ${blocking_flag} --hpss=${globus_path}/${case_name} --maxsize 1 -v zstash_demo 2>&1 | tee ${case_name}.log
if [ $? != 0 ]; then
echo "${case_name} failed."
return 1
fi

# Check that multiple tar files were created
tar_count=$(grep -c "Creating new tar archive" ${case_name}.log)
if [ ${tar_count} -lt 2 ]; then
echo "Expected at least 2 tar archives to be created, found ${tar_count}"
return 2
fi
echo "✓ Created ${tar_count} tar archives"

# Check that files were deleted progressively
deletion_count=$(grep -c "Deleting .* files from successful transfer" ${case_name}.log)

if [ "$blocking_str" == "blocking" ]; then
# In blocking mode, we should see deletion after each tar transfer
if [ ${deletion_count} -lt $((tar_count - 1)) ]; then
echo "Expected at least $((tar_count - 1)) deletion events in blocking mode, found ${deletion_count}"
return 2
fi
echo "✓ Files deleted progressively (${deletion_count} deletion events)"
else
# In non-blocking mode, deletions happen when we check status
if [ ${deletion_count} -lt 1 ]; then
echo "Expected at least 1 deletion event in non-blocking mode, found ${deletion_count}"
return 2
fi
echo "✓ Files deleted (${deletion_count} deletion events in non-blocking mode)"
fi

# Verify that NO tar files remain in source after completion
echo "Checking that no tar files remain in source"
ls ${src_dir}/${case_name}/zstash_demo/zstash/*.tar 2>&1 | tee ls_tar_check.log
if grep -q "\.tar" ls_tar_check.log && ! grep -q "No such file" ls_tar_check.log; then
echo "Found tar files that should have been deleted!"
return 2
fi
echo "✓ All tar files successfully deleted from source"

# Verify tar files exist in destination
if [ "$blocking_str" == "non-blocking" ]; then
wait_for_directory "${dst_dir}/${case_name}" || return 1
fi

dst_tar_count=$(ls ${dst_dir}/${case_name}/*.tar 2>/dev/null | wc -l)
if [ ${dst_tar_count} -ne ${tar_count} ]; then
echo "Expected ${tar_count} tar files in destination, found ${dst_tar_count}"
return 2
fi
echo "✓ All ${tar_count} tar files present in destination"

return 0
}

# Follow these directions #####################################################

# Example usage:
Expand Down Expand Up @@ -232,7 +325,14 @@ run_test_with_tracking() {
echo "Running: ${test_name}"
echo "=========================================="

if test_globus_tar_deletion "${args[@]}"; then
# Determine which test function to call based on test name
if [[ "${test_name}" == *"progressive"* ]]; then
test_func=test_globus_progressive_deletion
else
test_func=test_globus_tar_deletion
fi

if ${test_func} "${args[@]}"; then
# Print test result in the output block AND at the end
echo "✓ ${test_name} PASSED"
test_results+=("✓ ${test_name} PASSED") # Uses Global variable
Expand All @@ -252,15 +352,29 @@ tests_passed=0
tests_failed=0
test_results=() # Global variable to hold test results

echo "Primary tests: single authentication code tests for each endpoint"
echo "Primary tests: basic functionality tests"
echo "If a test hangs, check if https://app.globus.org/activity reports any errors on your transfers."

# Run all tests independently
# Run basic tests
# These check that AT THE END of the run,
# we either still have the files (keep) or the files are deleted (non-keep).
run_test_with_tracking "blocking_non-keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" "non-keep" || true
run_test_with_tracking "non-blocking_non-keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" "non-keep" || true
run_test_with_tracking "blocking_keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" "keep" || true
run_test_with_tracking "non-blocking_keep" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" "keep" || true

echo ""
echo "Progressive deletion tests: verify files are deleted as transfers complete"
echo "WARNING: These tests create ~2GB of data and will take several minutes"

# Run progressive deletion tests
# Thes check that DURING the run,
# files are deleted after successful transfers (non-keep only).
# Blocking -- get files, transfer files, delete at src, start next transfer.
# Non-blocking -- get files, transfer files, get next set of files, transfer those files, check if previous transfer is done (and if so, delete at src).
run_test_with_tracking "blocking_progressive_deletion" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "blocking" || true
run_test_with_tracking "non-blocking_progressive_deletion" ${path_to_repo} ${endpoint_str} ${machine_dst_dir} "non-blocking" || true

# Print summary
echo ""
echo "=========================================="
Expand Down
48 changes: 23 additions & 25 deletions zstash/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .hpss import hpss_put
from .hpss_utils import add_files
from .settings import DEFAULT_CACHE, config, get_db_filename, logger
from .transfer_tracking import TransferManager
from .utils import (
create_tars_table,
get_files_to_archive,
Expand Down Expand Up @@ -52,12 +53,13 @@ def create():
logger.error(input_path_error_str)
raise NotADirectoryError(input_path_error_str)

transfer_manager: TransferManager = TransferManager()
if hpss != "none":
url = urlparse(hpss)
if url.scheme == "globus":
# identify globus endpoints
logger.debug(f"{ts_utc()}:Calling globus_activate(hpss)")
globus_activate(hpss)
transfer_manager.globus_config = globus_activate(hpss)
else:
# config.hpss is not "none", so we need to
# create target HPSS directory
Expand Down Expand Up @@ -88,14 +90,21 @@ def create():

# Create and set up the database
logger.debug(f"{ts_utc()}: Calling create_database()")
failures: List[str] = create_database(cache, args)
failures: List[str] = create_database(cache, args, transfer_manager)

# Transfer to HPSS. Always keep a local copy.
logger.debug(f"{ts_utc()}: calling hpss_put() for {get_db_filename(cache)}")
hpss_put(hpss, get_db_filename(cache), cache, keep=args.keep, is_index=True)
hpss_put(
hpss,
get_db_filename(cache),
cache,
keep=args.keep,
is_index=True,
transfer_manager=transfer_manager,
)

logger.debug(f"{ts_utc()}: calling globus_finalize()")
globus_finalize(non_blocking=args.non_blocking)
globus_finalize(transfer_manager, non_blocking=args.non_blocking)

if len(failures) > 0:
# List the failures
Expand Down Expand Up @@ -204,7 +213,9 @@ def setup_create() -> Tuple[str, argparse.Namespace]:
return cache, args


def create_database(cache: str, args: argparse.Namespace) -> List[str]:
def create_database(
cache: str, args: argparse.Namespace, transfer_manager: TransferManager
) -> List[str]:
# Create new database
logger.debug(f"{ts_utc()}:Creating index database")
if os.path.exists(get_db_filename(cache)):
Expand Down Expand Up @@ -263,26 +274,7 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
files: List[str] = get_files_to_archive(cache, args.include, args.exclude)

failures: List[str]
if args.follow_symlinks:
try:
# Add files to archive
failures = add_files(
cur,
con,
-1,
files,
cache,
args.keep,
args.follow_symlinks,
skip_tars_md5=args.no_tars_md5,
non_blocking=args.non_blocking,
error_on_duplicate_tar=args.error_on_duplicate_tar,
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
force_database_corruption=args.for_developers_force_database_corruption,
)
except FileNotFoundError:
raise Exception("Archive creation failed due to broken symlink.")
else:
try:
# Add files to archive
failures = add_files(
cur,
Expand All @@ -297,7 +289,13 @@ def create_database(cache: str, args: argparse.Namespace) -> List[str]:
error_on_duplicate_tar=args.error_on_duplicate_tar,
overwrite_duplicate_tars=args.overwrite_duplicate_tars,
force_database_corruption=args.for_developers_force_database_corruption,
transfer_manager=transfer_manager,
)
except FileNotFoundError as e:
if args.follow_symlinks:
raise Exception("Archive creation failed due to broken symlink.")
else:
raise e

# Close database
con.commit()
Expand Down
Loading
Loading