Skip to content

Commit 3f139df

Browse files
authored
Merge pull request #5704 from grondo/issue#5702
python: fix `flux-watch: TypeError: Object of type 'bytes' is not JSON serializable`
2 parents 8d24e94 + 3bbe90f commit 3f139df

File tree

3 files changed

+30
-5
lines changed

3 files changed

+30
-5
lines changed

src/bindings/python/flux/job/output.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,9 @@ def __init__(self, entry, labelio=False):
113113
data = self.context["data"]
114114
if "encoding" in self.context:
115115
if self.context["encoding"] == "base64":
116-
data = base64.b64decode(data)
116+
data = base64.b64decode(data).decode(
117+
"utf-8", errors="surrogateescape"
118+
)
117119
if "repeat" in self.context:
118120
data *= self.context["repeat"]
119121
if labelio:

src/bindings/python/flux/job/watcher.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,8 @@ def __init__(
367367
self.t0 = starttime
368368
self.log_events = log_events
369369
self.log_status = log_status
370-
self.stdout = stdout
371-
self.stderr = stderr
370+
self.stdout = self._reopen(stdout)
371+
self.stderr = self._reopen(stderr)
372372
self.labelio = labelio
373373
self.exitcode = 0
374374
self.show_progress = progress
@@ -379,6 +379,22 @@ def __init__(
379379
if jobs:
380380
self.add_jobs(*jobs)
381381

382+
@staticmethod
383+
def _reopen(stream):
384+
"""reconfigure/reopen stream with correct encoding and error handling"""
385+
try:
386+
# reconfigure() only available in >=3.7
387+
stream.reconfigure(encoding="utf-8", errors="surrogateescape")
388+
return stream
389+
except AttributeError:
390+
return open(
391+
stream.fileno(),
392+
mode="w",
393+
encoding="utf-8",
394+
errors="surrogateescape",
395+
closefd=False,
396+
)
397+
382398
@staticmethod
383399
def _status_to_exitcode(status):
384400
"""Calculate exitcode from job status"""
@@ -420,9 +436,9 @@ def add_jobs(self, *jobs, stdout=None, stderr=None, wait="clean"):
420436
self.progress.add_jobs(*jobs)
421437

422438
if stdout is None:
423-
stdout = self.stdout
439+
stdout = self._reopen(self.stdout)
424440
if stderr is None:
425-
stderr = self.stderr
441+
stderr = self._reopen(self.stderr)
426442

427443
for job in jobs:
428444
if not self.t0 or job.t_submit < self.t0:

t/t2813-flux-watch.t

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,4 +130,11 @@ test_expect_success 'flux-watch: --filter works' '
130130
test_debug "cat failed.out" &&
131131
grep "Watching ${nfailed} job" failed.out
132132
'
133+
test_expect_success 'flux-watch: handles binary data' '
134+
id=$(flux submit dd if=/dev/urandom count=1) &&
135+
test_debug "flux job eventlog -p guest.output -HL $id" &&
136+
flux job attach $id >binary.expected &&
137+
flux watch $id >binary.output &&
138+
test_cmp binary.expected binary.output
139+
'
133140
test_done

0 commit comments

Comments
 (0)