Skip to content

Commit 267f8b3

Browse files
authored
Merge pull request #3523 from vkarak/bugfix/lock-filelog-append
[bugfix] Introduce optional file locking for appending to perflogs
2 parents b9ee96b + 32a6eef commit 267f8b3

File tree

10 files changed

+222
-88
lines changed

10 files changed

+222
-88
lines changed

docs/config_reference.rst

Lines changed: 70 additions & 46 deletions
Large diffs are not rendered by default.

docs/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ archspec==0.2.5
22
ClusterShell==1.9.3
33
docutils==0.18.1; python_version < '3.9'
44
docutils==0.21.2; python_version >= '3.9'
5+
fasteners==0.19
56
jinja2==3.0.3; python_version == '3.6'
67
jinja2==3.1.6; python_version >= '3.7'
78
jsonschema==3.2.0

reframe/core/logging.py

Lines changed: 76 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
# SPDX-License-Identifier: BSD-3-Clause
55

66
import abc
7+
import atexit
78
import itertools
89
import logging
910
import logging.handlers
@@ -149,7 +150,8 @@ class MultiFileHandler(logging.FileHandler):
149150
'''
150151

151152
def __init__(self, prefix, mode='a', encoding=None, fmt=None,
152-
perffmt=None, ignore_keys=None):
153+
perffmt=None, ignore_keys=None, use_locking=False,
154+
lockfile_mode=None):
153155
super().__init__(prefix, mode, encoding, delay=True)
154156

155157
# Reset FileHandler's filename
@@ -164,6 +166,9 @@ def __init__(self, prefix, mode='a', encoding=None, fmt=None,
164166
self.__perffmt = perffmt
165167
self.__attr_patt = re.compile(r'\%\((.*?)\)s(.*?(?=%|$))?')
166168
self.__ignore_keys = set(ignore_keys) if ignore_keys else set()
169+
self.__use_locking = use_locking
170+
self.__lockfile_mode = lockfile_mode
171+
self.__locks = {}
167172

168173
def __generate_header(self, record):
169174
# Generate the header from the record and fmt
@@ -208,6 +213,14 @@ def __generate_header(self, record):
208213

209214
return header
210215

216+
def __lock_file_name(self, logfile=None):
217+
if logfile is None:
218+
logfile = self.baseFilename
219+
220+
prefix = os.path.dirname(logfile)
221+
basename, _ = os.path.splitext(os.path.basename(logfile))
222+
return os.path.join(prefix, f'.{basename}.lock')
223+
211224
def _emit_header(self, record):
212225
if self.baseFilename in self.__streams:
213226
return
@@ -234,13 +247,27 @@ def _emit_header(self, record):
234247

235248
os.rename(self.baseFilename, self.baseFilename + f'.h{hcnt}')
236249
finally:
237-
# Open the file for writing and write the header
238-
fp = open(self.baseFilename,
239-
mode=self.mode, encoding=self.encoding)
240-
if record_header != header:
241-
fp.write(f'{record_header}\n')
250+
if self.__use_locking:
251+
# When using locking, we need to open, append and write to
252+
# the file at once
253+
rwlock = osext.ReadWriteFileLock(self.__lock_file_name(),
254+
self.__lockfile_mode)
255+
with rwlock.write_lock():
256+
with open(self.baseFilename, mode=self.mode,
257+
encoding=self.encoding) as fp:
258+
if record_header != header:
259+
fp.write(f'{record_header}\n')
260+
261+
self.__streams[self.baseFilename] = None
262+
self.__locks[self.baseFilename] = rwlock
263+
else:
264+
# Open the file for writing and write the header
265+
fp = open(self.baseFilename,
266+
mode=self.mode, encoding=self.encoding)
267+
if record_header != header:
268+
fp.write(f'{record_header}\n')
242269

243-
self.__streams[self.baseFilename] = fp
270+
self.__streams[self.baseFilename] = fp
244271

245272
def emit(self, record):
246273
try:
@@ -255,14 +282,22 @@ def emit(self, record):
255282
check_basename = type(record.__rfm_check__).variant_name()
256283
self.baseFilename = os.path.join(dirname, f'{check_basename}.log')
257284
self._emit_header(record)
258-
self.stream = self.__streams[self.baseFilename]
259-
super().emit(record)
285+
if self.__use_locking:
286+
with self.__locks[self.baseFilename].write_lock():
287+
with open(self.baseFilename, mode=self.mode,
288+
encoding=self.encoding) as fp:
289+
self.stream = fp
290+
super().emit(record)
291+
else:
292+
self.stream = self.__streams[self.baseFilename]
293+
super().emit(record)
260294

261295
def close(self):
262296
# Close all open streams
263-
for s in self.__streams.values():
264-
self.stream = s
265-
super().close()
297+
for stream in self.__streams.values():
298+
if stream:
299+
self.stream = stream
300+
super().close()
266301

267302

268303
def _format_time_rfc3339(timestamp, datefmt):
@@ -459,9 +494,16 @@ def _create_filelog_handler(site_config, config_prefix):
459494
format = site_config.get(f'{config_prefix}/format')
460495
format_perf = site_config.get(f'{config_prefix}/format_perfvars')
461496
ignore_keys = site_config.get(f'{config_prefix}/ignore_keys')
497+
use_locking = site_config.get(f'{config_prefix}/locking_enable')
498+
lockfile_mode = site_config.get(f'{config_prefix}/locking_file_mode')
499+
if lockfile_mode is not None:
500+
lockfile_mode = int(lockfile_mode, base=8)
501+
462502
return MultiFileHandler(filename_patt, mode='a+' if append else 'w+',
463503
fmt=format, perffmt=format_perf,
464-
ignore_keys=ignore_keys)
504+
ignore_keys=ignore_keys,
505+
use_locking=use_locking,
506+
lockfile_mode=lockfile_mode)
465507

466508

467509
@register_log_handler('syslog')
@@ -806,6 +848,13 @@ def debug(self, message, *args, **kwargs):
806848
def debug2(self, message, *args, **kwargs):
807849
self.log(DEBUG2, message, *args, **kwargs)
808850

851+
def shutdown(self):
852+
'''Shutdown logger by removing all handlers and closing any files.'''
853+
854+
for h in list(self.handlers):
855+
h.close()
856+
self.removeHandler(h)
857+
809858

810859
# This is a cache for warnings that we don't want to repeat
811860
_WARN_ONCE = set()
@@ -1027,6 +1076,8 @@ def configure_logging(site_config):
10271076
_context_logger = null_logger
10281077
return
10291078

1079+
# Shutdown the previously setup loggers and close any files
1080+
shutdown()
10301081
_logger = _create_logger(site_config, 'handlers$', 'handlers')
10311082
_perf_logger = _create_logger(site_config, 'handlers_perflog')
10321083
_context_logger = LoggerAdapter(_logger)
@@ -1079,6 +1130,17 @@ def _fn(*args, **kwargs):
10791130
return _fn
10801131

10811132

1133+
@atexit.register
1134+
def shutdown():
1135+
'''Shutdown logging'''
1136+
1137+
if _logger:
1138+
_logger.shutdown()
1139+
1140+
if _perf_logger:
1141+
_perf_logger.shutdown()
1142+
1143+
10821144
# The following is meant to be used only by the unit tests
10831145

10841146
class logging_sandbox:
@@ -1098,6 +1160,7 @@ def __enter__(self):
10981160
def __exit__(self, exc_type, exc_value, traceback):
10991161
global _logger, _perf_logger, _context_logger
11001162

1163+
shutdown()
11011164
_logger = self._logger
11021165
_perf_logger = self._perf_logger
11031166
_context_logger = self._context_logger

reframe/frontend/reporting/storage.py

Lines changed: 6 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@
99
import os
1010
import re
1111
import sqlite3
12-
import sys
13-
from filelock import FileLock
1412

1513
import reframe.utility.jsonext as jsonext
1614
import reframe.utility.osext as osext
@@ -88,6 +86,11 @@ def __init__(self):
8886
else:
8987
self.__db_file_mode = mode
9088

89+
self.__db_lock = osext.ReadWriteFileLock(
90+
os.path.join(os.path.dirname(self.__db_file), '.db.lock'),
91+
self.__db_file_mode
92+
)
93+
9194
def _db_file(self):
9295
prefix = os.path.dirname(self.__db_file)
9396
if not os.path.exists(self.__db_file):
@@ -124,22 +127,7 @@ def _db_connect(self, *args, **kwargs):
124127
return sqlite3.connect(*args, **kwargs)
125128

126129
def _db_lock(self):
127-
prefix = os.path.dirname(self.__db_file)
128-
if sys.version_info >= (3, 7):
129-
kwargs = {'mode': self.__db_file_mode}
130-
else:
131-
# Python 3.6 forces us to use an older filelock version that does
132-
# not support file modes. File modes where introduced in
133-
# filelock 3.10
134-
kwargs = {}
135-
136-
# Create parent directories of the lock file
137-
#
138-
# NOTE: This is not necessary for filelock >= 3.12.3 and Python >= 3.8
139-
# However, we do create it here, in order to support the older Python
140-
# versions.
141-
os.makedirs(prefix, exist_ok=True)
142-
return FileLock(os.path.join(prefix, '.db.lock'), **kwargs)
130+
return self.__db_lock.write_lock()
143131

144132
def _db_create(self):
145133
clsname = type(self).__name__

reframe/schemas/config.json

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@
8585
"ignore_keys": {
8686
"type": "array",
8787
"items": {"type": "string"}
88-
}
88+
},
89+
"locking_enable": {"type": "boolean"},
90+
"locking_file_mode": {"type": ["string", "null"]}
8991
},
9092
"required": ["prefix"]
9193
}
@@ -633,6 +635,8 @@
633635
"logging/handlers_perflog/filelog_append": true,
634636
"logging/handlers_perflog/filelog_basedir": "./perflogs",
635637
"logging/handlers_perflog/filelog_ignore_keys": [],
638+
"logging/handlers_perflog/filelog_locking_enable": false,
639+
"logging/handlers_perflog/filelog_locking_file_mode": null,
636640
"logging/handlers_perflog/graylog_extras": {},
637641
"logging/handlers_perflog/httpjson_extras": {},
638642
"logging/handlers_perflog/httpjson_ignore_keys": [],

reframe/utility/osext.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import collections.abc
1111
import errno
12+
import fasteners
1213
import getpass
1314
import grp
1415
import os
@@ -863,6 +864,36 @@ def unique_abs_paths(paths, prune_children=True):
863864
return list(unique_paths - children)
864865

865866

867+
class temp_umask:
868+
'''Temporarily change the umask'''
869+
def __init__(self, mask):
870+
self.__new_mask = mask
871+
self.__old_mask = None
872+
873+
def __enter__(self):
874+
self.__old_mask = os.umask(self.__new_mask)
875+
876+
def __exit__(self, exc_type, exc_val, exc_tb):
877+
os.umask(self.__old_mask)
878+
879+
880+
class ReadWriteFileLock(fasteners.InterProcessReaderWriterLock):
881+
def __init__(self, path, mode=None):
882+
super().__init__(path)
883+
self._mode = mode
884+
885+
def _do_open(self, *args, **kwargs):
886+
if self._mode is not None:
887+
# We create the directory structure ourselves, so that the mask
888+
# applies strictly to the lock file if the parent directories do
889+
# not exist
890+
os.makedirs(os.path.dirname(self.path), exist_ok=True)
891+
with temp_umask(0o0777 ^ self._mode):
892+
return super()._do_open(*args, **kwargs)
893+
else:
894+
return super()._do_open(*args, **kwargs)
895+
896+
866897
def cray_cdt_version():
867898
'''Return either the Cray Development Toolkit (CDT) version, the Cray
868899
Programming Environment (CPE) version or :class:`None` if the version

