Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/unit-tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ jobs:
- "-DFLB_SANITIZE_THREAD=On"
- "-DFLB_SIMD=On"
- "-DFLB_SIMD=Off"
- "-DFLB_SQLDB=On"
- "-DFLB_SQLDB=Off"
- "-DFLB_ARROW=On"
cmake_version:
- "3.31.6"
Expand Down
22 changes: 21 additions & 1 deletion plugins/filter_checklist/checklist.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_ra_key.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#include "checklist.h"
#include <ctype.h>

#ifdef FLB_HAVE_SQLDB
static int db_init(struct checklist *ctx)
{
int ret;
Expand Down Expand Up @@ -124,6 +124,7 @@ static int db_check(struct checklist *ctx, char *buf, size_t size)

return match;
}
#endif

static int load_file_patterns(struct checklist *ctx)
{
Expand Down Expand Up @@ -175,9 +176,11 @@ static int load_file_patterns(struct checklist *ctx)
if (ctx->mode == CHECK_EXACT_MATCH) {
ret = flb_hash_table_add(ctx->ht, buf, len, "", 0);
}
#ifdef FLB_HAVE_SQLDB
else if (ctx->mode == CHECK_PARTIAL_MATCH) {
ret = db_insert(ctx, buf, len);
}
#endif

if (ret >= 0) {
flb_plg_debug(ctx->ins, "file list: line=%i adds value='%s'", line, buf);
Expand Down Expand Up @@ -210,7 +213,13 @@ static int init_config(struct checklist *ctx)
ctx->mode = CHECK_EXACT_MATCH;
}
else if (strcasecmp(tmp, "partial") == 0) {
#ifdef FLB_HAVE_SQLDB
ctx->mode = CHECK_PARTIAL_MATCH;
#else
flb_plg_error(ctx->ins,
"'mode=partial' requires FLB_HAVE_SQLDB enabled at build time");
return -1;
#endif
}
}

Expand All @@ -223,12 +232,14 @@ static int init_config(struct checklist *ctx)
return -1;
}
}
#ifdef FLB_HAVE_SQLDB
else if (ctx->mode == CHECK_PARTIAL_MATCH) {
ret = db_init(ctx);
if (ret < 0) {
return -1;
}
}
#endif

/* record accessor pattern / key name */
ctx->ra_lookup_key = flb_ra_create(ctx->lookup_key, FLB_TRUE);
Expand Down Expand Up @@ -284,6 +295,11 @@ static int cb_checklist_init(struct flb_filter_instance *ins,
}

ret = init_config(ctx);
if (ret == -1) {
flb_filter_set_context(ins, NULL);
flb_free(ctx);
return -1;
}

return 0;
}
Expand Down Expand Up @@ -504,9 +520,11 @@ static int cb_checklist_filter(const void *data, size_t bytes,
found = FLB_TRUE;
}
}
#ifdef FLB_HAVE_SQLDB
else if (ctx->mode == CHECK_PARTIAL_MATCH) {
found = db_check(ctx, cmp_buf, cmp_size);
}
#endif

if (cmp_buf && cmp_buf != (char *) rval->o.via.str.ptr) {
flb_free(cmp_buf);
Expand Down Expand Up @@ -592,11 +610,13 @@ static int cb_exit(void *data, struct flb_config *config)
flb_hash_table_destroy(ctx->ht);
}

#ifdef FLB_HAVE_SQLDB
if (ctx->db) {
sqlite3_finalize(ctx->stmt_insert);
sqlite3_finalize(ctx->stmt_check);
flb_sqldb_close(ctx->db);
}
#endif

flb_free(ctx);
return 0;
Expand Down
9 changes: 8 additions & 1 deletion plugins/filter_checklist/checklist.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,19 @@
#define FLB_FILTER_CHECK_H

#include <fluent-bit/flb_info.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_hash_table.h>
#include <fluent-bit/flb_record_accessor.h>

#ifdef FLB_HAVE_SQLDB
#include <fluent-bit/flb_sqldb.h>
#endif

#define LINE_SIZE 2048
#define CHECK_HASH_TABLE_SIZE 100000
#define CHECK_EXACT_MATCH 0 /* exact string match */
#ifdef FLB_HAVE_SQLDB
#define CHECK_PARTIAL_MATCH 1 /* partial match */
#endif

