Skip to content

Commit 7418ab3

Browse files
committed
Interceptors for threads (start, exit)
1 parent 5ce1f8c commit 7418ab3

File tree

10 files changed

+284
-49
lines changed

10 files changed

+284
-49
lines changed

doc/KafkaConsumerQuickStart.md

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -191,15 +191,11 @@ About `Error`'s `value()`s, there are 2 cases
191191
192192
* How many threads would be created by a KafkaConsumer?
193193
194-
Excluding the user's main thread, if `enable.auto.commit` is `false`, the `KafkaConsumer` would start another (N + 2) threads in the background; otherwise, the `KafkaConsumer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
195-
196194
1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a seperate thread to transmit messages towards a kafka cluster server.
197195
198196
2. Another 3 threads will handle internal operations, consumer group operations, and kinds of timers, etc.
199197
200-
3. To enable the auto commit, one more thread would be create, which keeps polling/processing the offset-commit callback event.
201-
202-
E.g, if a KafkaConsumer was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 6 threads in total (including the main thread).
198+
3. To enable the auto events-polling, one more background thread would be created, which keeps polling/processing the offset-commit callback event.
203199
204200
* Which one of these threads will handle the callbacks?
205201

doc/KafkaProducerQuickStart.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -199,19 +199,15 @@ Larger `QUEUE_BUFFERING_MAX_MESSAGES`/`QUEUE_BUFFERING_MAX_KBYTES` might help to
199199
200200
### How many threads would be created by a KafkaProducer?
201201
202-
Excluding the user's main thread, `KafkaProducer` would start (N + 3) background threads. (N means the number of BOOTSTRAP_SERVERS)
203-
204202
Most of these background threads are started internally by librdkafka.
205203
206204
Here is a brief introduction what they're used for,
207205
208206
1. Each broker (in the list of BOOTSTRAP_SERVERS) would take a separate thread to transmit messages towards a kafka cluster server.
209207
210-
2. Another 2 background threads would handle internal operations and kinds of timers, etc.
211-
212-
3. One more background thread to keep polling the delivery callback event.
208+
2. Another 2 threads would handle internal operations and kinds of timers, etc.
213209
214-
E.g, if a `KafkaProducer` was created with property of `BOOTSTRAP_SERVERS=127.0.0.1:8888,127.0.0.1:8889,127.0.0.1:8890`, it would take 7 threads in total (including the main thread).
210+
3. To enale the auto events-polling, one more background thread would be created, which keeps polling the delivery callback event.
215211
216212
### Which one of these threads will handle the callbacks
217213

include/kafka/AdminClient.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,10 @@ class AdminClient: public KafkaClient
2828
public:
2929
explicit AdminClient(const Properties& properties)
3030
: KafkaClient(ClientType::AdminClient,
31-
KafkaClient::validateAndReformProperties(properties))
31+
KafkaClient::validateAndReformProperties(properties),
32+
ConfigCallbacksRegister{},
33+
EventsPollingOption::Auto,
34+
Interceptors{})
3235
{
3336
}
3437

include/kafka/Interceptors.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#pragma once
2+
3+
#include <kafka/Project.h>
4+
5+
#include <functional>
6+
7+
8+
namespace KAFKA_API { namespace clients {
9+
10+
class Interceptors
11+
{
12+
public:
13+
using ThreadStartCallback = std::function<void(const std::string&, const std::string&)>;
14+
using ThreadExitCallback = std::function<void(const std::string&, const std::string&)>;
15+
16+
Interceptors& onThreadStart(ThreadStartCallback cb) { _valid = true; _threadStartCb = std::move(cb); return *this; }
17+
Interceptors& onThreadExit(ThreadExitCallback cb) { _valid = true; _threadExitCb = std::move(cb); return *this; }
18+
19+
ThreadStartCallback onThreadStart() const { return _threadStartCb; }
20+
ThreadExitCallback onThreadExit() const { return _threadExitCb; }
21+
22+
bool empty() const { return !_valid; }
23+
24+
private:
25+
ThreadStartCallback _threadStartCb;
26+
ThreadExitCallback _threadExitCb;
27+
bool _valid = false;
28+
};
29+
30+
} } // end of KAFKA_API::clients
31+

include/kafka/KafkaClient.h

