Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 55 additions & 38 deletions plugins/out_kafka/kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -153,47 +153,58 @@ int produce_message(struct flb_time *tm, msgpack_object *map,
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

if (ctx->format == FLB_KAFKA_FMT_JSON || ctx->format == FLB_KAFKA_FMT_MSGP) {
/* Make room for the timestamp */
size = map->via.map.size + 1;
if (ctx->add_timestamp) {
/* Make room for the timestamp */
size = map->via.map.size + 1;
}
else {
size = map->via.map.size;
}
msgpack_pack_map(&mp_pck, size);

/* Pack timestamp */
msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
msgpack_pack_str_body(&mp_pck,
ctx->timestamp_key, ctx->timestamp_key_len);
switch (ctx->timestamp_format) {
case FLB_JSON_DATE_DOUBLE:
msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
break;

case FLB_JSON_DATE_ISO8601:
case FLB_JSON_DATE_ISO8601_NS:
{
size_t date_len;
int len;
struct tm _tm;
char time_formatted[36];

/* Format the time; use microsecond precision (not nanoseconds). */
gmtime_r(&tm->tm.tv_sec, &_tm);
date_len = strftime(time_formatted, sizeof(time_formatted) - 1,
FLB_JSON_DATE_ISO8601_FMT, &_tm);

if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) {
len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
".%06" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec / 1000);
}
else {
/* FLB_JSON_DATE_ISO8601_NS */
len = snprintf(time_formatted + date_len, sizeof(time_formatted) - 1 - date_len,
".%09" PRIu64 "Z", (uint64_t) tm->tm.tv_nsec);
}
date_len += len;
if (ctx->add_timestamp) {
/* Pack timestamp */
msgpack_pack_str(&mp_pck, ctx->timestamp_key_len);
msgpack_pack_str_body(&mp_pck,
ctx->timestamp_key, ctx->timestamp_key_len);
switch (ctx->timestamp_format) {
case FLB_JSON_DATE_DOUBLE:
msgpack_pack_double(&mp_pck, flb_time_to_double(tm));
break;

case FLB_JSON_DATE_ISO8601:
case FLB_JSON_DATE_ISO8601_NS:
{
size_t date_len;
int len;
struct tm _tm;
char time_formatted[36];

/* Format the time; use microsecond precision (not nanoseconds). */
gmtime_r(&tm->tm.tv_sec, &_tm);
date_len = strftime(time_formatted, sizeof(time_formatted) - 1,
FLB_JSON_DATE_ISO8601_FMT, &_tm);

if (ctx->timestamp_format == FLB_JSON_DATE_ISO8601) {
len = snprintf(time_formatted + date_len,
sizeof(time_formatted) - 1 - date_len,
".%06" PRIu64 "Z",
(uint64_t) tm->tm.tv_nsec / 1000);
}
else {
/* FLB_JSON_DATE_ISO8601_NS */
len = snprintf(time_formatted + date_len,
sizeof(time_formatted) - 1 - date_len,
".%09" PRIu64 "Z",
(uint64_t) tm->tm.tv_nsec);
}
date_len += len;

msgpack_pack_str(&mp_pck, date_len);
msgpack_pack_str_body(&mp_pck, time_formatted, date_len);
}
break;
msgpack_pack_str(&mp_pck, date_len);
msgpack_pack_str_body(&mp_pck, time_formatted, date_len);
}
break;
}
}
}
else {
Expand Down Expand Up @@ -602,6 +613,12 @@ static struct flb_config_map config_map[] = {
0, FLB_TRUE, offsetof(struct flb_out_kafka, timestamp_format_str),
"Set the format the timestamp is in."
},
{
FLB_CONFIG_MAP_BOOL, "add_timestamp", "true",
0, FLB_TRUE, offsetof(struct flb_out_kafka, add_timestamp),
"When enabled, a timestamp field (timestamp_key) is appended to the record. "
"Set to false to suppress the timestamp field entirely."
},
{
FLB_CONFIG_MAP_INT, "queue_full_retries", FLB_KAFKA_QUEUE_FULL_RETRIES,
0, FLB_TRUE, offsetof(struct flb_out_kafka, queue_full_retries),
Expand Down
1 change: 1 addition & 0 deletions plugins/out_kafka/kafka_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ struct flb_out_kafka {
char *timestamp_key;
int timestamp_format;
flb_sds_t timestamp_format_str;
int add_timestamp;

int message_key_len;
char *message_key;
Expand Down
Loading