@@ -970,6 +970,15 @@ int flb_input_instance_init(struct flb_input_instance *ins,
970970 1 , (char * []) {"name" });
971971 cmt_counter_set (ins -> cmt_records , ts , 0 , 1 , (char * []) {name });
972972
973+ /* fluentbit_input_ingestion_paused */
974+ ins -> cmt_ingestion_paused = \
975+ cmt_gauge_create (ins -> cmt ,
976+ "fluentbit" , "input" ,
977+ "ingestion_paused" ,
978+ "Is the input paused or not?" ,
979+ 1 , (char * []) {"name" });
980+ cmt_gauge_set (ins -> cmt_ingestion_paused , ts , 0 , 1 , (char * []) {name });
981+
973982 /* Storage Metrics */
974983 if (ctx -> storage_metrics == FLB_TRUE ) {
975984 /* fluentbit_input_storage_overlimit */
@@ -1670,6 +1679,24 @@ int flb_input_test_pause_resume(struct flb_input_instance *ins, int sleep_second
16701679 return 0 ;
16711680}
16721681
1682+ static void flb_input_ingestion_paused (struct flb_input_instance * ins )
1683+ {
1684+ if (ins -> cmt_ingestion_paused != NULL ) {
1685+ /* cmetrics */
1686+ cmt_gauge_set (ins -> cmt_ingestion_paused , cfl_time_now (), 1 ,
1687+ 1 , (char * []) {flb_input_name (ins )});
1688+ }
1689+ }
1690+
1691+ static void flb_input_ingestion_resumed (struct flb_input_instance * ins )
1692+ {
1693+ if (ins -> cmt_ingestion_paused != NULL ) {
1694+ /* cmetrics */
1695+ cmt_gauge_set (ins -> cmt_ingestion_paused , cfl_time_now (), 0 ,
1696+ 1 , (char * []) {flb_input_name (ins )});
1697+ }
1698+ }
1699+
16731700int flb_input_pause (struct flb_input_instance * ins )
16741701{
16751702 /* if the instance is already paused, just return */
@@ -1689,6 +1716,8 @@ int flb_input_pause(struct flb_input_instance *ins)
16891716 }
16901717 }
16911718
1719+ flb_input_ingestion_paused (ins );
1720+
16921721 return 0 ;
16931722}
16941723
@@ -1704,6 +1733,8 @@ int flb_input_resume(struct flb_input_instance *ins)
17041733 }
17051734 }
17061735
1736+ flb_input_ingestion_resumed (ins );
1737+
17071738 return 0 ;
17081739}
17091740
0 commit comments