Skip to content

Commit bb9b2f3

Browse files
committed
Implement proposed k8s-stream-file format
Instead of parsing the contents of the data read from the `stdout` and `stderr` pipes, this commit adds support for a "stream" format, named `k8s-stream-file`, which just records what is read from a pipe to disk. It significantly saves on CPU spend processing the buffer read, uses only 2 I/O vectors, and never touches the memory read from the pipe. This is an updated implementation of cri-o/cri-o#1605.
1 parent 80b5835 commit bb9b2f3

File tree

1 file changed

+136
-5
lines changed

1 file changed

+136
-5
lines changed

src/ctr_logging.c

Lines changed: 136 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,10 +28,12 @@ static inline int sd_journal_sendv(G_GNUC_UNUSED const struct iovec *iov, G_GNUC
2828
/* Different types of container logging */
2929
static gboolean use_journald_logging = FALSE;
3030
static gboolean use_k8s_logging = FALSE;
31+
static gboolean use_k8s_stream_logging = FALSE;
3132
static gboolean use_logging_passthrough = FALSE;
3233

3334
/* Value the user must input for each log driver */
3435
static const char *const K8S_FILE_STRING = "k8s-file";
36+
static const char *const K8S_STREAM_FILE_STRING = "k8s-stream-file";
3537
static const char *const JOURNALD_FILE_STRING = "journald";
3638

3739
/* Max log size for any log file types */
@@ -86,6 +88,7 @@ static void parse_log_path(char *log_config);
8688
static const char *stdpipe_name(stdpipe_t pipe);
8789
static int write_journald(int pipe, char *buf, ssize_t num_read);
8890
static int write_k8s_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
91+
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen);
8992
static bool get_line_len(ptrdiff_t *line_len, const char *buf, ssize_t buflen);
9093
static ssize_t writev_buffer_append_segment(int fd, writev_buffer_t *buf, const void *data, ssize_t len);
9194
static ssize_t writev_buffer_append_segment_no_flush(writev_buffer_t *buf, const void *data, ssize_t len);
@@ -239,9 +242,10 @@ void configure_log_drivers(gchar **log_drivers, int64_t log_size_max_, int64_t l
239242
* parse_log_path branches on log driver type the user inputted.
240243
* log_config will either be a ':' delimited string containing:
241244
* <DRIVER_NAME>:<PATH_NAME> or <PATH_NAME>
242-
* in the case of no colon, the driver will be kubernetes-log-file,
245+
* in the case the log driver is 'k8s-stream-file', the <PATH_NAME> must be present.
246+
* in the case of no colon, the driver will be k8s-file,
243247
* in the case the log driver is 'journald', the <PATH_NAME> is ignored.
244-
* exits with error if <DRIVER_NAME> isn't 'journald' or 'kubernetes-log-file'
248+
* exits with error if <DRIVER_NAME> isn't 'journald', 'k8s-file', or 'k8s-stream-file'.
245249
*/
246250
static void parse_log_path(char *log_config)
247251
{
@@ -275,6 +279,17 @@ static void parse_log_path(char *log_config)
275279
return;
276280
}
277281

282+
// Driver is k8s-file, k8s-stream-file, or empty
283+
if (!strcmp(driver, K8S_STREAM_FILE_STRING)) {
284+
if (path == NULL) {
285+
nexitf("k8s-stream-file requires a filename");
286+
}
287+
use_k8s_logging = TRUE;
288+
use_k8s_stream_logging = TRUE;
289+
k8s_log_path = path;
290+
return;
291+
}
292+
278293
// Driver is k8s-file or empty
279294
if (!strcmp(driver, K8S_FILE_STRING)) {
280295
if (path == NULL) {
@@ -298,9 +313,18 @@ static void parse_log_path(char *log_config)
298313
/* write container output to all logs the user defined */
299314
bool write_to_logs(stdpipe_t pipe, char *buf, ssize_t num_read)
300315
{
301-
if (use_k8s_logging && write_k8s_log(pipe, buf, num_read) < 0) {
302-
nwarn("write_k8s_log failed");
303-
return G_SOURCE_CONTINUE;
316+
if (use_k8s_logging) {
317+
if (use_k8s_stream_logging) {
318+
if (write_k8s_log(pipe, buf, num_read) < 0) {
319+
nwarn("write_k8s_log failed");
320+
return G_SOURCE_CONTINUE;
321+
}
322+
} else {
323+
if (write_k8s_stream_log(pipe, buf, num_read) < 0) {
324+
nwarn("write_k8s_stream_log failed");
325+
return G_SOURCE_CONTINUE;
326+
}
327+
}
304328
}
305329
if (use_journald_logging && write_journald(pipe, buf, num_read) < 0) {
306330
nwarn("write_journald failed");
@@ -1248,3 +1272,110 @@ void sync_logs(void)
12481272
if (fsync(k8s_log_fd) < 0)
12491273
nwarnf("Failed to sync log file before exit: %m");
12501274
}
1275+
1276+
1277+
/* strlen("1997-03-25T13:20:42.999999999+01:00 stdout 9999999999 999999999 ") + 1 */
1278+
#define TSSTREAMBUFLEN 128
1279+
1280+
/*
1281+
* PROPOSED: CRI Stream Format, variable length file format
1282+
*/
1283+
static int set_k8s_stream_timestamp(char *buf, ssize_t bufsiz, ssize_t *tsbuflen, const char *pipename, uint64_t offset, ssize_t buflen,
1284+
ssize_t *btbw)
1285+
{
1286+
char off_sign = '+';
1287+
int off, len, err = -1;
1288+
1289+
struct timespec ts;
1290+
if (clock_gettime(CLOCK_REALTIME, &ts) < 0) {
1291+
/* If CLOCK_REALTIME is not supported, we set nano seconds to 0 */
1292+
if (errno == EINVAL) {
1293+
ts.tv_nsec = 0;
1294+
} else {
1295+
return err;
1296+
}
1297+
}
1298+
1299+
struct tm current_tm;
1300+
if (localtime_r(&ts.tv_sec, &current_tm) == NULL)
1301+
return err;
1302+
1303+
off = (int)current_tm.tm_gmtoff;
1304+
if (current_tm.tm_gmtoff < 0) {
1305+
off_sign = '-';
1306+
off = -off;
1307+
}
1308+
1309+
len = snprintf(buf, bufsiz, "%d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %s %lud %ld ", current_tm.tm_year + 1900,
1310+
current_tm.tm_mon + 1, current_tm.tm_mday, current_tm.tm_hour, current_tm.tm_min, current_tm.tm_sec, ts.tv_nsec,
1311+
off_sign, off / 3600, off % 3600, pipename, offset, buflen);
1312+
1313+
if (len < bufsiz)
1314+
err = 0;
1315+
1316+
*tsbuflen = len;
1317+
*btbw = len + buflen;
1318+
return err;
1319+
}
1320+
1321+
1322+
/*
1323+
* PROPOSED: CRI Stream Format, variable length file format
1324+
*
1325+
* %d-%02d-%02dT%02d:%02d:%02d.%09ld%c%02d:%02d %(stream)s %(offset)lud %(buflen)ld %(buf)s
1326+
*
1327+
* The CRI stream fromat requires us to write each buffer read with a
1328+
* timestamp, stream, length (human readable ascii), and the buffer contents
1329+
* read (with a space character separating the buffer length string from the
1330+
* buffer.
1331+
*/
1332+
static int write_k8s_stream_log(stdpipe_t pipe, const char *buf, ssize_t buflen)
1333+
{
1334+
writev_buffer_t bufv = {0};
1335+
char tsbuf[TSSTREAMBUFLEN];
1336+
static ssize_t bytes_written = 0;
1337+
static uint64_t offset = 0;
1338+
ssize_t bytes_to_be_written = 0;
1339+
ssize_t tsbuflen = 0;
1340+
1341+
/*
1342+
* Use the same timestamp for every line of the log in this buffer.
1343+
* There is no practical difference in the output since write(2) is
1344+
* fast.
1345+
*/
1346+
if (set_k8s_stream_timestamp(tsbuf, sizeof tsbuf, &tsbuflen, stdpipe_name(pipe), buflen, offset, &bytes_to_be_written))
1347+
/* TODO: We should handle failures much more cleanly than this. */
1348+
return -1;
1349+
1350+
/*
1351+
* We re-open the log file if writing out the bytes will exceed the max
1352+
* log size. We also reset the state so that the new file is started with
1353+
* a timestamp.
1354+
*/
1355+
if ((opt_log_size_max > 0) && (bytes_written + bytes_to_be_written) > opt_log_size_max) {
1356+
bytes_written = 0;
1357+
1358+
reopen_k8s_file();
1359+
}
1360+
1361+
/* Output the timestamp, stream, and length */
1362+
if (writev_buffer_append_segment(k8s_log_fd, &bufv, tsbuf, tsbuflen) < 0) {
1363+
nwarn("failed to write (timestamp, stream) to log");
1364+
goto stream_next;
1365+
}
1366+
1367+
/* Output the actual contents. */
1368+
if (writev_buffer_append_segment(k8s_log_fd, &bufv, buf, buflen) < 0) {
1369+
nwarn("failed to write buffer to log");
1370+
}
1371+
1372+
stream_next:
1373+
bytes_written += bytes_to_be_written;
1374+
offset += (uint64_t)bytes_to_be_written;
1375+
1376+
if (writev_buffer_flush(k8s_log_fd, &bufv) < 0) {
1377+
nwarn("failed to flush buffer to log");
1378+
}
1379+
1380+
return 0;
1381+
}

0 commit comments

Comments
 (0)