@@ -373,7 +373,7 @@ static int check_chronicle_log_type(struct flb_chronicle *ctx, struct flb_config
373
373
{
374
374
int ret ;
375
375
size_t b_sent ;
376
- flb_sds_t token ;
376
+ flb_sds_t token = NULL ;
377
377
struct flb_connection * u_conn ;
378
378
struct flb_http_client * c ;
379
379
@@ -493,21 +493,23 @@ static int cb_chronicle_init(struct flb_output_instance *ins,
493
493
}
494
494
flb_output_upstream_set (ctx -> u , ins );
495
495
496
- /* Get or renew the OAuth2 token */
497
- token = get_google_token (ctx );
496
+ if (ins -> test_mode == FLB_FALSE ) {
497
+ /* Get or renew the OAuth2 token */
498
+ token = get_google_token (ctx );
498
499
499
- if (!token ) {
500
- flb_plg_warn (ctx -> ins , "token retrieval failed" );
501
- }
502
- else {
503
- flb_sds_destroy (token );
504
- }
500
+ if (!token ) {
501
+ flb_plg_warn (ctx -> ins , "token retrieval failed" );
502
+ }
503
+ else {
504
+ flb_sds_destroy (token );
505
+ }
505
506
506
- ret = check_chronicle_log_type (ctx , config );
507
- if (ret != 0 ) {
508
- flb_plg_error (ctx -> ins , "Validate log_type failed. '%s' is not supported. ret = %d" ,
509
- ctx -> log_type , ret );
510
- return -1 ;
507
+ ret = check_chronicle_log_type (ctx , config );
508
+ if (ret != 0 ) {
509
+ flb_plg_error (ctx -> ins , "Validate log_type failed. '%s' is not supported. ret = %d" ,
510
+ ctx -> log_type , ret );
511
+ return -1 ;
512
+ }
511
513
}
512
514
513
515
return 0 ;
@@ -607,6 +609,8 @@ static flb_sds_t flb_pack_msgpack_extract_log_key(void *out_context, uint64_t by
607
609
if (log_key_found == FLB_FALSE ) {
608
610
flb_plg_error (ctx -> ins , "Could not find log_key '%s' in record" ,
609
611
ctx -> log_key );
612
+ flb_free (val_buf );
613
+ return NULL ;
610
614
}
611
615
612
616
/* If nothing was read, destroy buffer */
@@ -684,6 +688,7 @@ static int chronicle_format(const void *data, size_t bytes,
684
688
msgpack_packer mp_pck ;
685
689
flb_sds_t log_text = NULL ;
686
690
int log_text_size ;
691
+ char * json_str ;
687
692
688
693
array_size = count_mp_with_threshold (last_offset , threshold , log_decoder , ctx );
689
694
@@ -764,11 +769,29 @@ static int chronicle_format(const void *data, size_t bytes,
764
769
msgpack_pack_str_body (& mp_pck , "log_text" , 8 );
765
770
if (ctx -> log_key != NULL ) {
766
771
log_text = flb_pack_msgpack_extract_log_key (ctx , bytes , log_event );
772
+ if (log_text == NULL ) {
773
+ flb_plg_error (ctx -> ins , "log_key extraction failed, skipping record" );
774
+ msgpack_sbuffer_destroy (& mp_sbuf );
775
+ return -1 ;
776
+ }
767
777
log_text_size = flb_sds_len (log_text );
768
778
}
769
779
else {
770
- log_text = flb_msgpack_to_json_str (alloc_size , log_event .body );
771
- log_text_size = strlen (log_text );
780
+ json_str = flb_msgpack_to_json_str (alloc_size , log_event .body );
781
+ if (json_str == NULL ) {
782
+ flb_plg_error (ctx -> ins , "Could not convert record to json string" );
783
+ msgpack_sbuffer_destroy (& mp_sbuf );
784
+ return -1 ;
785
+ }
786
+ log_text = flb_sds_create (json_str );
787
+ flb_free (json_str ); /* free the original buffer */
788
+
789
+ if (log_text == NULL ) {
790
+ flb_plg_error (ctx -> ins , "Could not create sds string for log" );
791
+ msgpack_sbuffer_destroy (& mp_sbuf );
792
+ return -1 ;
793
+ }
794
+ log_text_size = flb_sds_len (log_text );
772
795
}
773
796
774
797
if (log_text == NULL ) {
@@ -778,12 +801,7 @@ static int chronicle_format(const void *data, size_t bytes,
778
801
msgpack_pack_str (& mp_pck , log_text_size );
779
802
msgpack_pack_str_body (& mp_pck , log_text , log_text_size );
780
803
781
- if (ctx -> log_key != NULL ) {
782
- flb_sds_destroy (log_text );
783
- }
784
- else {
785
- flb_free (log_text );
786
- }
804
+ flb_sds_destroy (log_text );
787
805
/* timestamp */
788
806
msgpack_pack_str (& mp_pck , 10 );
789
807
msgpack_pack_str_body (& mp_pck , "ts_rfc3339" , 10 );
@@ -825,6 +843,36 @@ static int chronicle_format(const void *data, size_t bytes,
825
843
return 0 ;
826
844
}
827
845
846
+ static int cb_chronicle_format_test (struct flb_config * config ,
847
+ struct flb_input_instance * ins ,
848
+ void * plugin_context ,
849
+ void * flush_ctx ,
850
+ int event_type ,
851
+ const char * tag , int tag_len ,
852
+ const void * data , size_t bytes ,
853
+ void * * out_data , size_t * out_size )
854
+ {
855
+ struct flb_chronicle * ctx = plugin_context ;
856
+ struct flb_log_event_decoder log_decoder ;
857
+ int ret ;
858
+ size_t out_offset ;
859
+
860
+ ret = flb_log_event_decoder_init (& log_decoder , (char * ) data , bytes );
861
+ if (ret != FLB_EVENT_DECODER_SUCCESS ) {
862
+ flb_plg_error (ctx -> ins , "log event decoder init error" );
863
+ return -1 ;
864
+ }
865
+
866
+ ret = chronicle_format (data , bytes , tag , tag_len ,
867
+ (char * * )out_data , out_size ,
868
+ 0 , bytes , & out_offset ,
869
+ & log_decoder , ctx );
870
+
871
+ flb_log_event_decoder_destroy (& log_decoder );
872
+ return ret ;
873
+ }
874
+
875
+
828
876
static void cb_chronicle_flush (struct flb_event_chunk * event_chunk ,
829
877
struct flb_output_flush * out_flush ,
830
878
struct flb_input_instance * i_ins ,
@@ -859,13 +907,15 @@ static void cb_chronicle_flush(struct flb_event_chunk *event_chunk,
859
907
FLB_OUTPUT_RETURN (FLB_RETRY );
860
908
}
861
909
862
- /* Get or renew Token */
863
- token = get_google_token (ctx );
910
+ if (ctx -> ins -> test_mode == FLB_FALSE ) {
911
+ /* Get or renew Token */
912
+ token = get_google_token (ctx );
864
913
865
- if (!token ) {
866
- flb_plg_error (ctx -> ins , "cannot retrieve oauth2 token" );
867
- flb_upstream_conn_release (u_conn );
868
- FLB_OUTPUT_RETURN (FLB_RETRY );
914
+ if (!token ) {
915
+ flb_plg_error (ctx -> ins , "cannot retrieve oauth2 token" );
916
+ flb_upstream_conn_release (u_conn );
917
+ FLB_OUTPUT_RETURN (FLB_RETRY );
918
+ }
869
919
}
870
920
871
921
flb_plg_trace (ctx -> ins , "msgpack payload size is %zu" , event_chunk -> size );
@@ -906,7 +956,8 @@ static void cb_chronicle_flush(struct flb_event_chunk *event_chunk,
906
956
event_chunk -> tag , flb_sds_len (event_chunk -> tag ),
907
957
& payload_buf , & payload_size ,
908
958
offset , threshold , & out_offset ,
909
- & log_decoder , ctx );
959
+ & log_decoder ,
960
+ ctx );
910
961
if (ret != 0 ) {
911
962
flb_upstream_conn_release (u_conn );
912
963
flb_sds_destroy (token );
@@ -1019,6 +1070,7 @@ static int cb_chronicle_exit(void *data, struct flb_config *config)
1019
1070
if (ctx -> u ) {
1020
1071
flb_upstream_destroy (ctx -> u );
1021
1072
}
1073
+ pthread_mutex_destroy (& ctx -> token_mutex );
1022
1074
1023
1075
flb_chronicle_conf_destroy (ctx );
1024
1076
return 0 ;
@@ -1078,6 +1130,9 @@ struct flb_output_plugin out_chronicle_plugin = {
1078
1130
.cb_flush = cb_chronicle_flush ,
1079
1131
.cb_exit = cb_chronicle_exit ,
1080
1132
.config_map = config_map ,
1133
+
1134
+ /* Test*/
1135
+ .test_formatter .callback = cb_chronicle_format_test ,
1081
1136
/* Plugin flags */
1082
1137
.flags = FLB_OUTPUT_NET | FLB_IO_TLS ,
1083
1138
};
0 commit comments