Skip to content

Commit 79ba4bf

Browse files
committed
MAJOR: mux-quic: support pacing emission
Support pacing emission for STREAM frames at the QUIC MUX layer. This is implemented by adding a quic_pacer engine into QCC structure. The main changes have been written into qcc_io_send(). It now differentiates cases when some frames have been rejected by transport layer. This can occur as previously due to congestion or FD buffer full, which requires subscribing on transport layer. The new case is when emission has been interrupted due to pacing timing. In this case, QUIC MUX I/O tasklet is rescheduled to run with the flag TASK_F_USR1. On tasklet execution, if TASK_F_USR1 is set, all standard processing for emission and reception is skipped. Instead, a new function qcc_purge_sending() is called. Its purpose is to retry emission with the saved STREAM frames list. Either all remaining frames can now be send, subscribe is done on transport error or tasklet must be rescheduled for pacing purging. In the meantime, if tasklet is rescheduled due to other conditions, TASK_F_USR1 is reset. This will trigger a full regeneration of STREAM frames. In this case, pacing expiration must be check before calling qcc_send_frames() to ensure emission is now allowed.
1 parent f8a1372 commit 79ba4bf

File tree

2 files changed

+72
-14
lines changed

2 files changed

+72
-14
lines changed

include/haproxy/mux_quic-t.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <haproxy/ncbuf-t.h>
1616
#include <haproxy/quic_fctl-t.h>
1717
#include <haproxy/quic_frame-t.h>
18+
#include <haproxy/quic_pacing-t.h>
1819
#include <haproxy/quic_stream-t.h>
1920
#include <haproxy/stconn-t.h>
2021
#include <haproxy/time-t.h>
@@ -69,6 +70,7 @@ struct qcc {
6970
struct quic_fctl fc; /* stream flow control applied on sending */
7071
uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */
7172
struct list frms; /* list of STREAM frames ready for sent */
73+
struct quic_pacer pacer; /* engine used to pace emission */
7274
} tx;
7375

7476
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */

src/mux_quic.c

Lines changed: 70 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <haproxy/quic_enc.h>
2020
#include <haproxy/quic_fctl.h>
2121
#include <haproxy/quic_frame.h>
22+
#include <haproxy/quic_pacing.h>
2223
#include <haproxy/quic_sock.h>
2324
#include <haproxy/quic_stream.h>
2425
#include <haproxy/quic_tp-t.h>
@@ -397,6 +398,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
397398

398399
void qcc_wakeup(struct qcc *qcc)
399400
{
401+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
402+
tasklet_wakeup(qcc->wait_event.tasklet);
403+
}
404+
405+
static void qcc_wakeup_pacing(struct qcc *qcc)
406+
{
407+
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
400408
tasklet_wakeup(qcc->wait_event.tasklet);
401409
}
402410

@@ -2083,20 +2091,26 @@ static int qcc_subscribe_send(struct qcc *qcc)
20832091
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
20842092
* connection <qcc>.
20852093
*
2086-
* Returns 0 if all data sent with success else non-zero.
2094+
* Returns 0 if all data sent with success. On fatal error, a negative error
2095+
* code is returned. A positive 1 is used if emission should be paced.
20872096
*/
2088-
static int qcc_send_frames(struct qcc *qcc, struct list *frms)
2097+
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
20892098
{
2099+
struct quic_conn *qc = qcc->conn->handle.qc;
20902100
enum quic_tx_err ret;
2101+
struct quic_pacer *pacer = NULL;
20912102

20922103
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
20932104

20942105
if (LIST_ISEMPTY(frms)) {
20952106
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
2096-
return 1;
2107+
return -1;
20972108
}
20982109

2099-
ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL);
2110+
if (stream && qc->path->cc.algo->pacing_rate)
2111+
pacer = &qcc->tx.pacer;
2112+
2113+
ret = qc_send_mux(qc, frms, pacer);
21002114
if (ret == QUIC_TX_ERR_FATAL) {
21012115
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
21022116
qcc_subscribe_send(qcc);
@@ -2106,18 +2120,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
21062120
/* If there is frames left at this stage, transport layer is blocked.
21072121
* Subscribe on it to retry later.
21082122
*/
2109-
if (!LIST_ISEMPTY(frms)) {
2123+
if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
21102124
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
21112125
qcc_subscribe_send(qcc);
21122126
goto err;
21132127
}
21142128

21152129
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
2116-
return 0;
2130+
return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
21172131

21182132
err:
21192133
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
2120-
return 1;
2134+
return -1;
21212135
}
21222136

