Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 95 additions & 12 deletions tests/internal/input_chunk_routes.c
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include <fluent-bit/flb_input_plugin.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_str.h>
#include <fluent-bit/flb_output.h>

/* Dummy workaround: undefine input plugin macros to avoid redefinition warnings */
Expand All @@ -27,12 +28,13 @@
#include <chunkio/chunkio.h>
#include <chunkio/cio_utils.h>
#include <string.h>
#include <stdlib.h>
#include <cmetrics/cmetrics.h>


#define TEST_STREAM_PATH "/tmp/flb-chunk-direct-test"
#define TEST_STREAM_PATH_MATCH "/tmp/flb-chunk-direct-test-match"
#define TEST_STREAM_PATH_NULL "/tmp/flb-chunk-direct-test-null"
#define TEST_STREAM_PATH "/flb-chunk-direct-test"
#define TEST_STREAM_PATH_MATCH "/flb-chunk-direct-test-match"
#define TEST_STREAM_PATH_NULL "/flb-chunk-direct-test-null"

static int write_test_log_payload(struct cio_chunk *chunk)
{
Expand Down Expand Up @@ -332,6 +334,58 @@ static int write_legacy_chunk_metadata(struct cio_chunk *chunk,
return ret;
}

static char* env_tmpdir()
{
char *env;

/* Unix */
env = getenv("TMPDIR");
if (env) {
return flb_strdup(env);
}

/* Windows */
env = getenv("TEMP");
if (env) {
return flb_strdup(env);
}
env = getenv("TMP");
if (env) {
return flb_strdup(env);
}

/* Fallback */
return flb_strdup("/tmp");
}

static char* tmpdir_cat(const char *postfix)
{
char *tmpdir;
char *ret;
size_t tmpdir_len;
size_t postfix_len;

tmpdir = env_tmpdir();
if (!tmpdir) {
return NULL;
}

tmpdir_len = strlen(tmpdir);
postfix_len = strlen(postfix);
ret = (char *) flb_malloc(tmpdir_len + postfix_len + 1);
if (!ret) {
flb_free(tmpdir);
return NULL;
}

memcpy(ret, tmpdir, tmpdir_len);
flb_free(tmpdir);
memcpy(ret + tmpdir_len, postfix, postfix_len);
ret[tmpdir_len + postfix_len] = '\0';
return ret;
}


static void test_chunk_metadata_direct_routes()
{
struct cio_options opts;
Expand All @@ -341,6 +395,7 @@ static void test_chunk_metadata_direct_routes()
struct flb_input_chunk ic;
struct flb_chunk_direct_route output_routes[2];
struct flb_chunk_direct_route *loaded_routes;
char *stream_path;
char *content_buf;
const char *tag_buf;
const char *tag_string;
Expand All @@ -353,32 +408,41 @@ static void test_chunk_metadata_direct_routes()
size_t content_size;
size_t payload_size;

stream_path = tmpdir_cat(TEST_STREAM_PATH);
TEST_CHECK(stream_path != NULL);
if (!stream_path) {
return;
}

payload_size = sizeof(payload) - 1;
tag_string = "test.tag";
expected_tag_len = strlen(tag_string);

cio_utils_recursive_delete(TEST_STREAM_PATH);
cio_utils_recursive_delete(stream_path);
memset(&opts, 0, sizeof(opts));
cio_options_init(&opts);
opts.root_path = TEST_STREAM_PATH;
opts.root_path = stream_path;
opts.flags = CIO_OPEN;
ctx = cio_create(&opts);
TEST_CHECK(ctx != NULL);
if (!ctx) {
flb_free(stream_path);
return;
}

stream = cio_stream_create(ctx, "direct", CIO_STORE_FS);
TEST_CHECK(stream != NULL);
if (!stream) {
cio_destroy(ctx);
flb_free(stream_path);
return;
}

chunk = cio_chunk_open(ctx, stream, "meta", CIO_OPEN, 1024, &err);
TEST_CHECK(chunk != NULL);
if (!chunk) {
cio_destroy(ctx);
flb_free(stream_path);
return;
}

Expand Down Expand Up @@ -478,7 +542,8 @@ static void test_chunk_metadata_direct_routes()

cio_chunk_close(chunk, CIO_TRUE);
cio_destroy(ctx);
cio_utils_recursive_delete(TEST_STREAM_PATH);
cio_utils_recursive_delete(stream_path);
flb_free(stream_path);
}

static void test_chunk_restore_alias_plugin_match_multiple()
Expand All @@ -497,12 +562,19 @@ static void test_chunk_restore_alias_plugin_match_multiple()
struct flb_output_plugin stdout_plugin;
struct flb_output_plugin http_plugin;
struct flb_chunk_direct_route route;
char *stream_path;
const char *tag_string;
int tag_len;
int ret;
int err;
int config_ready;

stream_path = tmpdir_cat(TEST_STREAM_PATH_MATCH);
TEST_CHECK(stream_path != NULL);
if (!stream_path) {
return;
}

ctx = NULL;
stream = NULL;
chunk = NULL;
Expand All @@ -511,15 +583,16 @@ static void test_chunk_restore_alias_plugin_match_multiple()
tag_string = "test.tag";
tag_len = (int) strlen(tag_string);

cio_utils_recursive_delete(TEST_STREAM_PATH_MATCH);
cio_utils_recursive_delete(stream_path);
memset(&opts, 0, sizeof(opts));
cio_options_init(&opts);
opts.root_path = TEST_STREAM_PATH_MATCH;
opts.root_path = stream_path;
opts.flags = CIO_OPEN;

ctx = cio_create(&opts);
TEST_CHECK(ctx != NULL);
if (!ctx) {
flb_free(stream_path);
return;
}

Expand Down Expand Up @@ -639,7 +712,8 @@ static void test_chunk_restore_alias_plugin_match_multiple()
cleanup:
cleanup_test_routing_scenario(ic, &stdout_one, &stdout_two, &http_out,
&in, &config, chunk, ctx, config_ready,
TEST_STREAM_PATH_MATCH);
stream_path);
flb_free(stream_path);
}

