Skip to content

Commit da2df03

Browse files
committed
add dispatchKafkaCallbacks
1 parent 0fa5b6a commit da2df03

File tree

4 files changed

+30
-54
lines changed

4 files changed

+30
-54
lines changed

src/app/contextBroker/contextBroker.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1321,7 +1321,8 @@ int main(int argC, char* argV[])
13211321
while (1)
13221322
{
13231323
// Sleep periodically during a minute
1324-
sleep(60);
1324+
sleep(1);
1325+
kafkaMgr.dispatchKafkaCallbacks();
13251326

13261327
// At the present moment MQTT max age checking is the only one periodic process we need to do
13271328
// If some other is introduced in the future, this part should be adapted.
@@ -1331,12 +1332,12 @@ int main(int argC, char* argV[])
13311332
// checking should be moved to a separate thread or, the other way arround, the cache sync
13321333
// process be included as part of this sleep loop
13331334
times++;
1334-
if (times == mqttMaxAge)
1335+
if (times == mqttMaxAge * 60)
13351336
{
13361337
times = 0;
13371338
mqttMgr.cleanup(mqttMaxAge*60);
13381339
}
1339-
if (times == kafkaMaxAge)
1340+
if (times == kafkaMaxAge * 60)
13401341
{
13411342
times = 0;
13421343
kafkaMgr.cleanup(kafkaMaxAge*60);

src/lib/kafka/KafkaConnectionManager.cpp

Lines changed: 17 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,6 @@ void KafkaConnectionManager::init(long _timeout)
9595
}
9696

9797
semInit();
98-
startPollingThread();
9998
}
10099

101100