21232137
/* Emit a RESET_STREAM on <qcs>.
@@ -2142,7 +2156,7 @@ static int qcs_send_reset(struct qcs *qcs)
21422156
frm->reset_stream.final_size = qcs->tx.fc.off_real;
21432157

21442158
LIST_APPEND(&frms, &frm->list);
2145-
if (qcc_send_frames(qcs->qcc, &frms)) {
2159+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21462160
if (!LIST_ISEMPTY(&frms))
21472161
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
21482162
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2193,7 +2207,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
21932207
frm->stop_sending.app_error_code = qcs->err;
21942208

21952209
LIST_APPEND(&frms, &frm->list);
2196-
if (qcc_send_frames(qcs->qcc, &frms)) {
2210+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21972211
if (!LIST_ISEMPTY(&frms))
21982212
qc_frm_free(qcc->conn->handle.qc, &frm);
21992213
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2265,7 +2279,7 @@ static int qcc_io_send(struct qcc *qcc)
22652279
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
22662280
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
22672281
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
2268-
int ret, total = 0, resent;
2282+
int ret = 0, total = 0, resent;
22692283

22702284
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
22712285

@@ -2275,6 +2289,8 @@ static int qcc_io_send(struct qcc *qcc)
22752289
* apply for STREAM frames.
22762290
*/
22772291

2292+
qcc_tx_frms_free(qcc);
2293+
22782294
/* Check for transport error. */
22792295
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
22802296
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@@ -2299,7 +2315,7 @@ static int qcc_io_send(struct qcc *qcc)
22992315
}
23002316

23012317
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
2302-
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
2318+
if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
23032319
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
23042320
goto out;
23052321
}
@@ -2375,10 +2391,15 @@ static int qcc_io_send(struct qcc *qcc)
23752391
}
23762392
}
23772393

2394+
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
2395+
qcc_wakeup_pacing(qcc);
2396+
return 1;
2397+
}
2398+
23782399
/* Retry sending until no frame to send, data rejected or connection
23792400
* flow-control limit reached.
23802401
*/
2381-
while (qcc_send_frames(qcc, frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
2402+
while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
23822403
window_conn = qfctl_rcap(&qcc->tx.fc);
23832404
resent = 0;
23842405

@@ -2409,8 +2430,13 @@ static int qcc_io_send(struct qcc *qcc)
24092430
}
24102431

24112432
sent_done:
2412-
/* Deallocate frames that the transport layer has rejected. */
2413-
qcc_tx_frms_free(qcc);
2433+
if (ret == 1) {
2434+
qcc_wakeup_pacing(qcc);
2435+
}
2436+
else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
2437+
/* Deallocate frames that the transport layer has rejected. */
2438+
qcc_tx_frms_free(qcc);
2439+
}
24142440

24152441
/* Re-insert on-error QCS at the end of the send-list. */
24162442
if (!LIST_ISEMPTY(&qcs_failed)) {
@@ -2757,12 +2783,40 @@ static void qcc_release(struct qcc *qcc)
27572783
TRACE_LEAVE(QMUX_EV_QCC_END);
27582784
}
27592785

2786+
static void qcc_purge_sending(struct qcc *qcc)
2787+
{
2788+
struct quic_conn *qc = qcc->conn->handle.qc;
2789+
struct quic_pacer *pacer = &qcc->tx.pacer;
2790+
struct list *frms = &qcc->tx.frms;
2791+
enum quic_tx_err ret;
2792+
2793+
ret = quic_pacing_send(pacer, qc);
2794+
if (ret == QUIC_TX_ERR_AGAIN) {
2795+
BUG_ON(LIST_ISEMPTY(frms));
2796+
qcc_wakeup_pacing(qcc);
2797+
}
2798+
else if (ret == QUIC_TX_ERR_FATAL) {
2799+
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
2800+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
2801+
qcc_subscribe_send(qcc);
2802+
}
2803+
else {
2804+
if (!LIST_ISEMPTY(frms))
2805+
qcc_subscribe_send(qcc);
2806+
}
2807+
}
2808+
27602809
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
27612810
{
27622811
struct qcc *qcc = ctx;
27632812

27642813
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
27652814

2815+
if (status & TASK_F_USR1) {
2816+
qcc_purge_sending(qcc);
2817+
return NULL;
2818+
}
2819+
27662820
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
27672821
qcc_io_send(qcc);
27682822

@@ -2889,6 +2943,8 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
28892943
qcc->rfctl.msd_uni_l = rparams->initial_max_stream_data_uni;
28902944

28912945
qcc->tx.buf_in_flight = 0;
2946+
quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc,
2947+
&qcc->tx.frms);
28922948

28932949
if (conn_is_back(conn)) {
28942950
qcc->next_bidi_l = 0x00;

0 commit comments

Comments
 (0)