Skip to content

Commit 796446a

Browse files
committed
MAJOR: mux-quic: support pacing emission
Support pacing emission for STREAM frames at the QUIC MUX layer. This is implemented by adding a quic_pacer engine into QCC structure. The main changes have been written into qcc_io_send(). It now differentiates cases when some frames have been rejected by transport layer. This can occur as previously due to congestion or FD buffer full, which requires subscribing on transport layer. The new case is when emission has been interrupted due to pacing timing. In this case, QUIC MUX I/O tasklet is rescheduled to run with the flag TASK_F_USR1. On tasklet execution, if TASK_F_USR1 is set, all standard processing for emission and reception is skipped. Instead, a new function qcc_purge_sending() is called. Its purpose is to retry emission with the saved STREAM frames list. Either all remaining frames can now be send, subscribe is done on transport error or tasklet must be rescheduled for pacing purging. In the meantime, if tasklet is rescheduled due to other conditions, TASK_F_USR1 is reset. This will trigger a full regeneration of STREAM frames. In this case, pacing expiration must be check before calling qcc_send_frames() to ensure emission is now allowed.
1 parent ede4cd4 commit 796446a

File tree

2 files changed

+88
-14
lines changed

2 files changed

+88
-14
lines changed

include/haproxy/mux_quic-t.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <haproxy/ncbuf-t.h>
1616
#include <haproxy/quic_fctl-t.h>
1717
#include <haproxy/quic_frame-t.h>
18+
#include <haproxy/quic_pacing-t.h>
1819
#include <haproxy/quic_stream-t.h>
1920
#include <haproxy/stconn-t.h>
2021
#include <haproxy/time-t.h>
@@ -69,6 +70,7 @@ struct qcc {
6970
struct quic_fctl fc; /* stream flow control applied on sending */
7071
uint64_t buf_in_flight; /* sum of currently allocated Tx buffer sizes */
7172
struct list frms; /* list of STREAM frames ready for sent */
73+
struct quic_pacer pacer; /* engine used to pace emission */
7274
} tx;
7375

7476
uint64_t largest_bidi_r; /* largest remote bidi stream ID opened. */

src/mux_quic.c

Lines changed: 86 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#include <haproxy/quic_enc.h>
2020
#include <haproxy/quic_fctl.h>
2121
#include <haproxy/quic_frame.h>
22+
#include <haproxy/quic_pacing.h>
2223
#include <haproxy/quic_sock.h>
2324
#include <haproxy/quic_stream.h>
2425
#include <haproxy/quic_tp-t.h>
@@ -36,6 +37,13 @@ DECLARE_POOL(pool_head_qcs, "qcs", sizeof(struct qcs));
3637
static void qmux_ctrl_send(struct qc_stream_desc *, uint64_t data, uint64_t offset);
3738
static void qmux_ctrl_room(struct qc_stream_desc *, uint64_t room);
3839

40+
/* Returns true if pacing should be used for <conn> connection. */
41+
static int qcc_is_pacing_active(const struct connection *conn)
42+
{
43+
const struct quic_conn *qc = conn->handle.qc;
44+
return !!(qc->path->cc.algo->pacing_rate);
45+
}
46+
3947
/* Free <qcc> STREAM frames in Tx list. */
4048
static void qcc_tx_frms_free(struct qcc *qcc)
4149
{
@@ -398,6 +406,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
398406

399407
void qcc_wakeup(struct qcc *qcc)
400408
{
409+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
410+
tasklet_wakeup(qcc->wait_event.tasklet);
411+
}
412+
413+
static void qcc_wakeup_pacing(struct qcc *qcc)
414+
{
415+
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
401416
tasklet_wakeup(qcc->wait_event.tasklet);
402417
}
403418

@@ -2084,20 +2099,25 @@ static int qcc_subscribe_send(struct qcc *qcc)
20842099
/* Wrapper for send on transport layer. Send a list of frames <frms> for the
20852100
* connection <qcc>.
20862101
*
2087-
* Returns 0 if all data sent with success else non-zero.
2102+
* Returns 0 if all data sent with success. On fatal error, a negative error
2103+
* code is returned. A positive 1 is used if emission should be paced.
20882104
*/
2089-
static int qcc_send_frames(struct qcc *qcc, struct list *frms)
2105+
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
20902106
{
20912107
enum quic_tx_err ret;
2108+
struct quic_pacer *pacer = NULL;
20922109

20932110
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
20942111

20952112
if (LIST_ISEMPTY(frms)) {
20962113
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
2097-
return 1;
2114+
return -1;
20982115
}
20992116

2100-
ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL);
2117+
if (stream && qcc_is_pacing_active(qcc->conn))
2118+
pacer = &qcc->tx.pacer;
2119+
2120+
ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
21012121
if (ret == QUIC_TX_ERR_FATAL) {
21022122
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
21032123
qcc_subscribe_send(qcc);
@@ -2107,18 +2127,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
21072127
/* If there is frames left at this stage, transport layer is blocked.
21082128
* Subscribe on it to retry later.
21092129
*/
2110-
if (!LIST_ISEMPTY(frms)) {
2130+
if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_PACING) {
21112131
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
21122132
qcc_subscribe_send(qcc);
21132133
goto err;
21142134
}
21152135

21162136
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
2117-
return 0;
2137+
return ret == QUIC_TX_ERR_PACING ? 1 : 0;
21182138

21192139
err:
21202140
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
2121-
return 1;
2141+
return -1;
21222142
}
21232143

21242144
/* Emit a RESET_STREAM on <qcs>.
@@ -2143,7 +2163,7 @@ static int qcs_send_reset(struct qcs *qcs)
21432163
frm->reset_stream.final_size = qcs->tx.fc.off_real;
21442164

21452165
LIST_APPEND(&frms, &frm->list);
2146-
if (qcc_send_frames(qcs->qcc, &frms)) {
2166+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21472167
if (!LIST_ISEMPTY(&frms))
21482168
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
21492169
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2194,7 +2214,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
21942214
frm->stop_sending.app_error_code = qcs->err;
21952215

21962216
LIST_APPEND(&frms, &frm->list);
2197-
if (qcc_send_frames(qcs->qcc, &frms)) {
2217+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21982218
if (!LIST_ISEMPTY(&frms))
21992219
qc_frm_free(qcc->conn->handle.qc, &frm);
22002220
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2266,7 +2286,7 @@ static int qcc_io_send(struct qcc *qcc)
22662286
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
22672287
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
22682288
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
2269-
int ret, total = 0, resent;
2289+
int ret = 0, total = 0, resent;
22702290

22712291
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
22722292

@@ -2276,6 +2296,8 @@ static int qcc_io_send(struct qcc *qcc)
22762296
* apply for STREAM frames.
22772297
*/
22782298

2299+
qcc_tx_frms_free(qcc);
2300+
22792301
/* Check for transport error. */
22802302
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
22812303
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@@ -2300,7 +2322,7 @@ static int qcc_io_send(struct qcc *qcc)
23002322
}
23012323