@@ -108,8 +107,6 @@ void KafkaConnectionManager::teardown(void)
108107
{
109108
LM_T(LmtKafkaNotif, ("Teardown KAFKA connections"));
110109

111-
stopPollingThread();
112-
113110
for (std::map<std::string, KafkaConnection*>::iterator iter = connections.begin(); iter != connections.end(); ++iter)
114111
{
115112
std::string endpoint = iter->first;
@@ -198,46 +195,30 @@ void KafkaConnectionManager::semGive(void)
198195

199196
/* ****************************************************************************
200197
*
201-
* KafkaConnectionManager::startPollingThread -
198+
* KafkaConnectionManager::dispatchKafkaCallbacks -
202199
*/
203-
void KafkaConnectionManager::startPollingThread() {
204-
stopPolling = false;
205-
pollingThread = std::thread([this]() {
206-
while (!stopPolling.load()) {
207-
// Aquí llamamos a poll con un timeout de, digamos, 100 ms
208-
semTake();
209-
// Copiar referencias para minimizar tiempo de bloqueo
210-
std::vector<rd_kafka_t*> producers;
211-
for (auto& kv : connections) {
212-
producers.push_back(kv.second->producer);
213-
}
214-
semGive();
215-
216-
// Hacer poll fuera del bloque sincronizado
217-
for (auto* producer : producers) {
218-
rd_kafka_poll(producer, 0);
219-
}
220-
// Pequeña pausa para no consumir CPU innecesariamente
221-
std::this_thread::sleep_for(std::chrono::milliseconds(100));
222-
}
223-
});
224-
}
225-
200+
void KafkaConnectionManager::dispatchKafkaCallbacks()
201+
{
202+
// We copy the producers under the semaphore to minimize critical section time.
203+
std::vector<rd_kafka_t*> producers;
204+
producers.reserve(connections.size());
226205

206+
semTake();
207+
for (auto& kv : connections) {
208+
KafkaConnection* k = kv.second;
209+
if (k && k->producer)
210+
producers.push_back(k->producer);
211+
}
212+
semGive();
227213

228-
/* ****************************************************************************
229-
*
230-
* KafkaConnectionManager::stopPollingThread -
231-
*/
232-
void KafkaConnectionManager::stopPollingThread() {
233-
stopPolling = true;
234-
if (pollingThread.joinable()) {
235-
pollingThread.join();
214+
// We fire callbacks outside the critical section
215+
for (auto* p : producers) {
216+
// 0 ms: does not block; if you call frequently, this is sufficient
217+
rd_kafka_poll(p, 0);
236218
}
237219
}
238220

239221

240-
241222
/* ****************************************************************************
242223
*
243224
* KafkaConnectionManager::disconnect -

src/lib/kafka/KafkaConnectionManager.h

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,6 @@
3232
#include <string>
3333
#include <map>
3434
#include <rdkafka.h>
35-
#include <thread>
36-
#include <atomic>
3735

3836

3937
/* ****************************************************************************
@@ -79,18 +77,14 @@ class KafkaConnectionManager
7977
);
8078

8179
void cleanup(double maxAge); // Cleaning inactive connections
82-
83-
void stopPollingThread();
84-
void startPollingThread();
80+
void dispatchKafkaCallbacks();
8581

8682
private:
8783
void disconnect(rd_kafka_t* producer, const std::string& endpoint);
8884
void semInit(void);
8985
void semTake(void);
9086
void semGive(void);
9187
KafkaConnection* getConnection(const std::string& endpoint);
92-
std::thread pollingThread;
93-
std::atomic<bool> stopPolling{false};
9488
};
9589

9690
#endif // SRC_LIB_KAFKA_KAFKACONNECTIONMANAGER_H_

test/functionalTest/cases/4666_kafka/kafka_subscription_multibroker.test

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,9 @@ accumulatorStart --pretty-print --bootstrapServers "${KAFKA_BOOTSTRAP_A}" --kafk
4545
# 04. Create KAFKA sub4 at localhost:9095 (2)
4646
# 05. Create entity E9092:9094 to stimulate connections to localhost:9092,localhost:9094 (one created, one reused)
4747
# 06. Sleep 60 seconds, then create entity E1884 to stimulate connections to localhost:9095 (one created, one reused)
48-
# 07. Sleep 60 seconds, get logs and see localhost:9092,localhost:9094 connection went away
48+
# 07. Sleep 70 seconds, get logs and see localhost:9092,localhost:9094 connection went away
4949
# 08. Update entity E9092:9094 to re-create connections to localhost:9092,localhost:9094
50-
# 09. Sleep 60 seconds, get logs and see localhost:9095 connection went away
50+
# 09. Sleep 70 seconds, get logs and see localhost:9095 connection went away
5151
# 10. Dump accumulator for localhost:9092,localhost:9094 and see 4 notifications
5252
#
5353

@@ -178,9 +178,9 @@ echo
178178
echo
179179

180180

181-
echo "07. Sleep 60 seconds, get logs and see localhost:9092,localhost:9094 connection went away"
181+
echo "07. Sleep 70 seconds, get logs and see localhost:9092,localhost:9094 connection went away"
182182
echo "========================================================================================="
183-
sleep 60s
183+
sleep 70s
184184
cat /tmp/contextBroker.log | grep 'DEBUG\|ERROR\|WARN' | grep -v 'start command line' | awk -F 'msg=' '{print $2}'
185185
echo
186186
echo
@@ -198,9 +198,9 @@ orionCurl --url /v2/entities/E9092:9094/attrs --payload "$payload"
198198
echo
199199
echo
200200

201-
echo "09. Sleep 60 seconds, get logs and see localhost:9095 connection went away"
201+
echo "09. Sleep 70 seconds, get logs and see localhost:9095 connection went away"
202202
echo "=========================================================================="
203-
sleep 60s
203+
sleep 70s
204204
cat /tmp/contextBroker.log | grep 'DEBUG\|ERROR\|WARN' | grep -v 'start command line' | awk -F 'msg=' '{print $2}'
205205
echo
206206
echo
@@ -280,7 +280,7 @@ Content-Length: 0
280280

281281

282282

283-
07. Sleep 60 seconds, get logs and see localhost:9092,localhost:9094 connection went away
283+
07. Sleep 70 seconds, get logs and see localhost:9092,localhost:9094 connection went away
284284
=========================================================================================
285285
#SORT_START
286286
Initializing KAFKA library
@@ -312,7 +312,7 @@ Fiware-Correlator: REGEX([0-9a-f\-]{36})
312312

313313

314314

315-
09. Sleep 60 seconds, get logs and see localhost:9095 connection went away
315+
09. Sleep 70 seconds, get logs and see localhost:9095 connection went away
316316
==========================================================================
317317
#SORT_START
318318
Initializing KAFKA library

0 commit comments

Comments
 (0)