Skip to content

Commit 6d95773

Browse files
authored
Merge pull request #5155 from grondo/output-truncate
shell: truncate output to KVS after 10MB
2 parents 7083ce3 + 0318236 commit 6d95773

File tree

5 files changed

+117
-16
lines changed

5 files changed

+117
-16
lines changed

src/common/libioencode/ioencode.c

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,11 +102,16 @@ static int decode_data_base64 (char *src,
102102
{
103103
ssize_t rc;
104104
size_t size = base64_decoded_length (srclen);
105-
if (!(*datap = malloc (size)))
106-
return -1;
107-
if ((rc = base64_decode (*datap, size, src, srclen)) < 0)
108-
return -1;
109-
*lenp = rc;
105+
if (datap) {
106+
if (!(*datap = malloc (size)))
107+
return -1;
108+
if ((rc = base64_decode (*datap, size, src, srclen)) < 0)
109+
return -1;
110+
if (lenp)
111+
*lenp = rc;
112+
}
113+
else if (lenp)
114+
*lenp = size;
110115
return 0;
111116
}
112117

@@ -154,18 +159,21 @@ int iodecode (json_t *o,
154159
(*streamp) = stream;
155160
if (rankp)
156161
(*rankp) = rank;
157-
if (datap) {
158-
*datap = NULL;
162+
if (datap || lenp) {
163+
if (datap)
164+
*datap = NULL;
159165
if (data) {
160166
if (encoding && strcmp (encoding, "base64") == 0) {
161167
if (decode_data_base64 (data, len, datap, &bin_len) < 0)
162168
goto cleanup;
163169
}
164170
else {
165171
bin_len = len;
166-
if (!(*datap = malloc (bin_len)))
167-
goto cleanup;
168-
memcpy (*datap, data, bin_len);
172+
if (datap) {
173+
if (!(*datap = malloc (bin_len)))
174+
goto cleanup;
175+
memcpy (*datap, data, bin_len);
176+
}
169177
}
170178
}
171179
}

src/common/libioencode/ioencode.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ json_t *ioencode (const char *stream,
2929
* - both data and EOF can be available
3030
* - if no data available, data set to NULL and len to 0
3131
* - data must be freed after return
32+
* - data can be NULL and len non-NULL to retrieve data length
3233
*/
3334
int iodecode (json_t *o,
3435
const char **stream,

src/common/libioencode/test/ioencode.c

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,14 @@ void basic (void)
6262
&& eof == true,
6363
"iodecode returned correct info");
6464
free (data);
65+
66+
ok (iodecode (o, &stream, &rank, NULL, &len, &eof) == 0,
67+
"iodecode can be passed NULL data to query len");
68+
ok (!strcmp (stream, "stdout")
69+
&& !strcmp (rank, "[0-8]")
70+
&& len == 3
71+
&& eof == true,
72+
"iodecode returned correct info");
6573
json_decref (o);
6674

6775
ok ((o = ioencode ("stderr", "[4,5]", NULL, 0, true)) != NULL,
@@ -75,6 +83,14 @@ void basic (void)
7583
&& eof == true,
7684
"iodecode returned correct info");
7785
free (data);
86+
87+
ok (iodecode (o, &stream, &rank, NULL, &len, &eof) == 0,
88+
"iodecode can be passed NULL data to query len");
89+
ok (!strcmp (stream, "stderr")
90+
&& !strcmp (rank, "[4,5]")
91+
&& len == 0
92+
&& eof == true,
93+
"iodecode returned correct info");
7894
json_decref (o);
7995
}
8096

@@ -108,6 +124,10 @@ static void binary_data (void)
108124
ok (memcmp (data, buffer, len) == 0,
109125
"data matches");
110126
free (data);
127+
ok (iodecode (o, &stream, &rank, NULL, &len, &eof) == 0,
128+
"iodecode can be passed NULL data to query len");
129+
ok (len == sizeof (buffer),
130+
"len is correct");
111131
json_decref (o);
112132
}
113133

src/shell/output.c

Lines changed: 67 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,9 @@
6262
#include "builtins.h"
6363
#include "log.h"
6464

65+
#define OUTPUT_LIMIT_BYTES 1024*1024*10
66+
#define OUTPUT_LIMIT_STRING "10MB"
67+
6568
enum {
6669
FLUX_OUTPUT_TYPE_TERM = 1,
6770
FLUX_OUTPUT_TYPE_KVS = 2,
@@ -88,6 +91,8 @@ struct shell_output {
8891
bool stopped;
8992
int stdout_type;
9093
int stderr_type;
94+
size_t stdout_bytes;
95+
size_t stderr_bytes;
9196
struct shell_output_type_file stdout_file;
9297
struct shell_output_type_file stderr_file;
9398
zhash_t *fds;
@@ -298,36 +303,92 @@ static int shell_output_kvs_init (struct shell_output *out, json_t *header)
298303
return rc;
299304
}
300305

301-
static int entry_output_is_kvs (struct shell_output *out, json_t *entry)
306+
/* Return true if entry is a kvs destination, false otherwise.
307+
* If true, then then stream and len will be set to the stream and
308+
* length of data in this entry.
309+
*/
310+
static bool entry_output_is_kvs (struct shell_output *out,
311+
json_t *entry,
312+
bool *stdoutp,
313+
int *lenp,
314+
bool *eofp)
302315
{
303316
json_t *context;
304317
const char *name;
305318
const char *stream;
319+
306320
if (eventlog_entry_parse (entry, NULL, &name, &context) < 0) {
307321
shell_log_errno ("eventlog_entry_parse");
308322
return 0;
309323
}
310324
if (!strcmp (name, "data")) {
311-
if (iodecode (context, &stream, NULL, NULL, NULL, NULL) < 0) {
325+
if (iodecode (context, &stream, NULL, NULL, lenp, eofp) < 0) {
312326
shell_log_errno ("iodecode");
313327
return 0;
314328
}
315-
if (!strcmp (stream, "stdout"))
329+
if ((*stdoutp = !strcmp (stream, "stdout")))
316330
return (out->stdout_type == FLUX_OUTPUT_TYPE_KVS);
317331
else
318332
return (out->stderr_type == FLUX_OUTPUT_TYPE_KVS);
319333
}
320334
return 0;
321335
}
322336

337+
static bool check_kvs_output_limit (struct shell_output *out,
338+
bool is_stdout,
339+
int len)
340+
{
341+
const char *stream;
342+
size_t *bytesp;
343+
size_t prev;
344+
345+
if (is_stdout) {
346+
stream = "stdout";
347+
bytesp = &out->stdout_bytes;
348+
}
349+
else {
350+
stream = "stderr";
351+
bytesp = &out->stderr_bytes;
352+
}
353+
354+
prev = *bytesp;
355+
*bytesp += len;
356+
357+
if (*bytesp > OUTPUT_LIMIT_BYTES) {
358+
/* Only log an error when the threshold is reached.
359+
*/
360+
if (prev <= OUTPUT_LIMIT_BYTES)
361+
shell_warn ("%s will be truncated, %s limit exceeded",
362+
stream,
363+
OUTPUT_LIMIT_STRING);
364+
return true;
365+
}
366+
return false;
367+
}
368+
323369
static int shell_output_kvs (struct shell_output *out)
324370
{
325371
json_t *entry;
326372
size_t index;
373+
bool is_stdout;
374+
int len;
375+
bool eof;
376+
327377
json_array_foreach (out->output, index, entry) {
328-
if (entry_output_is_kvs (out, entry) &&
329-
eventlogger_append_entry (out->ev, 0, "output", entry) < 0) {
330-
return shell_log_errno ("eventlogger_append");
378+
if (entry_output_is_kvs (out, entry, &is_stdout, &len, &eof)) {
379+
bool truncate = check_kvs_output_limit (out, is_stdout, len);
380+
if (!truncate || eof) {
381+
if (eventlogger_append_entry (out->ev, 0, "output", entry) < 0)
382+
return shell_log_errno ("eventlogger_append");
383+
}
384+
if (eof && truncate) {
385+
size_t total = is_stdout ?
386+
out->stdout_bytes : out->stderr_bytes;
387+
shell_warn ("%s: %zu of %zu bytes truncated",
388+
is_stdout ? "stdout" : "stderr",
389+
total - OUTPUT_LIMIT_BYTES,
390+
total);
391+
}
331392
}
332393
}
333394
return 0;

t/t2606-job-shell-output-redirection.t

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,4 +348,15 @@ test_expect_success 'job-shell: shell errors are captured in error file' '
348348
test_expect_code 127 flux run --error=test.err nosuchcommand &&
349349
grep "nosuchcommand: No such file or directory" test.err
350350
'
351+
test_expect_success LONGTEST 'job-shell: output to kvs is truncated at 10MB' '
352+
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected &&
353+
flux run cat expected >output 2>truncate.error &&
354+
test_debug "cat truncate.error" &&
355+
grep "stdout.*truncated" truncate.error
356+
'
357+
test_expect_success LONGTEST 'job-shell: stderr to kvs is truncated at 10MB' '
358+
dd if=/dev/urandom bs=10240 count=800 | base64 --wrap 79 >expected &&
359+
flux run sh -c "cat expected >&2" >truncate2.error 2>&1 &&
360+
grep "stderr.*truncated" truncate2.error
361+
'
351362
test_done

0 commit comments

Comments
 (0)