1+ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+ /* Fluent Bit
4+ * ==========
5+ * Copyright (C) 2019 The Fluent Bit Authors
6+ * Copyright (C) 2015-2018 Treasure Data Inc.
7+ *
8+ * Licensed under the Apache License, Version 2.0 (the "License");
9+ * you may not use this file except in compliance with the License.
10+ * You may obtain a copy of the License at
11+ *
12+ * http://www.apache.org/licenses/LICENSE-2.0
13+ *
14+ * Unless required by applicable law or agreed to in writing, software
15+ * distributed under the License is distributed on an "AS IS" BASIS,
16+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+ * See the License for the specific language governing permissions and
18+ * limitations under the License.
19+ */
20+
21+ #include <fluent-bit/flb_output.h>
22+ #include <fluent-bit/flb_io.h>
23+ #include <fluent-bit/flb_log.h>
24+ #include <fluent-bit/flb_http_client.h>
25+ #include <fluent-bit/flb_pack.h>
26+ #include <fluent-bit/flb_time.h>
27+
28+ #include <msgpack.h>
29+
30+ #include "datadog.h"
31+ #include "datadog_conf.h"
32+
33+ static int cb_datadog_init (struct flb_output_instance * ins ,
34+ struct flb_config * config , void * data )
35+ {
36+ struct flb_out_datadog * ctx = NULL ;
37+ (void ) data ;
38+
39+ ctx = flb_datadog_conf_create (ins , config );
40+ if (!ctx ) {
41+ return -1 ;
42+ }
43+
44+ /* Set the plugin context */
45+ flb_output_set_context (ins , ctx );
46+ return 0 ;
47+ }
48+
49+ static int64_t timestamp_format (const struct flb_time * tms ) {
50+ int64_t timestamp = 0 ;
51+ /* Format the time, use milliseconds precision not nanoseconds */
52+ timestamp = tms -> tm .tv_sec * 1000 ;
53+ timestamp += tms -> tm .tv_nsec /1000000 ;
54+ /* round up if necessary */
55+ if (tms -> tm .tv_nsec % 1000000 >= 500000 ) {
56+ ++ timestamp ;
57+ }
58+ return timestamp ;
59+ }
60+
61+ static void dd_msgpack_pack_key_value_str (msgpack_packer * mp_pck ,
62+ const char * key , size_t key_size ,
63+ const char * val , size_t val_size )
64+ {
65+ msgpack_pack_str (mp_pck , key_size );
66+ msgpack_pack_str_body (mp_pck , key , key_size );
67+ msgpack_pack_str (mp_pck , val_size );
68+ msgpack_pack_str_body (mp_pck ,val , val_size );
69+ }
70+
71+ static int datadog_format (const void * data , size_t bytes ,
72+ const char * tag , int tag_len ,
73+ char * * out_data , size_t * out_size ,
74+ struct flb_out_datadog * ctx )
75+ {
76+ /* for msgpack global structs */
77+ size_t off = 0 ;
78+ int array_size = 0 ;
79+ msgpack_unpacked result ;
80+ msgpack_sbuffer mp_sbuf ;
81+ msgpack_packer mp_pck ;
82+ /* for sub msgpack objs */
83+ int map_size ;
84+ struct flb_time tms ;
85+ int64_t timestamp ;
86+ msgpack_object * obj ;
87+ msgpack_object map ;
88+ msgpack_object root ;
89+ msgpack_object k ;
90+ msgpack_object v ;
91+ /* output buffer */
92+ flb_sds_t out_buf ;
93+
94+ /* Count number of records */
95+ msgpack_unpacked_init (& result );
96+ while (msgpack_unpack_next (& result , data , bytes , & off ) == MSGPACK_UNPACK_SUCCESS ) {
97+ array_size ++ ;
98+ }
99+ msgpack_unpacked_destroy (& result );
100+ msgpack_unpacked_init (& result );
101+
102+ /* Create temporal msgpack buffer */
103+ msgpack_sbuffer_init (& mp_sbuf );
104+ msgpack_packer_init (& mp_pck , & mp_sbuf , msgpack_sbuffer_write );
105+
106+ /* Prepare array for all entries */
107+ msgpack_pack_array (& mp_pck , array_size );
108+
109+ off = 0 ;
110+ while (msgpack_unpack_next (& result , data , bytes , & off ) == MSGPACK_UNPACK_SUCCESS ) {
111+ root = result .data ;
112+
113+ /* Get timestamp and object */
114+ flb_time_pop_from_msgpack (& tms , & result , & obj );
115+ timestamp = timestamp_format (& tms );
116+
117+ map = root .via .array .ptr [1 ];
118+ map_size = map .via .map .size ;
119+
120+ /* build new object(map) with additional space for datadog entries */
121+ msgpack_pack_map (& mp_pck , ctx -> nb_additional_entries + map_size );
122+
123+ /* timestamp */
124+ msgpack_pack_str (& mp_pck , flb_sds_len (ctx -> json_date_key ));
125+ msgpack_pack_str_body (& mp_pck ,
126+ ctx -> json_date_key ,
127+ flb_sds_len (ctx -> json_date_key ));
128+ msgpack_pack_int64 (& mp_pck , timestamp );
129+
130+ /* include_tag_key */
131+ if (ctx -> include_tag_key == FLB_TRUE ) {
132+ dd_msgpack_pack_key_value_str (& mp_pck ,
133+ ctx -> tag_key , flb_sds_len (ctx -> tag_key ),
134+ tag , tag_len );
135+ }
136+
137+ /* dd_source */
138+ if (ctx -> dd_source != NULL ) {
139+ dd_msgpack_pack_key_value_str (& mp_pck ,
140+ FLB_DATADOG_DD_SOURCE_KEY , sizeof (FLB_DATADOG_DD_SOURCE_KEY ) - 1 ,
141+ ctx -> dd_source , flb_sds_len (ctx -> dd_source ));
142+ }
143+
144+ /* dd_service */
145+ if (ctx -> dd_service != NULL ) {
146+ dd_msgpack_pack_key_value_str (& mp_pck ,
147+ FLB_DATADOG_DD_SERVICE_KEY , sizeof (FLB_DATADOG_DD_SERVICE_KEY ) - 1 ,
148+ ctx -> dd_service , flb_sds_len (ctx -> dd_service ));
149+ }
150+
151+ /* dd_tags */
152+ if (ctx -> dd_tags != NULL ) {
153+ dd_msgpack_pack_key_value_str (& mp_pck ,
154+ FLB_DATADOG_DD_TAGS_KEY , sizeof (FLB_DATADOG_DD_TAGS_KEY ) - 1 ,
155+ ctx -> dd_tags , flb_sds_len (ctx -> dd_tags ));
156+ }
157+
158+ /* Append initial object k/v */
159+ int i = 0 ;
160+ for (i = 0 ; i < map_size ; i ++ ) {
161+ k = map .via .map .ptr [i ].key ;
162+ v = map .via .map .ptr [i ].val ;
163+
164+ msgpack_pack_object (& mp_pck , k );
165+ msgpack_pack_object (& mp_pck , v );
166+ }
167+ }
168+
169+ /* Convert from msgpack to JSON */
170+ out_buf = flb_msgpack_raw_to_json_sds (mp_sbuf .data , mp_sbuf .size );
171+ msgpack_sbuffer_destroy (& mp_sbuf );
172+
173+ if (!out_buf ) {
174+ flb_error ("[out_datadog] error formatting JSON payload" );
175+ msgpack_unpacked_destroy (& result );
176+ return -1 ;
177+ }
178+
179+ * out_data = out_buf ;
180+ * out_size = flb_sds_len (out_buf );
181+ /* Cleanup */
182+ msgpack_unpacked_destroy (& result );
183+
184+ return 0 ;
185+ }
186+
187+ static void cb_datadog_flush (const void * data , size_t bytes ,
188+ const char * tag , int tag_len ,
189+ struct flb_input_instance * i_ins ,
190+ void * out_context ,
191+ struct flb_config * config )
192+ {
193+ struct flb_out_datadog * ctx = out_context ;
194+ struct flb_upstream_conn * upstream_conn ;
195+ struct flb_http_client * client ;
196+
197+ flb_sds_t payload_buf ;
198+ size_t payload_size = 0 ;
199+ size_t b_sent ;
200+ int ret = FLB_ERROR ;
201+
202+ /* Get upstream connection */
203+ upstream_conn = flb_upstream_conn_get (ctx -> upstream );
204+ if (!upstream_conn ) {
205+ FLB_OUTPUT_RETURN (FLB_RETRY );
206+ }
207+
208+ /* Convert input data into a Datadog JSON payload */
209+ ret = datadog_format (data , bytes , tag , tag_len , & payload_buf , & payload_size , ctx );
210+ if (ret == -1 ) {
211+ flb_upstream_conn_release (upstream_conn );
212+ FLB_OUTPUT_RETURN (FLB_ERROR );
213+ }
214+
215+ /* Create HTTP client context */
216+ client = flb_http_client (upstream_conn , FLB_HTTP_POST , ctx -> uri ,
217+ payload_buf , payload_size ,
218+ ctx -> host , ctx -> port ,
219+ NULL , 0 );
220+ if (!client ) {
221+ flb_upstream_conn_release (upstream_conn );
222+ FLB_OUTPUT_RETURN (FLB_ERROR );
223+ }
224+
225+ flb_http_add_header (client , "User-Agent" , 10 , "Fluent-Bit" , 10 );
226+ flb_http_add_header (client ,
227+ FLB_DATADOG_CONTENT_TYPE , sizeof (FLB_DATADOG_CONTENT_TYPE ) - 1 ,
228+ FLB_DATADOG_MIME_JSON , sizeof (FLB_DATADOG_MIME_JSON ) - 1 );
229+ /* TODO: Append other headers if needed*/
230+
231+ /* finaly send the query */
232+ ret = flb_http_do (client , & b_sent );
233+ if (ret == 0 ) {
234+ if (client -> resp .status < 200 || client -> resp .status > 205 ) {
235+ flb_error ("[out_datadog] %s%s:%i HTTP status=%i" ,
236+ ctx -> scheme , ctx -> host , ctx -> port , client -> resp .status );
237+ ret = FLB_RETRY ;
238+ }
239+ else {
240+ if (client -> resp .payload ) {
241+ flb_info ("[out_datadog] %s%s, port=%i, HTTP status=%i payload=%s" ,
242+ ctx -> scheme , ctx -> host , ctx -> port ,
243+ client -> resp .status , client -> resp .payload );
244+ }
245+ else {
246+ flb_info ("[out_datadog] %s%s, port=%i, HTTP status=%i" ,
247+ ctx -> scheme , ctx -> host , ctx -> port ,
248+ client -> resp .status );
249+ }
250+ ret = FLB_OK ;
251+ }
252+ }
253+ else {
254+ flb_error ("[out_datadog] could not flush records to %s:%i (http_do=%i)" ,
255+ ctx -> host , ctx -> port , ret );
256+ ret = FLB_RETRY ;
257+ }
258+
259+ /* Destroy HTTP client context */
260+ flb_sds_destroy (payload_buf );
261+ flb_http_client_destroy (client );
262+ flb_upstream_conn_release (upstream_conn );
263+
264+ FLB_OUTPUT_RETURN (ret );
265+ }
266+
267+
268+ static int cb_datadog_exit (void * data , struct flb_config * config )
269+ {
270+ struct flb_out_datadog * ctx = data ;
271+
272+ flb_datadog_conf_destroy (ctx );
273+ return 0 ;
274+ }
275+
276+ struct flb_output_plugin out_datadog_plugin = {
277+ .name = "datadog" ,
278+ .description = "Send events to DataDog HTTP Event Collector" ,
279+ .cb_init = cb_datadog_init ,
280+ .cb_flush = cb_datadog_flush ,
281+ .cb_exit = cb_datadog_exit ,
282+ /* Plugin flags */
283+ .flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS ,
284+ };
0 commit comments