Skip to content

Commit 51f28d3

Browse files
committed
Add ZMQ input plugin
Added a plugin which can received the message to be sent out over ZMQ channels. For each ZMQ input plugin there must be an "Endpoint" configured, which contains the name of the ZMQ socket which will be accessed to receive the messages (for example: "ipc:///var/run/zmq-channel-1"). The ZMQ socket must be of style PULL/PUSH - the ZMQ input plugin will PULL messages from the socket and the producer of the information must PUSH them to the socket. Note that the producer must to the "bind" to the ZMQ socket, as the ZMQ plugin will do the "connect". There is also optional config variable "hwm" which can be configured for the plug-in. This will cause the receive and send high-water-marks for the ZMQ socket to be set to the value. The value is in messages, and defaults to unlimited. As normal with internal MSGPACK messages, there is an outer envelope array of two, with the first element being the timestamp, and the second one being a map of keys and their values. The ZMQ message received must contain three parts. The first part is encoded as a string named "topic" in the MAP part of the MSGPACK. The second part is encoded as a string named "key" in the MAP part. The third part is encoded as a binary data with a name "payload" in the MAP part. Note: this plugin can currently only be compiled for Linux. It can be enabled on the cmake line with "-DFLB_IN_ZMQ=Yes". Note that to compile this plugin the libczmq-dev package (and its dependencies) need installed. Signed-off-by: Gavin Shearer <gavin.shearer@att.com>
1 parent e1c7c2d commit 51f28d3

File tree

7 files changed

+354
-1
lines changed

7 files changed

+354
-1
lines changed

CMakeLists.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,7 @@ option(FLB_IN_NETIF "Enable NetworkIF input plugin" Yes)
122122
option(FLB_IN_WINLOG "Enable Windows Log input plugin" No)
123123
option(FLB_IN_COLLECTD "Enable Collectd input plugin" Yes)
124124
option(FLB_IN_STORAGE_BACKLOG "Enable storage backlog input plugin" Yes)
125+
option(FLB_IN_ZMQ "Enable ZMQ input plugin" No)
125126
option(FLB_OUT_AZURE "Enable Azure output plugin" Yes)
126127
option(FLB_OUT_BIGQUERY "Enable BigQuery output plugin" Yes)
127128
option(FLB_OUT_COUNTER "Enable Counter output plugin" Yes)
@@ -175,6 +176,7 @@ if(FLB_ALL)
175176
set(FLB_IN_DUMMY 1)
176177
set(FLB_IN_NETIF 1)
177178
set(FLB_IN_EXEC 1)
179+
set(FLB_IN_ZMQ 1)
178180

179181
# Output plugins
180182
set(FLB_OUT_ES 1)

cmake/windows-setup.cmake

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ set(FLB_IN_NETIF No)
3939
set(FLB_IN_WINLOG Yes)
4040
set(FLB_IN_COLLECTD No)
4141
set(FLB_IN_STORAGE_BACKLOG No)
42+
set(FLB_IN_ZMQ No)
4243

4344
# OUTPUT plugins
4445
# ==============

debian/control

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ Source: td-agent-bit
22
Section: net
33
Priority: optional
44
Maintainer: Eduardo Silva <eduardo@treasure-data.com>
5-
Build-Depends: debhelper (>= 7.0.50~), cmake (>= 2.6)
5+
Build-Depends: debhelper (>= 7.0.50~), cmake (>= 2.6), libczmq-dev
66
Standards-Version: 3.9.1
77
Homepage: http://fluentbit.io
88
Vcs-Git: https://github.com/fluent/fluent-bit

