-
Notifications
You must be signed in to change notification settings - Fork 10
Refactor to resolve tars not deleting when --non-blocking is set
#416
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Action items:
|
|
All tests are passing now. Self-review guide from Claude: Self-Review Guide for Progressive Tar File Deletion FixOverviewThis diff fixes the issue where tar files weren't being deleted after successful Globus transfers when Key Changes to Review1. New Transfer Tracking System (
|
|
Follow-up: Performance Review for Progressive Tar File DeletionPerformance Concerns to Review1. Status Check OverheadCurrent Implementation: def delete_successfully_transferred_files(self):
for batch in self.batches:
if batch.is_globus and batch.task_id and (batch.task_status != "SUCCEEDED"):
if self.globus_config and self.globus_config.transfer_client:
task = self.globus_config.transfer_client.get_task(batch.task_id)
batch.task_status = task["status"]Issues:
Impact:
Potential Optimizations:
2. Batch List GrowthCurrent Implementation: self.batches: List[TransferBatch] = []
# Grows unbounded throughout the runIssues:
Potential Optimizations:
3. File Deletion PerformanceCurrent Implementation: def delete_files(self):
for src_path in self.file_paths:
if os.path.exists(src_path):
os.remove(src_path)Issues:
Impact:
Potential Optimizations:
4. Non-Blocking Mode EfficiencyCurrent Behavior: # After EVERY hpss_put() call:
if not keep:
transfer_manager.delete_successfully_transferred_files()Issues:
Example:
This means 5 status checks when only 1-2 would be needed. Potential Optimizations:
5. Globus Transfer BatchingCurrent Implementation: # Creates new batch if last one was submitted
if not transfer_manager.batches or transfer_manager.batches[-1].task_id:
new_batch = TransferBatch()Questions:
Looking at the code:
Potential Optimizations:
6. Test Performance ImpactNew Progressive Deletion Tests: dd if=/dev/zero of=zstash_demo/file1.dat bs=1M count=700 # 700 MB
dd if=/dev/zero of=zstash_demo/file2.dat bs=1M count=700 # 700 MB
dd if=/dev/zero of=zstash_demo/file3.dat bs=1M count=700 # 700 MBIssues:
Recommendations:
7. Finalization PerformanceCurrent Implementation: def globus_finalize(transfer_manager: TransferManager, non_blocking: bool = False):
# Submit any pending transfer_data
# Wait for most recent transfer
# Wait for last task (if different)
# Delete successfully transferred filesIssues:
Questions:
Recommended Performance ImprovementsPriority 1: High Impact, Low Effort
def delete_successfully_transferred_files(self):
# Only check batches that haven't been processed yet
batches_to_check = [b for b in self.batches if b.file_paths] # Has files to delete
for batch in batches_to_check:
# ... existing logic
def delete_files(self):
for src_path in self.file_paths:
try:
os.remove(src_path)
except FileNotFoundError:
logger.warning(f"File already deleted: {src_path}")
# In hpss.py::hpss_transfer()
if not keep:
# Only check every 5 files or if this is the last file
if (transfer_manager.cumulative_tarfiles_pushed % 5 == 0) or is_last_file:
transfer_manager.delete_successfully_transferred_files()Priority 2: Medium Impact, Medium Effort
class TransferManager:
def __init__(self):
self.batches: List[TransferBatch] = []
self.last_deletion_check_index: int = 0 # New field
def delete_successfully_transferred_files(self):
# Only check batches from last_deletion_check_index forward
for i in range(self.last_deletion_check_index, len(self.batches)):
batch = self.batches[i]
# ... check and delete logic
if batch.file_paths == []: # Processed
self.last_deletion_check_index = i + 1
def delete_successfully_transferred_files(self):
# ... existing logic
# Remove fully processed batches
self.batches = [b for b in self.batches if b.file_paths or not b.task_id]Priority 3: Lower Priority / More Investigation Needed
class TransferManager:
def __init__(self):
self.last_status_check_time: float = 0
def delete_successfully_transferred_files(self):
now = time.time()
if now - self.last_status_check_time < 30: # Don't check more than every 30s
return
self.last_status_check_time = now
# ... existing logic
Performance Testing Checklist
Questions to Answer
|
Summary
Objectives:
--non-blockingis setIssue resolution:
--non-blockingcompletes #383 (closed because of significant rebase conflicts), Delete transferred files #405 (closed because of design not working properly)Select one: This pull request is...
Big Change
1. Does this do what we want it to do?
Required:
If applicable:
2. Are the implementation details accurate & efficient?
Required:
If applicable:
zstash/conda, not just animportstatement.3. Is this well documented?
Required:
4. Is this code clean?
Required:
If applicable: