Skip to content

Commit bf0c0a9

Browse files
committed
out_kafka: validate if aws_msk_iam is available
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 0765a92 commit bf0c0a9

File tree

3 files changed

+15
-0
lines changed

3 files changed

+15
-0
lines changed

plugins/out_kafka/kafka.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,8 @@ static struct flb_config_map config_map[] = {
676676
"If you specify a key name with this option, then only the value of "
677677
"that key will be sent to Kafka."
678678
},
679+
680+
#ifdef FLB_HAVE_AWS_MSK_IAM
679681
{
680682
FLB_CONFIG_MAP_STR, "aws_msk_iam_cluster_arn", NULL,
681683
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam_cluster_arn),
@@ -686,6 +688,8 @@ static struct flb_config_map config_map[] = {
686688
0, FLB_TRUE, offsetof(struct flb_out_kafka, aws_msk_iam),
687689
"Enable AWS MSK IAM authentication"
688690
},
691+
#endif
692+
689693
/* EOF */
690694
{0}
691695
};

plugins/out_kafka/kafka_config.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
5858
return NULL;
5959
}
6060

61+
#ifdef FLB_HAVE_AWS_MSK_IAM
6162
/*
6263
* When MSK IAM auth is enabled, default the required
6364
* security settings so users don't need to specify them.
@@ -78,12 +79,16 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
7879
}
7980
}
8081
else {
82+
#endif
8183
/* Retrieve SASL mechanism if configured */
8284
tmp = flb_output_get_property("rdkafka.sasl.mechanism", ins);
8385
if (tmp) {
8486
ctx->sasl_mechanism = flb_sds_create(tmp);
8587
}
88+
89+
#ifdef FLB_HAVE_AWS_MSK_IAM
8690
}
91+
#endif
8792

8893
/* rdkafka config context */
8994
ctx->conf = flb_kafka_conf_create(&ctx->kafka, &ins->properties, 0);
@@ -205,6 +210,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
205210
flb_kafka_opaque_set(ctx->opaque, ctx, NULL);
206211
rd_kafka_conf_set_opaque(ctx->conf, ctx->opaque);
207212

213+
#ifdef FLB_HAVE_AWS_MSK_IAM
208214
if (ctx->aws_msk_iam && ctx->aws_msk_iam_cluster_arn && ctx->sasl_mechanism &&
209215
strcasecmp(ctx->sasl_mechanism, "OAUTHBEARER") == 0) {
210216

@@ -225,6 +231,7 @@ struct flb_out_kafka *flb_out_kafka_create(struct flb_output_instance *ins,
225231
}
226232
}
227233
}
234+
#endif
228235

229236
/* Kafka Producer */
230237
ctx->kafka.rk = rd_kafka_new(RD_KAFKA_PRODUCER, ctx->conf,
@@ -315,9 +322,11 @@ int flb_out_kafka_destroy(struct flb_out_kafka *ctx)
315322
flb_sds_destroy(ctx->gelf_fields.full_message_key);
316323
flb_sds_destroy(ctx->gelf_fields.level_key);
317324

325+
#ifdef FLB_HAVE_AWS_MSK_IAM
318326
if (ctx->msk_iam) {
319327
flb_aws_msk_iam_destroy(ctx->msk_iam);
320328
}
329+
#endif
321330

322331
flb_sds_destroy(ctx->sasl_mechanism);
323332

plugins/out_kafka/kafka_config.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,10 @@ struct flb_out_kafka {
125125
struct flb_avro_fields avro_fields;
126126
#endif
127127

128+
#ifdef FLB_HAVE_AWS_MSK_IAM
128129
flb_sds_t aws_msk_iam_cluster_arn;
129130
struct flb_aws_msk_iam *msk_iam;
131+
#endif
130132

131133
int aws_msk_iam;
132134

0 commit comments

Comments
 (0)