Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 5 additions & 9 deletions .github/workflows/build_workflow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ jobs:
- name: Checkout Code Repository
uses: actions/checkout@v3

- name: Set up Python 3.13
- name: Set up Python 3.14
uses: actions/setup-python@v4
with:
python-version: "3.13"
python-version: "3.14"

# Run all pre-commit hooks on all the files.
# Getting only staged files can be tricky in case a new PR is opened
Expand All @@ -35,8 +35,9 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false # Continue all jobs even if one fails
matrix:
python-version: ["3.11", "3.12", "3.13"]
python-version: ["3.11", "3.12", "3.13", "3.14"]
defaults:
run:
shell: bash -l {0}
Expand Down Expand Up @@ -72,11 +73,6 @@ jobs:
conda list
# Ensure we have the right Python version
python --version
# Fix pip issues for Python 3.12+
if [[ "${{ matrix.python-version }}" == "3.12" ]] || [[ "${{ matrix.python-version }}" == "3.13" ]]; then
python -m ensurepip --upgrade || true
python -m pip install --upgrade --force-reinstall pip setuptools wheel
fi

- name: Install `zstash` Package
run: |
Expand Down Expand Up @@ -121,7 +117,7 @@ jobs:
environment-file: conda/dev.yml
channel-priority: flexible # Changed from strict to flexible
auto-update-conda: true
python-version: "3.13" # Use stable Python version for docs
python-version: "3.14" # Use stable Python version for docs

# sphinx-multiversion allows for version docs.
- name: Build Sphinx Docs
Expand Down
3 changes: 2 additions & 1 deletion conda/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ dependencies:
# Base
# =================
- pip
- python >=3.11,<3.14
- python >=3.11,<3.15
- setuptools
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of setuptools as an explicit dependency is good for Python 3.12+ compatibility, as setuptools is no longer automatically installed with pip. However, consider specifying a minimum version to ensure compatibility with the package's setup.py requirements. For example, setuptools >=65.0.0 would ensure modern setuptools features are available.

Suggested change
- setuptools
- setuptools >=65.0.0

Copilot uses AI. Check for mistakes.
- sqlite
- six >=1.16.0
- globus-sdk >=3.15.0,<4.0
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ exclude =
venv

