Skip to content

Commit 5c321ac

Browse files
committed
IcingaDB: put all queue related stuff into icingadb:task_queue namespace
1 parent bd7ab1b commit 5c321ac

File tree

3 files changed

+72
-66
lines changed

3 files changed

+72
-66
lines changed

lib/icingadb/icingadb-objects.cpp

Lines changed: 20 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ void IcingaDB::UpdateAllConfigObjects()
354354
auto checkable (dynamic_pointer_cast<Checkable>(object));
355355

356356
if (checkable && checkable->GetEnableActiveChecks()) {
357-
EnqueueConfigObject(checkable, NextUpdate);
357+
EnqueueConfigObject(checkable, icingadb::task_queue::NextUpdate);
358358
}
359359
}
360360

@@ -1339,15 +1339,15 @@ void IcingaDB::UpdateState(const Checkable::Ptr& checkable, uint32_t mode)
13391339
String checksum = HashValue(stateAttrs);
13401340

13411341
auto [redisStateKey, redisChecksumKey] = GetCheckableStateKeys(checkable->GetReflectionType());
1342-
if (mode & VolatileState) {
1342+
if (mode & icingadb::task_queue::VolatileState) {
13431343
String objectKey = GetObjectIdentifier(checkable);
13441344
m_RconWorker->FireAndForgetQueries({
13451345
{"HSET", redisStateKey, objectKey, JsonEncode(stateAttrs)},
13461346
{"HSET", redisChecksumKey, objectKey, JsonEncode(new Dictionary({{"checksum", checksum}}))},
13471347
});
13481348
}
13491349

1350-
if (mode & RuntimeState) {
1350+
if (mode & icingadb::task_queue::RuntimeState) {
13511351
ObjectLock olock(stateAttrs);
13521352

13531353
RedisConnection::Query streamadd({
@@ -1820,7 +1820,7 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
18201820

18211821
auto [configStateKey, checksumStateKey] = GetCheckableStateKeys(checkable->GetReflectionType());
18221822
EnqueueRelationsDeletion(GetObjectIdentifier(checkable), {{configStateKey, checksumStateKey}});
1823-
EnqueueConfigObject(object, ConfigDelete | NextUpdate); // Send also ZREM for next update
1823+
EnqueueConfigObject(object, icingadb::task_queue::ConfigDelete | icingadb::task_queue::NextUpdate); // Send also ZREM for next update
18241824

18251825
if (service) {
18261826
SendGroupsChanged<ServiceGroup>(checkable, service->GetGroups(), nullptr);
@@ -1831,7 +1831,7 @@ void IcingaDB::SendConfigDelete(const ConfigObject::Ptr& object)
18311831
return;
18321832
}
18331833

1834-
EnqueueConfigObject(object, ConfigDelete);
1834+
EnqueueConfigObject(object, icingadb::task_queue::ConfigDelete);
18351835

18361836
if (type == TimePeriod::TypeInstance) {
18371837
TimePeriod::Ptr timeperiod = static_pointer_cast<TimePeriod>(object);
@@ -1893,7 +1893,7 @@ void IcingaDB::SendStateChange(const ConfigObject::Ptr& object, const CheckResul
18931893

18941894
tie(host, service) = GetHostService(checkable);
18951895

1896-
EnqueueConfigObject(checkable, RuntimeState);
1896+
EnqueueConfigObject(checkable, icingadb::task_queue::RuntimeState);
18971897

18981898
int hard_state{};
18991899
if (!cr) {
@@ -2074,7 +2074,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
20742074
return;
20752075
}
20762076

2077-
EnqueueConfigObject(downtime, ConfigUpdate);
2077+
EnqueueConfigObject(downtime, icingadb::task_queue::ConfigUpdate);
20782078

20792079
auto checkable (downtime->GetCheckable());
20802080
auto triggeredBy (Downtime::GetByName(downtime->GetTriggeredBy()));
@@ -2084,7 +2084,7 @@ void IcingaDB::SendStartedDowntime(const Downtime::Ptr& downtime)
20842084
tie(host, service) = GetHostService(checkable);
20852085

20862086
/* Update checkable state as in_downtime may have changed. */
2087-
EnqueueConfigObject(checkable, FullState);
2087+
EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
20882088

20892089
RedisConnection::Query xAdd ({
20902090
"XADD", "icinga:history:stream:downtime", "*",
@@ -2173,7 +2173,7 @@ void IcingaDB::SendRemovedDowntime(const Downtime::Ptr& downtime)
21732173
return;
21742174

21752175
/* Update checkable state as in_downtime may have changed. */
2176-
EnqueueConfigObject(checkable, FullState);
2176+
EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
21772177

21782178
RedisConnection::Query xAdd ({
21792179
"XADD", "icinga:history:stream:downtime", "*",
@@ -2262,7 +2262,7 @@ void IcingaDB::SendAddedComment(const Comment::Ptr& comment)
22622262
tie(host, service) = GetHostService(checkable);
22632263

22642264
// Update the checkable state to so that the "last_comment_id" is correctly reflected.
2265-
EnqueueConfigObject(checkable, FullState);
2265+
EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
22662266

22672267
RedisConnection::Query xAdd ({
22682268
"XADD", "icinga:history:stream:comment", "*",
@@ -2336,7 +2336,7 @@ void IcingaDB::SendRemovedComment(const Comment::Ptr& comment)
23362336
tie(host, service) = GetHostService(checkable);
23372337

23382338
// Update the checkable state to so that the "last_comment_id" is correctly reflected.
2339-
EnqueueConfigObject(checkable, FullState);
2339+
EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
23402340

23412341
RedisConnection::Query xAdd ({
23422342
"XADD", "icinga:history:stream:comment", "*",
@@ -2493,7 +2493,7 @@ void IcingaDB::SendAcknowledgementSet(const Checkable::Ptr& checkable, const Str
24932493
tie(host, service) = GetHostService(checkable);
24942494

24952495
/* Update checkable state as is_acknowledged may have changed. */
2496-
EnqueueConfigObject(checkable, FullState);
2496+
EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
24972497

24982498
RedisConnection::Query xAdd ({
24992499
"XADD", "icinga:history:stream:acknowledgement", "*",
@@ -2551,7 +2551,7 @@ void IcingaDB::SendAcknowledgementCleared(const Checkable::Ptr& checkable, const
25512551
tie(host, service) = GetHostService(checkable);
25522552

25532553
/* Update checkable state as is_acknowledged may have changed. */
2554-
EnqueueConfigObject(checkable, FullState);
2554+
EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
25552555

25562556
RedisConnection::Query xAdd ({
25572557
"XADD", "icinga:history:stream:acknowledgement", "*",
@@ -2970,7 +2970,7 @@ void IcingaDB::ReachabilityChangeHandler(const std::set<Checkable::Ptr>& childre
29702970
{
29712971
for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
29722972
for (auto& checkable : children) {
2973-
rw->EnqueueConfigObject(checkable, FullState);
2973+
rw->EnqueueConfigObject(checkable, icingadb::task_queue::FullState);
29742974
for (const auto& dependencyGroup : checkable->GetDependencyGroups()) {
29752975
rw->EnqueueDependencyGroupStateUpdate(dependencyGroup);
29762976
}
@@ -2991,7 +2991,7 @@ void IcingaDB::VersionChangedHandler(const ConfigObject::Ptr& object)
29912991
if (object->IsActive()) {
29922992
for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
29932993
// A runtime config change triggers also a full state update as well as next update event.
2994-
rw->EnqueueConfigObject(object, ConfigUpdate | FullState | NextUpdate);
2994+
rw->EnqueueConfigObject(object, icingadb::task_queue::ConfigUpdate | icingadb::task_queue::FullState | icingadb::task_queue::NextUpdate);
29952995
}
29962996
} else if (!object->IsActive() && object->GetExtension("ConfigObjectDeleted")) { // same as in apilistener-configsync.cpp
29972997
for (const IcingaDB::Ptr& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
@@ -3055,21 +3055,21 @@ void IcingaDB::FlappingChangeHandler(const Checkable::Ptr& checkable, double cha
30553055
void IcingaDB::NewCheckResultHandler(const Checkable::Ptr& checkable)
30563056
{
30573057
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3058-
rw->EnqueueConfigObject(checkable, VolatileState);
3058+
rw->EnqueueConfigObject(checkable, icingadb::task_queue::VolatileState);
30593059
}
30603060
}
30613061

30623062
void IcingaDB::NextCheckChangedHandler(const Checkable::Ptr& checkable)
30633063
{
30643064
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3065-
rw->EnqueueConfigObject(checkable, VolatileState | NextUpdate);
3065+
rw->EnqueueConfigObject(checkable, icingadb::task_queue::VolatileState | icingadb::task_queue::NextUpdate);
30663066
}
30673067
}
30683068

30693069
void IcingaDB::DependencyGroupChildRegisteredHandler(const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
30703070
{
30713071
for (const auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3072-
rw->EnqueueConfigObject(child, FullState); // Child requires a full state update.
3072+
rw->EnqueueConfigObject(child, icingadb::task_queue::FullState); // Child requires a full state update.
30733073
rw->EnqueueDependencyChildRegistered(dependencyGroup, child);
30743074
rw->EnqueueDependencyGroupStateUpdate(dependencyGroup);
30753075

@@ -3080,7 +3080,7 @@ void IcingaDB::DependencyGroupChildRegisteredHandler(const Checkable::Ptr& child
30803080
// Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's
30813081
// not worth traversing the whole tree way up and sending config updates for each one of them, as the next
30823082
// Redis config dump is going to fix it anyway.
3083-
rw->EnqueueConfigObject(parent, ConfigUpdate | FullState);
3083+
rw->EnqueueConfigObject(parent, icingadb::task_queue::ConfigUpdate | icingadb::task_queue::FullState);
30843084
}
30853085
}
30863086
}
@@ -3095,7 +3095,7 @@ void IcingaDB::DependencyGroupChildRemovedHandler(const DependencyGroup::Ptr& de
30953095
void IcingaDB::HostProblemChangedHandler(const Service::Ptr& service) {
30963096
for (auto& rw : ConfigType::GetObjectsByType<IcingaDB>()) {
30973097
/* Host state changes affect is_handled and severity of services. */
3098-
rw->EnqueueConfigObject(service, FullState);
3098+
rw->EnqueueConfigObject(service, icingadb::task_queue::FullState);
30993099
}
31003100
}
31013101

lib/icingadb/icingadb-worker.cpp

Lines changed: 27 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,22 @@
77

88
using namespace icinga;
99

10-
PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits)
10+
icingadb::task_queue::PendingConfigItem::PendingConfigItem(const ConfigObject::Ptr& obj, uint32_t bits)
1111
: Object{obj}, DirtyBits{bits & DirtyBitsAll}
1212
{
1313
}
1414

15-
PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup)
15+
icingadb::task_queue::PendingDependencyGroupStateItem::PendingDependencyGroupStateItem(const DependencyGroup::Ptr& depGroup)
1616
: DepGroup{depGroup}
1717
{
1818
}
1919

20-
PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child)
20+
icingadb::task_queue::PendingDependencyEdgeItem::PendingDependencyEdgeItem(const DependencyGroup::Ptr& depGroup, const Checkable::Ptr& child)
2121
: DepGroup{depGroup}, Child{child}
2222
{
2323
}
2424

25-
RelationsDeletionItem::RelationsDeletionItem(const String& id, const RelationsKeySet& relations)
25+
icingadb::task_queue::RelationsDeletionItem::RelationsDeletionItem(const String& id, const RelationsKeySet& relations)
2626
: ID{id}, Relations{relations}
2727
{
2828
}
@@ -93,7 +93,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
9393
auto& seqView = m_PendingItems.get<1>();
9494
for (auto it(seqView.begin()); it != seqView.end(); ++it) {
9595
if (it != seqView.begin()) {
96-
if (dynamic_cast<const RelationsDeletionItem*>(it->get())) {
96+
if (dynamic_cast<const icingadb::task_queue::RelationsDeletionItem*>(it->get())) {
9797
// We don't know whether the previous items are related to this deletion item or not,
9898
// thus we can't just process this right now when there are older items in the queue.
9999
// Otherwise, we might delete something that is going to be updated/created.
@@ -122,7 +122,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
122122
try {
123123
itemToProcess->Execute(*this);
124124
} catch (const std::exception& ex) {
125-
PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects).
125+
icingadb::task_queue::PendingQueueItem& itemRef = *itemToProcess; // For typeid(operand of typeid must not have any side effects).
126126
Log(LogCritical, "IcingaDB")
127127
<< "Exception while processing pending item of type '" << typeid(itemRef).name() << "': "
128128
<< DiagnosticInformation(ex, GetActive());
@@ -138,7 +138,7 @@ std::chrono::duration<double> IcingaDB::DequeueAndProcessOne(std::unique_lock<st
138138
return retryAfter;
139139
}
140140

141-
ConfigObject::Ptr PendingConfigItem::GetObjectToLock() const
141+
ConfigObject::Ptr icingadb::task_queue::PendingConfigItem::GetObjectToLock() const
142142
{
143143
return Object;
144144
}
@@ -152,7 +152,7 @@ ConfigObject::Ptr PendingConfigItem::GetObjectToLock() const
152152
*
153153
* @param icingadb The IcingaDB instance to use for executing Redis queries.
154154
*/
155-
void PendingConfigItem::Execute(IcingaDB& icingadb) const {
155+
void icingadb::task_queue::PendingConfigItem::Execute(IcingaDB& icingadb) const {
156156
if (DirtyBits & ConfigDelete) {
157157
auto redisKeyPair = icingadb.GetSyncableTypeRedisKeys(Object->GetReflectionType());
158158
icingadb.m_RconWorker->FireAndForgetQueries(
@@ -203,7 +203,7 @@ void PendingConfigItem::Execute(IcingaDB& icingadb) const {
203203
*
204204
* @param icingadb The IcingaDB instance to use for executing Redis queries.
205205
*/
206-
void PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
206+
void icingadb::task_queue::PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
207207
{
208208
// For dependency group state updates, we don't actually care which child triggered the update,
209209
// since all children share the same dependency group state. Thus, we can just pick any child to
@@ -221,7 +221,7 @@ void PendingDependencyGroupStateItem::Execute(IcingaDB& icingadb) const
221221
*
222222
* @param icingadb The IcingaDB instance to use for executing Redis queries.
223223
*/
224-
void PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
224+
void icingadb::task_queue::PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
225225
{
226226
std::vector<Dictionary::Ptr> runtimeUpdates;
227227
std::map<RedisConnection::QueryArg, RedisConnection::Query> hMSets;
@@ -238,7 +238,7 @@ void PendingDependencyEdgeItem::Execute(IcingaDB& icingadb) const
238238
*
239239
* @param icingadb The IcingaDB instance to use for executing Redis queries.
240240
*/
241-
void RelationsDeletionItem::Execute(IcingaDB& icingadb) const
241+
void icingadb::task_queue::RelationsDeletionItem::Execute(IcingaDB& icingadb) const
242242
{
243243
for (const auto& [configKey, checksumKey] : Relations) {
244244
if (icingadb.IsStateKey(configKey)) {
@@ -260,18 +260,19 @@ void IcingaDB::EnqueueConfigObject(const ConfigObject::Ptr& object, uint32_t bit
260260
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
261261
return; // No need to enqueue anything if we're not connected.
262262
}
263+
namespace queue = icingadb::task_queue;
263264

264265
{
265266
std::lock_guard lock(m_PendingItemsMutex);
266-
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<PendingConfigItem>(object, bits)); !inserted) {
267-
m_PendingItems.modify(it, [bits](const std::shared_ptr<PendingQueueItem>& item) {
268-
auto configItem = dynamic_cast<PendingConfigItem*>(item.get());
269-
if (bits & ConfigDelete) {
270-
configItem->DirtyBits &= ~(ConfigUpdate | FullState);
271-
} else if (bits & ConfigUpdate) {
272-
configItem->DirtyBits &= ~ConfigDelete;
267+
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<queue::PendingConfigItem>(object, bits)); !inserted) {
268+
m_PendingItems.modify(it, [bits](const std::shared_ptr<queue::PendingQueueItem>& item) {
269+
auto configItem = dynamic_cast<queue::PendingConfigItem*>(item.get());
270+
if (bits & queue::ConfigDelete) {
271+
configItem->DirtyBits &= ~(queue::ConfigUpdate | queue::FullState);
272+
} else if (bits & queue::ConfigUpdate) {
273+
configItem->DirtyBits &= ~queue::ConfigDelete;
273274
}
274-
configItem->DirtyBits |= bits & DirtyBitsAll;
275+
configItem->DirtyBits |= bits & queue::DirtyBitsAll;
275276
});
276277
}
277278
}
@@ -283,7 +284,7 @@ void IcingaDB::EnqueueDependencyGroupStateUpdate(const DependencyGroup::Ptr& dep
283284
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
284285
{
285286
std::lock_guard lock(m_PendingItemsMutex);
286-
m_PendingItems.insert(std::make_shared<PendingDependencyGroupStateItem>(depGroup));
287+
m_PendingItems.insert(std::make_shared<icingadb::task_queue::PendingDependencyGroupStateItem>(depGroup));
287288
}
288289
m_PendingItemsCV.notify_one();
289290
}
@@ -303,7 +304,7 @@ void IcingaDB::EnqueueDependencyChildRegistered(const DependencyGroup::Ptr& depG
303304
if (GetActive() && m_RconWorker && m_RconWorker->IsConnected()) {
304305
{
305306
std::lock_guard lock(m_PendingItemsMutex);
306-
m_PendingItems.insert(std::make_shared<PendingDependencyEdgeItem>(depGroup, child));
307+
m_PendingItems.insert(std::make_shared<icingadb::task_queue::PendingDependencyEdgeItem>(depGroup, child));
307308
}
308309
m_PendingItemsCV.notify_one();
309310
}
@@ -398,7 +399,7 @@ void IcingaDB::EnqueueDependencyChildRemoved(
398399
// Checkable as well. The grandparent Checkable may still have wrong numbers of total children, though it's
399400
// not worth traversing the whole tree way up and sending config updates for each one of them, as the next
400401
// Redis config dump is going to fix it anyway.
401-
EnqueueConfigObject(parent, ConfigUpdate);
402+
EnqueueConfigObject(parent, icingadb::task_queue::ConfigUpdate);
402403

403404
if (!parent->HasAnyDependencies()) {
404405
// If the parent Checkable isn't part of any other dependency chain anymore, drop its dependency node entry.
@@ -435,17 +436,17 @@ void IcingaDB::EnqueueDependencyChildRemoved(
435436
* @param id The ID of the relation to be deleted.
436437
* @param relations A map of Redis keys from which to delete the relation.
437438
*/
438-
void IcingaDB::EnqueueRelationsDeletion(const String& id, const RelationsDeletionItem::RelationsKeySet& relations)
439+
void IcingaDB::EnqueueRelationsDeletion(const String& id, const icingadb::task_queue::RelationsDeletionItem::RelationsKeySet& relations)
439440
{
440441
if (!GetActive() || !m_RconWorker || !m_RconWorker->IsConnected()) {
441442
return; // No need to enqueue anything if we're not connected.
442443
}
443444

444445
{
445446
std::lock_guard lock(m_PendingItemsMutex);
446-
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<RelationsDeletionItem>(id, relations)); !inserted) {
447-
m_PendingItems.modify(it, [&relations](std::shared_ptr<PendingQueueItem>& val) {
448-
auto item = dynamic_cast<RelationsDeletionItem*>(val.get());
447+
if (auto [it, inserted] = m_PendingItems.insert(std::make_shared<icingadb::task_queue::RelationsDeletionItem>(id, relations)); !inserted) {
448+
m_PendingItems.modify(it, [&relations](std::shared_ptr<icingadb::task_queue::PendingQueueItem>& val) {
449+
auto item = dynamic_cast<icingadb::task_queue::RelationsDeletionItem*>(val.get());
449450
item->Relations.insert(relations.begin(), relations.end());
450451
});
451452
}

0 commit comments

Comments
 (0)