Skip to content

Commit 4acffc2

Browse files
Jorge NiedbalskiJorge Niedbalski
andauthored
out_calyptia: retry agent registration on flush callback (#9656)
* out_calyptia: retry registering agent on flush. if register_retry_on_flush is set (default true), agent registration is retried on each flush callback. if set to false then registration will cause to abort the plugin initialisation. Signed-off-by: Jorge Niedbalski <[email protected]> * custom_calyptia: cascade register_retry_on_flush variables. Signed-off-by: Jorge Niedbalski <[email protected]> --------- Signed-off-by: Jorge Niedbalski <[email protected]> Co-authored-by: Jorge Niedbalski <[email protected]>
1 parent 361b5b9 commit 4acffc2

File tree

6 files changed

+492
-111
lines changed

6 files changed

+492
-111
lines changed

plugins/custom_calyptia/calyptia.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
293293
flb_output_set_property(cloud, "match", "_calyptia_cloud");
294294
flb_output_set_property(cloud, "api_key", ctx->api_key);
295295

296+
if (ctx->register_retry_on_flush) {
297+
flb_output_set_property(cloud, "register_retry_on_flush", "true");
298+
} else {
299+
flb_output_set_property(cloud, "register_retry_on_flush", "false");
300+
}
301+
296302
if (ctx->store_path) {
297303
flb_output_set_property(cloud, "store_path", ctx->store_path);
298304
}
@@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = {
585591
"Pipeline ID for reporting to calyptia cloud."
586592
},
587593
#endif /* FLB_HAVE_CHUNK_TRACE */
588-
594+
{
595+
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
596+
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
597+
"Retry agent registration on flush if failed on init."
598+
},
589599
/* EOF */
590600
{0}
591601
};

plugins/custom_calyptia/calyptia.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct calyptia {
5353
flb_sds_t fleet_max_http_buffer_size;
5454
flb_sds_t fleet_interval_sec;
5555
flb_sds_t fleet_interval_nsec;
56+
bool register_retry_on_flush; /* retry registration on flush if failed */
5657
};
5758

5859
int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);

plugins/out_calyptia/calyptia.c

Lines changed: 145 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
322322
int ret;
323323
size_t b_sent;
324324

325+
if( !ctx || !c ) {
326+
return FLB_ERROR;
327+
}
328+
329+
/* Ensure agent_token is not empty when required */
330+
if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
331+
!ctx->agent_token) {
332+
flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
333+
return FLB_ERROR;
334+
}
335+
325336
/* append headers */
326337
if (type == CALYPTIA_ACTION_REGISTER) {
338+
// When registering a new agent api key is required
339+
if (!ctx->api_key) {
340+
flb_plg_error(ctx->ins, "api_key is missing");
341+
return FLB_ERROR;
342+
}
327343
flb_http_add_header(c,
328344
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
329345
CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
@@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
721737
return NULL;
722738
}
723739

740+
ctx->metrics_endpoint = flb_sds_create_size(256);
741+
if (!ctx->metrics_endpoint) {
742+
flb_free(ctx);
743+
return NULL;
744+
}
745+
746+
#ifdef FLB_HAVE_CHUNK_TRACE
747+
ctx->trace_endpoint = flb_sds_create_size(256);
748+
if (!ctx->trace_endpoint) {
749+
flb_sds_destroy(ctx->metrics_endpoint);
750+
flb_free(ctx);
751+
return NULL;
752+
}
753+
#endif
754+
724755
/* api_key */
725756
if (!ctx->api_key) {
726757
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
@@ -771,12 +802,40 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
771802
return ctx;
772803
}
773804

774-
static int cb_calyptia_init(struct flb_output_instance *ins,
775-
struct flb_config *config, void *data)
805+
static int register_agent(struct flb_calyptia *ctx, struct flb_config *config)
776806
{
777807
int ret;
808+
809+
/* Try registration */
810+
ret = api_agent_create(config, ctx);
811+
if (ret != FLB_OK) {
812+
flb_plg_warn(ctx->ins, "agent registration failed");
813+
return FLB_ERROR;
814+
}
815+
816+
/* Update endpoints */
817+
flb_sds_len_set(ctx->metrics_endpoint, 0);
818+
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
819+
ctx->agent_id);
820+
821+
#ifdef FLB_HAVE_CHUNK_TRACE
822+
if (ctx->pipeline_id) {
823+
flb_sds_len_set(ctx->trace_endpoint, 0);
824+
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
825+
ctx->pipeline_id);
826+
}
827+
#endif
828+
829+
flb_plg_info(ctx->ins, "agent registration successful");
830+
return FLB_OK;
831+
}
832+
833+
static int cb_calyptia_init(struct flb_output_instance *ins,
834+
struct flb_config *config, void *data)
835+
{
778836
struct flb_calyptia *ctx;
779837
(void) data;
838+
int ret;
780839

781840
/* create config context */
782841
ctx = config_init(ins, config);
@@ -791,23 +850,12 @@ static int cb_calyptia_init(struct flb_output_instance *ins,
791850
*/
792851
flb_output_set_http_debug_callbacks(ins);
793852

794-
/* register/update agent */
795-
ret = api_agent_create(config, ctx);
796-
if (ret != FLB_OK) {
797-
flb_plg_error(ctx->ins, "agent registration failed");
853+
ret = register_agent(ctx, config);
854+
if (ret != FLB_OK && !ctx->register_retry_on_flush) {
855+
flb_plg_error(ins, "agent registration failed and register_retry_on_flush=false");
798856
return -1;
799857
}
800858

801-
/* metrics endpoint */
802-
ctx->metrics_endpoint = flb_sds_create_size(256);
803-
flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
804-
ctx->agent_id);
805-
806-
#ifdef FLB_HAVE_CHUNK_TRACE
807-
ctx->trace_endpoint = flb_sds_create_size(256);
808-
flb_sds_printf(&ctx->trace_endpoint, CALYPTIA_ENDPOINT_TRACE,
809-
ctx->pipeline_id);
810-
#endif /* FLB_HAVE_CHUNK_TRACE */
811859
return 0;
812860
}
813861

@@ -830,29 +878,79 @@ static void debug_payload(struct flb_calyptia *ctx, void *data, size_t bytes)
830878
cmt_destroy(cmt);
831879
}
832880

