@@ -110,6 +110,9 @@ void IcingaDB::ConfigStaticInitialize()
110110 IcingaDB::VersionChangedHandler (object);
111111 });
112112
113+ DependencyGroup::OnChildRegistered.connect (&IcingaDB::DependencyGroupChildRegisteredHandler);
114+ DependencyGroup::OnChildRemoved.connect (&IcingaDB::DependencyGroupChildRemovedHandler);
115+
113116 /* downtime start */
114117 Downtime::OnDowntimeTriggered.connect (&IcingaDB::DowntimeStartedHandler);
115118 /* fixed/flexible downtime end or remove */
@@ -1149,16 +1152,20 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
11491152 * - `icinga:config:dependency:edge:state`: State information for (each) dependency edge. Multiple edges may share the
11501153 * same state.
11511154 *
1152- * For initial dumps, it shouldn't be necessary to set the `runtimeUpdates` parameter.
1155+ * If the `onlyDependencyGroup` parameter is set, only dependencies from this group are processed. This is useful
1156+ * when only a specific dependency group should be processed, e.g. during runtime updates. For initial config dumps,
1157+ * it shouldn't be necessary to set the `runtimeUpdates` and `onlyDependencyGroup` parameters.
11531158 *
11541159 * @param checkable The checkable object to extract dependencies from.
11551160 * @param hMSets The map of Redis HMSETs to insert the dependency data into.
11561161 * @param runtimeUpdates The vector of runtime updates to append the dependency data to.
1162+ * @param onlyDependencyGroup If set, only process dependency objects from this group.
11571163 */
11581164void IcingaDB::InsertCheckableDependencies (
11591165 const Checkable::Ptr& checkable,
11601166 std::map<String, RedisConnection::Query>& hMSets,
1161- std::vector<Dictionary::Ptr>* runtimeUpdates
1167+ std::vector<Dictionary::Ptr>* runtimeUpdates,
1168+ const DependencyGroup::Ptr& onlyDependencyGroup
11621169)
11631170{
11641171 // Only generate a dependency node event if the Checkable is actually part of some dependency graph.
@@ -1191,7 +1198,12 @@ void IcingaDB::InsertCheckableDependencies(
11911198 }
11921199 }
11931200
1194- for (auto & dependencyGroup : checkable->GetDependencyGroups ()) {
1201+ std::vector<DependencyGroup::Ptr> dependencyGroups{onlyDependencyGroup};
1202+ if (!onlyDependencyGroup) {
1203+ dependencyGroups = checkable->GetDependencyGroups ();
1204+ }
1205+
1206+ for (auto & dependencyGroup : dependencyGroups) {
11951207 String edgeFromNodeId (checkableId);
11961208 bool syncSharedEdgeState (false );
11971209
@@ -1451,34 +1463,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
14511463 UpdateState (checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile);
14521464 }
14531465
1454- std::vector<std::vector<String> > transaction = {{" MULTI" }};
1455-
1456- for (auto & kv : hMSets) {
1457- if (!kv.second .empty ()) {
1458- kv.second .insert (kv.second .begin (), {" HMSET" , kv.first });
1459- transaction.emplace_back (std::move (kv.second ));
1460- }
1461- }
1462-
1463- for (auto & objectAttributes : runtimeUpdates) {
1464- std::vector<String> xAdd ({" XADD" , " icinga:runtime" , " MAXLEN" , " ~" , " 1000000" , " *" });
1465- ObjectLock olock (objectAttributes);
1466-
1467- for (const Dictionary::Pair& kv : objectAttributes) {
1468- String value = IcingaToStreamValue (kv.second );
1469- if (!value.IsEmpty ()) {
1470- xAdd.emplace_back (kv.first );
1471- xAdd.emplace_back (value);
1472- }
1473- }
1474-
1475- transaction.emplace_back (std::move (xAdd));
1476- }
1477-
1478- if (transaction.size () > 1 ) {
1479- transaction.push_back ({" EXEC" });
1480- m_Rcon->FireAndForgetQueries (std::move (transaction), Prio::Config, {1 });
1481- }
1466+ ExecuteRedisTransaction (hMSets, runtimeUpdates);
14821467
14831468 if (checkable) {
14841469 SendNextUpdate (checkable);
@@ -2842,6 +2827,102 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict
28422827 }
28432828}
28442829
2830+ void IcingaDB::SendDependencyGroupChildRegistered (const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
2831+ {
2832+ if (!m_Rcon || !m_Rcon->IsConnected ()) {
2833+ return ;
2834+ }
2835+
2836+ std::vector<Dictionary::Ptr> runtimeUpdates;
2837+ std::map<String, RedisConnection::Query> hMSets;
2838+ InsertCheckableDependencies (child, hMSets, &runtimeUpdates, dependencyGroup);
2839+ ExecuteRedisTransaction (hMSets, runtimeUpdates);
2840+
2841+ UpdateState (child, StateUpdate::Full);
2842+
2843+ std::set<Checkable::Ptr> parents;
2844+ dependencyGroup->LoadParents (parents);
2845+ for (const auto & parent : parents) {
2846+ // The affect{ed,s}_children might now have different outcome, so update the parent Checkable as well.
2847+ // The grandparent Checkable may still have wrong numbers of affected children, though it's not worth
2848+ // traversing the whole tree way up and sending config updates for each one of them, as the next Redis
2849+ // config dump is going to fix it anyway.
2850+ SendConfigUpdate (parent, true );
2851+ }
2852+ }
2853+
2854+ void IcingaDB::SendDependencyGroupChildRemoved (const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& members)
2855+ {
2856+ if (!m_Rcon || !m_Rcon->IsConnected () || members.empty ()) {
2857+ return ;
2858+ }
2859+
2860+ RedisConnection::Queries hdels, xAdds;
2861+ auto deleteState ([this , &hdels, &xAdds](const String& id, const String& redisKey) {
2862+ hdels.emplace_back (RedisConnection::Query{" HDEL" , m_PrefixConfigObject + redisKey, id});
2863+ xAdds.emplace_back (RedisConnection::Query{
2864+ " XADD" , " icinga:runtime:state" , " MAXLEN" , " ~" , " 1000000" , " *" , " runtime_type" , " delete" ,
2865+ " redis_key" , m_PrefixConfigObject + redisKey, " id" , id
2866+ });
2867+ });
2868+
2869+ Checkable::Ptr child;
2870+ std::set<Checkable*> detachedParents;
2871+ for (const auto & dependency : members) {
2872+ child = dependency->GetChild (); // All members have the same child.
2873+ const auto & parent (dependency->GetParent ());
2874+ if (auto [_, inserted] = detachedParents.insert (dependency->GetParent ().get ()); inserted) {
2875+ String edgeId;
2876+ if (dependencyGroup->IsRedundancyGroup ()) {
2877+ // If there are no other members in the dependency group that connect the redundancy group with
2878+ // the parent Checkable, we've to remove the edge and its state accordingly.
2879+ if (!dependencyGroup->HasMembers () || !dependencyGroup->HasIdenticalMember (dependency)) {
2880+ auto id (HashValue (new Array{dependencyGroup->GetIcingaDBIdentifier (), GetObjectIdentifier (parent)}));
2881+ deleteState (id, " dependency:edge:state" );
2882+ DeleteRelationship (id, " dependency:edge" );
2883+ }
2884+
2885+ // Remove the connection from the child Checkable to the redundancy group.
2886+ edgeId = HashValue (new Array{GetObjectIdentifier (child), dependencyGroup->GetIcingaDBIdentifier ()});
2887+ } else {
2888+ // Remove the edge between the parent and child Checkable linked through the removed dependency.
2889+ edgeId = HashValue (new Array{GetObjectIdentifier (child), GetObjectIdentifier (parent)});
2890+ }
2891+
2892+ DeleteRelationship (edgeId, " dependency:edge" );
2893+
2894+ // The affect{ed,s}_children might now have different outcome, so update the parent Checkable as well.
2895+ // The grandparent Checkable may still have wrong numbers of affected children, though it's not worth
2896+ // traversing the whole tree way up and sending config updates for each one of them, as the next Redis
2897+ // config dump is going to fix it anyway.
2898+ SendConfigUpdate (parent, true );
2899+ }
2900+ }
2901+
2902+ if (dependencyGroup->IsRedundancyGroup () && !dependencyGroup->HasMembers ()) {
2903+ String redundancyGroupId (dependencyGroup->GetIcingaDBIdentifier ());
2904+ deleteState (redundancyGroupId, " dependency:edge:state" );
2905+ deleteState (redundancyGroupId, " redundancygroup:state" );
2906+
2907+ DeleteRelationship (redundancyGroupId, " dependency:node" );
2908+ DeleteRelationship (redundancyGroupId, " redundancygroup" );
2909+ } else if (!dependencyGroup->HasMembers ()) {
2910+ // Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID
2911+ // and shared by all of its members. See also SerializeDependencyEdgeState() for details.
2912+ deleteState (dependencyGroup->GetIcingaDBIdentifier (), " dependency:edge:state" );
2913+ }
2914+
2915+ if (!child->HasAnyDependencies ()) {
2916+ // If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node.
2917+ DeleteRelationship (GetObjectIdentifier (child), " dependency:node" );
2918+ }
2919+
2920+ if (!hdels.empty ()) {
2921+ m_Rcon->FireAndForgetQueries (std::move (hdels), Prio::RuntimeStateSync);
2922+ m_Rcon->FireAndForgetQueries (std::move (xAdds), Prio::RuntimeStateStream, {0 , 1 });
2923+ }
2924+ }
2925+
28452926Dictionary::Ptr IcingaDB::SerializeState (const Checkable::Ptr& checkable)
28462927{
28472928 Dictionary::Ptr attrs = new Dictionary ();
@@ -3119,6 +3200,20 @@ void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable)
31193200 }
31203201}
31213202
3203+ void IcingaDB::DependencyGroupChildRegisteredHandler (const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
3204+ {
3205+ for (const auto & rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3206+ rw->SendDependencyGroupChildRegistered (child, dependencyGroup);
3207+ }
3208+ }
3209+
3210+ void IcingaDB::DependencyGroupChildRemovedHandler (const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& members)
3211+ {
3212+ for (const auto & rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3213+ rw->SendDependencyGroupChildRemoved (dependencyGroup, members);
3214+ }
3215+ }
3216+
31223217void IcingaDB::HostProblemChangedHandler (const Service::Ptr& service) {
31233218 for (auto & rw : ConfigType::GetObjectsByType<IcingaDB>()) {
31243219 /* Host state changes affect is_handled and severity of services. */
@@ -3236,3 +3331,45 @@ void IcingaDB::DeleteRelationship(const String& id, const String& redisKeyWithou
32363331
32373332 m_Rcon->FireAndForgetQueries (queries, Prio::Config);
32383333}
3334+
3335+ /* *
3336+ * Execute the provided HMSET values and runtime updates in a single Redis transaction.
3337+ *
3338+ * The HMSETs should just contain the necessary key value pairs to be set in Redis, i.e, without the HMSET command
3339+ * itself. This function will then go through each of the map keys and prepend the HMSET command when transforming the
3340+ * map into valid Redis queries. Likewise, the runtime updates should just contain the key value pairs to be streamed
3341+ * to the icinga:runtime pipeline, and this function will generate a XADD query for each one of the vector elements.
3342+ *
3343+ * @param hMSets A map of Redis keys and their respective HMSET values.
3344+ * @param runtimeUpdates A list of dictionaries to be sent to the icinga:runtime stream.
3345+ */
3346+ void IcingaDB::ExecuteRedisTransaction (std::map<String, RedisConnection::Query>& hMSets, const std::vector<Dictionary::Ptr>& runtimeUpdates) const
3347+ {
3348+ RedisConnection::Queries transaction{{" MULTI" }};
3349+ for (auto & [redisKey, query] : hMSets) {
3350+ if (!query.empty ()) {
3351+ RedisConnection::Query hSet{" HMSET" , redisKey};
3352+ hSet.insert (hSet.end (), std::make_move_iterator (query.begin ()), std::make_move_iterator (query.end ()));
3353+ transaction.emplace_back (std::move (hSet));
3354+ }
3355+ }
3356+
3357+ for (auto & attrs : runtimeUpdates) {
3358+ RedisConnection::Query xAdd{" XADD" , " icinga:runtime" , " MAXLEN" , " ~" , " 1000000" , " *" };
3359+
3360+ ObjectLock olock (attrs);
3361+ for (auto & [key, value]: attrs) {
3362+ if (auto streamVal (IcingaToStreamValue (value)); !streamVal.IsEmpty ()) {
3363+ xAdd.emplace_back (key);
3364+ xAdd.emplace_back (std::move (streamVal));
3365+ }
3366+ }
3367+
3368+ transaction.emplace_back (std::move (xAdd));
3369+ }
3370+
3371+ if (transaction.size () > 1 ) {
3372+ transaction.emplace_back (RedisConnection::Query{" EXEC" });
3373+ m_Rcon->FireAndForgetQueries (std::move (transaction), Prio::Config, {1 });
3374+ }
3375+ }
0 commit comments