Skip to content

Commit ae06ddf

Browse files
committed
shell: truncate KVS output above a threshold of 10MB
Problem: A job that writes a huge amount of data to the KVS can wreak havoc when the output eventlog is read back since the KVS has to reassemble the eventlog into one big value to return to the user or job-info service. Limit the maximum amount of data a job can write to the KVS in the shell output plugin to add a first-level protection against this issue. An arbitrary limit of 10MB is chosen, after which output for the given stream is trunctated. A warning is issued at the time of truncation and also at the end of the job, e.g. flux-shell[0]: WARN: output: stdout will be truncated, 10MB limit exceeded flux-shell[0]: WARN: output: stdout: 5514240 bytes truncated
1 parent 425c679 commit ae06ddf

File tree

1 file changed

+67
-6
lines changed

1 file changed

+67
-6
lines changed

src/shell/output.c

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
#include "builtins.h"
6363
#include "log.h"
6464

65+
#define OUTPUT_LIMIT_BYTES 1024*1024*10
66+
#define OUTPUT_LIMIT_STRING "10MB"
67+
6568
enum {
6669
FLUX_OUTPUT_TYPE_TERM = 1,
6770
FLUX_OUTPUT_TYPE_KVS = 2,
@@ -88,6 +91,8 @@ struct shell_output {
8891
bool stopped;
8992
int stdout_type;
9093
int stderr_type;
94+
size_t stdout_bytes;
95+
size_t stderr_bytes;
9196
struct shell_output_type_file stdout_file;
9297
struct shell_output_type_file stderr_file;
9398
zhash_t *fds;
@@ -298,36 +303,92 @@ static int shell_output_kvs_init (struct shell_output *out, json_t *header)
298303
return rc;
299304
}
300305

301-
static int entry_output_is_kvs (struct shell_output *out, json_t *entry)
306+
/* Return true if entry is a kvs destination, false otherwise.
307+
* If true, then then stream and len will be set to the stream and
308+
* length of data in this entry.
309+
*/
310+
static bool entry_output_is_kvs (struct shell_output *out,
311+
json_t *entry,
312+
bool *stdoutp,
313+
int *lenp,
314+
bool *eofp)
302315
{
303316
json_t *context;
304317
const char *name;
305318
const char *stream;
319+
306320
if (eventlog_entry_parse (entry, NULL, &name, &context) < 0) {
307321
shell_log_errno ("eventlog_entry_parse");
308322
return 0;
309323
}
310324
if (!strcmp (name, "data")) {
311-
if (iodecode (context, &stream, NULL, NULL, NULL, NULL) < 0) {
325+
if (iodecode (context, &stream, NULL, NULL, lenp, eofp) < 0) {
312326
shell_log_errno ("iodecode");
313327
return 0;
314328
}
315-
if (!strcmp (stream, "stdout"))
329+
if ((*stdoutp = !strcmp (stream, "stdout")))
316330
return (out->stdout_type == FLUX_OUTPUT_TYPE_KVS);
317331
else
318332
return (out->stderr_type == FLUX_OUTPUT_TYPE_KVS);
319333
}
320334
return 0;
321335
}
322336

337+
static bool check_kvs_output_limit (struct shell_output *out,
338+
bool is_stdout,
339+
int len)
340+
{
341+
const char *stream;
342+
size_t *bytesp;
343+
size_t prev;
344+
345+
if (is_stdout) {
346+
stream = "stdout";
347+
bytesp = &out->stdout_bytes;
348+
}
349+
else {
350+
stream = "stderr";
351+
bytesp = &out->stderr_bytes;
352+
}
353+
354+
prev = *bytesp;
355+
*bytesp += len;
356+
357+
if (*bytesp > OUTPUT_LIMIT_BYTES) {
358+
/* Only log an error when the threshold is reached.
359+
*/
360+
if (prev <= OUTPUT_LIMIT_BYTES)
361+
shell_warn ("%s will be truncated, %s limit exceeded",
362+
stream,
363+
OUTPUT_LIMIT_STRING);
364+
return true;
365+
}
366+
return false;
367+
}
368+
323369
static int shell_output_kvs (struct shell_output *out)
324370
{
325371
json_t *entry;
326372
size_t index;
373+
bool is_stdout;
374+
int len;
375+
bool eof;
376+
327377
json_array_foreach (out->output, index, entry) {
328-
if (entry_output_is_kvs (out, entry) &&
329-
eventlogger_append_entry (out->ev, 0, "output", entry) < 0) {
330-
return shell_log_errno ("eventlogger_append");
378+
if (entry_output_is_kvs (out, entry, &is_stdout, &len, &eof)) {
379+
bool truncate = check_kvs_output_limit (out, is_stdout, len);
380+
if (!truncate || eof) {
381+
if (eventlogger_append_entry (out->ev, 0, "output", entry) < 0)
382+
return shell_log_errno ("eventlogger_append");
383+
}
384+
if (eof && truncate) {
385+
size_t total = is_stdout ?
386+
out->stdout_bytes : out->stderr_bytes;
387+
shell_warn ("%s: %zu of %zu bytes truncated",
388+
is_stdout ? "stdout" : "stderr",
389+
total - OUTPUT_LIMIT_BYTES,
390+
total);
391+
}
331392
}
332393
}
333394
return 0;

0 commit comments

Comments
 (0)