Skip to content

Commit b39fd40

Browse files
committed
Add support fo OAUTHBEARER
1 parent 98692ff commit b39fd40

File tree

7 files changed

+303
-10
lines changed

7 files changed

+303
-10
lines changed

include/kafka/AdminClientConfig.h

Lines changed: 72 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,86 @@ class Config: public Properties
2727
* Protocol used to communicate with brokers.
2828
* Default value: plaintext
2929
*/
30-
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";
30+
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";
31+
32+
/**
33+
* SASL mechanism to use for authentication.
34+
* Default value: GSSAPI
35+
*/
36+
static const constexpr char* SASL_MECHANISM = "sasl.mechanisms";
37+
38+
/**
39+
* SASL username for use with the PLAIN and SASL-SCRAM-.. mechanism.
40+
*/
41+
static const constexpr char* SASL_USERNAME = "sasl.username";
42+
43+
/**
44+
* SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism.
45+
*/
46+
static const constexpr char* SASL_PASSWORD = "sasl.password";
3147

3248
/**
3349
* Shell command to refresh or acquire the client's Kerberos ticket.
3450
*/
35-
static const constexpr char* SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
51+
static const constexpr char* SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
3652

3753
/**
3854
* The client's Kerberos principal name.
3955
*/
40-
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
56+
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
57+
58+
/**
59+
* Set to "default" or "oidc" to control with login method to be used.
60+
* If set to "oidc", the following properties must also be specified:
61+
* sasl.oauthbearer.client.id
62+
* sasl.oauthbearer.client.secret
63+
* sasl.oauthbearer.token.endpoint.url
64+
* Default value: default
65+
*/
66+
static const constexpr char* SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method";
67+
68+
/**
69+
* Public identifier for the applicaition.
70+
* Only used with "sasl.oauthbearer.method=oidc".
71+
*/
72+
static const constexpr char* SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id";
73+
74+
/**
75+
* Client secret only known to the application and the authorization server.
76+
* Only used with "sasl.oauthbearer.method=oidc".
77+
*/
78+
static const constexpr char* SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret";
79+
80+
/**
81+
* Allow additional information to be provided to the broker. Comma-separated list of key=value pairs.
82+
* Only used with "sasl.oauthbearer.method=oidc".
83+
*/
84+
static const constexpr char* SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions";
85+
86+
/**
87+
* Client use this to specify the scope of the access request to the broker.
88+
* Only used with "sasl.oauthbearer.method=oidc".
89+
*/
90+
static const constexpr char* SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope";
91+
92+
/**
93+
* OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token.
94+
* Only used with "sasl.oauthbearer.method=oidc".
95+
*/
96+
static const constexpr char* SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";
97+
98+
/**
99+
* SASL/OAUTHBEARER configuration.
100+
* The format is implementation-dependent and must be parsed accordingly.
101+
*/
102+
static const constexpr char* SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config";
103+
104+
/**
105+
* Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set.
106+
* Should only be used for development or testing, and not in production.
107+
* Default value: false
108+
*/
109+
static const constexpr char* ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt";
41110
};
42111

43112
} } } // end of KAFKA_API::clients::admin

include/kafka/ConsumerConfig.h

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,21 +94,92 @@ class Config: public Properties
9494
* Default value: range,roundrobin
9595
*/
9696
static const constexpr char* PARTITION_ASSIGNMENT_STRATEGY = "partition.assignment.strategy";
97+
9798
/**
9899
* Protocol used to communicate with brokers.
99100
* Default value: plaintext
100101
*/
101-
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";
102+
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";
103+
104+
/**
105+
* SASL mechanism to use for authentication.
106+
* Default value: GSSAPI
107+
*/
108+
static const constexpr char* SASL_MECHANISM = "sasl.mechanisms";
109+
110+
/**
111+
* SASL username for use with the PLAIN and SASL-SCRAM-.. mechanism.
112+
*/
113+
static const constexpr char* SASL_USERNAME = "sasl.username";
114+
115+
/**
116+
* SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism.
117+
*/
118+
static const constexpr char* SASL_PASSWORD = "sasl.password";
102119