833-
static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
834-
struct flb_output_flush *out_flush,
835-
struct flb_input_instance *i_ins,
836-
void *out_context,
837-
struct flb_config *config)
881+
static int cb_calyptia_exit(void *data, struct flb_config *config)
838882
{
839-
int ret = FLB_RETRY;
840-
size_t off = 0;
841-
size_t out_size = 0;
842-
char *out_buf = NULL;
883+
struct flb_calyptia *ctx = data;
884+
885+
if (!ctx) {
886+
return 0;
887+
}
888+
889+
if (ctx->u) {
890+
flb_upstream_destroy(ctx->u);
891+
}
892+
893+
if (ctx->agent_id) {
894+
flb_sds_destroy(ctx->agent_id);
895+
}
896+
897+
if (ctx->agent_token) {
898+
flb_sds_destroy(ctx->agent_token);
899+
}
900+
901+
if (ctx->env) {
902+
flb_env_destroy(ctx->env);
903+
}
904+
905+
if (ctx->metrics_endpoint) {
906+
flb_sds_destroy(ctx->metrics_endpoint);
907+
}
843908

844-
/* used to create records for reporting traces to the cloud. */
845909
#ifdef FLB_HAVE_CHUNK_TRACE
846-
flb_sds_t json;
910+
if (ctx->trace_endpoint) {
911+
flb_sds_destroy(ctx->trace_endpoint);
912+
}
847913
#endif /* FLB_HAVE_CHUNK_TRACE */
848914

915+
if (ctx->fs) {
916+
flb_fstore_destroy(ctx->fs);
917+
}
918+
919+
flb_kv_release(&ctx->kv_labels);
920+
flb_free(ctx);
921+
922+
return 0;
923+
}
924+
925+
static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
926+
struct flb_output_flush *out_flush,
927+
struct flb_input_instance *i_ins,
928+
void *out_context,
929+
struct flb_config *config)
930+
{
931+
int ret;
932+
size_t off = 0;
933+
size_t out_size = 0;
934+
char *out_buf = NULL;
849935
struct flb_connection *u_conn;
850936
struct flb_http_client *c = NULL;
851937
struct flb_calyptia *ctx = out_context;
852938
struct cmt *cmt;
939+
flb_sds_t json;
853940
(void) i_ins;
854941
(void) config;
855942

943+
if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
944+
flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
945+
if (register_agent(ctx, config) != FLB_OK) {
946+
FLB_OUTPUT_RETURN(FLB_RETRY);
947+
}
948+
}
949+
else if (!ctx->agent_id || !ctx->agent_token) {
950+
flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
951+
FLB_OUTPUT_RETURN(FLB_ERROR);
952+
}
953+
856954
/* Get upstream connection */
857955
u_conn = flb_upstream_conn_get(ctx->u);
858956
if (!u_conn) {
@@ -890,7 +988,7 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
890988

891989
/* Compose HTTP Client request */
892990
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->metrics_endpoint,
893-
out_buf, out_size, NULL, 0, NULL, 0);
991+
out_buf, out_size, NULL, 0, NULL, 0);
894992
if (!c) {
895993
if (out_buf != event_chunk->data) {
896994
cmt_encode_msgpack_destroy(out_buf);
@@ -899,12 +997,12 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
899997
FLB_OUTPUT_RETURN(FLB_RETRY);
900998
}
901999

902-
/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
1000+
/* perform request */
9031001
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_METRICS);
9041002
if (ret == FLB_OK) {
9051003
flb_plg_debug(ctx->ins, "metrics delivered OK");
9061004
}
907-
else if (ret == FLB_ERROR) {
1005+
else {
9081006
flb_plg_error(ctx->ins, "could not deliver metrics");
9091007
debug_payload(ctx, out_buf, out_size);
9101008
}
@@ -915,42 +1013,35 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
9151013
}
9161014

