Skip to content

Commit e52bbe5

Browse files
a-denoyellehaproxyFred
authored andcommitted
MAJOR: mux-quic: support pacing emission
1 parent 9c06e91 commit e52bbe5

File tree

3 files changed

+81
-14
lines changed

3 files changed

+81
-14
lines changed

include/haproxy/quic_pacing.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ static inline ullong quic_pacing_ns_pkt(const struct quic_pacer *pacer)
3737

3838
int quic_pacing_expired(const struct quic_pacer *pacer);
3939

40+
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc);
41+
4042
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent);
4143

4244
#endif /* _HAPROXY_QUIC_PACING_H */

src/mux_quic.c

Lines changed: 65 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,13 @@ static void qcc_refresh_timeout(struct qcc *qcc)
390390

391391
void qcc_wakeup(struct qcc *qcc)
392392
{
393+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
394+
tasklet_wakeup(qcc->wait_event.tasklet);
395+
}
396+
397+
static void qcc_wakeup_pacing(struct qcc *qcc)
398+
{
399+
HA_ATOMIC_OR(&qcc->wait_event.tasklet->state, TASK_F_USR1);
393400
tasklet_wakeup(qcc->wait_event.tasklet);
394401
}
395402

@@ -2078,18 +2085,22 @@ static int qcc_subscribe_send(struct qcc *qcc)
20782085
*
20792086
* Returns 0 if all data sent with success else non-zero.
20802087
*/
2081-
static int qcc_send_frames(struct qcc *qcc, struct list *frms)
2088+
static int qcc_send_frames(struct qcc *qcc, struct list *frms, int stream)
20822089
{
20832090
enum quic_tx_err ret;
2091+
struct quic_pacer *pacer = NULL;
20842092

20852093
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
20862094

20872095
if (LIST_ISEMPTY(frms)) {
20882096
TRACE_DEVEL("leaving on no frame to send", QMUX_EV_QCC_SEND, qcc->conn);
2089-
return 1;
2097+
return -1;
20902098
}
20912099

2092-
ret = qc_send_mux(qcc->conn->handle.qc, frms, NULL);
2100+
if (stream)
2101+
pacer = &qcc->tx.pacer;
2102+
2103+
ret = qc_send_mux(qcc->conn->handle.qc, frms, pacer);
20932104
if (ret == QUIC_TX_ERR_FATAL) {
20942105
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
20952106
qcc_subscribe_send(qcc);
@@ -2099,18 +2110,18 @@ static int qcc_send_frames(struct qcc *qcc, struct list *frms)
20992110
/* If there is frames left at this stage, transport layer is blocked.
21002111
* Subscribe on it to retry later.
21012112
*/
2102-
if (!LIST_ISEMPTY(frms)) {
2113+
if (!LIST_ISEMPTY(frms) && ret != QUIC_TX_ERR_AGAIN) {
21032114
TRACE_DEVEL("remaining frames to send", QMUX_EV_QCC_SEND, qcc->conn);
21042115
qcc_subscribe_send(qcc);
21052116
goto err;
21062117
}
21072118

21082119
TRACE_LEAVE(QMUX_EV_QCC_SEND, qcc->conn);
2109-
return 0;
2120+
return ret == QUIC_TX_ERR_AGAIN ? 1 : 0;
21102121

21112122
err:
21122123
TRACE_DEVEL("leaving on error", QMUX_EV_QCC_SEND, qcc->conn);
2113-
return 1;
2124+
return -1;
21142125
}
21152126

21162127
/* Emit a RESET_STREAM on <qcs>.
@@ -2135,7 +2146,7 @@ static int qcs_send_reset(struct qcs *qcs)
21352146
frm->reset_stream.final_size = qcs->tx.fc.off_real;
21362147

21372148
LIST_APPEND(&frms, &frm->list);
2138-
if (qcc_send_frames(qcs->qcc, &frms)) {
2149+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21392150
if (!LIST_ISEMPTY(&frms))
21402151
qc_frm_free(qcs->qcc->conn->handle.qc, &frm);
21412152
TRACE_DEVEL("cannot send RESET_STREAM", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2186,7 +2197,7 @@ static int qcs_send_stop_sending(struct qcs *qcs)
21862197
frm->stop_sending.app_error_code = qcs->err;
21872198

21882199
LIST_APPEND(&frms, &frm->list);
2189-
if (qcc_send_frames(qcs->qcc, &frms)) {
2200+
if (qcc_send_frames(qcs->qcc, &frms, 0)) {
21902201
if (!LIST_ISEMPTY(&frms))
21912202
qc_frm_free(qcc->conn->handle.qc, &frm);
21922203
TRACE_DEVEL("cannot send STOP_SENDING", QMUX_EV_QCS_SEND, qcs->qcc->conn, qcs);
@@ -2254,11 +2265,12 @@ static int qcs_send(struct qcs *qcs, struct list *frms, uint64_t window_conn)
22542265
static int qcc_io_send(struct qcc *qcc)
22552266
{
22562267
/* Temporary list for QCS on error. */
2257-
struct list *frms = quic_pacing_frms(&qcc->tx.pacer);
2268+
struct quic_pacer *pacer = &qcc->tx.pacer;
2269+
struct list *frms = quic_pacing_frms(pacer);
22582270
struct list qcs_failed = LIST_HEAD_INIT(qcs_failed);
22592271
struct qcs *qcs, *qcs_tmp, *first_qcs = NULL;
22602272
uint64_t window_conn = qfctl_rcap(&qcc->tx.fc);
2261-
int ret, total = 0, resent;
2273+
int ret = 0, total = 0, resent;
22622274

22632275
TRACE_ENTER(QMUX_EV_QCC_SEND, qcc->conn);
22642276

@@ -2268,6 +2280,8 @@ static int qcc_io_send(struct qcc *qcc)
22682280
* apply for STREAM frames.
22692281
*/
22702282

2283+
quic_pacing_reset(pacer);
2284+
22712285
/* Check for transport error. */
22722286
if (qcc->flags & QC_CF_ERR_CONN || qcc->conn->flags & CO_FL_ERROR) {
22732287
TRACE_DEVEL("connection on error", QMUX_EV_QCC_SEND, qcc->conn);
@@ -2292,7 +2306,7 @@ static int qcc_io_send(struct qcc *qcc)
22922306
}
22932307

22942308
if (!LIST_ISEMPTY(&qcc->lfctl.frms)) {
2295-
if (qcc_send_frames(qcc, &qcc->lfctl.frms)) {
2309+
if (qcc_send_frames(qcc, &qcc->lfctl.frms, 0)) {
22962310
TRACE_DEVEL("flow-control frames rejected by transport, aborting send", QMUX_EV_QCC_SEND, qcc->conn);
22972311
goto out;
22982312
}
@@ -2368,10 +2382,15 @@ static int qcc_io_send(struct qcc *qcc)
23682382
}
23692383
}
23702384

2385+
if (!LIST_ISEMPTY(frms) && !quic_pacing_expired(&qcc->tx.pacer)) {
2386+
qcc_wakeup_pacing(qcc);
2387+
return 1;
2388+
}
2389+
23712390
/* Retry sending until no frame to send, data rejected or connection
23722391
* flow-control limit reached.
23732392
*/
2374-
while ((ret = qcc_send_frames(qcc, frms)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
2393+
while ((ret = qcc_send_frames(qcc, frms, 1)) == 0 && !qfctl_rblocked(&qcc->tx.fc)) {
23752394
window_conn = qfctl_rcap(&qcc->tx.fc);
23762395
resent = 0;
23772396

@@ -2402,8 +2421,13 @@ static int qcc_io_send(struct qcc *qcc)
24022421
}
24032422

24042423
sent_done:
2405-
/* Deallocate frames that the transport layer has rejected. */
2406-
quic_pacing_reset(&qcc->tx.pacer);
2424+
if (ret == 1) {
2425+
qcc_wakeup_pacing(qcc);
2426+
}
2427+
else if (!LIST_ISEMPTY(quic_pacing_frms(pacer))) {
2428+
/* Deallocate frames that the transport layer has rejected. */
2429+
quic_pacing_reset(pacer);
2430+
}
24072431

24082432
/* Re-insert on-error QCS at the end of the send-list. */
24092433
if (!LIST_ISEMPTY(&qcs_failed)) {
@@ -2750,12 +2774,39 @@ static void qcc_release(struct qcc *qcc)
27502774
TRACE_LEAVE(QMUX_EV_QCC_END);
27512775
}
27522776

2777+
static void qcc_purge_sending(struct qcc *qcc)
2778+
{
2779+
struct quic_conn *qc = qcc->conn->handle.qc;
2780+
struct quic_pacer *pacer = &qcc->tx.pacer;
2781+
enum quic_tx_err ret;
2782+
2783+
ret = quic_pacing_send(pacer, qc);
2784+
if (ret == QUIC_TX_ERR_AGAIN) {
2785+
BUG_ON(LIST_ISEMPTY(quic_pacing_frms(pacer)));
2786+
qcc_wakeup_pacing(qcc);
2787+
}
2788+
else if (ret == QUIC_TX_ERR_FATAL) {
2789+
TRACE_DEVEL("error on sending", QMUX_EV_QCC_SEND, qcc->conn);
2790+
HA_ATOMIC_AND(&qcc->wait_event.tasklet->state, ~TASK_F_USR1);
2791+
qcc_subscribe_send(qcc);
2792+
}
2793+
else {
2794+
if (!LIST_ISEMPTY(quic_pacing_frms(pacer)))
2795+
qcc_subscribe_send(qcc);
2796+
}
2797+
}
2798+
27532799
struct task *qcc_io_cb(struct task *t, void *ctx, unsigned int status)
27542800
{
27552801
struct qcc *qcc = ctx;
27562802

27572803
TRACE_ENTER(QMUX_EV_QCC_WAKE, qcc->conn);
27582804

2805+
if (status & TASK_F_USR1) {
2806+
qcc_purge_sending(qcc);
2807+
return NULL;
2808+
}
2809+
27592810
if (!(qcc->wait_event.events & SUB_RETRY_SEND))
27602811
qcc_io_send(qcc);
27612812

src/quic_pacing.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,20 @@ int quic_pacing_expired(const struct quic_pacer *pacer)
99
return !pacer->next || pacer->next <= now_mono_time();
1010
}
1111

12+
enum quic_tx_err quic_pacing_send(struct quic_pacer *pacer, struct quic_conn *qc)
13+
{
14+
enum quic_tx_err ret;
15+
16+
if (!quic_pacing_expired(pacer))
17+
return QUIC_TX_ERR_AGAIN;
18+
19+
BUG_ON(LIST_ISEMPTY(&pacer->frms));
20+
ret = qc_send_mux(qc, &pacer->frms, pacer);
21+
22+
/* TODO handle QUIC_TX_ERR_FATAL */
23+
return ret;
24+
}
25+
1226
void quic_pacing_sent_done(struct quic_pacer *pacer, int sent)
1327
{
1428
pacer->next = now_mono_time() + quic_pacing_ns_pkt(pacer) * sent;

0 commit comments

Comments
 (0)