103120
/**
104121
* Shell command to refresh or acquire the client's Kerberos ticket.
105122
*/
106-
static const constexpr char* SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
123+
static const constexpr char* SASL_KERBEROS_KINIT_CMD = "sasl.kerberos.kinit.cmd";
107124

108125
/**
109126
* The client's Kerberos principal name.
110127
*/
111-
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
128+
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
129+
130+
/**
131+
* Set to "default" or "oidc" to control with login method to be used.
132+
* If set to "oidc", the following properties must also be specified:
133+
* sasl.oauthbearer.client.id
134+
* sasl.oauthbearer.client.secret
135+
* sasl.oauthbearer.token.endpoint.url
136+
* Default value: default
137+
*/
138+
static const constexpr char* SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method";
139+
140+
/**
141+
* Public identifier for the applicaition.
142+
* Only used with "sasl.oauthbearer.method=oidc".
143+
*/
144+
static const constexpr char* SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id";
145+
146+
/**
147+
* Client secret only known to the application and the authorization server.
148+
* Only used with "sasl.oauthbearer.method=oidc".
149+
*/
150+
static const constexpr char* SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret";
151+
152+
/**
153+
* Allow additional information to be provided to the broker. Comma-separated list of key=value pairs.
154+
* Only used with "sasl.oauthbearer.method=oidc".
155+
*/
156+
static const constexpr char* SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions";
157+
158+
/**
159+
* Client use this to specify the scope of the access request to the broker.
160+
* Only used with "sasl.oauthbearer.method=oidc".
161+
*/
162+
static const constexpr char* SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope";
163+
164+
/**
165+
* OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token.
166+
* Only used with "sasl.oauthbearer.method=oidc".
167+
*/
168+
static const constexpr char* SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";
169+
170+
/**
171+
* SASL/OAUTHBEARER configuration.
172+
* The format is implementation-dependent and must be parsed accordingly.
173+
*/
174+
static const constexpr char* SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config";
175+
176+
/**
177+
* Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set.
178+
* Should only be used for development or testing, and not in production.
179+
* Default value: false
180+
*/
181+
static const constexpr char* ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt";
182+
112183
};
113184

114185
} } } // end of KAFKA_API::clients::consumer

include/kafka/KafkaClient.h

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,16 @@ class KafkaClient
9090
*/
9191
void setErrorCallback(ErrorCallback cb) { _errorCb = std::move(cb); }
9292

93+
/**
94+
* Callback type for OAUTHBEARER token refresh.
95+
*/
96+
using OauthbearerTokenRefreshCallback = std::function<SaslOauthbearerToken(const std::string&)>;
97+
98+
/**
99+
* Set callback for OAUTHBEARER token refresh.
100+
*/
101+
void setOauthbearerTokernRefreshCallback(OauthbearerTokenRefreshCallback cb) { _oauthbearerTokenRefreshCb = std::move(cb); }
102+
93103
/**
94104
* Return the properties which took effect.
95105
*/
@@ -220,10 +230,13 @@ class KafkaClient
220230
private:
221231
std::string _clientId;
222232
std::string _clientName;
233+
223234
std::atomic<int> _logLevel = {Log::Level::Notice};
224235
Logger _logger;
225-
StatsCallback _statsCb;
226-
ErrorCallback _errorCb;
236+
237+
StatsCallback _statsCb;
238+
ErrorCallback _errorCb;
239+
OauthbearerTokenRefreshCallback _oauthbearerTokenRefreshCb;
227240

228241
EventsPollingOption _eventsPollingOption;
229242
Interceptors _interceptors;
@@ -245,6 +258,9 @@ class KafkaClient
245258
// Error callback (for librdkafka)
246259
static void errorCallback(rd_kafka_t* rk, int err, const char* reason, void* opaque);
247260

261+
// OAUTHBEARER Toker Refresh Callback (for librdkafka)
262+
static void oauthbearerTokenRefreshCallback(rd_kafka_t* rk, const char* oauthbearerConfig, void* /*opaque*/);
263+
248264
// Interceptor callback (for librdkafka)
249265
static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* conf, void* opaque, char* errStr, std::size_t maxErrStrSize);
250266
static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
@@ -259,6 +275,9 @@ class KafkaClient
259275
// Error callback (for class instance)
260276
void onError(const Error& error);
261277

