Skip to content

Commit 4b6637d

Browse files
author
Jorge Niedbalski
committed
custom_calyptia: cascade register_retry_on_flush variables.
Signed-off-by: Jorge Niedbalski <jorge.niedbalski@chronosphere.io>
1 parent 6cb6a04 commit 4b6637d

File tree

5 files changed

+389
-28
lines changed

5 files changed

+389
-28
lines changed

plugins/custom_calyptia/calyptia.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,12 @@ static struct flb_output_instance *setup_cloud_output(struct flb_config *config,
293293
flb_output_set_property(cloud, "match", "_calyptia_cloud");
294294
flb_output_set_property(cloud, "api_key", ctx->api_key);
295295

296+
if (ctx->register_retry_on_flush) {
297+
flb_output_set_property(cloud, "register_retry_on_flush", "true");
298+
} else {
299+
flb_output_set_property(cloud, "register_retry_on_flush", "false");
300+
}
301+
296302
if (ctx->store_path) {
297303
flb_output_set_property(cloud, "store_path", ctx->store_path);
298304
}
@@ -585,7 +591,11 @@ static struct flb_config_map config_map[] = {
585591
"Pipeline ID for reporting to calyptia cloud."
586592
},
587593
#endif /* FLB_HAVE_CHUNK_TRACE */
588-
594+
{
595+
FLB_CONFIG_MAP_BOOL, "register_retry_on_flush", "true",
596+
0, FLB_TRUE, offsetof(struct calyptia, register_retry_on_flush),
597+
"Retry agent registration on flush if failed on init."
598+
},
589599
/* EOF */
590600
{0}
591601
};

plugins/custom_calyptia/calyptia.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ struct calyptia {
5353
flb_sds_t fleet_max_http_buffer_size;
5454
flb_sds_t fleet_interval_sec;
5555
flb_sds_t fleet_interval_nsec;
56+
bool register_retry_on_flush; /* retry registration on flush if failed */
5657
};
5758

5859
int set_fleet_input_properties(struct calyptia *ctx, struct flb_input_instance *fleet);