plugins/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,7 @@ if(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
9898
REGISTER_IN_PLUGIN("in_systemd")
9999
REGISTER_IN_PLUGIN("in_netif")
100100
REGISTER_IN_PLUGIN("in_docker")
101+
REGISTER_IN_PLUGIN("in_zmq")
101102
endif()
102103

103104
REGISTER_IN_PLUGIN("in_tail")

plugins/in_zmq/CMakeLists.txt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
if (FLB_IN_ZMQ)
2+
find_package(PkgConfig REQUIRED)
3+
pkg_check_modules(CZMQ libczmq)
4+
if (NOT CZMQ_FOUND)
5+
message(FATAL_ERROR "Could not find libczmq libraries (need libczmq-dev package)")
6+
else (NOT CZMQ_FOUND)
7+
message(STATUS "Found libczmq libraries")
8+
endif (NOT CZMQ_FOUND)
9+
endif (FLB_IN_ZMQ)
10+
11+
set(src
12+
in_zmq.c)
13+
14+
FLB_PLUGIN(in_zmq "${src}" "${CZMQ_LIBRARIES}")

plugins/in_zmq/in_zmq.c

Lines changed: 296 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,296 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* ZMQ input plugin for Fluent Bit
4+
* ===============================
5+
* Copyright (C) 2019 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#include <fluent-bit/flb_info.h>
21+
#include <fluent-bit/flb_input.h>
22+
#include <fluent-bit/flb_utils.h>
23+
#include <fluent-bit/flb_engine.h>
24+
#include <fluent-bit/flb_pack.h>
25+
#include <fluent-bit/flb_error.h>
26+
#include <msgpack.h>
27+
28+
#include <stdio.h>
29+
#include <stdlib.h>
30+
#include <string.h>
31+
#include <limits.h>
32+
#include <errno.h>
33+
#include <inttypes.h>
34+
#include <czmq.h>
35+
36+
#include "in_zmq.h"
37+
38+
#ifdef __GNUC__
39+
#define likely(x) __builtin_expect(!!(x), 1)
40+
#define unlikely(x) __builtin_expect(!!(x), 0)
41+
#else
42+
#define likely(x) (x)
43+
#define unlikely(x) (x)
44+
#endif
45+
46+
static int zmq_config_read(struct flb_in_zmq_ctx *ctx,
47+
struct flb_input_instance *i_ins)
48+
{
49+
const char *hwm_str;
50+
51+
/* Get input properties */
52+
ctx->zmq_endpoint = flb_input_get_property("endpoint", i_ins);
53+
54+
if (ctx->zmq_endpoint == NULL) {
55+
flb_error("[in_zmq] error reading 'endpoint' from configuration");
56+
return -1;
57+
}
58+
59+
hwm_str = flb_input_get_property("hwm", i_ins);
60+
if (hwm_str == NULL)
61+
ctx->zmq_hwm = 0; /* default of unlimited */
62+
else {
63+
ctx->zmq_hwm = atoi(hwm_str);
64+
if (ctx->zmq_hwm < 0) {
65+
flb_error("[in_zmq] invalid config value for 'hwm' (%s)", hwm_str);
66+
return -1;
67+
}
68+
}
69+
70+
ctx->zmq_pull_socket = NULL;
71+
ctx->ul_fd = -1;
72+
73+
flb_debug("[in_zmq] endpoint='%s', hwm=%d", ctx->zmq_endpoint,
74+
ctx->zmq_hwm);
75+
76+
return 0;
77+
}
78+
79+
/* Callback triggered when some zmq msgs are available */
80+
static int in_zmq_collect(struct flb_input_instance *in,
81+
struct flb_config *config, void *in_context)
82+
{
83+
int ret;
84+
struct flb_in_zmq_ctx *ctx = in_context;
85+
msgpack_packer mp_pck;
86+
msgpack_sbuffer mp_sbuf;
87+
int zevents = zsock_events(ctx->zmq_pull_socket);
88+
zmsg_t *zmsg;
89+
size_t num_frames;
90+
zframe_t *frame;
91+
92+
if ((zevents & ZMQ_POLLIN) == 0) /* nothing to read */
93+
return 0;
94+
95+
/* Note that all messages need read, as ZMQ events are edge-triggered */
96+
while (zevents & ZMQ_POLLIN) {
97+
zmsg = zmsg_recv(ctx->zmq_pull_socket);
98+
if (zmsg == NULL)
99+
continue;
100+
101+
/* There should be 3 frames - topic, key, and payload */
102+
num_frames = zmsg_size(zmsg);
103+
if (num_frames != 3) {
104+
flb_warn("[in_zmq] dropping message with wrong number of frames "
105+
"(%d)", num_frames);
106+
zmsg_destroy(&zmsg);
107+
continue;
108+
}
109+
110+
/* Initialize local msgpack buffer */
111+
msgpack_sbuffer_init(&mp_sbuf);
112+
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
113+
114+
/*
115+
* Msgpack format is an array of two items: the first item is the
116+
* time, and the second is a MAP of keys with their values.
117+
*/
118+
msgpack_pack_array(&mp_pck, 2);
119+
flb_pack_time_now(&mp_pck);
120+
121+
/* We then store the 3 parts - "topic", "key", and "payload". */
122+
msgpack_pack_map(&mp_pck, 3);
123+
124+
frame = zmsg_pop(zmsg);
125+
if (unlikely(frame == NULL)) {
126+
flb_warn("[in_zmq] dropping message with missing frame 1 "
127+
"(%d)", num_frames);
128+
msgpack_sbuffer_destroy(&mp_sbuf);
129+
zmsg_destroy(&zmsg);
130+
continue;
131+
}
132+
msgpack_pack_str(&mp_pck, 5);
133+
msgpack_pack_str_body(&mp_pck, "topic", 5);
134+
msgpack_pack_str(&mp_pck, zframe_size(frame));
135+
msgpack_pack_str_body(&mp_pck, zframe_data(frame), zframe_size(frame));
136+
zframe_destroy(&frame);
137+
138+
frame = zmsg_pop(zmsg);
139+
if (unlikely(frame == NULL)) {
140+
flb_warn("[in_zmq] dropping message with missing frame 2 "
141+
"(%d)", num_frames);
142+
msgpack_sbuffer_destroy(&mp_sbuf);
143+
zmsg_destroy(&zmsg);
144+
continue;
145+
}
146+
msgpack_pack_str(&mp_pck, 3);
147+
msgpack_pack_str_body(&mp_pck, "key", 3);
148+
msgpack_pack_str(&mp_pck, zframe_size(frame));
149+
msgpack_pack_str_body(&mp_pck, zframe_data(frame), zframe_size(frame));
150+
zframe_destroy(&frame);
151+
152+
frame = zmsg_pop(zmsg);
153+
if (unlikely(frame == NULL)) {
154+
flb_warn("[in_zmq] dropping message with missing frame 3 "
155+
"(%d)", num_frames);
156+
msgpack_sbuffer_destroy(&mp_sbuf);
157+
zmsg_destroy(&zmsg);
158+
continue;
159+
}
160+
msgpack_pack_str(&mp_pck, 7);
161+
msgpack_pack_str_body(&mp_pck, "payload", 7);
162+
msgpack_pack_bin(&mp_pck, zframe_size(frame));
163+
msgpack_pack_bin_body(&mp_pck, zframe_data(frame), zframe_size(frame));
164+
zframe_destroy(&frame);
165+
166+
ret = flb_input_chunk_append_raw(in, NULL, 0, mp_sbuf.data,
167+
mp_sbuf.size);
168+
if (unlikely(ret < 0))
169+
flb_warn("[in_zmq] flb_input_chunk_append_raw failed of size %u",
170+
mp_sbuf.size);
171+
msgpack_sbuffer_destroy(&mp_sbuf);
172+
173+
zevents = zsock_events(ctx->zmq_pull_socket);
174+
}
175+
176+
return 0;
177+
}
178+
179+
static void in_zmq_pause(void *data, struct flb_config *config)
180+
{
181+
struct flb_in_zmq_ctx *ctx = data;
182+
flb_debug("[in_zmq] pausing endpoint %s on fd %d", ctx->zmq_endpoint,
183+
ctx->ul_fd);
184+
flb_input_collector_pause(ctx->ul_fd, ctx->i_ins);
185+
}
186+
187+
static void in_zmq_resume(void *data, struct flb_config *config)
188+
{
189+
struct flb_in_zmq_ctx *ctx = data;
190+
flb_debug("[in_zmq] resuming endpoint %s on fd %d", ctx->zmq_endpoint,
191+
ctx->ul_fd);
192+
flb_input_collector_resume(ctx->ul_fd, ctx->i_ins);
193+
}
194+
195+
/* Cleanup zmq input */
196+
int in_zmq_exit(void *in_context, struct flb_config *config)
197+
{
198+
struct flb_in_zmq_ctx *ctx = in_context;
199+
200+
flb_debug("[in_zmq] exiting '%s'", ctx->zmq_endpoint);
201+
202+
if (ctx->zmq_pull_socket)
203+
zsock_destroy(&(ctx->zmq_pull_socket));
204+
205+
flb_free(ctx);
206+
207+
return 0;
208+
}
209+
210+
/* Init zmq input */
211+
int in_zmq_init(struct flb_input_instance *in,
212+
struct flb_config *config, void *data)
213+
{
214+
int ret;
215+
struct flb_in_zmq_ctx *ctx = NULL;
216+
(void) data;
217+
218+
/*
219+
* Disable czmq from overriding fluent-bits SIGINT/SIGTERM signal
220+
* handling, as prevents application from existing.
221+
*/
222+
setenv("ZSYS_SIGHANDLER", "false", 1);
223+
224+
ctx = flb_calloc(1, sizeof(struct flb_in_zmq_ctx));
225+
if (!ctx) {
226+
flb_error("[in_zmq] flb_calloc failed: %s", strerror(errno));
227+
goto error;
228+
}
229+
230+
if (zmq_config_read(ctx, in) < 0) {
231+
flb_error("[in_zmq] zmq_config_read failed");
232+
goto error;
233+
}
234+
235+
ctx->zmq_pull_socket = zsock_new(ZMQ_PULL);
236+
if (ctx->zmq_pull_socket == NULL) {
237+
flb_error("[in_zmq] zsock_new failed: %s", strerror(errno));
238+
goto error;
239+
}
240+
241+
/* NB: HWMs need set before zsock_connect() */
242+
zsock_set_sndhwm(ctx->zmq_pull_socket, ctx->zmq_hwm);
243+
zsock_set_rcvhwm(ctx->zmq_pull_socket, ctx->zmq_hwm);
244+
245+
ret = zsock_connect(ctx->zmq_pull_socket, "%s", ctx->zmq_endpoint);
246+
if (ret < 0) {
247+
flb_error("[in_zmq] zsock_connect(%s) failed: %s", ctx->zmq_endpoint,
248+
strerror(errno));
249+
goto error;
250+
}
251+
252+
ctx->ul_fd = zsock_fd(ctx->zmq_pull_socket);
253+
254+
if (ctx->ul_fd < 0) {
255+
flb_error("[in_zmq] zsock_fd failed: %s", strerror(errno));
256+
goto error;
257+
}
258+
259+
/* Set our collector based on an fd event using underlying fd */
260+
ret = flb_input_set_collector_event(in, in_zmq_collect, ctx->ul_fd, config);
261+
262+
if (ret < 0) {
263+
flb_error("[in_zmq] flb_input_set_collector_event failed: %s",
264+
strerror(errno));
265+
goto error;
266+
}
267+
268+
ctx->i_ins = in;
269+
270+
flb_input_set_context(in, ctx);
271+
272+
return 0;
273+
274+
error:
275+
if (ctx) {
276+
if (ctx->zmq_pull_socket)
277+
zsock_destroy(&(ctx->zmq_pull_socket));
278+
279+
flb_free(ctx);
280+
}
281+
282+
return -1;
283+
}
284+
285+
/* Plugin reference */
286+
struct flb_input_plugin in_zmq_plugin = {
287+
.name = "zmq",
288+
.description = "Process logs in zmq msgs",
289+
.cb_init = in_zmq_init,
290+
.cb_pre_run = NULL,
291+
.cb_collect = in_zmq_collect,
292+
.cb_flush_buf = NULL,
293+
.cb_pause = in_zmq_pause,
294+
.cb_resume = in_zmq_resume,
295+
.cb_exit = in_zmq_exit
296+
};

plugins/in_zmq/in_zmq.h

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* ZMQ input plugin for Fluent Bit
4+
* ===============================
5+
* Copyright (C) 2019 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLB_IN_ZMQ_H
21+
#define FLB_IN_ZMQ_H
22+
23+
#include <czmq.h>
24+
25+
struct flb_input_instance;
26+
27+
struct flb_in_zmq_ctx {
28+
/* config */
29+
const char *zmq_endpoint;
30+
int zmq_hwm;
31+
32+
zsock_t *zmq_pull_socket;
33+
int ul_fd; /* underlying fd for the zmq socket */
34+
35+
/* Input instance reference */
36+
struct flb_input_instance *i_ins;
37+
};
38+
39+
#endif /* FLB_IN_ZMQ_H */

0 commit comments

Comments
 (0)