static void test_chunk_restore_alias_plugin_null_matches_all()
Expand All @@ -658,12 +732,19 @@ static void test_chunk_restore_alias_plugin_null_matches_all()
struct flb_output_plugin stdout_plugin;
struct flb_output_plugin http_plugin;
struct flb_chunk_direct_route route;
char *stream_path;
const char *tag_string;
int tag_len;
int ret;
int err;
int config_ready;

stream_path = tmpdir_cat(TEST_STREAM_PATH_NULL);
TEST_CHECK(stream_path != NULL);
if (!stream_path) {
return;
}

ctx = NULL;
stream = NULL;
chunk = NULL;
Expand All @@ -672,15 +753,16 @@ static void test_chunk_restore_alias_plugin_null_matches_all()
tag_string = "test.tag";
tag_len = (int) strlen(tag_string);

cio_utils_recursive_delete(TEST_STREAM_PATH_NULL);
cio_utils_recursive_delete(stream_path);
memset(&opts, 0, sizeof(opts));
cio_options_init(&opts);
opts.root_path = TEST_STREAM_PATH_NULL;
opts.root_path = stream_path;
opts.flags = CIO_OPEN;

ctx = cio_create(&opts);
TEST_CHECK(ctx != NULL);
if (!ctx) {
flb_free(stream_path);
return;
}

Expand Down Expand Up @@ -800,7 +882,8 @@ static void test_chunk_restore_alias_plugin_null_matches_all()
cleanup:
cleanup_test_routing_scenario(ic, &stdout_one, &stdout_two, &http_out,
&in, &config, chunk, ctx, config_ready,
TEST_STREAM_PATH_NULL);
stream_path);
flb_free(stream_path);
}

TEST_LIST = {
Expand Down
Loading