Skip to content

Commit aa7c1f4

Browse files
committed
Local executor was not cleaning its files. Also, add a mechanism to clean
old submit scripts to avoid gazillions of small files in ~/.psij/work.
1 parent d20fd45 commit aa7c1f4

File tree

3 files changed

+52
-2
lines changed

3 files changed

+52
-2
lines changed

src/psij/executors/batch/batch_scheduler_executor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from psij import JobExecutor, JobExecutorConfig, Launcher, Job, SubmitException, \
1717
JobStatus, JobState
1818
from psij.executors.batch.template_function_library import ALL as FUNCTION_LIBRARY
19-
from psij.utils import _StatusUpdater
19+
from psij.utils import _StatusUpdater, _FileCleaner
2020

2121
UNKNOWN_ERROR = 'PSIJ: Unknown error'
2222

@@ -206,6 +206,7 @@ def __init__(self, url: Optional[str] = None,
206206
assert config
207207
self.work_directory = config.work_directory / self.name
208208
self.work_directory.mkdir(parents=True, exist_ok=True)
209+
cast(_FileCleaner, _FileCleaner.get_instance()).clean(self.work_directory)
209210

210211
def submit(self, job: Job) -> None:
211212
"""See :func:`~psij.JobExecutor.submit`."""

src/psij/executors/local.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from psij import JobExecutor
2020
from psij.executors.batch.batch_scheduler_executor import _env_to_mustache
2121
from psij.executors.batch.script_generator import TemplatedScriptGenerator
22-
from psij.utils import SingletonThread, _StatusUpdater
22+
from psij.utils import SingletonThread, _StatusUpdater, _FileCleaner
2323

2424
from psij.executors.batch.template_function_library import ALL as FUNCTION_LIBRARY
2525

@@ -232,6 +232,7 @@ def __init__(self, url: Optional[str] = None,
232232
self._reaper = _ProcessReaper.get_instance()
233233
self._work_dir = Path.home() / '.psij' / 'work' / 'local'
234234
self._work_dir.mkdir(parents=True, exist_ok=True)
235+
cast(_FileCleaner, _FileCleaner.get_instance()).clean(self._work_dir)
235236
self._status_updater = cast(_StatusUpdater, _StatusUpdater.get_instance())
236237
self.generator = TemplatedScriptGenerator(config, Path(__file__).parent / 'local'
237238
/ 'local.mustache')
@@ -339,13 +340,22 @@ def _process_done(self, p: _ProcessEntry) -> None:
339340
message = p.out
340341
state = JobState.FAILED
341342

343+
if state.final:
344+
self._clean_submit_file(p.job)
342345
# We need to ensure that the status updater has processed all updates that
343346
# have been sent up to this point
344347
self._status_updater.flush()
345348
self._status_updater.unregister_job(p.job)
346349
self._set_job_status(p.job, JobStatus(state, time=p.done_time, exit_code=p.exit_code,
347350
message=message))
348351

352+
def _clean_submit_file(self, job: Job) -> None:
353+
submit_file_path = self._work_dir / (job.id + '.job')
354+
try:
355+
submit_file_path.unlink()
356+
except FileNotFoundError:
357+
pass
358+
349359
def list(self) -> List[str]:
350360
"""
351361
Return a list of ids representing jobs that are running on the underlying implementation.

src/psij/utils.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
import io
33
import logging
44
import os
5+
import queue
56
import random
67
import socket
78
import tempfile
89
import threading
910
import time
11+
from datetime import datetime, timedelta
1012
from pathlib import Path
1113
from typing import Type, Dict, Optional, Tuple, Set, List
1214

@@ -17,6 +19,9 @@
1719
logger = logging.getLogger(__name__)
1820

1921

22+
_MAX_FILE_AGE_DAYS = 30
23+
24+
2025
class SingletonThread(threading.Thread):
2126
"""
2227
A convenience class to return a thread that is guaranteed to be unique to this process.
@@ -207,3 +212,37 @@ def _process_update_data(self, data: bytes) -> None:
207212
pass
208213
if job:
209214
executor._set_job_status(job, JobStatus(state))
215+
216+
217+
class _FileCleaner(SingletonThread):
218+
def __init__(self) -> None:
219+
super().__init__()
220+
self.name = 'File Cleaner'
221+
self.daemon = True
222+
self.queue: queue.SimpleQueue[Path] = queue.SimpleQueue()
223+
224+
def clean(self, path: Path) -> None:
225+
self.queue.put(path)
226+
227+
def run(self) -> None:
228+
while True:
229+
try:
230+
path = self.queue.get(block=True, timeout=1)
231+
try:
232+
self._do_clean(path)
233+
except Exception as ex:
234+
print(f'Warning: cannot clean {path}: {ex}')
235+
except queue.Empty:
236+
pass
237+
238+
def _do_clean(self, path: Path) -> None:
239+
now = datetime.now()
240+
max_age = timedelta(days=_MAX_FILE_AGE_DAYS)
241+
for child in path.iterdir():
242+
m_time = datetime.fromtimestamp(child.lstat().st_mtime)
243+
if now - m_time > max_age:
244+
try:
245+
child.unlink()
246+
except FileNotFoundError:
247+
# we try our best
248+
pass

0 commit comments

Comments
 (0)