23022324
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
2303-
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
2325+
if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
23042326
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
23052327
goto out;
23062328
}
@@ -2376,10 +2398,17 @@ static int qcc_io_send(struct qcc *qcc)
23762398
}
23772399
}
23782400

2401+
if (qcc_is_pacing_active(qcc->conn)) {
2402+
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
2403+
qcc_wakeup_pacing(qcc);
2404+
return 1;
2405+
}
2406+
}
2407+
23792408
/* Retry sending until no frame to send, data rejected or connection
23802409
* flow-control limit reached.
23812410
*/
2382-
while (qcc_send_frames(qcc, frms) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
2411+
while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
23832412
window_conn = qfctl_rcap(&qcc->tx.fc);
23842413
resent = 0;
23852414

@@ -2410,8 +2439,15 @@ static int qcc_io_send(struct qcc *qcc)
24102439
}
24112440

24122441
sent_done:
2413-
/* Deallocate frames that the transport layer has rejected. */
2414-
qcc_tx_frms_free(qcc);
2442+
if (ret == 1) {
2443+
/* qcc_send_frames cannot return 1 if pacing not used. */
2444+
BUG_ON(!qcc_is_pacing_active(qcc->conn));
2445+
qcc_wakeup_pacing(qcc);
2446+
}
2447+
else if (!LIST_ISEMPTY(&qcc->tx.frms)) {
2448+
/* Deallocate frames that the transport layer has rejected. */
2449+
qcc_tx_frms_free(qcc);
2450+
}
24152451

24162452
/* Re-insert on-error QCS at the end of the send-list. */
24172453
if (!LIST_ISEMPTY(&qcs_failed)) {
@@ -2758,12 +2794,45 @@ static void qcc_release(struct qcc *qcc)
27582794
TRACE_LEAVE(QMUX_EV_QCC_END);
27592795
}
27602796

2797+
static void qcc_purge_sending(struct qcc *qcc)
2798+
{
2799+
struct quic_pacer *pacer = &qcc->tx.pacer;
2800+
struct list *frms = &qcc->tx.frms;
2801+
enum quic_tx_err ret = QUIC_TX_ERR_PACING;
2802+
2803+
/* This function is reserved for pacing usage. */
2804+
BUG_ON(!qcc_is_pacing_active(qcc->conn));
2805+
2806+
/* Only restart emission if pacing delay is reached. */
2807+
if (quic_pacing_expired(pacer))
2808+
ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
2809+
2810+
if (ret == QUIC_TX_ERR_PACING) {
2811+
BUG_ON(LIST_ISEMPTY(frms));
2812+
qcc_wakeup_pacing(qcc);
2813+
}
2814+
else if (ret == QUIC_TX_ERR_FATAL) {
2815+
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
2816+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
2817+
qcc_subscribe_send(qcc);
2818+
}
2819+
else {
2820+
if (!LIST_ISEMPTY(frms))
2821+
qcc_subscribe_send(qcc);
2822+
}
2823+
}
2824+
27612825
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
27622826
{
27632827
struct qcc *qcc = ctx;
27642828

27652829
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
27662830

2831+
if (status & TASK_F_USR1) {
2832+
qcc_purge_sending(qcc);
2833+
return NULL;
2834+
}
2835+
27672836
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
27682837
qcc_io_send(qcc);
27692838

@@ -2891,6 +2960,9 @@ static int qmux_init(struct connection *conn, struct proxy *prx,
28912960

28922961
qcc->tx.buf_in_flight = 0;
28932962

2963+
if (qcc_is_pacing_active(conn))
2964+
quic_pacing_init(&qcc->tx.pacer, &conn->handle.qc->path->cc);
2965+
28942966
if (conn_is_back(conn)) {
28952967
qcc->next_bidi_l = 0x00;
28962968
qcc->largest_bidi_r = 0x01;

0 commit comments

Comments
 (0)