requirements.txt

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,7 @@ archspec==0.2.5
22
argcomplete==3.1.2; python_version < '3.8'
33
argcomplete==3.6.1; python_version >= '3.8'
44
ClusterShell==1.9.3
5-
filelock==3.4.1; python_version == '3.6'
6-
filelock==3.12.2; python_version == '3.7'
7-
filelock==3.16.1; python_version == '3.8'
8-
filelock==3.18.0; python_version > '3.8'
5+
fasteners==0.19
96
importlib_metadata==4.0.1; python_version < '3.8'
107
jinja2==3.0.3; python_version == '3.6'
118
jinja2==3.1.6; python_version >= '3.7'

setup.cfg

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,7 @@ install_requires =
3131
argcomplete
3232
argcomplete <= 3.1.2; python_version < '3.8'
3333
ClusterShell
34-
filelock
35-
filelock<=3.16.1; python_version == '3.8'
36-
filelock<=3.12.2; python_version == '3.7'
37-
filelock<=3.4.1; python_version == '3.6'
34+
fasteners
3835
jinja2==3.0.3; python_version == '3.6'
3936
jinja2
4037
jsonschema

unittests/test_logging.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ def handler(logfile, rfc3339formatter):
6969
def logger(handler):
7070
logger = rlog.Logger('reframe')
7171
logger.addHandler(handler)
72-
return logger
72+
yield logger
73+
logger.shutdown()
7374