/* plugin context */
struct checklist {
Expand All @@ -41,9 +46,11 @@ struct checklist {
struct mk_list *records;

/* internal */
#ifdef FLB_HAVE_SQLDB
struct flb_sqldb *db;
sqlite3_stmt *stmt_insert;
sqlite3_stmt *stmt_check;
#endif
struct flb_hash_table *ht;
struct flb_record_accessor *ra_lookup_key;
struct flb_filter_instance *ins;
Expand Down
8 changes: 7 additions & 1 deletion plugins/in_blob/blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,12 @@
#include <stdio.h>

#include "blob.h"
#include "blob_db.h"
#include "blob_file.h"

#ifdef FLB_HAVE_SQLDB
#include "blob_db.h"
#endif

#include "win32_glob.c"

/* Define missing GLOB_TILDE if not exists */
Expand Down Expand Up @@ -739,7 +742,10 @@ static int in_blob_exit(void *in_context, struct flb_config *config)
return 0;
}

#ifdef FLB_HAVE_SQLDB
blob_db_close(ctx);
#endif

blob_file_list_remove_all(ctx);
flb_log_event_encoder_destroy(ctx->log_encoder);
flb_free(ctx);
Expand Down
5 changes: 4 additions & 1 deletion plugins/in_blob/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#define FLB_IN_BLOB_H

#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>

#ifdef FLB_HAVE_SQLDB
#include <fluent-bit/flb_sqldb.h>
#endif

#define POST_UPLOAD_ACTION_NONE 0
#define POST_UPLOAD_ACTION_DELETE 1
#define POST_UPLOAD_ACTION_EMIT_LOG 2
Expand Down
25 changes: 22 additions & 3 deletions plugins/out_azure_blob/azure_blob.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
#include <fluent-bit/flb_config_map.h>
#include <fluent-bit/flb_gzip.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/flb_sqldb.h>
#include <fluent-bit/flb_input_blob.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_plugin.h>
Expand All @@ -35,14 +34,17 @@
#include <msgpack.h>

#include "azure_blob.h"
#include "azure_blob_db.h"
#include "azure_blob_uri.h"
#include "azure_blob_conf.h"
#include "azure_blob_appendblob.h"
#include "azure_blob_blockblob.h"
#include "azure_blob_http.h"
#include "azure_blob_store.h"

#ifdef FLB_HAVE_SQLDB
#include "azure_blob_db.h"
#endif

#define CREATE_BLOB 1337

/* thread_local_storage for workers */
Expand Down Expand Up @@ -248,6 +250,7 @@ static int create_blob(struct flb_azure_blob *ctx, char *name)
return FLB_OK;
}

#ifdef FLB_HAVE_SQLDB
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this function wrapped by this conditional when it only executes an HTTP call?
Also, this function wouldn't be called because cb_azb_blob_file_upload would exit prematurely due to ctx->db being set to NULL

static int delete_blob(struct flb_azure_blob *ctx, char *name)
{
int ret;
Expand Down Expand Up @@ -322,6 +325,7 @@ static int delete_blob(struct flb_azure_blob *ctx, char *name)
flb_upstream_conn_release(u_conn);
return FLB_OK;
}
#endif

static int http_send_blob(struct flb_config *config, struct flb_azure_blob *ctx,
flb_sds_t ref_name,
Expand Down Expand Up @@ -488,16 +492,23 @@ static int send_blob(struct flb_config *config,
if (!block_id) {
flb_plg_error(ctx->ins, "could not generate block id");
flb_free(generated_random_string);
cfl_sds_destroy(ref_name);
flb_sds_destroy(ref_name);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct.
The only thing is bundling this fix with the FLB_SQLDB patch means it could slip through the cracks and not make it to the other branch that should receive the same patch.
I'm not saying you have to create a new PR, it's more of an internal note.

return FLB_RETRY;
}
uri = azb_block_blob_uri(ctx, tag, block_id, ms, generated_random_string);
ref_name = flb_sds_printf(&ref_name, "file=%s.%" PRIu64, name, ms);
}
else if (event_type == FLB_EVENT_TYPE_BLOBS) {
#ifdef FLB_HAVE_SQLDB
block_id = azb_block_blob_id_blob(ctx, name, part_id);
uri = azb_block_blob_uri(ctx, name, block_id, 0, generated_random_string);
ref_name = flb_sds_printf(&ref_name, "file=%s:%" PRIu64, name, part_id);
#else
flb_plg_error(ctx->ins, "FLB_EVENT_TYPE_BLOBS requires FLB_HAVE_SQLDB enabled at build time");
flb_free(generated_random_string);
flb_sds_destroy(ref_name);
return FLB_ERROR;
#endif
}
}

