Skip to content

Commit 21126a3

Browse files
committed
python: fix handling of base64 output in JobWatcher
Problem: If there is binary output, encoded in base64 in the job output eventlog, the JobWatcher class and thus flux-watch(1) throws errors about bytes not being encodable as JSON. First, ensure that base64 encoded output is decoded to utf-8 with surrogates, so that the data can be encoded back to binary without data loss. Then, in the JobWatcher class, ensure all output streams are capable of handling the surrogates (and utf-8) by reconfiguring them (in python 3.7+) or reopening them with the proper error handling. Fixes #5702
1 parent 8d24e94 commit 21126a3

File tree

2 files changed

+23
-5
lines changed

2 files changed

+23
-5
lines changed

src/bindings/python/flux/job/output.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ def __init__(self, entry, labelio=False):
113113
data = self.context["data"]
114114
if "encoding" in self.context:
115115
if self.context["encoding"] == "base64":
116-
data = base64.b64decode(data)
116+
data = base64.b64decode(data).decode(
117+
"utf-8", errors="surrogateescape"
118+
)
117119
if "repeat" in self.context:
118120
data *= self.context["repeat"]
119121
if labelio:

src/bindings/python/flux/job/watcher.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,8 @@ def __init__(
367367
self.t0 = starttime
368368
self.log_events = log_events
369369
self.log_status = log_status
370-
self.stdout = stdout
371-
self.stderr = stderr
370+
self.stdout = self._reopen(stdout)
371+
self.stderr = self._reopen(stderr)
372372
self.labelio = labelio
373373
self.exitcode = 0
374374
self.show_progress = progress
@@ -379,6 +379,22 @@ def __init__(
379379
if jobs:
380380
self.add_jobs(*jobs)
381381

382+
@staticmethod
383+
def _reopen(stream):
384+
"""reconfigure/reopen stream with correct encoding and error handling"""
385+
try:
386+
# reconfigure() only available in >=3.7
387+
stream.reconfigure(encoding="utf-8", errors="surrogateescape")
388+
return stream
389+
except AttributeError:
390+
return open(
391+
stream.fileno(),
392+
mode="w",
393+
encoding="utf-8",
394+
errors="surrogateescape",
395+
closefd=False,
396+
)
397+
382398
@staticmethod
383399
def _status_to_exitcode(status):
384400
"""Calculate exitcode from job status"""
@@ -420,9 +436,9 @@ def add_jobs(self, *jobs, stdout=None, stderr=None, wait="clean"):
420436
self.progress.add_jobs(*jobs)
421437

422438
if stdout is None:
423-
stdout = self.stdout
439+
stdout = self._reopen(self.stdout)
424440
if stderr is None:
425-
stderr = self.stderr
441+
stderr = self._reopen(self.stderr)
426442

427443
for job in jobs:
428444
if not self.t0 or job.t_submit < self.t0:

0 commit comments

Comments
 (0)