Lines changed: 90 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
#include <kafka/BrokerMetadata.h>
66
#include <kafka/Error.h>
7+
#include <kafka/Interceptors.h>
78
#include <kafka/KafkaException.h>
89
#include <kafka/Log.h>
910
#include <kafka/Properties.h>
@@ -20,6 +21,7 @@
2021
#include <mutex>
2122
#include <string>
2223
#include <thread>
24+
#include <vector>
2325

2426

2527
namespace KAFKA_API { namespace clients {
@@ -168,10 +170,11 @@ class KafkaClient
168170

169171
using ConfigCallbacksRegister = std::function<void(rd_kafka_conf_t*)>;
170172

171-
KafkaClient(ClientType clientType,
172-
const Properties& properties,
173-
const ConfigCallbacksRegister& extraConfigRegister = ConfigCallbacksRegister{},
174-
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto);
173+
KafkaClient(ClientType clientType,
174+
const Properties& properties,
175+
const ConfigCallbacksRegister& extraConfigRegister,
176+
EventsPollingOption eventsPollingOption,
177+
Interceptors interceptors);
175178

176179
rd_kafka_t* getClientHandle() const { return _rk.get(); }
177180

@@ -221,8 +224,11 @@ class KafkaClient
221224
Logger _logger;
222225
StatsCallback _statsCb;
223226
ErrorCallback _errorCb;
224-
rd_kafka_unique_ptr _rk;
227+
225228
EventsPollingOption _eventsPollingOption;
229+
Interceptors _interceptors;
230+
231+
rd_kafka_unique_ptr _rk;
226232

227233
static std::string getClientTypeString(ClientType type)
228234
{
@@ -239,6 +245,11 @@ class KafkaClient
239245
// Error callback (for librdkafka)
240246
static void errorCallback(rd_kafka_t* rk, int err, const char* reason, void* opaque);
241247

248+
// Interceptor callback (for librdkafka)
249+
static rd_kafka_resp_err_t configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* conf, void* opaque, char* errStr, std::size_t maxErrStrSize);
250+
static rd_kafka_resp_err_t interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
251+
static rd_kafka_resp_err_t interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* opaque);
252+
242253
// Log callback (for class instance)
243254
void onLog(int level, const char* fac, const char* buf) const;
244255

@@ -248,6 +259,10 @@ class KafkaClient
248259
// Error callback (for class instance)
249260
void onError(const Error& error);
250261

262+
// Interceptor callback (for class instance)
263+
void interceptThreadStart(const std::string& threadName, const std::string& threadType);
264+
void interceptThreadExit(const std::string& threadName, const std::string& threadType);
265+
251266
static const constexpr char* BOOTSTRAP_SERVERS = "bootstrap.servers";
252267
static const constexpr char* CLIENT_ID = "client.id";
253268
static const constexpr char* LOG_LEVEL = "log_level";
@@ -275,8 +290,9 @@ class KafkaClient
275290
class PollThread
276291
{
277292
public:
278-
explicit PollThread(Pollable& pollable)
279-
: _running(true), _thread(keepPolling, std::ref(_running), std::ref(pollable))
293+
using InterceptorCb = std::function<void()>;
294+
explicit PollThread(const InterceptorCb& entryCb, const InterceptorCb& exitCb, Pollable& pollable)
295+
: _running(true), _thread(keepPolling, std::ref(_running), entryCb, exitCb, std::ref(pollable))
280296
{
281297
}
282298

@@ -288,12 +304,19 @@ class KafkaClient
288304
}
289305

290306
private:
291-
static void keepPolling(std::atomic_bool& running, Pollable& pollable)
307+
static void keepPolling(std::atomic_bool& running,
308+
const InterceptorCb& entryCb,
309+
const InterceptorCb& exitCb,
310+
Pollable& pollable)
292311
{
312+
entryCb();
313+
293314
while (running.load())
294315
{
295316
pollable.poll(CALLBACK_POLLING_INTERVAL_MS);
296317
}
318+
319+
exitCb();
297320
}
298321

299322
static constexpr int CALLBACK_POLLING_INTERVAL_MS = 10;
@@ -306,7 +329,10 @@ class KafkaClient
306329
{
307330
_pollable = std::make_unique<KafkaClient::PollableCallback>(pollableCallback);
308331

309-
if (isWithAutoEventsPolling()) _pollThread = std::make_unique<PollThread>(*_pollable);
332+
auto entryCb = [this]() { interceptThreadStart("events-polling", "background"); };
333+
auto exitCb = [this]() { interceptThreadExit("events-polling", "background"); };
334+
335+
if (isWithAutoEventsPolling()) _pollThread = std::make_unique<PollThread>(entryCb, exitCb, *_pollable);
310336
}
311337

312338
void stopBackgroundPollingIfNecessary()
@@ -331,8 +357,10 @@ inline
331357
KafkaClient::KafkaClient(ClientType clientType,
332358
const Properties& properties,
333359
const ConfigCallbacksRegister& extraConfigRegister,
334-
EventsPollingOption eventsPollingOption)
335-
: _eventsPollingOption(eventsPollingOption)
360+
EventsPollingOption eventsPollingOption,
361+
Interceptors interceptors)
362+
: _eventsPollingOption(eventsPollingOption),
363+
_interceptors(std::move(interceptors))
336364
{
337365
static const std::set<std::string> PRIVATE_PROPERTY_KEYS = { "max.poll.records" };
338366

@@ -403,6 +431,13 @@ KafkaClient::KafkaClient(ClientType clientType,
403431
// Other Callbacks
404432
if (extraConfigRegister) extraConfigRegister(rk_conf.get());
405433

434+
// Interceptor
435+
if (!_interceptors.empty())
436+
{
437+
Error result{ rd_kafka_conf_interceptor_add_on_new(rk_conf.get(), "on_new", KafkaClient::configInterceptorOnNew, nullptr) };
438+
KAFKA_THROW_IF_WITH_ERROR(result);
439+
}
440+
406441
// Set client handler
407442
_rk.reset(rd_kafka_new((clientType == ClientType::KafkaConsumer ? RD_KAFKA_CONSUMER : RD_KAFKA_PRODUCER),
408443
rk_conf.release(), // rk_conf's ownship would be transferred to rk, after the "rd_kafka_new()" call
@@ -534,6 +569,50 @@ KafkaClient::errorCallback(rd_kafka_t* rk, int err, const char* reason, void* /*
534569
kafkaClient(rk).onError(error);
535570
}
536571

572+
inline void
573+
KafkaClient::interceptThreadStart(const std::string& threadName, const std::string& threadType)
574+
{
575+
if (const auto& cb = _interceptors.onThreadStart()) cb(threadName, threadType);
576+
}
577+
578+
inline void
579+
KafkaClient::interceptThreadExit(const std::string& threadName, const std::string& threadType)
580+
{
581+
if (const auto& cb = _interceptors.onThreadExit()) cb(threadName, threadType);
582+
}
583+
584+
inline rd_kafka_resp_err_t
585+
KafkaClient::configInterceptorOnNew(rd_kafka_t* rk, const rd_kafka_conf_t* /*conf*/, void* opaque, char* /*errStr*/, std::size_t /*maxErrStrSize*/)
586+
{
587+
if (auto result = rd_kafka_interceptor_add_on_thread_start(rk, "on_thread_start", KafkaClient::interceptorOnThreadStart, opaque))
588+
{
589+
return result;
590+
}
591+
592+
if (auto result = rd_kafka_interceptor_add_on_thread_exit(rk, "on_thread_exit", KafkaClient::interceptorOnThreadExit, opaque))
593+
{
594+
return result;
595+
}
596+
597+
return RD_KAFKA_RESP_ERR_NO_ERROR;
598+
}
599+
600+
inline rd_kafka_resp_err_t
601+
KafkaClient::interceptorOnThreadStart(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/)
602+
{
603+
kafkaClient(rk).interceptThreadStart(threadName, toString(threadType));
604+
605+
return RD_KAFKA_RESP_ERR_NO_ERROR;
606+
}
607+
608+
inline rd_kafka_resp_err_t
609+
KafkaClient::interceptorOnThreadExit(rd_kafka_t* rk, rd_kafka_thread_type_t threadType, const char* threadName, void* /*opaque*/)
610+
{
611+
kafkaClient(rk).interceptThreadExit(threadName, toString(threadType));
612+
613+
return RD_KAFKA_RESP_ERR_NO_ERROR;
614+
}
615+
537616
inline Optional<BrokerMetadata>
538617
KafkaClient::fetchBrokerMetadata(const std::string& topic, std::chrono::milliseconds timeout, bool disableErrorLogging)
539618
{

include/kafka/KafkaConsumer.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ class KafkaConsumer: public KafkaClient
3939
* - RD_KAFKA_RESP_ERR__INVALID_ARG : Invalid BOOTSTRAP_SERVERS property
4040
* - RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE: Fail to create internal threads
4141
*/
42-
explicit KafkaConsumer(const Properties& properties,
43-
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto);
42+
explicit KafkaConsumer(const Properties& properties,
43+
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto,
44+
const Interceptors& interceptors = Interceptors{});
4445

4546
/**
4647
* The destructor for KafkaConsumer.
@@ -373,11 +374,14 @@ KafkaConsumer::registerConfigCallbacks(rd_kafka_conf_t* conf)
373374
}
374375

375376
inline
376-
KafkaConsumer::KafkaConsumer(const Properties &properties, EventsPollingOption eventsPollingOption)
377+
KafkaConsumer::KafkaConsumer(const Properties& properties,
378+
EventsPollingOption eventsPollingOption,
379+
const Interceptors& interceptors)
377380
: KafkaClient(ClientType::KafkaConsumer,
378381
validateAndReformProperties(properties),
379382
registerConfigCallbacks,
380-
eventsPollingOption)
383+
eventsPollingOption,
384+
interceptors)
381385
{
382386
// Pick up the "max.poll.records" property
383387
if (auto maxPollRecordsProperty = properties.getProperty(consumer::Config::MAX_POLL_RECORDS))

include/kafka/KafkaProducer.h

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ class KafkaProducer: public KafkaClient
3939
* - RD_KAFKA_RESP_ERR__INVALID_ARG : Invalid BOOTSTRAP_SERVERS property
4040
* - RD_KAFKA_RESP_ERR__CRIT_SYS_RESOURCE: Fail to create internal threads
4141
*/
42-
explicit KafkaProducer(const Properties& properties,
43-
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto);
42+
explicit KafkaProducer(const Properties& properties,
43+
EventsPollingOption eventsPollingOption = EventsPollingOption::Auto,
44+
const Interceptors& interceptors = Interceptors{});
4445

4546
/**
4647
* The destructor for KafkaProducer.
@@ -221,11 +222,14 @@ class KafkaProducer: public KafkaClient
221222
};
222223

223224
inline
224-
KafkaProducer::KafkaProducer(const Properties& properties, EventsPollingOption eventsPollingOption)
225+
KafkaProducer::KafkaProducer(const Properties& properties,
226+
EventsPollingOption eventsPollingOption,
227+
const Interceptors& interceptors)
225228
: KafkaClient(ClientType::KafkaProducer,
226229
validateAndReformProperties(properties),
227230
registerConfigCallbacks,
228-
eventsPollingOption)
231+
eventsPollingOption,
232+
interceptors)
229233
{
230234
// Start background polling (if needed)
231235
startBackgroundPollingIfNecessary([this](int timeoutMs){ pollCallbacks(timeoutMs); });

include/kafka/RdKafkaHelper.h

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
#include <librdkafka/rdkafka.h>
88

9+
#include <cassert>
910
#include <memory>
1011

1112
namespace KAFKA_API {
@@ -48,6 +49,23 @@ using rd_kafka_consumer_group_metadata_unique_ptr = std::unique_ptr<rd_kafka_con
4849
inline void RkErrorDeleter(rd_kafka_error_t* p) { rd_kafka_error_destroy(p); }
4950
using rd_kafka_error_shared_ptr = std::shared_ptr<rd_kafka_error_t>;
5051

52+
53+
inline std::string toString(rd_kafka_thread_type_t threadType)
54+
{
55+
switch (threadType)
56+
{
57+
case RD_KAFKA_THREAD_MAIN:
58+
return "main";
59+
case RD_KAFKA_THREAD_BACKGROUND:
60+
return "background";
61+
case RD_KAFKA_THREAD_BROKER:
62+
return "broker";
63+
default:
64+
assert(false);
65+
return "NA";
66+
}
67+
}
68+
5169
// Convert from rd_kafka_xxx datatypes
5270
inline TopicPartitionOffsets getTopicPartitionOffsets(const rd_kafka_topic_partition_list_t* rk_tpos)
5371
{

0 commit comments

Comments
 (0)