plugins/out_calyptia/calyptia.c

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,24 @@ static int calyptia_http_do(struct flb_calyptia *ctx, struct flb_http_client *c,
322322
int ret;
323323
size_t b_sent;
324324

325+
if( !ctx || !c ) {
326+
return FLB_ERROR;
327+
}
328+
329+
/* Ensure agent_token is not empty when required */
330+
if ((type == CALYPTIA_ACTION_METRICS || type == CALYPTIA_ACTION_PATCH || type == CALYPTIA_ACTION_TRACE) &&
331+
!ctx->agent_token) {
332+
flb_plg_warn(ctx->ins, "agent_token is missing for action type %d", type);
333+
return FLB_ERROR;
334+
}
335+
325336
/* append headers */
326337
if (type == CALYPTIA_ACTION_REGISTER) {
338+
// When registering a new agent api key is required
339+
if (!ctx->api_key) {
340+
flb_plg_error(ctx->ins, "api_key is missing");
341+
return FLB_ERROR;
342+
}
327343
flb_http_add_header(c,
328344
CALYPTIA_H_CTYPE, sizeof(CALYPTIA_H_CTYPE) - 1,
329345
CALYPTIA_H_CTYPE_JSON, sizeof(CALYPTIA_H_CTYPE_JSON) - 1);
@@ -721,6 +737,21 @@ static struct flb_calyptia *config_init(struct flb_output_instance *ins,
721737
return NULL;
722738
}
723739

740+
ctx->metrics_endpoint = flb_sds_create_size(256);
741+
if (!ctx->metrics_endpoint) {
742+
flb_free(ctx);
743+
return NULL;
744+
}
745+
746+
#ifdef FLB_HAVE_CHUNK_TRACE
747+
ctx->trace_endpoint = flb_sds_create_size(256);
748+
if (!ctx->trace_endpoint) {
749+
flb_sds_destroy(ctx->metrics_endpoint);
750+
flb_free(ctx);
751+
return NULL;
752+
}
753+
#endif
754+
724755
/* api_key */
725756
if (!ctx->api_key) {
726757
flb_plg_error(ctx->ins, "configuration 'api_key' is missing");
@@ -905,17 +936,18 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
905936
struct flb_http_client *c = NULL;
906937
struct flb_calyptia *ctx = out_context;
907938
struct cmt *cmt;
939+
flb_sds_t json;
908940
(void) i_ins;
909941
(void) config;
910942

911-
if (!ctx->agent_id && ctx->register_retry_on_flush) {
912-
flb_plg_info(ctx->ins, "agent_id not found and register_retry_on_flush=true, attempting registration");
943+
if ((!ctx->agent_id || !ctx->agent_token) && ctx->register_retry_on_flush) {
944+
flb_plg_info(ctx->ins, "missing agent_id or agent_token, attempting re-registration register_retry_on_flush=true");
913945
if (register_agent(ctx, config) != FLB_OK) {
914946
FLB_OUTPUT_RETURN(FLB_RETRY);
915947
}
916948
}
917-
else if (!ctx->agent_id) {
918-
flb_plg_error(ctx->ins, "no agent_id available and register_retry_on_flush=false");
949+
else if (!ctx->agent_id || !ctx->agent_token) {
950+
flb_plg_error(ctx->ins, "missing agent_id or agent_token, and register_retry_on_flush=false");
919951
FLB_OUTPUT_RETURN(FLB_ERROR);
920952
}
921953

@@ -981,12 +1013,13 @@ static void cb_calyptia_flush(struct flb_event_chunk *event_chunk,
9811013
}
9821014

9831015
#ifdef FLB_HAVE_CHUNK_TRACE
984-
if (event_chunk->type == (FLB_EVENT_TYPE_LOGS | FLB_EVENT_TYPE_HAS_TRACE)) {
985-
flb_sds_t json = flb_pack_msgpack_to_json_format(event_chunk->data,
986-
event_chunk->size,
987-
FLB_PACK_JSON_FORMAT_STREAM,
988-
FLB_PACK_JSON_DATE_DOUBLE,
989-
NULL);
1016+
if (event_chunk->type & FLB_EVENT_TYPE_LOGS &&
1017+
event_chunk->type & FLB_EVENT_TYPE_HAS_TRACE) {
1018+
json = flb_pack_msgpack_to_json_format(event_chunk->data,
1019+
event_chunk->size,
1020+
FLB_PACK_JSON_FORMAT_STREAM,
1021+
FLB_PACK_JSON_DATE_DOUBLE,
1022+
NULL);
9901023
if (json == NULL) {
9911024
flb_upstream_conn_release(u_conn);
9921025
FLB_OUTPUT_RETURN(FLB_RETRY);

tests/runtime/CMakeLists.txt

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,29 +61,37 @@ if(FLB_OUT_LIB)
6161
endif()
6262

6363
if (FLB_CUSTOM_CALYPTIA)
64-
# Define common variables for calyptia tests
6564
set(CALYPTIA_TEST_LINK_LIBS
6665
fluent-bit-static
6766
${CMAKE_THREAD_LIBS_INIT}
6867
)
6968

70-
# Add calyptia input properties test
71-
set(TEST_TARGET "flb-rt-calyptia_input_properties")
72-
add_executable(${TEST_TARGET}
69+
set(CALYPTIA_TESTS
70+
"custom_calyptia_test.c"
71+
"custom_calyptia_registration_retry_test.c"
7372
"custom_calyptia_input_test.c"
74-
"../../plugins/custom_calyptia/calyptia.c"
7573
)
7674

77-
target_link_libraries(${TEST_TARGET}
78-
${CALYPTIA_TEST_LINK_LIBS}
79-
)
75+
foreach(TEST_SOURCE ${CALYPTIA_TESTS})
76+
get_filename_component(TEST_NAME ${TEST_SOURCE} NAME_WE)
8077

81-
add_test(NAME ${TEST_TARGET}
82-
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
83-
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)
78+
set(TEST_TARGET "flb-rt-${TEST_NAME}")
79+
add_executable(${TEST_TARGET}
80+
${TEST_SOURCE}
81+
"../../plugins/custom_calyptia/calyptia.c"
82+
)
83+
84+
target_link_libraries(${TEST_TARGET}
85+
${CALYPTIA_TEST_LINK_LIBS}
86+
)
8487

85-
set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
86-
add_dependencies(${TEST_TARGET} fluent-bit-static)
88+
add_test(NAME ${TEST_TARGET}
89+
COMMAND ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/${TEST_TARGET}
90+
WORKING_DIRECTORY ${CMAKE_HOME_DIRECTORY}/build)
91+
92+
set_tests_properties(${TEST_TARGET} PROPERTIES LABELS "runtime")
93+
add_dependencies(${TEST_TARGET} fluent-bit-static)
94+
endforeach()
8795
endif()
8896

8997
if(FLB_IN_EBPF)
@@ -222,10 +230,6 @@ if(FLB_IN_LIB)
222230

223231
endif()
224232

225-
if (FLB_CUSTOM_CALYPTIA)
226-
FLB_RT_TEST(FLB_CUSTOM_CALYPTIA "custom_calyptia_test.c")
227-
endif()
228-
229233
if (FLB_PROCESSOR_METRICS_SELECTOR)
230234
FLB_RT_TEST(FLB_PROCESSOR_METRICS_SELECTOR "processor_metrics_selector.c")
231235
endif()

0 commit comments

Comments
 (0)