1+ /*
2+ Copyright 2024 Huawei Technologies Co., Ltd.
3+
4+ Licensed under the Apache License, Version 2.0 (the "License");
5+ you may not use this file except in compliance with the License.
6+ You may obtain a copy of the License at
7+
8+ http://www.apache.org/licenses/LICENSE-2.0
9+
10+ Unless required by applicable law or agreed to in writing, software
11+ distributed under the License is distributed on an "AS IS" BASIS,
12+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+ See the License for the specific language governing permissions and
14+ limitations under the License.
15+
16+ @author Toni Boehnlein, Benjamin Lozes, Pal Andras Papp, Raphael S. Steiner
17+ */
18+
19+ #pragma once
20+
21+ #include " ../kl_active_schedule.hpp"
22+ #include " lambda_container.hpp"
23+ #include " max_comm_datastructure.hpp"
24+
25+ namespace osp {
26+
27+ template <typename Graph_t, typename cost_t , typename MemoryConstraint_t, unsigned window_size = 1 >
28+ struct kl_bsp_comm_cost_function {
29+
30+ using VertexType = vertex_idx_t <Graph_t>;
31+ using kl_move = kl_move_struct<cost_t , VertexType>;
32+ using kl_gain_update_info = kl_update_info<VertexType>;
33+
34+ constexpr static unsigned window_range = 2 * window_size + 1 ;
35+
36+ kl_active_schedule<Graph_t, cost_t , MemoryConstraint_t> *active_schedule;
37+ compatible_processor_range<Graph_t> *proc_range;
38+ const Graph_t *graph;
39+ const BspInstance<Graph_t> *instance;
40+
41+ max_comm_datastructure<Graph_t> comm_ds;
42+
43+ inline cost_t get_comm_multiplier () { return 1 ; }
44+ inline cost_t get_max_comm_weight () { return comm_ds.max_comm_weight ; }
45+ inline cost_t get_max_comm_weight_multiplied () { return comm_ds.max_comm_weight ; }
46+ inline const std::string name () const { return " bsp_comm" ; }
47+ inline bool is_compatible (VertexType node, unsigned proc) { return active_schedule->getInstance ().isCompatible (node, proc); }
48+ inline unsigned start_idx (const unsigned node_step, const unsigned start_step) { return (node_step < window_size + start_step) ? window_size - (node_step - start_step) : 0 ; }
49+ inline unsigned end_idx (const unsigned node_step, const unsigned end_step) { return (node_step + window_size <= end_step) ? window_range : window_range - (node_step + window_size - end_step); }
50+
51+ void initialize (kl_active_schedule<Graph_t, cost_t , MemoryConstraint_t> &sched, compatible_processor_range<Graph_t> &p_range) {
52+ active_schedule = &sched;
53+ proc_range = &p_range;
54+ instance = &sched.getInstance ();
55+ graph = &instance->getComputationalDag ();
56+
57+ const unsigned num_steps = active_schedule->num_steps ();
58+ comm_ds.initialize (active_schedule->getSetSchedule (), *instance, num_steps);
59+ comm_ds.set_active_schedule (*active_schedule);
60+ }
61+
62+ void compute_send_receive_datastructures () {
63+ comm_ds.compute_comm_datastructures (0 , active_schedule->num_steps () - 1 );
64+ }
65+
66+ template <bool compute_datastructures = true >
67+ cost_t compute_schedule_cost () {
68+ if constexpr (compute_datastructures) compute_send_receive_datastructures ();
69+
70+ cost_t total_cost = 0 ;
71+ for (unsigned step = 0 ; step < active_schedule->num_steps (); step++) {
72+ total_cost += active_schedule->get_step_max_work (step);
73+ total_cost += comm_ds.step_max_comm (step) * instance->communicationCosts ();
74+ }
75+ total_cost += static_cast <cost_t >(active_schedule->num_steps () - 1 ) * instance->synchronisationCosts ();
76+ return total_cost;
77+ }
78+
79+ cost_t compute_schedule_cost_test () { return compute_schedule_cost<false >(); }
80+
81+ void update_datastructure_after_move (const kl_move &move, const unsigned start_step, const unsigned end_step) {
82+ comm_ds.update_datastructure_after_move (move, start_step, end_step);
83+ }
84+
85+ template <typename affinity_table_t >
86+ void compute_comm_affinity (VertexType node, affinity_table_t &affinity_table_node, const cost_t &penalty,
87+ const cost_t &reward, const unsigned start_step, const unsigned end_step) {
88+
89+ const unsigned node_step = active_schedule->assigned_superstep (node);
90+ const unsigned node_proc = active_schedule->assigned_processor (node);
91+ const unsigned window_bound = end_idx (node_step, end_step);
92+ const unsigned node_start_idx = start_idx (node_step, start_step);
93+
94+ const cost_t comm_w_node = graph->vertex_comm_weight (node);
95+
96+ const auto ¤t_set_schedule = active_schedule->getSetSchedule ();
97+
98+ for (unsigned p_to = 0 ; p_to < instance->numberOfProcessors (); ++p_to) {
99+ if (!is_compatible (node, p_to)) continue ;
100+
101+ for (unsigned s_to_idx = node_start_idx; s_to_idx < window_bound; ++s_to_idx) {
102+ unsigned s_to = node_step + s_to_idx - window_size;
103+ cost_t comm_cost_change = 0 ;
104+
105+ const auto pre_move_data_from = comm_ds.get_pre_move_comm_data_step (node_step);
106+ const auto pre_move_data_to = comm_ds.get_pre_move_comm_data_step (s_to);
107+
108+ // --- Outgoing communication from `node` ---
109+ // From
110+ for (const auto [proc, count] : comm_ds.node_lambda_map .iterate_proc_entries (node)) {
111+ comm_cost_change += calculate_comm_cost_change_send (node_step, node_proc, comm_w_node, -1 , pre_move_data_from);
112+ }
113+ // To
114+ lambda_vector_container temp_lambda_map; // Use a temporary map for 'to' state
115+ temp_lambda_map.initialize (1 , instance->numberOfProcessors ());
116+ for (const auto &v : graph->children (node)) {
117+ const unsigned v_proc = current_set_schedule.assignedProcessor (v);
118+
119+ if (p_to != v_proc) {
120+ if (temp_lambda_map.increase_proc_count (0 , v_proc)) {
121+ comm_cost_change -= calculate_comm_cost_change_send (s_to, p_to, comm_w_node, 1 , pre_move_data_to);
122+ comm_cost_change -= calculate_comm_cost_change_receive (s_to, v_proc, comm_w_node, 1 , pre_move_data_to);
123+ }
124+ }
125+ }
126+
127+ // --- Incoming communication to `node` ---
128+ for (const auto &u : graph->parents (node)) {
129+ const unsigned u_proc = active_schedule->assigned_processor (u);
130+ const unsigned u_step = current_set_schedule.assignedSuperstep (u);
131+ const cost_t comm_w_u = graph->vertex_comm_weight (u);
132+ const auto pre_move_data_u = comm_ds.get_pre_move_comm_data_step (u_step);
133+
134+ // From
135+ if (u_proc != node_proc) {
136+ // Send part (from parent u) & Receive part (at node_proc) // TODO: this is not correct, the lambda map is not updated
137+ if (comm_ds.node_lambda_map .get_proc_entry (u, node_proc) == 1 ) { // if node is the only child on this proc
138+ comm_cost_change += calculate_comm_cost_change_send (u_step, u_proc, comm_w_u, -1 , pre_move_data_u);
139+ comm_cost_change += calculate_comm_cost_change_receive (u_step, node_proc, comm_w_u, -1 , pre_move_data_u);
140+ }
141+ }
142+ // To
143+ if (u_proc != p_to) {
144+ // Send part (from parent u) & Receive part (at p_to)
145+ // This logic is complex for an affinity calculation.
146+ // A full recompute for neighbors is a safer bet, which is what update_node_comm_affinity does. // TODO: this is not true anymore
147+ // The following is an approximation.
148+
149+ // if moving node to p_to creates a new communication link for parent u
150+ bool has_other_on_p_to = false ;
151+ for (const auto & sibling : graph->children (u)) {
152+ if (sibling != node && active_schedule->assigned_processor (sibling) == p_to) { has_other_on_p_to = true ; break ; }
153+ }
154+ if (!has_other_on_p_to) {
155+ comm_cost_change -= calculate_comm_cost_change_send (u_step, u_proc, comm_w_u, 1 , pre_move_data_u);
156+ comm_cost_change -= calculate_comm_cost_change_receive (u_step, p_to, comm_w_u, 1 , pre_move_data_u);
157+ }
158+ }
159+ }
160+ affinity_table_node[p_to][s_to_idx] += comm_cost_change * instance->communicationCosts ();
161+ }
162+ }
163+ }
164+
165+ cost_t calculate_comm_cost_change_send (unsigned step, unsigned p_send, cost_t comm_w, int sign, const pre_move_comm_data<cost_t >& pre_move_data) {
166+ cost_t old_max = pre_move_data.from_step_max_comm ;
167+
168+ cost_t new_send = comm_ds.step_proc_send (step, p_send) + sign * comm_w;
169+ cost_t new_max_send = comm_ds.step_max_send (step);
170+ if (new_send > new_max_send) new_max_send = new_send;
171+ else if (comm_ds.step_proc_send (step, p_send) == new_max_send) {
172+ if (sign < 0 && comm_ds.step_max_send_processor_count [step] == 1 ) {
173+ new_max_send = comm_ds.step_second_max_send (step);
174+ } else {
175+ new_max_send = new_send;
176+ }
177+ }
178+
179+ return std::max (new_max_send, comm_ds.step_max_receive (step)) - old_max;
180+ }
181+
182+ cost_t calculate_comm_cost_change_receive (unsigned step, unsigned p_receive, cost_t comm_w, int sign, const pre_move_comm_data<cost_t >& pre_move_data) {
183+ cost_t old_max = pre_move_data.from_step_max_comm ;
184+
185+ cost_t new_receive = comm_ds.step_proc_receive (step, p_receive) + sign * comm_w;
186+
187+ cost_t new_max_receive = comm_ds.step_max_receive (step);
188+ if (new_receive > new_max_receive) new_max_receive = new_receive;
189+ else if (comm_ds.step_proc_receive (step, p_receive) == new_max_receive) {
190+ if (sign < 0 && comm_ds.step_max_receive_processor_count [step] == 1 ) {
191+ new_max_receive = comm_ds.step_second_max_receive (step);
192+ } else {
193+ new_max_receive = new_receive;
194+ }
195+ }
196+
197+ return std::max (comm_ds.step_max_send (step), new_max_receive) - old_max;
198+ }
199+
200+ cost_t calculate_comm_cost_change (unsigned step, unsigned p_send, unsigned p_receive, cost_t comm_w, int sign) {
201+ const auto pre_move_data = comm_ds.get_pre_move_comm_data_step (step);
202+ cost_t change = 0 ;
203+ change += calculate_comm_cost_change_send (step, p_send, comm_w, sign, pre_move_data);
204+ comm_ds.step_proc_send (step, p_send) += sign * comm_w;
205+ change += calculate_comm_cost_change_receive (step, p_receive, comm_w, sign, pre_move_data);
206+ comm_ds.step_proc_send (step, p_send) -= sign * comm_w; // revert for next calculation
207+ return change;
208+ }
209+
210+ template <typename thread_data_t >
211+ void update_node_comm_affinity (const kl_move &move, thread_data_t &thread_data, const cost_t &penalty,
212+ const cost_t &reward, std::map<VertexType, kl_gain_update_info> &max_gain_recompute,
213+ std::vector<VertexType> &new_nodes) {
214+ // For simplicity and correctness, we will do a full recompute for neighbors.
215+ // A fully incremental update is very complex for this cost function.
216+ auto process_neighbor = [&](VertexType neighbor) {
217+ if (thread_data.lock_manager .is_locked (neighbor)) return ;
218+ if (not thread_data.affinity_table .is_selected (neighbor)) {
219+ new_nodes.push_back (neighbor);
220+ return ;
221+ }
222+ if (max_gain_recompute.find (neighbor) == max_gain_recompute.end ()) {
223+ max_gain_recompute[neighbor] = kl_gain_update_info (neighbor, true );
224+ } else {
225+ max_gain_recompute[neighbor].full_update = true ;
226+ }
227+ };
228+
229+ for (const auto &target : graph->children (move.node )) {
230+ process_neighbor (target);
231+ }
232+ for (const auto &source : graph->parents (move.node )) {
233+ process_neighbor (source);
234+ }
235+ }
236+ };
237+
238+ } // namespace osp
0 commit comments