@@ -110,6 +110,9 @@ void IcingaDB::ConfigStaticInitialize()
110110 IcingaDB::VersionChangedHandler (object);
111111 });
112112
113+ DependencyGroup::OnChildRegistered.connect (&IcingaDB::DependencyGroupChildRegisteredHandler);
114+ DependencyGroup::OnChildRemoved.connect (&IcingaDB::DependencyGroupChildRemovedHandler);
115+
113116 /* downtime start */
114117 Downtime::OnDowntimeTriggered.connect (&IcingaDB::DowntimeStartedHandler);
115118 /* fixed/flexible downtime end or remove */
@@ -1149,16 +1152,20 @@ void IcingaDB::InsertObjectDependencies(const ConfigObject::Ptr& object, const S
11491152 * - `icinga:config:dependency:edge:state`: State information for (each) dependency edge. Multiple edges may share the
11501153 * same state.
11511154 *
1152- * For initial dumps, it shouldn't be necessary to set the `runtimeUpdates` parameter.
1155+ * If the `onlyDependencyGroup` parameter is set, only dependencies from this group are processed. This is useful
1156+ * when only a specific dependency group should be processed, e.g. during runtime updates. For initial config dumps,
1157+ * it shouldn't be necessary to set the `runtimeUpdates` and `onlyDependencyGroup` parameters.
11531158 *
11541159 * @param checkable The checkable object to extract dependencies from.
11551160 * @param hMSets The map of Redis HMSETs to insert the dependency data into.
11561161 * @param runtimeUpdates The vector of runtime updates to append the dependency data to.
1162+ * @param onlyDependencyGroup If set, only process dependency objects from this group.
11571163 */
11581164void IcingaDB::InsertCheckableDependencies (
11591165 const Checkable::Ptr& checkable,
11601166 std::map<String, RedisConnection::Query>& hMSets,
1161- std::vector<Dictionary::Ptr>* runtimeUpdates
1167+ std::vector<Dictionary::Ptr>* runtimeUpdates,
1168+ const DependencyGroup::Ptr& onlyDependencyGroup
11621169)
11631170{
11641171 // Only generate a dependency node event if the Checkable is actually part of some dependency graph.
@@ -1226,7 +1233,12 @@ void IcingaDB::InsertCheckableDependencies(
12261233 }
12271234 }
12281235
1229- for (auto & dependencyGroup : checkable->GetDependencyGroups ()) {
1236+ std::vector<DependencyGroup::Ptr> dependencyGroups{onlyDependencyGroup};
1237+ if (!onlyDependencyGroup) {
1238+ dependencyGroups = checkable->GetDependencyGroups ();
1239+ }
1240+
1241+ for (auto & dependencyGroup : dependencyGroups) {
12301242 String edgeFromNodeId (checkableId);
12311243 bool syncSharedEdgeState (false );
12321244
@@ -1493,34 +1505,7 @@ void IcingaDB::SendConfigUpdate(const ConfigObject::Ptr& object, bool runtimeUpd
14931505 UpdateState (checkable, runtimeUpdate ? StateUpdate::Full : StateUpdate::Volatile);
14941506 }
14951507
1496- std::vector<std::vector<String> > transaction = {{" MULTI" }};
1497-
1498- for (auto & kv : hMSets) {
1499- if (!kv.second .empty ()) {
1500- kv.second .insert (kv.second .begin (), {" HMSET" , kv.first });
1501- transaction.emplace_back (std::move (kv.second ));
1502- }
1503- }
1504-
1505- for (auto & objectAttributes : runtimeUpdates) {
1506- std::vector<String> xAdd ({" XADD" , " icinga:runtime" , " MAXLEN" , " ~" , " 1000000" , " *" });
1507- ObjectLock olock (objectAttributes);
1508-
1509- for (const Dictionary::Pair& kv : objectAttributes) {
1510- String value = IcingaToStreamValue (kv.second );
1511- if (!value.IsEmpty ()) {
1512- xAdd.emplace_back (kv.first );
1513- xAdd.emplace_back (value);
1514- }
1515- }
1516-
1517- transaction.emplace_back (std::move (xAdd));
1518- }
1519-
1520- if (transaction.size () > 1 ) {
1521- transaction.push_back ({" EXEC" });
1522- m_Rcon->FireAndForgetQueries (std::move (transaction), Prio::Config, {1 });
1523- }
1508+ ExecuteRedisTransaction (hMSets, runtimeUpdates);
15241509
15251510 if (checkable) {
15261511 SendNextUpdate (checkable);
@@ -2884,6 +2869,102 @@ void IcingaDB::SendCustomVarsChanged(const ConfigObject::Ptr& object, const Dict
28842869 }
28852870}
28862871
2872+ void IcingaDB::SendDependencyGroupChildRegistered (const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
2873+ {
2874+ if (!m_Rcon || !m_Rcon->IsConnected ()) {
2875+ return ;
2876+ }
2877+
2878+ std::vector<Dictionary::Ptr> runtimeUpdates;
2879+ std::map<String, RedisConnection::Query> hMSets;
2880+ InsertCheckableDependencies (child, hMSets, &runtimeUpdates, dependencyGroup);
2881+ ExecuteRedisTransaction (hMSets, runtimeUpdates);
2882+
2883+ UpdateState (child, StateUpdate::Full);
2884+
2885+ std::set<Checkable::Ptr> parents;
2886+ dependencyGroup->LoadParents (parents);
2887+ for (const auto & parent : parents) {
2888+ // The affect{ed,s}_children might now have different outcome, so update the parent Checkable as well.
2889+ // The grandparent Checkable may still have wrong numbers of affected children, though it's not worth
2890+ // traversing the whole tree way up and sending config updates for each one of them, as the next Redis
2891+ // config dump is going to fix it anyway.
2892+ SendConfigUpdate (parent, true );
2893+ }
2894+ }
2895+
2896+ void IcingaDB::SendDependencyGroupChildRemoved (const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& dependencies)
2897+ {
2898+ if (!m_Rcon || !m_Rcon->IsConnected () || dependencies.empty ()) {
2899+ return ;
2900+ }
2901+
2902+ RedisConnection::Queries hdels, xAdds;
2903+ auto deleteState ([this , &hdels, &xAdds](const String& id, const String& redisKey) {
2904+ hdels.emplace_back (RedisConnection::Query{" HDEL" , m_PrefixConfigObject + redisKey, id});
2905+ xAdds.emplace_back (RedisConnection::Query{
2906+ " XADD" , " icinga:runtime:state" , " MAXLEN" , " ~" , " 1000000" , " *" , " runtime_type" , " delete" ,
2907+ " redis_key" , m_PrefixConfigObject + redisKey, " id" , id
2908+ });
2909+ });
2910+
2911+ Checkable::Ptr child;
2912+ std::set<Checkable*> detachedParents;
2913+ for (const auto & dependency : dependencies) {
2914+ child = dependency->GetChild (); // All dependencies have the same child.
2915+ const auto & parent (dependency->GetParent ());
2916+ if (auto [_, inserted] = detachedParents.insert (dependency->GetParent ().get ()); inserted) {
2917+ String edgeId;
2918+ if (dependencyGroup->IsRedundancyGroup ()) {
2919+ // If there are no other dependencies in the dependency group that connect the redundancy group with
2920+ // the parent Checkable, we've to remove the edge and its state accordingly.
2921+ if (dependencyGroup->IsEmpty () || !dependencyGroup->HasParentWithConfig (dependency)) {
2922+ auto id (HashValue (new Array{dependencyGroup->GetIcingaDBIdentifier (), GetObjectIdentifier (parent)}));
2923+ deleteState (id, " dependency:edge:state" );
2924+ DeleteRelationship (id, " dependency:edge" );
2925+ }
2926+
2927+ // Remove the connection from the child Checkable to the redundancy group.
2928+ edgeId = HashValue (new Array{GetObjectIdentifier (child), dependencyGroup->GetIcingaDBIdentifier ()});
2929+ } else {
2930+ // Remove the edge between the parent and child Checkable linked through the removed dependency.
2931+ edgeId = HashValue (new Array{GetObjectIdentifier (child), GetObjectIdentifier (parent)});
2932+ }
2933+
2934+ DeleteRelationship (edgeId, " dependency:edge" );
2935+
2936+ // The affect{ed,s}_children might now have different outcome, so update the parent Checkable as well.
2937+ // The grandparent Checkable may still have wrong numbers of affected children, though it's not worth
2938+ // traversing the whole tree way up and sending config updates for each one of them, as the next Redis
2939+ // config dump is going to fix it anyway.
2940+ SendConfigUpdate (parent, true );
2941+ }
2942+ }
2943+
2944+ if (dependencyGroup->IsRedundancyGroup () && dependencyGroup->IsEmpty ()) {
2945+ String redundancyGroupId (dependencyGroup->GetIcingaDBIdentifier ());
2946+ deleteState (redundancyGroupId, " dependency:edge:state" );
2947+ deleteState (redundancyGroupId, " redundancygroup:state" );
2948+
2949+ DeleteRelationship (redundancyGroupId, " dependency:node" );
2950+ DeleteRelationship (redundancyGroupId, " redundancygroup" );
2951+ } else if (dependencyGroup->IsEmpty ()) {
2952+ // Note: The Icinga DB identifier of a non-redundant dependency group is used as the edge state ID
2953+ // and shared by all of its dependency objects. See also SerializeDependencyEdgeState() for details.
2954+ deleteState (dependencyGroup->GetIcingaDBIdentifier (), " dependency:edge:state" );
2955+ }
2956+
2957+ if (!child->HasAnyDependencies ()) {
2958+ // If the child Checkable has no parent and reverse dependencies, we can safely remove the dependency node.
2959+ DeleteRelationship (GetObjectIdentifier (child), " dependency:node" );
2960+ }
2961+
2962+ if (!hdels.empty ()) {
2963+ m_Rcon->FireAndForgetQueries (std::move (hdels), Prio::RuntimeStateSync);
2964+ m_Rcon->FireAndForgetQueries (std::move (xAdds), Prio::RuntimeStateStream, {0 , 1 });
2965+ }
2966+ }
2967+
28872968Dictionary::Ptr IcingaDB::SerializeState (const Checkable::Ptr& checkable)
28882969{
28892970 Dictionary::Ptr attrs = new Dictionary ();
@@ -3161,6 +3242,20 @@ void IcingaDB::NextCheckUpdatedHandler(const Checkable::Ptr& checkable)
31613242 }
31623243}
31633244
3245+ void IcingaDB::DependencyGroupChildRegisteredHandler (const Checkable::Ptr& child, const DependencyGroup::Ptr& dependencyGroup)
3246+ {
3247+ for (const auto & rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3248+ rw->SendDependencyGroupChildRegistered (child, dependencyGroup);
3249+ }
3250+ }
3251+
3252+ void IcingaDB::DependencyGroupChildRemovedHandler (const DependencyGroup::Ptr& dependencyGroup, const std::vector<Dependency::Ptr>& dependencies)
3253+ {
3254+ for (const auto & rw : ConfigType::GetObjectsByType<IcingaDB>()) {
3255+ rw->SendDependencyGroupChildRemoved (dependencyGroup, dependencies);
3256+ }
3257+ }
3258+
31643259void IcingaDB::HostProblemChangedHandler (const Service::Ptr& service) {
31653260 for (auto & rw : ConfigType::GetObjectsByType<IcingaDB>()) {
31663261 /* Host state changes affect is_handled and severity of services. */
@@ -3278,3 +3373,45 @@ void IcingaDB::DeleteRelationship(const String& id, const String& redisKeyWithou
32783373
32793374 m_Rcon->FireAndForgetQueries (queries, Prio::Config);
32803375}
3376+
3377+ /* *
3378+ * Execute the provided HMSET values and runtime updates in a single Redis transaction.
3379+ *
3380+ * The HMSETs should just contain the necessary key value pairs to be set in Redis, i.e, without the HMSET command
3381+ * itself. This function will then go through each of the map keys and prepend the HMSET command when transforming the
3382+ * map into valid Redis queries. Likewise, the runtime updates should just contain the key value pairs to be streamed
3383+ * to the icinga:runtime pipeline, and this function will generate a XADD query for each one of the vector elements.
3384+ *
3385+ * @param hMSets A map of Redis keys and their respective HMSET values.
3386+ * @param runtimeUpdates A list of dictionaries to be sent to the icinga:runtime stream.
3387+ */
3388+ void IcingaDB::ExecuteRedisTransaction (std::map<String, RedisConnection::Query>& hMSets, const std::vector<Dictionary::Ptr>& runtimeUpdates) const
3389+ {
3390+ RedisConnection::Queries transaction{{" MULTI" }};
3391+ for (auto & [redisKey, query] : hMSets) {
3392+ if (!query.empty ()) {
3393+ RedisConnection::Query hSet{" HMSET" , redisKey};
3394+ hSet.insert (hSet.end (), std::make_move_iterator (query.begin ()), std::make_move_iterator (query.end ()));
3395+ transaction.emplace_back (std::move (hSet));
3396+ }
3397+ }
3398+
3399+ for (auto & attrs : runtimeUpdates) {
3400+ RedisConnection::Query xAdd{" XADD" , " icinga:runtime" , " MAXLEN" , " ~" , " 1000000" , " *" };
3401+
3402+ ObjectLock olock (attrs);
3403+ for (auto & [key, value]: attrs) {
3404+ if (auto streamVal (IcingaToStreamValue (value)); !streamVal.IsEmpty ()) {
3405+ xAdd.emplace_back (key);
3406+ xAdd.emplace_back (std::move (streamVal));
3407+ }
3408+ }
3409+
3410+ transaction.emplace_back (std::move (xAdd));
3411+ }
3412+
3413+ if (transaction.size () > 1 ) {
3414+ transaction.emplace_back (RedisConnection::Query{" EXEC" });
3415+ m_Rcon->FireAndForgetQueries (std::move (transaction), Prio::Config, {1 });
3416+ }
3417+ }
0 commit comments