2222#include <haproxy/quic_sock.h>
2323#include <haproxy/quic_stream.h>
2424#include <haproxy/quic_tp-t.h>
25+ #include <haproxy/quic_tx-t.h>
2526#include <haproxy/session.h>
2627#include <haproxy/ssl_sock-t.h>
2728#include <haproxy/stconn.h>
@@ -388,6 +389,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
388389
389390static void qcc_wakeup (struct qcc * qcc )
390391{
392+ HA_ATOMIC_AND (& qcc -> wait_event .tasklet -> state , ~TASK_F_USR1 );
393+ tasklet_wakeup (qcc -> wait_event .tasklet );
394+ }
395+
396+ static void qcc_wakeup_pacing (struct qcc * qcc )
397+ {
398+ HA_ATOMIC_OR (& qcc -> wait_event .tasklet -> state , TASK_F_USR1 );
391399 tasklet_wakeup (qcc -> wait_event .tasklet );
392400}
393401
@@ -2076,36 +2084,42 @@ static int qcc_subscribe_send(struct qcc *qcc)
20762084 *
20772085 * Returns 0 if all data sent with success else non-zero.
20782086 */
2079- static int qcc_send_frames (struct qcc * qcc , struct list * frms )
2087+ static int qcc_send_frames (struct qcc * qcc , struct list * frms , int strm_content )
20802088{
2089+ enum quic_tx_err ret ;
2090+ int max_burst = strm_content ? global .tune .quic_frontend_max_tx_burst : 0 ;
2091+
20812092 TRACE_ENTER (QMUX_EV_QCC_SEND , qcc -> conn );
20822093
20832094 if (LIST_ISEMPTY (frms )) {
20842095 TRACE_DEVEL ("leaving on no frame to send" , QMUX_EV_QCC_SEND , qcc -> conn );
2085- return 1 ;
2096+ return - 1 ;
20862097 }
20872098
2088- if (!qc_send_mux (qcc -> conn -> handle .qc , frms )) {
2099+ ret = qc_send_mux (qcc -> conn -> handle .qc , frms , max_burst );
2100+ if (ret == QUIC_TX_ERR_FATAL ) {
20892101 TRACE_DEVEL ("error on sending" , QMUX_EV_QCC_SEND , qcc -> conn );
20902102 qcc_subscribe_send (qcc );
20912103 goto err ;
20922104 }
20932105
2106+ BUG_ON (ret == QUIC_TX_ERR_AGAIN && !max_burst );
2107+
20942108 /* If there is frames left at this stage, transport layer is blocked.
20952109 * Subscribe on it to retry later.
20962110 */
2097- if (!LIST_ISEMPTY (frms )) {
2111+ if (!LIST_ISEMPTY (frms ) && ret != QUIC_TX_ERR_AGAIN ) {
20982112 TRACE_DEVEL ("remaining frames to send" , QMUX_EV_QCC_SEND , qcc -> conn );
20992113 qcc_subscribe_send (qcc );
21002114 goto err ;
21012115 }
21022116
21032117 TRACE_LEAVE (QMUX_EV_QCC_SEND , qcc -> conn );
2104- return 0 ;
2118+ return ret == QUIC_TX_ERR_AGAIN ? 1 : 0 ;
21052119
21062120 err :
21072121 TRACE_DEVEL ("leaving on error" , QMUX_EV_QCC_SEND , qcc -> conn );
2108- return 1 ;
2122+ return - 1 ;
21092123}
21102124
21112125/* Emit a RESET_STREAM on <qcs>.
@@ -2130,7 +2144,7 @@ static int qcs_send_reset(struct qcs *qcs)
21302144 frm -> reset_stream .final_size = qcs -> tx .fc .off_real ;
21312145
21322146 LIST_APPEND (& frms , & frm -> list );
2133- if (qcc_send_frames (qcs -> qcc , & frms )) {
2147+ if (qcc_send_frames (qcs -> qcc , & frms , 0 )) {
21342148 if (!LIST_ISEMPTY (& frms ))
21352149 qc_frm_free (qcs -> qcc -> conn -> handle .qc , & frm );
21362150 TRACE_DEVEL ("cannot send RESET_STREAM" , QMUX_EV_QCS_SEND , qcs -> qcc -> conn , qcs );
@@ -2181,7 +2195,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
21812195 frm -> stop_sending .app_error_code = qcs -> err ;
21822196
21832197 LIST_APPEND (& frms , & frm -> list );
2184- if (qcc_send_frames (qcs -> qcc , & frms )) {
2198+ if (qcc_send_frames (qcs -> qcc , & frms , 0 )) {
21852199 if (!LIST_ISEMPTY (& frms ))
21862200 qc_frm_free (qcc -> conn -> handle .qc , & frm );
21872201 TRACE_DEVEL ("cannot send STOP_SENDING" , QMUX_EV_QCS_SEND , qcs -> qcc -> conn , qcs );
@@ -2286,7 +2300,7 @@ static int qcc_io_send(struct qcc *qcc)
22862300 }
22872301
22882302 if (!LIST_ISEMPTY (& qcc -> lfctl .frms )) {
2289- if (qcc_send_frames (qcc , & qcc -> lfctl .frms )) {
2303+ if (qcc_send_frames (qcc , & qcc -> lfctl .frms , 0 )) {
22902304 TRACE_DEVEL ("flow-control frames rejected by transport, aborting send" , QMUX_EV_QCC_SEND , qcc -> conn );
22912305 goto out ;
22922306 }
@@ -2365,7 +2379,7 @@ static int qcc_io_send(struct qcc *qcc)
23652379 /* Retry sending until no frame to send, data rejected or connection
23662380 * flow-control limit reached.
23672381 */
2368- while (qcc_send_frames (qcc , & qcc -> tx .frms ) == 0 && !qfctl_rblocked (& qcc -> tx .fc )) {
2382+ while (( ret = qcc_send_frames (qcc , & qcc -> tx .frms , 1 ) ) == 0 && !qfctl_rblocked (& qcc -> tx .fc )) {
23692383 window_conn = qfctl_rcap (& qcc -> tx .fc );
23702384 resent = 0 ;
23712385
@@ -2397,7 +2411,10 @@ static int qcc_io_send(struct qcc *qcc)
23972411
23982412 sent_done :
23992413 /* Deallocate frames that the transport layer has rejected. */
2400- if (!LIST_ISEMPTY (& qcc -> tx .frms )) {
2414+ if (ret == 1 ) {
2415+ qcc_wakeup_pacing (qcc );
2416+ }
2417+ else if (!LIST_ISEMPTY (& qcc -> tx .frms )) {
24012418 struct quic_frame * frm , * frm2 ;
24022419
24032420 list_for_each_entry_safe (frm , frm2 , & qcc -> tx .frms , list )
@@ -2751,12 +2768,38 @@ static void qcc_release(struct qcc *qcc)
27512768 TRACE_LEAVE (QMUX_EV_QCC_END );
27522769}
27532770
2771+ static int qcc_purge_sending (struct qcc * qcc )
2772+ {
2773+ int ret ;
2774+
2775+ //fprintf(stderr, "%s\n", __func__);
2776+ ret = qcc_send_frames (qcc , & qcc -> tx .frms , 1 );
2777+ if (ret > 0 ) {
2778+ qcc_wakeup_pacing (qcc );
2779+ return 1 ;
2780+ }
2781+
2782+ return 0 ;
2783+ }
2784+
27542785struct task * qcc_io_cb (struct task * t , void * ctx , unsigned int status )
27552786{
27562787 struct qcc * qcc = ctx ;
27572788
27582789 TRACE_ENTER (QMUX_EV_QCC_WAKE , qcc -> conn );
27592790
2791+ if (status & TASK_F_USR1 ) {
2792+ qcc_purge_sending (qcc );
2793+ return NULL ;
2794+ }
2795+ else {
2796+ while (!LIST_ISEMPTY (& qcc -> tx .frms )) {
2797+ struct quic_frame * frm = LIST_ELEM (qcc -> tx .frms .n , struct quic_frame * , list );
2798+ qc_frm_free (qcc -> conn -> handle .qc , & frm );
2799+ }
2800+ LIST_INIT (& qcc -> tx .frms );
2801+ }
2802+
27602803 if (!(qcc -> wait_event .events & SUB_RETRY_SEND ))
27612804 qcc_io_send (qcc );
27622805
0 commit comments