7475

7576
@pytest.fixture
@@ -545,7 +546,7 @@ def url_scheme(request):
545546

546547
def test_httpjson_handler_no_port(make_exec_ctx, config_file,
547548
url_scheme, logging_sandbox):
548-
ctx = make_exec_ctx(
549+
make_exec_ctx(
549550
config_file({
550551
'level': 'info',
551552
'handlers_perflog': [{

unittests/test_perflogging.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -465,3 +465,31 @@ def validate(self):
465465

466466
assert len(lines) == 2
467467
assert lines[1] == 'fail|sanity error: no way|None|None\n'
468+
469+
470+
def test_perf_logging_locking(make_runner, make_exec_ctx,
471+
config_perflog, perf_test, tmp_path):
472+
make_exec_ctx(config_perflog(
473+
fmt='',
474+
logging_opts={
475+
'handlers_perflog': [{
476+
'type': 'filelog',
477+
'use_locking': True,
478+
'prefix': '%(check_system)s/%(check_partition)s',
479+
'level': 'info',
480+
'format': (
481+
'%(check_job_completion_time)s,%(version)s,'
482+
'%(check_display_name)s,%(check_system)s,'
483+
'%(check_partition)s,%(check_environ)s,'
484+
'%(check_jobid)s,%(check_result)s,%(check_perfvalues)s'
485+
)
486+
}]
487+
}
488+
))
489+
logging.configure_logging(rt.runtime().site_config)
490+
runner = make_runner()
491+
testcases = executors.generate_testcases([perf_test])
492+
_assert_no_logging_error(runner.runall, testcases)
493+
logfile = tmp_path / 'perflogs' / 'generic' / 'default' / '_MyPerfTest.log'
494+
assert os.path.exists(logfile)
495+
assert _count_lines(logfile) == 2

0 commit comments

Comments
 (0)