Skip to content

Commit 4c96c42

Browse files
authored
Add SIGHUP handler to pbench indexer (#2114)
Adding SIGHUP Handler Fixes #2060 Re-evaluate the list of tar balls to index after indexing of the current tar ball has finished. Report the current state of indexing - SIGHUP - Current tar ball being processed - No. of remaining TB - No. of completed TB - No. of errors Encountered Handler Behaviour - No exception raised
1 parent a0a3289 commit 4c96c42

File tree

2 files changed

+62
-16
lines changed

2 files changed

+62
-16
lines changed

lib/pbench/server/indexing_tarballs.py

Lines changed: 47 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import tarfile
77
import tempfile
88
from pathlib import Path
9+
from collections import deque
910

1011
from pbench.common.exceptions import (
1112
BadDate,
@@ -50,7 +51,7 @@ def _count_lines(fname):
5051
class Index:
5152
"""class used to collect tarballs and index them"""
5253

53-
def __init__(self, name, options, idxctx, incoming):
54+
def __init__(self, name, options, idxctx, incoming, archive, qdir):
5455

5556
self.options = options
5657
_re_idx = "RE-" if options.re_index else ""
@@ -70,42 +71,44 @@ def __init__(self, name, options, idxctx, incoming):
7071
self.idxctx = idxctx
7172
self.incoming = incoming
7273
self.name = name
74+
self.archive = archive
75+
self.qdir = qdir
7376

74-
def collect_tb(self, ARCHIVE_rp, qdir):
77+
def collect_tb(self):
7578
""" Collect tarballs that needs indexing"""
7679

7780
# find -L $ARCHIVE/*/$linksrc -name '*.tar.xz' -printf "%s\t%p\n" 2>/dev/null | sort -n > $list
7881
tarballs = []
7982
idxctx = self.idxctx
8083
try:
81-
tb_glob = os.path.join(ARCHIVE_rp, "*", self.linksrc, "*.tar.xz")
84+
tb_glob = os.path.join(self.archive, "*", self.linksrc, "*.tar.xz")
8285
for tb in glob.iglob(tb_glob):
8386
try:
8487
rp = Path(tb).resolve(strict=True)
8588
except OSError:
8689
idxctx.logger.warning("{} does not resolve to a real path", tb)
87-
quarantine(qdir, idxctx.logger, tb)
90+
quarantine(self.qdir, idxctx.logger, tb)
8891
continue
8992
controller_path = rp.parent
9093
controller = controller_path.name
9194
archive_path = controller_path.parent
92-
if str(archive_path) != str(ARCHIVE_rp):
95+
if str(archive_path) != str(self.archive):
9396
idxctx.logger.warning(
94-
"For tar ball {}, original home is not {}", tb, ARCHIVE_rp
97+
"For tar ball {}, original home is not {}", tb, self.archive
9598
)
96-
quarantine(qdir, idxctx.logger, tb)
99+
quarantine(self.qdir, idxctx.logger, tb)
97100
continue
98101
if not Path(f"{rp}.md5").is_file():
99102
idxctx.logger.warning("Missing .md5 file for {}", tb)
100-
quarantine(qdir, idxctx.logger, tb)
103+
quarantine(self.qdir, idxctx.logger, tb)
101104
# Audit should pick up missing .md5 file in ARCHIVE directory.
102105
continue
103106
try:
104107
# get size
105108
size = rp.stat().st_size
106109
except OSError:
107110
idxctx.logger.warning("Could not fetch tar ball size for {}", tb)
108-
quarantine(qdir, idxctx.logger, tb)
111+
quarantine(self.qdir, idxctx.logger, tb)
109112
# Audit should pick up missing .md5 file in ARCHIVE directory.
110113
continue
111114
else:
@@ -126,7 +129,7 @@ def collect_tb(self, ARCHIVE_rp, qdir):
126129
idxctx.logger.info("No tar balls found that need processing")
127130
return (0, [])
128131

129-
return (0, sorted(tarballs))
132+
return (0, tarballs)
130133

131134
def process_tb(self, tarballs):
132135
"""Process Tarballs For Indexing and creates report
@@ -137,9 +140,11 @@ def process_tb(self, tarballs):
137140
# We always process the smallest tar balls first.
138141
idxctx = self.idxctx
139142

143+
tb_deque = deque(sorted(tarballs))
144+
140145
# At this point, tarballs contains a list of tar balls sorted by size
141146
# that were available as symlinks in the various 'linksrc' directories.
142-
idxctx.logger.debug("Preparing to index {:d} tar balls", len(tarballs))
147+
idxctx.logger.debug("Preparing to index {:d} tar balls", len(tb_deque))
143148

144149
try:
145150
# Now that we are ready to begin the actual indexing step, ensure we
@@ -218,13 +223,22 @@ def process_tb(self, tarballs):
218223
def sigquit_handler(*args):
219224
sigquit_interrupt[0] = True
220225

226+
sighup_interrupt = [False]
227+
228+
def sighup_handler(*args):
229+
sighup_interrupt[0] = True
230+
221231
signal.signal(signal.SIGQUIT, sigquit_handler)
232+
signal.signal(signal.SIGHUP, sighup_handler)
233+
count_processed_tb = 0
222234

223235
try:
224-
for size, controller, tb in tarballs:
236+
while len(tb_deque) > 0:
237+
size, controller, tb = tb_deque.popleft()
225238
# Sanity check source tar ball path
226239
linksrc_dir = Path(tb).parent
227240
linksrc_dirname = linksrc_dir.name
241+
count_processed_tb += 1
228242
assert linksrc_dirname == self.linksrc, (
229243
f"Logic bomb! tar ball "
230244
f"path {tb} does not contain {self.linksrc}"
@@ -425,13 +439,33 @@ def sigquit_handler(*args):
425439

426440
if sigquit_interrupt[0]:
427441
break
442+
if sighup_interrupt[0]:
443+
status, new_tb = self.collect_tb()
444+
if status == 0:
445+
if not set(new_tb).issuperset(tb_deque):
446+
idxctx.logger.info(
447+
"Tarballs supposed to be in 'TO-INDEX' are no longer present",
448+
set(tb_deque).difference(new_tb),
449+
)
450+
tb_deque = deque(sorted(new_tb))
451+
idxctx.logger.info(
452+
"SIGHUP status (Current tar ball indexed: ({}), Remaining: {}, Completed: {}, Errors_encountered: {}, Status: {})",
453+
Path(tb).name,
454+
len(tb_deque),
455+
count_processed_tb,
456+
_count_lines(erred),
457+
tb_res,
458+
)
459+
sighup_interrupt[0] = False
460+
continue
428461
except SigTermException:
429462
idxctx.logger.exception(
430463
"Indexing interrupted by SIGQUIT, stop processing tarballs"
431464
)
432465
finally:
433-
# Turn off the SIGQUIT handler when not indexing.
466+
# Turn off the SIGQUIT and SIGHUP handler when not indexing.
434467
signal.signal(signal.SIGQUIT, signal.SIG_IGN)
468+
signal.signal(signal.SIGHUP, signal.SIG_IGN)
435469
except SigTermException:
436470
# Re-raise a SIGTERM to avoid it being lumped in with general
437471
# exception handling below.

server/bin/pbench-index.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,17 @@ def main(options, name):
105105
- Raises an exception caught be a new, outer-most, try/except
106106
block that does not have a finally clause (as you don't want
107107
any code execution in the finally block).
108+
109+
4. Re-evaluate the list of tar balls to index after indexing of the
110+
current tar ball has finished
111+
- SIGHUP
112+
- Report the current state of indexing
113+
- Current tar ball being processed
114+
- Count of Remaining tarballs
115+
- Count of Completed tarballs
116+
- No. of Errors encountered
117+
- Handler Behavior:
118+
- No exception raised
108119
"""
109120

110121
_name_suf = "-tool-data" if options.index_tool_data else ""
@@ -170,11 +181,11 @@ def main(options, name):
170181

171182
idxctx.logger.debug("{}.{}: starting", name, idxctx.TS)
172183

173-
idx = Index(name, options, idxctx, INCOMING_rp)
184+
index_obj = Index(name, options, idxctx, INCOMING_rp, ARCHIVE_rp, qdir)
174185

175-
status, tarballs = idx.collect_tb(ARCHIVE_rp, qdir)
186+
status, tarballs = index_obj.collect_tb()
176187
if status == 0 and tarballs:
177-
status = idx.process_tb(tarballs)
188+
status = index_obj.process_tb(tarballs)
178189

179190
return status
180191

@@ -236,6 +247,7 @@ def main(options, name):
236247
signal.signal(signal.SIGTERM, sigterm_handler)
237248
signal.signal(signal.SIGINT, signal.SIG_IGN)
238249
signal.signal(signal.SIGQUIT, signal.SIG_IGN)
250+
signal.signal(signal.SIGHUP, signal.SIG_IGN)
239251
status = main(parsed, run_name)
240252
except SigTermException:
241253
# If a SIGTERM escapes the main indexing function, silently set our

0 commit comments

Comments
 (0)