1313#include <ucp/proto/proto_debug.h>
1414#include <ucp/proto/proto_multi.inl>
1515#include <ucp/proto/proto_init.h>
16+ #include <ucs/datastruct/callbackq.h>
1617
1718
1819enum {
@@ -32,6 +33,17 @@ typedef struct {
3233 size_t frag_proto_min_length ; /* Frag proto min length */
3334} ucp_proto_rndv_ppln_priv_t ;
3435
36+ /* A callback to reschedule a throttled ppln request, that is called from the
37+ * worker's progress queue */
38+ static unsigned ucp_proto_rndv_ppln_reschedule_progress (void * arg )
39+ {
40+ ucp_request_t * req = arg ;
41+ ucs_trace_req ("ppln reschedule progress for request %p" , req );
42+ ucp_request_send (req );
43+
44+ return 1 ;
45+ }
46+
3547static ucs_status_t
3648ucp_proto_rndv_ppln_add_overhead (ucp_proto_perf_t * ppln_perf , size_t frag_size )
3749{
@@ -209,12 +221,37 @@ ucp_proto_rndv_ppln_frag_complete(ucp_request_t *freq, int send_ack, int abort,
209221 ucp_proto_complete_cb_t complete_func ,
210222 const char * title )
211223{
212- ucp_request_t * req = ucp_request_get_super (freq );
224+ ucp_request_t * req = ucp_request_get_super (freq );
225+ ucp_worker_h worker = req -> send .ep -> worker ;
226+ ucp_context_h context = worker -> context ;
227+ int fc_enabled = context -> config .ext .rndv_ppln_worker_fc_enable ;
213228
214229 if (send_ack ) {
215230 req -> send .rndv .ppln .ack_data_size += freq -> send .state .dt_iter .length ;
216231 }
217232
233+ if (fc_enabled ) {
234+ ucs_assert (worker -> rndv_ppln_fc .active_frags > 0 );
235+ worker -> rndv_ppln_fc .active_frags -- ;
236+
237+ ucs_trace_req ("%s frag complete, worker active_frags=%zu" ,
238+ title , worker -> rndv_ppln_fc .active_frags );
239+
240+ /* Reschedule throttled requests to the progress queue, if any are
241+ * pending. */
242+ if (!ucs_queue_is_empty (& worker -> rndv_ppln_fc .pending_q )) {
243+ ucp_request_t * pending_req ;
244+ ucs_queue_elem_t * elem ;
245+
246+ elem = ucs_queue_pull (& worker -> rndv_ppln_fc .pending_q );
247+ pending_req = ucs_container_of (elem , ucp_request_t ,
248+ send .rndv .ppln .queue_elem );
249+ ucs_callbackq_add_oneshot (& worker -> uct -> progress_q , pending_req ,
250+ ucp_proto_rndv_ppln_reschedule_progress ,
251+ pending_req );
252+ }
253+ }
254+
218255 /* In case of abort we don't destroy super request until all fragments are
219256 * completed */
220257 if (!ucp_proto_rndv_frag_complete (req , freq , title )) {
@@ -252,8 +289,10 @@ void ucp_proto_rndv_ppln_recv_frag_complete(ucp_request_t *freq, int send_ack,
252289
253290static ucs_status_t ucp_proto_rndv_ppln_progress (uct_pending_req_t * uct_req )
254291{
255- ucp_request_t * req = ucs_container_of (uct_req , ucp_request_t , send .uct );
256- ucp_worker_h worker = req -> send .ep -> worker ;
292+ ucp_request_t * req = ucs_container_of (uct_req , ucp_request_t , send .uct );
293+ ucp_worker_h worker = req -> send .ep -> worker ;
294+ ucp_context_h context = worker -> context ;
295+ int fc_enabled = context -> config .ext .rndv_ppln_worker_fc_enable ;
257296 const ucp_proto_rndv_ppln_priv_t * rpriv ;
258297 ucp_datatype_iter_t next_iter ;
259298 ucs_status_t status ;
@@ -271,6 +310,22 @@ static ucs_status_t ucp_proto_rndv_ppln_progress(uct_pending_req_t *uct_req)
271310 rpriv = req -> send .proto_config -> priv ;
272311
273312 while (!ucp_datatype_iter_is_end (& req -> send .state .dt_iter )) {
313+ /* Check throttling limit */
314+ if (fc_enabled &&
315+ (worker -> rndv_ppln_fc .active_frags >=
316+ context -> config .ext .rndv_ppln_worker_max_frags )) {
317+
318+ /* Add request to the pending queue. It will be rescheduled
319+ * when other fragments complete. */
320+ ucs_queue_push (& worker -> rndv_ppln_fc .pending_q ,
321+ & req -> send .rndv .ppln .queue_elem );
322+ return UCS_OK ;
323+ }
324+
325+ if (fc_enabled ) {
326+ worker -> rndv_ppln_fc .active_frags ++ ;
327+ }
328+
274329 status = ucp_proto_rndv_frag_request_alloc (worker , req , & freq );
275330 if (status != UCS_OK ) {
276331 ucp_proto_request_abort (req , status );
0 commit comments