9171015
#ifdef FLB_HAVE_CHUNK_TRACE
918-
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
1016+
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
1017+
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
9191018
json = flb_pack_msgpack_to_json_format(event_chunk->data,
920-
event_chunk->size,
921-
FLB_PACK_JSON_FORMAT_STREAM,
922-
FLB_PACK_JSON_DATE_DOUBLE,
923-
NULL);
1019+
event_chunk->size,
1020+
FLB_PACK_JSON_FORMAT_STREAM,
1021+
FLB_PACK_JSON_DATE_DOUBLE,
1022+
NULL);
9241023
if (json == NULL) {
9251024
flb_upstream_conn_release(u_conn);
9261025
FLB_OUTPUT_RETURN(FLB_RETRY);
9271026
}
928-
out_buf = (char *)json;
929-
out_size = flb_sds_len(json);
9301027

931-
if (flb_sds_printf(&ctx->metrics_endpoint, CALYPTIA_ENDPOINT_METRICS,
932-
ctx->agent_id) == NULL) {
933-
flb_upstream_conn_release(u_conn);
934-
flb_sds_destroy(json);
935-
FLB_OUTPUT_RETURN(FLB_RETRY);
936-
}
9371028
c = flb_http_client(u_conn, FLB_HTTP_POST, ctx->trace_endpoint,
938-
out_buf, out_size, NULL, 0, NULL, 0);
1029+
(char *) json, flb_sds_len(json),
1030+
NULL, 0, NULL, 0);
1031+
9391032
if (!c) {
9401033
flb_upstream_conn_release(u_conn);
9411034
flb_sds_destroy(json);
942-
flb_sds_destroy(ctx->metrics_endpoint);
9431035
FLB_OUTPUT_RETURN(FLB_RETRY);
9441036
}
9451037

946-
/* perform request: 'ret' might be FLB_OK, FLB_ERROR or FLB_RETRY */
9471038
ret = calyptia_http_do(ctx, c, CALYPTIA_ACTION_TRACE);
9481039
if (ret == FLB_OK) {
9491040
flb_plg_debug(ctx->ins, "trace delivered OK");
9501041
}
951-
else if (ret == FLB_ERROR) {
1042+
else {
9521043
flb_plg_error(ctx->ins, "could not deliver trace");
953-
debug_payload(ctx, out_buf, out_size);
1044+
debug_payload(ctx, (char *) json, flb_sds_len(json));
9541045
}
9551046
flb_sds_destroy(json);
9561047
}
@@ -961,51 +1052,8 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
9611052
if (c) {
9621053
flb_http_client_destroy(c);
9631054
}
964-
FLB_OUTPUT_RETURN(ret);
965-
}
966-
967-
static int cb_calyptia_exit(void *data, struct flb_config *config)
968-
{
969-
struct flb_calyptia *ctx = data;
970-
971-
if (!ctx) {
972-
return 0;
973-
}
974-
975-
if (ctx->u) {
976-
flb_upstream_destroy(ctx->u);
977-
}
978-
979-
if (ctx->agent_id) {
980-
flb_sds_destroy(ctx->agent_id);
981-
}
982-
983-
if (ctx->agent_token) {
984-
flb_sds_destroy(ctx->agent_token);
985-
}
986-
987-
if (ctx->env) {
988-
flb_env_destroy(ctx->env);
989-
}
990-
991-
if (ctx->metrics_endpoint) {
992-
flb_sds_destroy(ctx->metrics_endpoint);
993-
}
994-
995-
#ifdef FLB_HAVE_CHUNK_TRACE
996-
if (ctx->trace_endpoint) {
997-
flb_sds_destroy(ctx->trace_endpoint);
998-
}
999-
#endif /* FLB_HAVE_CHUNK_TRACE */
1000-
1001-
if (ctx->fs) {
1002-
flb_fstore_destroy(ctx->fs);
1003-
}
1004-
1005-
flb_kv_release(&ctx->kv_labels);
1006-
flb_free(ctx);
10071055

1008-
return 0;
1056+
FLB_OUTPUT_RETURN(ret);
10091057
}
10101058

10111059
/* Configuration properties map */
@@ -1057,7 +1105,11 @@ static struct flb_config_map config_map[] = {
10571105
"Pipeline ID for calyptia core traces."
10581106
},
10591107
#endif
1060-
1108+
{
1109+
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
1110+
0, FLB_TRUE, offsetof(struct flb_calyptia, register_retry_on_flush),
1111+
"Retry agent registration on flush if failed on init."
1112+
},
10611113
/* EOF */
10621114
{0}
10631115
};

plugins/out_calyptia/calyptia.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ struct flb_calyptia {
8080
flb_sds_t trace_endpoint;
8181
flb_sds_t pipeline_id;
8282
#endif /* FLB_HAVE_CHUNK_TRACE */
83+
bool register_retry_on_flush; /* retry registration on flush if failed */
8384
};
8485

8586
#endif

0 commit comments

Comments
 (0)