@@ -114,6 +114,9 @@ static int in_systemd_collect(struct flb_input_instance *i_ins,
114114 */
115115 if (ctx -> pending_records == FLB_FALSE ) {
116116 ret = sd_journal_process (ctx -> j );
117+ if (ret == SD_JOURNAL_INVALIDATE ) {
118+ flb_debug ("[in_systemd] received event on added or removed journal file" );
119+ }
117120 if (ret != SD_JOURNAL_APPEND && ret != SD_JOURNAL_NOP ) {
118121 return FLB_SYSTEMD_NONE ;
119122 }
@@ -148,7 +151,9 @@ static int in_systemd_collect(struct flb_input_instance *i_ins,
148151 /* Set time */
149152 ret = sd_journal_get_realtime_usec (ctx -> j , & usec );
150153 if (ret != 0 ) {
151- flb_error ("[in_systemd] error reading from systemd journal. sd_journal_get_realtime_usec() return value '%s'" , ret );
154+ flb_error ("[in_systemd] error reading from systemd journal. sd_journal_get_realtime_usec() return value '%i'" , ret );
155+ /* It seems the journal file was deleted (rotated). */
156+ ret_j = -1 ;
152157 break ;
153158 }
154159 sec = usec / 1000000 ;
@@ -196,7 +201,7 @@ static int in_systemd_collect(struct flb_input_instance *i_ins,
196201 entries < ctx -> max_fields ) {
197202 key = (char * ) data ;
198203 if (ctx -> strip_underscores == FLB_TRUE && key [0 ] == '_' ) {
199- key ++ ;
204+ key ++ ;
200205 length -- ;
201206 }
202207 sep = strchr (key , '=' );
@@ -206,12 +211,15 @@ static int in_systemd_collect(struct flb_input_instance *i_ins,
206211
207212 val = sep + 1 ;
208213 len = length - (sep - key ) - 1 ;
209- msgpack_pack_str (& mp_pck , len );
214+ msgpack_pack_str (& mp_pck , len );
210215 msgpack_pack_str_body (& mp_pck , val , len );
211216
212217 entries ++ ;
213218 }
214219 rows ++ ;
220+ if (entries == ctx -> max_fields ) {
221+ flb_debug ("[in_systemd] max number of fields is reached: %i; all other fields are discarded" , ctx -> max_fields );
222+ }
215223
216224 /*
217225 * The fields were packed, now we need to adjust the msgpack map size
@@ -244,12 +252,10 @@ static int in_systemd_collect(struct flb_input_instance *i_ins,
244252 msgpack_sbuffer_init (& mp_sbuf );
245253 strncpy (last_tag , tag , tag_len );
246254 last_tag_len = tag_len ;
247- ret_j = -1 ;
248255 break ;
249256 }
250257
251258 if (rows >= ctx -> max_entries ) {
252- ret_j = -1 ;
253259 break ;
254260 }
255261 }
@@ -277,14 +283,27 @@ static int in_systemd_collect(struct flb_input_instance *i_ins,
277283 ctx -> pending_records = FLB_FALSE ;
278284 return FLB_SYSTEMD_OK ;
279285 }
280-
281- /*
282- * ret_j == -1, the loop was broken due to some special condition like
283- * buffer size limit or it reach the max number of rows that it supposed to
284- * process on this call. Assume there are pending records.
285- */
286- ctx -> pending_records = FLB_TRUE ;
287- return FLB_SYSTEMD_MORE ;
286+ else if (ret_j > 0 ) {
287+ /*
288+ * ret_j == 1, but the loop was broken due to some special condition like
289+ * buffer size limit or it reach the max number of rows that it supposed to
290+ * process on this call. Assume there are pending records.
291+ */
292+ ctx -> pending_records = FLB_TRUE ;
293+ return FLB_SYSTEMD_MORE ;
294+ }
295+ else {
296+ /* Supposedly, current cursor points to a deleted file.
297+ * Re-seeking to the first journal entry.
298+ * Other failures, such as disk read error, would still lead to infinite loop there,
299+ * but at least FLB log will be full of errors. */
300+ ret = sd_journal_seek_head (ctx -> j );
301+ flb_error ("[in_systemd] sd_journal_next() returned error %i; "
302+ "journal is re-opened, unread logs are lost; "
303+ "sd_journal_seek_head() returned %i" , ret_j , ret );
304+ ctx -> pending_records = FLB_TRUE ;
305+ return FLB_SYSTEMD_ERROR ;
306+ }
288307}
289308
290309static int in_systemd_collect_archive (struct flb_input_instance * i_ins ,
0 commit comments