Expand Down Expand Up @@ -772,6 +783,7 @@ static int cb_azure_blob_init(struct flb_output_instance *ins,
return 0;
}

#ifdef FLB_HAVE_SQLDB
static int blob_chunk_register_parts(struct flb_azure_blob *ctx, uint64_t file_id, size_t total_size)
{
int ret;
Expand Down Expand Up @@ -1171,6 +1183,7 @@ static int azb_timer_create(struct flb_azure_blob *ctx)

return 0;
}
#endif

/**
* Azure Blob Storage ingestion callback function
Expand Down Expand Up @@ -1660,6 +1673,7 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk,
}
}
else if (event_chunk->type == FLB_EVENT_TYPE_BLOBS) {
#ifdef FLB_HAVE_SQLDB
/*
* For Blob types, we use the flush callback to enqueue the file, then cb_azb_blob_file_upload()
* takes care of the rest like reading the file and uploading it to Azure.
Expand All @@ -1668,6 +1682,9 @@ static void cb_azure_blob_flush(struct flb_event_chunk *event_chunk,
if (ret == -1) {
FLB_OUTPUT_RETURN(FLB_RETRY);
}
#else
ret = FLB_ERROR;
#endif
}

if (json){
Expand Down Expand Up @@ -1728,11 +1745,13 @@ static int cb_worker_init(void *data, struct flb_config *config)
FLB_TLS_SET(worker_info, info);
}

#ifdef FLB_HAVE_SQLDB
ret = azb_timer_create(ctx);
if (ret == -1) {
flb_plg_error(ctx->ins, "failed to create upload timer");
return -1;
}
#endif

return 0;
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_azure_blob/azure_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_upstream.h>
#include <fluent-bit/flb_sds.h>

#ifdef FLB_HAVE_SQLDB
#include <fluent-bit/flb_sqldb.h>
#endif

/* Content-Type */
#define AZURE_BLOB_CT "Content-Type"
Expand Down
1 change: 1 addition & 0 deletions plugins/out_azure_blob/azure_blob_blockblob.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <fluent-bit/flb_time.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_hash.h>
#include <fluent-bit/flb_utils.h>
#include <fluent-bit/flb_crypto_constants.h>

#include <math.h>
Expand Down
10 changes: 9 additions & 1 deletion plugins/out_azure_blob/azure_blob_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_base64.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_utils.h>

#include "azure_blob.h"
#include "azure_blob_conf.h"

#ifdef FLB_HAVE_SQLDB
#include "azure_blob_db.h"
#endif

#include <sys/types.h>
#include <sys/stat.h>
Expand Down Expand Up @@ -762,6 +766,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
}
}

#ifdef FLB_HAVE_SQLDB
/* database file for blob signal handling */
if (ctx->database_file) {
ctx->db = azb_db_open(ctx, ctx->database_file);
Expand All @@ -771,6 +776,7 @@ struct flb_azure_blob *flb_azure_blob_conf_create(struct flb_output_instance *in
}

pthread_mutex_init(&ctx->file_upload_commit_file_parts, NULL);
#endif
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conditional should be one line before, even though file_upload_commit_file_parts is used by a function that will exit prematurely there's no reason not to initialize it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 6df3149, file_upload_commit_file_parts is excluded by FLB_HAVE_SQLDB. So it doesn't make sense to suddenly move it just for the purpose of initializing it.


flb_plg_info(ctx->ins,
"account_name=%s, container_name=%s, blob_type=%s, emulator_mode=%s, endpoint=%s, auth_type=%s",
Expand Down Expand Up @@ -826,7 +832,9 @@ void flb_azure_blob_conf_destroy(struct flb_azure_blob *ctx)
flb_upstream_destroy(ctx->u);
}


#ifdef FLB_HAVE_SQLDB
azb_db_close(ctx);
#endif

flb_free(ctx);
}
Loading
Loading