[mypy]
python_version = 3.13
python_version = 3.14
check_untyped_defs = True
ignore_missing_imports = True
warn_unused_ignores = True
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,6 @@
author_email="forsyth2@llnl.gov, golaz1@llnl.gov, shaheen2@llnl.gov",
description="Long term HPSS archiving software for E3SM",
packages=find_packages(include=["zstash", "zstash.*"]),
python_requires=">=3.11,<3.14",
python_requires=">=3.11,<3.15",
entry_points={"console_scripts": ["zstash=zstash.main:main"]},
)
113 changes: 77 additions & 36 deletions zstash/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,12 @@
import logging
import multiprocessing
import os.path
import queue
import re
import sqlite3
import sys
import tarfile
import time
import traceback
from datetime import datetime
from typing import DefaultDict, List, Optional, Set, Tuple
Expand Down Expand Up @@ -282,10 +284,10 @@ def extract_database(
if args.workers > 1:
logger.debug("Running zstash {} with multiprocessing".format(cmd))
failures = multiprocess_extract(
args.workers, matches, keep_files, keep, cache, cur, args
args.workers, matches, keep_files, keep, cache, args
)
else:
failures = extractFiles(matches, keep_files, keep, cache, cur, args)
failures = extractFiles(matches, keep_files, keep, cache, args, None, cur)

# Close database
logger.debug("Closing index database")
Expand All @@ -300,7 +302,6 @@ def multiprocess_extract(
keep_files: bool,
keep_tars: Optional[bool],
cache: str,
cur: sqlite3.Cursor,
args: argparse.Namespace,
) -> List[FilesRow]:
"""
Expand All @@ -314,12 +315,9 @@ def multiprocess_extract(
# the processes.
tar_to_size_unsorted: DefaultDict[str, float] = collections.defaultdict(float)
db_row: FilesRow
tar: str
size: int
for db_row in matches:
tar, size = db_row.tar, db_row.size
tar_to_size_unsorted[tar] += size
# Sort by the size.
tar_to_size_unsorted[db_row.tar] += db_row.size

tar_to_size: collections.OrderedDict[str, float] = collections.OrderedDict(
sorted(tar_to_size_unsorted.items(), key=lambda x: x[1])
)
Expand All @@ -335,46 +333,52 @@ def multiprocess_extract(
# A min heap, of (work, worker_idx) tuples, work is the size of data
# that worker_idx needs to work on.
# We can efficiently get the worker with the least amount of work.
work_to_workers: List[Tuple[int, int]] = [(0, i) for i in range(num_workers)]
heapq.heapify(workers_to_tars)
work_to_workers: List[Tuple[float, int]] = [(0.0, i) for i in range(num_workers)]
heapq.heapify(work_to_workers)
Copy link
Copy Markdown
Collaborator Author

@forsyth2 forsyth2 Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was apparently a mistake that was fixed in this PR (see #426). That is, it should have always been work_to_workers here, despite that line being added over 6 years ago.


# Using a greedy approach, populate workers_to_tars.
for _, tar in enumerate(tar_to_size):
tar: str
for tar in tar_to_size:
# The worker with the least work should get the current largest amount of work.
workers_work: int
workers_work: float
worker_idx: int
workers_work, worker_idx = heapq.heappop(work_to_workers)
workers_to_tars[worker_idx].add(tar)
# Add this worker back to the heap, with the new amount of work.
worker_tuple: Tuple[float, int] = (workers_work + tar_to_size[tar], worker_idx)
# FIXME: error: Cannot infer type argument 1 of "heappush"
heapq.heappush(work_to_workers, worker_tuple) # type: ignore
heapq.heappush(work_to_workers, worker_tuple)

# For worker i, workers_to_matches[i] is a list of
# matches from the database for it to process.
workers_to_matches: List[List[FilesRow]] = [[] for _ in range(num_workers)]
workers_idx: int
for db_row in matches:
tar = db_row.tar
workers_idx: int
for workers_idx in range(len(workers_to_tars)):
if tar in workers_to_tars[workers_idx]:
# This worker gets this db_row.
if db_row.tar in workers_to_tars[workers_idx]:
workers_to_matches[workers_idx].append(db_row)
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The inner loop at lines 356-358 doesn't break after finding the worker that should process a tar. This means if a tar is somehow in multiple workers' sets, the db_row will be added to multiple workers' match lists, leading to duplicate processing. While this should not happen given the assignment logic, adding a break statement after line 358 would make the code more robust and efficient by avoiding unnecessary iterations once the correct worker is found.

Suggested change
workers_to_matches[workers_idx].append(db_row)
workers_to_matches[workers_idx].append(db_row)
# Once the appropriate worker is found, no need to check others.
break

Copilot uses AI. Check for mistakes.

# Ensure each worker processes tars in order
for worker_matches in workers_to_matches:
worker_matches.sort(key=lambda t: t.tar)

tar_ordering: List[str] = sorted([tar for tar in tar_to_size])
monitor: parallel.PrintMonitor = parallel.PrintMonitor(tar_ordering)
manager = multiprocessing.Manager()
monitor: parallel.PrintMonitor = parallel.PrintMonitor(
tar_ordering, manager=manager
)
Comment on lines +365 to +368
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new multiprocessing.Manager() is created for the print monitor but never explicitly shut down. This can leave the manager server process running longer than necessary and can cause hangs on interpreter shutdown in some environments. After worker processes finish, call manager.shutdown() (and ideally join() worker processes) before returning from multiprocess_extract().

Copilot uses AI. Check for mistakes.

# The return value for extractFiles will be added here.
failure_queue: multiprocessing.Queue[FilesRow] = multiprocessing.Queue()
processes: List[multiprocessing.Process] = []

for matches in workers_to_matches:
tars_for_this_worker: List[str] = list(set(match.tar for match in matches))
worker: parallel.ExtractWorker = parallel.ExtractWorker(
monitor, tars_for_this_worker, failure_queue
)
process: multiprocessing.Process = multiprocessing.Process(
target=extractFiles,
args=(matches, keep_files, keep_tars, cache, cur, args, worker),
args=(matches, keep_files, keep_tars, cache, args, worker, None),
daemon=True,
)
process.start()
Expand All @@ -386,8 +390,21 @@ def multiprocess_extract(
# because we'll be in this loop until completion.
failures: List[FilesRow] = []
while any(p.is_alive() for p in processes):
while not failure_queue.empty():
failures.append(failure_queue.get())
try:
while True:
failures.append(failure_queue.get_nowait())
except queue.Empty:
pass
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a sleep(0.01) in the main process loop is good for reducing CPU usage while waiting for worker processes. However, this means the main process will check for failures at most 100 times per second. For very short-running jobs, this could add up to 10ms of latency before failures are detected. This is likely acceptable, but consider documenting this trade-off.

Suggested change
pass
pass
# Sleep briefly to avoid busy-waiting while worker processes run.
# This limits failure detection to ~100 checks per second (worst-case
# ~10 ms latency for very short-running jobs), which is an intentional
# trade-off for lower CPU usage in the main process.

Copilot uses AI. Check for mistakes.
time.sleep(0.01)
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another note from Claude, after mentioning "We routinely extract archives 10-20 TB in size. The default individual tar size is 256 GB."

With 40-80 tars running for hours each, this loop spins ~360,000 times per hour doing essentially nothing but checking is_alive(). That's harmless on its own, but consider: if a worker crashes without putting anything in failure_queue, the main process has no timeout mechanism. It will spin forever. At 10-20 TB job sizes, a silent worker death — due to OOM, a storage node going down, an HPSS timeout — is not a hypothetical.


# Drain any remaining failures after all processes have exited.
try:
while True:
failures.append(failure_queue.get_nowait())
except queue.Empty:
pass

manager.shutdown()
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manager.shutdown() call may prevent proper cleanup of failures. This is called immediately after collecting failures from processes, but the PrintMonitor and ExtractWorker objects may still hold references to manager-created objects like locks and values. Consider moving the shutdown call after all cleanup is complete, or verify that all worker references to manager objects have been released. Additionally, verify that manager.shutdown() is the correct method - it may need to be manager.join() depending on the multiprocessing.Manager() implementation.

Copilot uses AI. Check for mistakes.

# Sort the failures, since they can come in at any order.
failures.sort(key=lambda t: (t.name, t.tar, t.offset))
Expand Down Expand Up @@ -479,9 +496,9 @@ def extractFiles( # noqa: C901
keep_files: bool,
keep_tars: Optional[bool],
cache: str,
cur: sqlite3.Cursor,
args: argparse.Namespace,
multiprocess_worker: Optional[parallel.ExtractWorker] = None,
cur: Optional[sqlite3.Cursor] = None,
Comment on lines 499 to +501
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The parameter order change for extractFiles is not backwards compatible. Moving cur from position 4 to position 6 and making it optional, while adding multiprocess_worker before it, could break any external code that calls this function with positional arguments. While this might be an internal function, it's worth considering whether this breaking change is necessary or if it could be avoided by keeping the parameter order and just making cur optional with a default value of None.

Suggested change
args: argparse.Namespace,
multiprocess_worker: Optional[parallel.ExtractWorker] = None,
cur: Optional[sqlite3.Cursor] = None,
cur: Optional[sqlite3.Cursor] = None,
args: Optional[argparse.Namespace] = None,
multiprocess_worker: Optional[parallel.ExtractWorker] = None,

Copilot uses AI. Check for mistakes.
) -> List[FilesRow]:
"""
Given a list of database rows, extract the files from the
Expand All @@ -498,11 +515,26 @@ def extractFiles( # noqa: C901
that called this function.
We need a reference to it so we can signal it to print
the contents of what's in its print queue.

If cur is None (when running in parallel), a new database connection
will be opened for this worker process.
"""
# Open database connection if not provided (parallel case)
if cur is None:
con: sqlite3.Connection = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
)
cur = con.cursor()
close_db: bool = True
else:
close_db = False
Comment on lines +522 to +530
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When cur is None, a new SQLite connection is opened for the worker, but it is only closed on the normal return path. If an exception escapes the main loop (e.g., HPSS retrieval failing after retries), the connection may leak. Wrap the main body in a try/finally that closes cur/con when close_db is true.

Copilot uses AI. Check for mistakes.
Comment on lines +523 to +530
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential resource leak: If an exception occurs between lines 524-527 (when opening the database connection), the connection and cursor will not be properly closed because close_db is set to True on line 528, but if an exception happens before that line, the finally block or exception handler won't close the connection. Consider wrapping the connection opening in a try block or using a context manager to ensure proper cleanup.

Suggested change
if cur is None:
con: sqlite3.Connection = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
)
cur = con.cursor()
close_db: bool = True
else:
close_db = False
con: Optional[sqlite3.Connection] = None
close_db: bool = False
if cur is None:
try:
con = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
)
cur = con.cursor()
close_db = True
except Exception:
if con is not None:
con.close()
raise

Copilot uses AI. Check for mistakes.
Comment on lines +522 to +530
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database cursor handling has been changed to open a new connection per worker process when running in parallel. This is necessary because sqlite3 connections and cursors are not thread-safe or process-safe. However, verify that all test cases pass with this change, particularly tests that involve concurrent database access, as this changes the connection model significantly.

Suggested change
# Open database connection if not provided (parallel case)
if cur is None:
con: sqlite3.Connection = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
)
cur = con.cursor()
close_db: bool = True
else:
close_db = False
# Open database connection if not provided, or always in parallel.
con: Optional[sqlite3.Connection]
close_db: bool = False
if multiprocess_worker is not None:
# In parallel, each worker must use its own sqlite3 connection
# because sqlite3 connections and cursors are not safe to share
# between processes.
con = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
)
cur = con.cursor()
close_db = True
elif cur is None:
con = sqlite3.connect(
get_db_filename(cache), detect_types=sqlite3.PARSE_DECLTYPES
)
cur = con.cursor()
close_db = True

Copilot uses AI. Check for mistakes.

failures: List[FilesRow] = []
tfname: str
newtar: bool = True
nfiles: int = len(files)

# Set up logging redirection for multiprocessing
if multiprocess_worker:
# All messages to the logger will now be sent to
# this queue, instead of sys.stdout.
Expand All @@ -524,17 +556,23 @@ def extractFiles( # noqa: C901
# Everytime we're extracting a new tar, if running in parallel,
# let the process know.
# This is to synchronize the print statements.

if multiprocess_worker:
multiprocess_worker.set_curr_tar(files_row.tar)

if config.hpss is not None:
hpss: str = config.hpss
# Use args.hpss, falling back to config.hpss when not provided
if args.hpss is not None:
hpss: str = args.hpss
elif config.hpss is not None:
hpss = config.hpss
else:
raise TypeError("Invalid config.hpss={}".format(config.hpss))

tries: int = args.retries + 1
# Set to True to test the `--retries` option with a forced failure.
# Then run `python -m unittest tests.test_extract.TestExtract.testExtractRetries`
test_retry: bool = False

while tries > 0:
tries -= 1
do_retrieve: bool
Expand Down Expand Up @@ -653,13 +691,15 @@ def extractFiles( # noqa: C901
logger.error("md5 mismatch for: {}".format(fname))
logger.error("md5 of extracted file: {}".format(md5))
logger.error("md5 of original file: {}".format(files_row_md5))

failures.append(files_row)
else:
logger.debug("Valid md5: {} {}".format(md5, fname))

elif extract_this_file:
tar.extract(tarinfo)
if sys.version_info >= (3, 12):
tar.extract(tarinfo, filter="tar")
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tar extraction filter should use "data" instead of "tar" for Python 3.12+. The "tar" filter was deprecated in Python 3.14 and will be removed. According to Python documentation, "data" is the recommended filter that prevents path traversal attacks while being backwards compatible. This will ensure compatibility with Python 3.14 and future versions.

Suggested change
tar.extract(tarinfo, filter="tar")
tar.extract(tarinfo, filter="data")

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This must be "tar" for tests to pass.

Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tarfile.extract filter parameter was introduced in Python 3.12 to address security concerns. However, the "tar" filter is deprecated as of Python 3.14 (PEP 706). For Python 3.14+ compatibility, consider using filter="data" instead, which is the recommended secure filter. The "data" filter is safer and future-proof. Alternatively, explicitly handle different Python versions if the "tar" filter's behavior is required.

Suggested change
tar.extract(tarinfo, filter="tar")
tar.extract(tarinfo, filter="data")

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Collaborator Author

@forsyth2 forsyth2 Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must use filter="tar" or else we get this test failure:

======================================================================
FAIL: test_hpss_none_fs_off (tests.integration.python_tests.group_by_workflow.test_cache_fs.TestCacheFs.test_hpss_none_fs_off)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/global/u1/f/forsyth/ez/zstash/tests/integration/python_tests/group_by_workflow/test_cache_fs.py", line 152, in test_hpss_none_fs_off
    self.assertTrue(os.path.islink("dir2/file1_soft.txt"))  # DIFFERENT from fs_on
    ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AssertionError: False is not true

----------------------------------------------------------------------
Ran 1 test in 1.045s

FAILED (failures=1)

else:
tar.extract(tarinfo)
# Note: tar.extract() will not restore time stamps of symbolic
# links. Could not find a Python-way to restore it either, so
# relying here on 'touch'. This is not the prettiest solution.
Expand All @@ -676,9 +716,6 @@ def extractFiles( # noqa: C901
logger.error("Retrieving {}".format(files_row.name))
failures.append(files_row)

if multiprocess_worker:
multiprocess_worker.print_contents()

# Close current archive?
if i == nfiles - 1 or files[i].tar != files[i + 1].tar:
# We're either on the last file or the tar is distinct from the tar of the next file.
Expand All @@ -689,6 +726,7 @@ def extractFiles( # noqa: C901

if multiprocess_worker:
multiprocess_worker.done_enqueuing_output_for_tar(files_row.tar)
multiprocess_worker.print_all_contents()

# Open new archive next time
newtar = True
Expand All @@ -700,15 +738,18 @@ def extractFiles( # noqa: C901
else:
raise TypeError("Invalid tfname={}".format(tfname))

if multiprocess_worker:
# If there are things left to print, print them.
multiprocess_worker.print_all_contents()
# Close database connection if we opened it
if close_db:
cur.close()
con.close()
Comment on lines +741 to +744
Copy link

Copilot AI Feb 25, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The database connection cleanup at lines 742-744 only happens if the function exits normally. If an exception occurs in the try block (lines 629-712) or elsewhere in the function, the database connection will not be closed, leading to a resource leak. Consider wrapping the database operations in a try-finally block or using a context manager to ensure the connection is always closed when opened by this function.

Copilot uses AI. Check for mistakes.

# Add the failures to the queue.
# When running with multiprocessing, the function multiprocess_extract()
# that calls this extractFiles() function will return the failures as a list.
# Add the failures to the queue.
# When running with multiprocessing, the function multiprocess_extract()
# that calls this extractFiles() function will return the failures as a list.
if multiprocess_worker:
for f in failures:
multiprocess_worker.failure_queue.put(f)

return failures


Expand Down
Loading
Loading