278+
// OAUTHBEARER Toker Refresh Callback (for class instance)
279+
SaslOauthbearerToken onOauthbearerTokenRefresh(const std::string& oauthbearerConfig);
280+
262281
// Interceptor callback (for class instance)
263282
void interceptThreadStart(const std::string& threadName, const std::string& threadType);
264283
void interceptThreadExit(const std::string& threadName, const std::string& threadType);
@@ -432,6 +451,9 @@ KafkaClient::KafkaClient(ClientType clientType,
432451
// Error Callback
433452
rd_kafka_conf_set_error_cb(rk_conf.get(), KafkaClient::errorCallback);
434453

454+
// OAUTHBEARER Toker Refresh Callback
455+
rd_kafka_conf_set_oauthbearer_token_refresh_cb(rk_conf.get(), KafkaClient::oauthbearerTokenRefreshCallback);
456+
435457
// Other Callbacks
436458
if (extraConfigRegister) extraConfigRegister(rk_conf.get());
437459

@@ -442,6 +464,7 @@ KafkaClient::KafkaClient(ClientType clientType,
442464
KAFKA_THROW_IF_WITH_ERROR(result);
443465
}
444466

467+
445468
// Set client handler
446469
_rk.reset(rd_kafka_new((clientType == ClientType::KafkaConsumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER),
447470
rk_conf.release(), // rk_conf's ownship would be transferred to rk, after the "rd_kafka_new()" call
@@ -553,6 +576,17 @@ KafkaClient::onError(const Error& error)
553576
if (_errorCb) _errorCb(error);
554577
}
555578

579+
inline SaslOauthbearerToken
580+
KafkaClient::onOauthbearerTokenRefresh(const std::string& oauthbearerConfig)
581+
{
582+
if (!_oauthbearerTokenRefreshCb)
583+
{
584+
throw std::runtime_error("No OAUTHBEARER token refresh callback configured!");
585+
}
586+
587+
return _oauthbearerTokenRefreshCb(oauthbearerConfig);
588+
}
589+
556590
inline void
557591
KafkaClient::errorCallback(rd_kafka_t* rk, int err, const char* reason, void* /*opaque*/)
558592
{
@@ -573,6 +607,41 @@ KafkaClient::errorCallback(rd_kafka_t* rk, int err, const char* reason, void* /*
573607
kafkaClient(rk).onError(error);
574608
}
575609

610+
inline void
611+
KafkaClient::oauthbearerTokenRefreshCallback(rd_kafka_t* rk, const char* oauthbearerConfig, void* /* opaque */)
612+
{
613+
SaslOauthbearerToken oauthbearerToken;
614+
615+
try
616+
{
617+
oauthbearerToken = kafkaClient(rk).onOauthbearerTokenRefresh(oauthbearerConfig);
618+
}
619+
catch (const std::exception& e)
620+
{
621+
rd_kafka_oauthbearer_set_token_failure(rk, e.what());
622+
}
623+
624+
LogBuffer<LOG_BUFFER_SIZE> errInfo;
625+
626+
std::vector<const char*> extensions;
627+
extensions.reserve(oauthbearerToken.extensions.size() * 2);
628+
for (const auto& kv: oauthbearerToken.extensions)
629+
{
630+
extensions.push_back(kv.first.c_str());
631+
extensions.push_back(kv.second.c_str());
632+
}
633+
634+
if (rd_kafka_oauthbearer_set_token(rk,
635+
oauthbearerToken.value.c_str(),
636+
oauthbearerToken.mdLifetime.count(),
637+
oauthbearerToken.mdPrincipalName.c_str(),
638+
extensions.data(), extensions.size(),
639+
errInfo.str(), errInfo.capacity()) != RD_KAFKA_RESP_ERR_NO_ERROR)
640+
{
641+
rd_kafka_oauthbearer_set_token_failure(rk, errInfo.c_str());
642+
}
643+
}
644+
576645
inline void
577646
KafkaClient::interceptThreadStart(const std::string& threadName, const std::string& threadType)
578647
{

include/kafka/ProducerConfig.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,22 @@ class Config: public Properties
135135
*/
136136
static const constexpr char* SECURITY_PROTOCOL = "security.protocol";
137137

138+
/**
139+
* SASL mechanism to use for authentication.
140+
* Default value: GSSAPI
141+
*/
142+
static const constexpr char* SASL_MECHANISM = "sasl.mechanisms";
143+
144+
/**
145+
* SASL username for use with the PLAIN and SASL-SCRAM-.. mechanism.
146+
*/
147+
static const constexpr char* SASL_USERNAME = "sasl.username";
148+
149+
/**
150+
* SASL password for use with the PLAIN and SASL-SCRAM-.. mechanism.
151+
*/
152+
static const constexpr char* SASL_PASSWORD = "sasl.password";
153+
138154
/**
139155
* Shell command to refresh or acquire the client's Kerberos ticket.
140156
*/
@@ -144,6 +160,59 @@ class Config: public Properties
144160
* The client's Kerberos principal name.
145161
*/
146162
static const constexpr char* SASL_KERBEROS_SERVICE_NAME = "sasl.kerberos.service.name";
163+
164+
/**
165+
* Set to "default" or "oidc" to control with login method to be used.
166+
* If set to "oidc", the following properties must also be specified:
167+
* sasl.oauthbearer.client.id
168+
* sasl.oauthbearer.client.secret
169+
* sasl.oauthbearer.token.endpoint.url
170+
* Default value: default
171+
*/
172+
static const constexpr char* SASL_OAUTHBEARER_METHOD = "sasl.oauthbearer.method";
173+
174+
/**
175+
* Public identifier for the applicaition.
176+
* Only used with "sasl.oauthbearer.method=oidc".
177+
*/
178+
static const constexpr char* SASL_OAUTHBEARER_CLIENT_ID = "sasl.oauthbearer.client.id";
179+
180+
/**
181+
* Client secret only known to the application and the authorization server.
182+
* Only used with "sasl.oauthbearer.method=oidc".
183+
*/
184+
static const constexpr char* SASL_OAUTHBEARER_CLIENT_SECRET = "sasl.oauthbearer.client.secret";
185+
186+
/**
187+
* Allow additional information to be provided to the broker. Comma-separated list of key=value pairs.
188+
* Only used with "sasl.oauthbearer.method=oidc".
189+
*/
190+
static const constexpr char* SASL_OAUTHBEARER_EXTENSIONS = "sasl.oauthbearer.extensions";
191+
192+
/**
193+
* Client use this to specify the scope of the access request to the broker.
194+
* Only used with "sasl.oauthbearer.method=oidc".
195+
*/
196+
static const constexpr char* SASL_OAUTHBEARER_SCOPE = "sasl.oauthbearer.scope";
197+
198+
/**
199+
* OAuth/OIDC issuer token endpoint HTTP(S) URI used to retreve token.
200+
* Only used with "sasl.oauthbearer.method=oidc".
201+
*/
202+
static const constexpr char* SASL_OAUTHBEARER_TOKEN_ENDPOINT_URL = "sasl.oauthbearer.token.endpoint.url";
203+
204+
/**
205+
* SASL/OAUTHBEARER configuration.
206+
* The format is implementation-dependent and must be parsed accordingly.
207+
*/
208+
static const constexpr char* SASL_OAUTHBEARER_CONFIG = "sasl.oauthbearer.config";
209+
210+
/**
211+
* Enable the builtin unsecure JWT OAUTHBEARER token handler if no oauthbearer_refresh_cb has been set.
212+
* Should only be used for development or testing, and not in production.
213+
* Default value: false
214+
*/
215+
static const constexpr char* ENABLE_SASL_OAUTHBEARER_UNSECURE_JWT = "enable.sasl.oauthbearer.unsecure.jwt";
147216
};
148217

149218
} } } // end of KAFKA_API::clients::producer

0 commit comments

Comments
 (0)