Skip to content

Commit c99cd63

Browse files
Robert Stephensedsiper
authored andcommitted
in_docker_events: new plugin to retrieve Docker events (#2216)
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 7d8bbe6 commit c99cd63

File tree

7 files changed

+422
-0
lines changed

7 files changed

+422
-0
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ option(FLB_IN_CPU "Enable CPU input plugin" Yes)
118118
option(FLB_IN_THERMAL "Enable Thermal plugin" Yes)
119119
option(FLB_IN_DISK "Enable Disk input plugin" Yes)
120120
option(FLB_IN_DOCKER "Enable Docker input plugin" Yes)
121+
option(FLB_IN_DOCKER_EVENTS "Enable Docker events input plugin" Yes)
121122
option(FLB_IN_EXEC "Enable Exec input plugin" Yes)
122123
option(FLB_IN_FORWARD "Enable Forward input plugin" Yes)
123124
option(FLB_IN_HEALTH "Enable Health input plugin" Yes)

plugins/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
129129
REGISTER_IN_PLUGIN("in_systemd")
130130
REGISTER_IN_PLUGIN("in_netif")
131131
REGISTER_IN_PLUGIN("in_docker")
132+
REGISTER_IN_PLUGIN("in_docker_events")
132133
endif()
133134

134135
REGISTER_IN_PLUGIN("in_emitter")
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
set(src
2+
docker_events.c
3+
docker_events_config.c)
4+
5+
FLB_PLUGIN(in_docker_events "${src}" "")
Lines changed: 232 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,232 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2020 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#include <fluent-bit/flb_input_plugin.h>
22+
#include <fluent-bit/flb_network.h>
23+
#include <fluent-bit/flb_pack.h>
24+
#include <msgpack.h>
25+
#include <sys/socket.h>
26+
#include <sys/un.h>
27+
28+
#include "docker_events.h"
29+
#include "docker_events_config.h"
30+
31+
32+
/**
33+
* Creates the connection to docker's unix socket and sends the
34+
* HTTP GET /events
35+
*
36+
* @param ctx Pointer to flb_in_de_config
37+
*
38+
* @return int 0 on success, -1 on failure
39+
*/
40+
static int de_unix_create(struct flb_in_de_config *ctx)
41+
{
42+
unsigned long len;
43+
size_t address_length;
44+
struct sockaddr_un address;
45+
char request[512];
46+
47+
ctx->fd = flb_net_socket_create(AF_UNIX, FLB_FALSE);
48+
if (ctx->fd == -1) {
49+
return -1;
50+
}
51+
52+
/* Prepare the unix socket path */
53+
len = strlen(ctx->unix_path);
54+
address.sun_family = AF_UNIX;
55+
sprintf(address.sun_path, "%s", ctx->unix_path);
56+
address_length = sizeof(address.sun_family) + len + 1;
57+
if (connect(ctx->fd, (struct sockaddr *)&address, address_length) == -1) {
58+
flb_errno();
59+
close(ctx->fd);
60+
return -1;
61+
}
62+
63+
strcpy(request, "GET /events HTTP/1.0\r\n\r\n");
64+
flb_plg_trace(ctx->ins, "writing to socket %s", request);
65+
write(ctx->fd, request, strlen(request));
66+
67+
/* Read the initial http response */
68+
read(ctx->fd, ctx->buf, ctx->buf_size - 1);
69+
70+
return 0;
71+
}
72+
73+
/**
74+
* Callback function to process events recieved on the unix
75+
* socket.
76+
*
77+
* @param ins Pointer to flb_input_instance
78+
* @param config Pointer to flb_config
79+
* @param in_context void Pointer used to cast to
80+
* flb_in_de_config
81+
*
82+
* @return int Always returns success
83+
*/
84+
static int in_de_collect(struct flb_input_instance *ins,
85+
struct flb_config *config, void *in_context)
86+
{
87+
int ret = 0;
88+
int error;
89+
size_t str_len = 0;
90+
struct flb_in_de_config *ctx = in_context;
91+
msgpack_packer mp_pck;
92+
msgpack_sbuffer mp_sbuf;
93+
94+
/* variables for parser */
95+
int parser_ret = -1;
96+
void *out_buf = NULL;
97+
size_t out_size = 0;
98+
struct flb_time out_time;
99+
100+
if ((ret = read(ctx->fd, ctx->buf, ctx->buf_size - 1)) > 0) {
101+
str_len = ret;
102+
ctx->buf[str_len] = '\0';
103+
104+
if (!ctx->parser) {
105+
/* Initialize local msgpack buffer */
106+
msgpack_sbuffer_init(&mp_sbuf);
107+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
108+
109+
msgpack_pack_array(&mp_pck, 2);
110+
flb_pack_time_now(&mp_pck);
111+
msgpack_pack_map(&mp_pck, 1);
112+
113+
msgpack_pack_str(&mp_pck, ctx->key_len);
114+
msgpack_pack_str_body(&mp_pck, ctx->key,
115+
ctx->key_len);
116+
msgpack_pack_str(&mp_pck, str_len);
117+
msgpack_pack_str_body(&mp_pck, ctx->buf, str_len);
118+
flb_input_chunk_append_raw(ins, NULL, 0, mp_sbuf.data,
119+
mp_sbuf.size);
120+
msgpack_sbuffer_destroy(&mp_sbuf);
121+
}
122+
else {
123+
flb_time_get(&out_time);
124+
parser_ret = flb_parser_do(ctx->parser, ctx->buf, str_len - 1,
125+
&out_buf, &out_size, &out_time);
126+
if (parser_ret >= 0) {
127+
if (flb_time_to_double(&out_time) == 0.0) {
128+
flb_time_get(&out_time);
129+
}
130+
131+
/* Initialize local msgpack buffer */
132+
msgpack_sbuffer_init(&mp_sbuf);
133+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
134+
135+
msgpack_pack_array(&mp_pck, 2);
136+
flb_time_append_to_msgpack(&out_time, &mp_pck, 0);
137+
msgpack_sbuffer_write(&mp_sbuf, out_buf, out_size);
138+
139+
flb_input_chunk_append_raw(ins, NULL, 0,
140+
mp_sbuf.data, mp_sbuf.size);
141+
msgpack_sbuffer_destroy(&mp_sbuf);
142+
flb_free(out_buf);
143+
}
144+
else {
145+
flb_plg_trace(ctx->ins, "tried to parse: %s", ctx->buf);
146+
flb_plg_trace(ctx->ins, "buf_size %zu", ctx->buf_size);
147+
flb_plg_error(ctx->ins, "parser returned an error: %d",
148+
parser_ret);
149+
}
150+
}
151+
}
152+
else {
153+
error = errno;
154+
flb_plg_error(ctx->ins, "read returned error: %d, %s", error,
155+
strerror(error));
156+
}
157+
158+
return 0;
159+
}
160+
161+
/**
162+
* Callback function to initialize docker events plugin
163+
*
164+
* @param ins Pointer to flb_input_instance
165+
* @param config Pointer to flb_config
166+
* @param data Unused
167+
*
168+
* @return int 0 on success, -1 on failure
169+
*/
170+
static int in_de_init(struct flb_input_instance *ins,
171+
struct flb_config *config, void *data)
172+
{
173+
struct flb_in_de_config *ctx = NULL;
174+
(void) data;
175+
176+
/* Allocate space for the configuration */
177+
ctx = de_config_init(ins, config);
178+
if (!ctx) {
179+
return -1;
180+
}
181+
ctx->ins = ins;
182+
183+
/* Set the context */
184+
flb_input_set_context(ins, ctx);
185+
186+
if (de_unix_create(ctx) != 0) {
187+
flb_plg_error(ctx->ins, "could not listen on unix://%s",
188+
ctx->unix_path);
189+
de_config_destroy(ctx);
190+
return -1;
191+
}
192+
193+
if (flb_input_set_collector_event(ins, in_de_collect,
194+
ctx->fd, config) == -1) {
195+
flb_plg_error(ctx->ins,
196+
"could not set collector for IN_DOCKER_EVENTS plugin");
197+
de_config_destroy(ctx);
198+
return -1;
199+
}
200+
201+
return 0;
202+
}
203+
204+
/**
205+
* Callback exit function to cleanup plugin
206+
*
207+
* @param data Pointer cast to flb_in_de_config
208+
* @param config Unused
209+
*
210+
* @return int Always returns 0
211+
*/
212+
static int in_de_exit(void *data, struct flb_config *config)
213+
{
214+
(void) config;
215+
struct flb_in_de_config *ctx = data;
216+
217+
de_config_destroy(ctx);
218+
219+
return 0;
220+
}
221+
222+
/* Plugin reference */
223+
struct flb_input_plugin in_docker_events_plugin = {
224+
.name = "docker_events",
225+
.description = "Docker events",
226+
.cb_init = in_de_init,
227+
.cb_pre_run = NULL,
228+
.cb_collect = in_de_collect,
229+
.cb_flush_buf = NULL,
230+
.cb_exit = in_de_exit,
231+
.flags = FLB_INPUT_NET
232+
};
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2019-2020 The Fluent Bit Authors
6+
* Copyright (C) 2015-2018 Treasure Data Inc.
7+
*
8+
* Licensed under the Apache License, Version 2.0 (the "License");
9+
* you may not use this file except in compliance with the License.
10+
* You may obtain a copy of the License at
11+
*
12+
* http://www.apache.org/licenses/LICENSE-2.0
13+
*
14+
* Unless required by applicable law or agreed to in writing, software
15+
* distributed under the License is distributed on an "AS IS" BASIS,
16+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+
* See the License for the specific language governing permissions and
18+
* limitations under the License.
19+
*/
20+
21+
#ifndef FLB_IN_DE_H
22+
#define FLB_IN_DE_H
23+
24+
#include <msgpack.h>
25+
#include <fluent-bit/flb_input.h>
26+
#include <fluent-bit/flb_parser.h>
27+
28+
#define DEFAULT_BUF_SIZE 8192
29+
#define MIN_BUF_SIZE 2048
30+
#define DEFAULT_FIELD_NAME "message"
31+
#define DEFAULT_UNIX_SOCKET_PATH "/var/run/docker.sock"
32+
33+
struct flb_in_de_config
34+
{
35+
int fd; /* File descriptor */
36+
char *unix_path; /* Unix path for socket */
37+
struct flb_parser *parser;
38+
struct flb_input_instance *ins; /* Input plugin instace */
39+
char *buf;
40+
size_t buf_size;
41+
char *key;
42+
int key_len;
43+
};
44+
45+
#endif

0 commit comments

Comments
 (0)