Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,27 @@ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
return 0;
}

/* Create a new oauth2 context and get a oauth2 token */
static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)
static int azure_kusto_get_workload_identity_token(struct flb_azure_kusto *ctx)
{
int ret;
char *token;

ret = flb_azure_workload_identity_token_get(ctx->o,
ctx->workload_identity_token_file,
ctx->client_id,
ctx->tenant_id);
if (ret == -1) {
flb_plg_error(ctx->ins, "error retrieving workload identity token");
return -1;
}

flb_plg_debug(ctx->ins, "Workload identity token retrieved successfully");
return 0;
}

static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
{
int ret;

/* Clear any previous oauth2 payload content */
flb_oauth2_payload_clear(ctx->o);

Expand Down Expand Up @@ -86,7 +101,7 @@ static int azure_kusto_get_oauth2_token(struct flb_azure_kusto *ctx)
}

/* Retrieve access token */
token = flb_oauth2_token_get(ctx->o);
char *token = flb_oauth2_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
return -1;
Expand All @@ -107,11 +122,18 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
}

if (flb_oauth2_token_expired(ctx->o) == FLB_TRUE) {
if (ctx->managed_identity_client_id != NULL) {
ret = azure_kusto_get_msi_token(ctx);
}
else {
ret = azure_kusto_get_oauth2_token(ctx);
switch (ctx->auth_type) {
case FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY:
ret = azure_kusto_get_workload_identity_token(ctx);
break;
case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM:
case FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER:
ret = azure_kusto_get_msi_token(ctx);
break;
case FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL:
default:
ret = azure_kusto_get_service_principal_token(ctx);
break;
}
}

Expand Down Expand Up @@ -205,7 +227,7 @@ flb_sds_t execute_ingest_csl_command(struct flb_azure_kusto *ctx, const char *cs
ctx->ins,
"Kusto ingestion command request http_do=%i, HTTP Status: %i",
ret, c->resp.status);
flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP request payload: %.*s", (int)c->resp.payload_size, c->resp.payload);
flb_plg_debug(ctx->ins, "Kusto ingestion command HTTP response payload: %.*s", (int)c->resp.payload_size, c->resp.payload);

if (ret == 0) {
if (c->resp.status == 200) {
Expand Down Expand Up @@ -1391,7 +1413,7 @@ static void cb_azure_kusto_flush(struct flb_event_chunk *event_chunk,
/* Error handling and cleanup */
if (json) {
flb_sds_destroy(json);
}
}
if (is_compressed && final_payload) {
flb_free(final_payload);
}
Expand Down Expand Up @@ -1472,16 +1494,18 @@ static struct flb_config_map config_map[] = {
"Set the tenant ID of the AAD application used for authentication"},
{FLB_CONFIG_MAP_STR, "client_id", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, client_id),
"Set the client ID (Application ID) of the AAD application used for authentication"},
"Set the client ID (Application ID) of the AAD application or the user-assigned managed identity's client ID when using managed identity authentication"},
{FLB_CONFIG_MAP_STR, "client_secret", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, client_secret),
"Set the client secret (Application Password) of the AAD application used for "
"authentication"},
{FLB_CONFIG_MAP_STR, "managed_identity_client_id", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, managed_identity_client_id),
"A managed identity client id to authenticate with. "
"Set to 'system' for system-assigned managed identity. "
"Set the MI client ID (GUID) for user-assigned managed identity."},
{FLB_CONFIG_MAP_STR, "workload_identity_token_file", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, workload_identity_token_file),
"Set the token file path for workload identity authentication"},
{FLB_CONFIG_MAP_STR, "auth_type", "service_principal", 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, auth_type_str),
"Set the authentication type: 'service_principal', 'managed_identity', or 'workload_identity'. "
"For managed_identity, use 'system' as client_id for system-assigned identity, or specify the managed identity's client ID"},
{FLB_CONFIG_MAP_STR, "ingestion_endpoint", (char *)NULL, 0, FLB_TRUE,
offsetof(struct flb_azure_kusto, ingestion_endpoint),
"Set the Kusto cluster's ingestion endpoint URL (e.g. "
Expand Down
13 changes: 13 additions & 0 deletions plugins/out_azure_kusto/azure_kusto.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@
/* refresh token every 50 minutes */
#define FLB_AZURE_KUSTO_TOKEN_REFRESH 3000

/* Authentication types */
typedef enum {
FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL = 0, /* Client ID + Client Secret */
FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM, /* System-assigned managed identity */
FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER, /* User-assigned managed identity */
FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY /* Workload Identity */
} flb_azure_kusto_auth_type;

/* Kusto streaming inserts oauth scope */
#define FLB_AZURE_KUSTO_SCOPE "https://help.kusto.windows.net/.default"

Expand Down Expand Up @@ -92,6 +100,11 @@ struct flb_azure_kusto {
int ingestion_endpoint_connect_timeout;
int io_timeout;

/* Authentication */
int auth_type;
char *auth_type_str;
char *workload_identity_token_file;

/* compress payload */
int compression_enabled;

Expand Down
105 changes: 63 additions & 42 deletions plugins/out_azure_kusto/azure_kusto_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -722,8 +722,55 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

if (ctx->tenant_id == NULL && ctx->client_id == NULL && ctx->client_secret == NULL && ctx->managed_identity_client_id == NULL) {
flb_plg_error(ctx->ins, "Service Principal or Managed Identity is not defined");
/* Auth method validation and setup */
if (strcasecmp(ctx->auth_type_str, "service_principal") == 0) {
ctx->auth_type = FLB_AZURE_KUSTO_AUTH_SERVICE_PRINCIPAL;

/* Verify required parameters for Service Principal auth */
if (!ctx->tenant_id || !ctx->client_id || !ctx->client_secret) {
flb_plg_error(ins, "When using service_principal auth, tenant_id, client_id, and client_secret are required");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
}
else if (strcasecmp(ctx->auth_type_str, "managed_identity") == 0) {
/* Check if client_id indicates system-assigned or user-assigned managed identity */
if (!ctx->client_id) {
flb_plg_error(ins, "When using managed_identity auth, client_id must be set to 'system' for system-assigned or the managed identity client ID");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

if (strcasecmp(ctx->client_id, "system") == 0) {
ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM;
} else {
ctx->auth_type = FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER;
}
}
else if (strcasecmp(ctx->auth_type_str, "workload_identity") == 0) {
ctx->auth_type = FLB_AZURE_KUSTO_AUTH_WORKLOAD_IDENTITY;

/* Verify required parameters for Workload Identity auth */
if (!ctx->tenant_id || !ctx->client_id) {
flb_plg_error(ins, "When using workload_identity auth, tenant_id and client_id are required");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* Set default token file path if not specified */
if (!ctx->workload_identity_token_file) {
ctx->workload_identity_token_file = flb_strdup("/var/run/secrets/azure/tokens/azure-identity-token");
if (!ctx->workload_identity_token_file) {
flb_errno();
flb_plg_error(ins, "Could not allocate default workload identity token path");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
}
}
else {
flb_plg_error(ins, "Invalid auth_type '%s'. Valid options are: 'service_principal', 'managed_identity', or 'workload_identity'",
ctx->auth_type_str);
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}
Expand All @@ -749,60 +796,35 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}

if (ctx->managed_identity_client_id != NULL) {
/* system assigned managed identity */
if (strcasecmp(ctx->managed_identity_client_id, "system") == 0) {
/* Create oauth2 context */
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM ||
ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_USER) {
/* MSI auth */
/* Construct the URL template with or without client_id for managed identity */
if (ctx->auth_type == FLB_AZURE_KUSTO_AUTH_MANAGED_IDENTITY_SYSTEM) {
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1);

if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "", "");

}
else {
/* user assigned managed identity */
} else {
/* User-assigned managed identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_AZURE_MSIAUTH_URL_TEMPLATE) - 1 +
sizeof("&client_id=") - 1 +
flb_sds_len(ctx->managed_identity_client_id));

sizeof("&client_id=") - 1 +
flb_sds_len(ctx->client_id));
if (!ctx->oauth_url) {
flb_errno();
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->managed_identity_client_id);
}
}
else {
/* config: 'tenant_id' */
if (ctx->tenant_id == NULL) {
flb_plg_error(ctx->ins, "property 'tenant_id' is not defined.");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
FLB_AZURE_MSIAUTH_URL_TEMPLATE, "&client_id=", ctx->client_id);
}

/* config: 'client_id' */
if (ctx->client_id == NULL) {
flb_plg_error(ctx->ins, "property 'client_id' is not defined");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* config: 'client_secret' */
if (ctx->client_secret == NULL) {
flb_plg_error(ctx->ins, "property 'client_secret' is not defined");
flb_azure_kusto_conf_destroy(ctx);
return NULL;
}

/* Create the auth URL */
} else {
/* Standard OAuth2 for service principal or workload identity */
ctx->oauth_url = flb_sds_create_size(sizeof(FLB_MSAL_AUTH_URL_TEMPLATE) - 1 +
flb_sds_len(ctx->tenant_id));
if (!ctx->oauth_url) {
Expand All @@ -811,10 +833,9 @@ struct flb_azure_kusto *flb_azure_kusto_conf_create(struct flb_output_instance *
return NULL;
}
flb_sds_snprintf(&ctx->oauth_url, flb_sds_alloc(ctx->oauth_url),
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
FLB_MSAL_AUTH_URL_TEMPLATE, ctx->tenant_id);
}


ctx->resources = flb_calloc(1, sizeof(struct flb_azure_kusto_resources));
if (!ctx->resources) {
flb_errno();
Expand Down
Loading
Loading