@@ -65,7 +65,7 @@ function EventQueue.new(params)
6565 self .sc_params .params .influxdb_database = params .influxdb_database
6666 self .sc_params .params .accepted_categories = params .accepted_categories or " neb"
6767 self .sc_params .params .accepted_elements = params .accepted_elements or " host_status,service_status"
68- self .sc_params .params .max_buffer_size = params .max_buffer_size or 5000
68+ self .sc_params .params .max_buffer_size = params .max_buffer_size or 100
6969 self .sc_params .params .hard_only = params .hard_only or 0
7070 self .sc_params .params .enable_host_status_dedup = params .enable_host_status_dedup or 0
7171 self .sc_params .params .enable_service_status_dedup = params .enable_service_status_dedup or 0
@@ -167,13 +167,15 @@ end
167167---- ----------------------------------------------------------------------------
168168function EventQueue :format_metric_host (metric )
169169 self .sc_logger :debug (" [EventQueue:format_metric_host]: start format_metric host" )
170+
170171 local event = self .sc_event .event
171- local metric_key = tostring (event .host_id ) .. ' :' .. tostring ( event . cache . service . service_id ) .. ' :' .. tostring (metric .metric_name )
172- self . sc_event . event .formated_event = {
172+ local metric_key = tostring (event .host_id ) .. ' :0 :' .. tostring (metric .metric_name )
173+ event .formated_event = {
173174 metric_name = metric .metric_name ,
174175 metric_value = metric .value ,
175176 metric_key = metric_key ,
176- last_check = self .sc_event .event .last_check ,
177+ last_check = event .last_check ,
178+ status = " status value=" .. tostring (event .state ) .. " ,host_id=" .. tostring (event .host_id ) .. " " .. tostring (event .last_check )
177179 }
178180 self :add ()
179181 self .sc_logger :debug (" [EventQueue:format_metric_service]: end format_metric host" )
@@ -187,11 +189,12 @@ function EventQueue:format_metric_service(metric)
187189 self .sc_logger :debug (" [EventQueue:format_metric_service]: start format_metric service" )
188190 local event = self .sc_event .event
189191 local metric_key = tostring (event .host_id ) .. ' :' .. tostring (event .cache .service .service_id ) .. ' :' .. tostring (metric .metric_name )
190- self . sc_event . event .formated_event = {
192+ event .formated_event = {
191193 metric_name = metric .metric_name ,
192194 metric_value = metric .value ,
193195 metric_key = metric_key ,
194- last_check = self .sc_event .event .last_check ,
196+ last_check = event .last_check ,
197+ status = " status value=" .. tostring (event .state ) .. " ,host_id=" .. tostring (event .host_id ) .. " ,service_id=" .. tostring (event .cache .service .service_id ) .. " " .. tostring (event .last_check )
195198 }
196199 self :add ()
197200 self .sc_logger :debug (" [EventQueue:format_metric_service]: end format_metric service" )
201204-- EventQueue:add, add an event to the sending queue
202205---- ----------------------------------------------------------------------------
203206function EventQueue :add ()
204- broker_log :info (0 , " EventQueue:add()" )
205207 -- store event in self.events lists
206208 local category = self .sc_event .event .category
207209 local element = self .sc_event .event .element
224226-- @return payload {string} json encoded string
225227---- ----------------------------------------------------------------------------
226228function EventQueue :build_payload (payload , event )
227- broker_log :info (0 , " EventQueue:build_payload()" )
228229 if not payload then
229230 payload = {event }
230231 else
@@ -233,8 +234,9 @@ function EventQueue:build_payload(payload, event)
233234 return payload
234235end
235236
237+ local events_retry = {}
238+
236239function EventQueue :send_data (payload , queue_metadata )
237- broker_log :info (0 , " EventQueue:send_data()" )
238240 self .sc_logger :debug (" [EventQueue:send_data]: Starting to send data" )
239241 local params = self .sc_params .params
240242
@@ -248,14 +250,27 @@ function EventQueue:send_data(payload, queue_metadata)
248250 " content-type: text/plain; charset=utf-8"
249251 }
250252
251- self .sc_common :dumper (payload )
252253 local data_binary = ' '
253- for index , event in ipairs (payload ) do
254- if not metrics [event .metric_key ] then
255- broker_log :error (0 , " METRIC_ID not found for key: " .. event .metric_key )
254+ for index , payload_event in ipairs (payload ) do
255+ if not metrics [payload_event .metric_key ] then
256+ payload_event .retry = 1
257+ table.insert (events_retry , payload_event )
258+ else
259+ data_binary = data_binary .. payload_event .metric_name .. " ,metric.id=" .. metrics [payload_event .metric_key ] .. " value=" .. payload_event .metric_value .. " " .. payload_event .last_check .. " \n " .. payload_event .status .. " \n "
260+ end
261+ end
262+
263+ for index , retry_event in ipairs (events_retry ) do
264+ if not metrics [retry_event .metric_key ] then
265+ retry_event .retry = retry_event .retry + 1
266+ if retry_event .retry > 3 then
267+ self .sc_logger :debug (" Retry limit reached for key: " .. retry_event .metric_key )
268+ data_binary = data_binary .. retry_event .metric_name .. " value=" .. retry_event .metric_value .. " " .. retry_event .last_check .. " \n "
269+ table.remove (events_retry , index )
270+ end
256271 else
257- broker_log : info ( 0 , " METRIC_ID: " .. metrics [event .metric_key ])
258- data_binary = data_binary .. " \n " .. event . metric_name .. " ,metric.id= " .. metrics [ event . metric_key ] .. " value= " .. event . metric_value .. " " .. event . last_check
272+ data_binary = data_binary .. retry_event . metric_name .. " ,metric.id= " .. metrics [retry_event .metric_key ] .. " value= " .. retry_event . metric_value .. " " .. retry_event . last_check .. " \n " .. retry_event . status .. " \n "
273+ table.remove ( events_retry , index )
259274 end
260275 end
261276
@@ -267,7 +282,7 @@ function EventQueue:send_data(payload, queue_metadata)
267282 return true
268283 end
269284
270- self .sc_logger :info (" [EventQueue:send_data]: Going to send the following json " .. tostring (payload ))
285+ self .sc_logger :info (" [EventQueue:send_data]: Going to send the following data " .. tostring (data_binary ))
271286 self .sc_logger :info (" [EventQueue:send_data]: Influxdb address is: " .. tostring (url ))
272287
273288 local http_response_body = " "
@@ -336,7 +351,6 @@ local queue
336351
337352-- Fonction init()
338353function init (conf )
339- broker_log :info (0 , " EventQueue:init()" )
340354 queue = EventQueue .new (conf )
341355end
342356
346360-- @return {boolean}
347361---- ----------------------------------------------------------------------------
348362function write (event )
349- -- broker_log:info(0, "EventQueue:write()")
350363
351364 if event ._type == 196617 then
352365 local metric_key = tostring (event .host_id ) .. ' :' .. tostring (event .service_id ) .. ' :' .. tostring (event .name )
@@ -355,6 +368,7 @@ function write (event)
355368 metrics [metric_key ] = event .metric_id
356369 end
357370 end
371+ -- broker_log:info(0, "METRICS: " .. broker.json_encode(metrics))
358372
359373 -- skip event if a mandatory parameter is missing
360374 if queue .fail then
0 commit comments