diff --git a/conf/out_pgsql.conf b/conf/out_pgsql.conf new file mode 100644 index 00000000000..f42a8f4a428 --- /dev/null +++ b/conf/out_pgsql.conf @@ -0,0 +1,36 @@ +[SERVICE] + Flush 5 + Daemon Off + Log_Level info + +[INPUT] + Name mem + Tag mem.usage + +# filter_stdout to show streaming data +[FILTER] + Name stdout + Match * + +# default mode +[OUTPUT] + Name pgsql + Match * + Host 127.0.0.1 + Port 5432 + User postgres + Password pwd + Database fluentbit + Table fluentbit + +# daemon mode +[OUTPUT] + Name pgsql + Match * + Host 127.0.0.1 + Port 5432 + User postgres + Password pwd + Database fluentbit + Table fluentbit + Daemon On diff --git a/plugins/out_pgsql/pgsql.c b/plugins/out_pgsql/pgsql.c index a01090c1ab0..c00af6a79c6 100644 --- a/plugins/out_pgsql/pgsql.c +++ b/plugins/out_pgsql/pgsql.c @@ -54,6 +54,7 @@ static int cb_pgsql_init(struct flb_output_instance *ins, char *temp = NULL; const char *tmp = NULL; int ret; + int daemon; /* set default network configuration */ flb_output_net_default(FLB_PGSQL_HOST, FLB_PGSQL_PORT, ins); @@ -171,8 +172,20 @@ static int cb_pgsql_init(struct flb_output_instance *ins, ctx->cockroachdb = FLB_FALSE; } + /* daemon mode so that fluenbit will not exit if pgsql error */ + tmp = flb_output_get_property("daemon", ins); + if (tmp && flb_utils_bool(tmp)) { + daemon = FLB_TRUE; + } + else { + daemon = FLB_FALSE; + } + ret = pgsql_start_connections(ctx); if (ret) { + if (daemon) { + return 0; + } return -1; } @@ -187,6 +200,9 @@ static int cb_pgsql_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "failed to parse table name: %s", PQerrorMessage(ctx->conn_current->conn)); pgsql_conf_destroy(ctx); + if (daemon) { + return 0; + } return -1; } @@ -197,6 +213,9 @@ static int cb_pgsql_init(struct flb_output_instance *ins, if (!ctx->db_table) { flb_errno(); pgsql_conf_destroy(ctx); + if (daemon) { + return 0; + } return -1; } @@ -209,6 +228,9 @@ static int cb_pgsql_init(struct flb_output_instance *ins, if (query == NULL) { flb_errno(); pgsql_conf_destroy(ctx); + if (daemon) { + return 0; + } return -1; } @@ -227,6 +249,9 @@ static int cb_pgsql_init(struct flb_output_instance *ins, flb_plg_error(ctx->ins, "%s", PQerrorMessage(ctx->conn_current->conn)); pgsql_conf_destroy(ctx); + if (daemon) { + return 0; + } return -1; } @@ -250,6 +275,9 @@ static void cb_pgsql_flush(struct flb_event_chunk *event_chunk, flb_sds_t tag_escaped = NULL; size_t str_len; + if (ctx == NULL) { + FLB_OUTPUT_RETURN(FLB_ERROR); + } if (pgsql_next_connection(ctx) == 1) { FLB_OUTPUT_RETURN(FLB_RETRY); diff --git a/plugins/out_pgsql/pgsql_connections.c b/plugins/out_pgsql/pgsql_connections.c index 9c4ccfba203..5726710eb1c 100644 --- a/plugins/out_pgsql/pgsql_connections.c +++ b/plugins/out_pgsql/pgsql_connections.c @@ -143,6 +143,10 @@ int pgsql_next_connection(struct flb_pgsql_config *ctx) struct mk_list *head; int ret_conn = 1; + if (ctx == NULL) { + return 1; + } + if (PQconsumeInput(ctx->conn_current->conn) == 1) { if (PQisBusy(ctx->conn_current->conn) == 0) { res = PQgetResult(ctx->conn_current->conn);