Skip to content

Commit a343b17

Browse files
committed
rxrpc: Only set/transmit aborts in the I/O thread
Only set the abort call completion state in the I/O thread and only transmit ABORT packets from there. rxrpc_abort_call() can then be made to actually send the packet. Further, ABORT packets should only be sent if the call has been exposed to the network (ie. at least one attempted DATA transmission has occurred for it). Signed-off-by: David Howells <[email protected]> cc: Marc Dionne <[email protected]> cc: [email protected]
1 parent 30df927 commit a343b17

File tree

7 files changed

+50
-18
lines changed

7 files changed

+50
-18
lines changed

include/trace/events/rxrpc.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* Declare tracing information enums and their string mappings for display.
1818
*/
1919
#define rxrpc_call_poke_traces \
20+
EM(rxrpc_call_poke_abort, "Abort") \
2021
EM(rxrpc_call_poke_error, "Error") \
2122
EM(rxrpc_call_poke_idle, "Idle") \
2223
EM(rxrpc_call_poke_start, "Start") \

net/rxrpc/ar-internal.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -625,7 +625,10 @@ struct rxrpc_call {
625625
unsigned long events;
626626
spinlock_t notify_lock; /* Kernel notification lock */
627627
rwlock_t state_lock; /* lock for state transition */
628-
u32 abort_code; /* Local/remote abort code */
628+
const char *send_abort_why; /* String indicating why the abort was sent */
629+
s32 send_abort; /* Abort code to be sent */
630+
short send_abort_err; /* Error to be associated with the abort */
631+
s32 abort_code; /* Local/remote abort code */
629632
int error; /* Local error incurred */
630633
enum rxrpc_call_state state; /* current state of call */
631634
enum rxrpc_call_completion completion; /* Call completion condition */
@@ -1146,6 +1149,8 @@ struct key *rxrpc_look_up_server_security(struct rxrpc_connection *,
11461149
/*
11471150
* sendmsg.c
11481151
*/
1152+
bool rxrpc_propose_abort(struct rxrpc_call *call,
1153+
u32 abort_code, int error, const char *why);
11491154
int rxrpc_do_sendmsg(struct rxrpc_sock *, struct msghdr *, size_t);
11501155

11511156
/*

net/rxrpc/call_event.c

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -270,9 +270,11 @@ static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
270270
{
271271
struct rxrpc_txbuf *txb;
272272

273-
if (rxrpc_is_client_call(call) &&
274-
!test_bit(RXRPC_CALL_EXPOSED, &call->flags))
273+
if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
274+
if (list_empty(&call->tx_sendmsg))
275+
return;
275276
rxrpc_expose_client_call(call);
277+
}
276278

277279
while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
278280
struct rxrpc_txbuf, call_link))) {
@@ -336,6 +338,7 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
336338
unsigned long now, next, t;
337339
rxrpc_serial_t ackr_serial;
338340
bool resend = false, expired = false;
341+
s32 abort_code;
339342

340343
rxrpc_see_call(call, rxrpc_call_see_input);
341344

@@ -346,6 +349,14 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
346349
if (call->state == RXRPC_CALL_COMPLETE)
347350
goto out;
348351

352+
/* Handle abort request locklessly, vs rxrpc_propose_abort(). */
353+
abort_code = smp_load_acquire(&call->send_abort);
354+
if (abort_code) {
355+
rxrpc_abort_call(call->send_abort_why, call, 0, call->send_abort,
356+
call->send_abort_err);
357+
goto out;
358+
}
359+
349360
if (skb && skb->mark == RXRPC_SKB_MARK_ERROR)
350361
goto out;
351362

@@ -433,7 +444,6 @@ void rxrpc_input_call_event(struct rxrpc_call *call, struct sk_buff *skb)
433444
} else {
434445
rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, -ETIME);
435446
}
436-
rxrpc_send_abort_packet(call);
437447
goto out;
438448
}
439449

net/rxrpc/call_object.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -430,6 +430,8 @@ void rxrpc_incoming_call(struct rxrpc_sock *rx,
430430
call->state = RXRPC_CALL_SERVER_SECURING;
431431
call->cong_tstamp = skb->tstamp;
432432

433+
__set_bit(RXRPC_CALL_EXPOSED, &call->flags);
434+
433435
spin_lock(&conn->state_lock);
434436

435437
switch (conn->state) {
@@ -590,16 +592,15 @@ void rxrpc_release_calls_on_socket(struct rxrpc_sock *rx)
590592
call = list_entry(rx->to_be_accepted.next,
591593
struct rxrpc_call, accept_link);
592594
list_del(&call->accept_link);
593-
rxrpc_abort_call("SKR", call, 0, RX_CALL_DEAD, -ECONNRESET);
595+
rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, "SKR");
594596
rxrpc_put_call(call, rxrpc_call_put_release_sock_tba);
595597
}
596598

597599
while (!list_empty(&rx->sock_calls)) {
598600
call = list_entry(rx->sock_calls.next,
599601
struct rxrpc_call, sock_link);
600602
rxrpc_get_call(call, rxrpc_call_get_release_sock);
601-
rxrpc_abort_call("SKT", call, 0, RX_CALL_DEAD, -ECONNRESET);
602-
rxrpc_send_abort_packet(call);
603+
rxrpc_propose_abort(call, RX_CALL_DEAD, -ECONNRESET, "SKT");
603604
rxrpc_release_call(rx, call);
604605
rxrpc_put_call(call, rxrpc_call_put_release_sock);
605606
}

net/rxrpc/input.c

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,7 @@
1212
static void rxrpc_proto_abort(const char *why,
1313
struct rxrpc_call *call, rxrpc_seq_t seq)
1414
{
15-
if (rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG))
16-
rxrpc_send_abort_packet(call);
15+
rxrpc_abort_call(why, call, seq, RX_PROTOCOL_ERROR, -EBADMSG);
1716
}
1817

1918
/*
@@ -1007,8 +1006,7 @@ void rxrpc_implicit_end_call(struct rxrpc_call *call, struct sk_buff *skb)
10071006
case RXRPC_CALL_COMPLETE:
10081007
break;
10091008
default:
1010-
if (rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN))
1011-
rxrpc_send_abort_packet(call);
1009+
rxrpc_abort_call("IMP", call, 0, RX_CALL_DEAD, -ESHUTDOWN);
10121010
trace_rxrpc_improper_term(call);
10131011
break;
10141012
}

net/rxrpc/recvmsg.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,8 @@ bool rxrpc_abort_call(const char *why, struct rxrpc_call *call,
134134
write_lock(&call->state_lock);
135135
ret = __rxrpc_abort_call(why, call, seq, abort_code, error);
136136
write_unlock(&call->state_lock);
137+
if (ret && test_bit(RXRPC_CALL_EXPOSED, &call->flags))
138+
rxrpc_send_abort_packet(call);
137139
return ret;
138140
}
139141

net/rxrpc/sendmsg.c

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,26 @@
1717
#include <net/af_rxrpc.h>
1818
#include "ar-internal.h"
1919

20+
/*
21+
* Propose an abort to be made in the I/O thread.
22+
*/
23+
bool rxrpc_propose_abort(struct rxrpc_call *call,
24+
u32 abort_code, int error, const char *why)
25+
{
26+
_enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
27+
28+
if (!call->send_abort && call->state < RXRPC_CALL_COMPLETE) {
29+
call->send_abort_why = why;
30+
call->send_abort_err = error;
31+
/* Request abort locklessly vs rxrpc_input_call_event(). */
32+
smp_store_release(&call->send_abort, abort_code);
33+
rxrpc_poke_call(call, rxrpc_call_poke_abort);
34+
return true;
35+
}
36+
37+
return false;
38+
}
39+
2040
/*
2141
* Return true if there's sufficient Tx queue space.
2242
*/
@@ -663,9 +683,8 @@ int rxrpc_do_sendmsg(struct rxrpc_sock *rx, struct msghdr *msg, size_t len)
663683
/* it's too late for this call */
664684
ret = -ESHUTDOWN;
665685
} else if (p.command == RXRPC_CMD_SEND_ABORT) {
686+
rxrpc_propose_abort(call, p.abort_code, -ECONNABORTED, "CMD");
666687
ret = 0;
667-
if (rxrpc_abort_call("CMD", call, 0, p.abort_code, -ECONNABORTED))
668-
ret = rxrpc_send_abort_packet(call);
669688
} else if (p.command != RXRPC_CMD_SEND_DATA) {
670689
ret = -EINVAL;
671690
} else {
@@ -760,11 +779,7 @@ bool rxrpc_kernel_abort_call(struct socket *sock, struct rxrpc_call *call,
760779
_enter("{%d},%d,%d,%s", call->debug_id, abort_code, error, why);
761780

762781
mutex_lock(&call->user_mutex);
763-
764-
aborted = rxrpc_abort_call(why, call, 0, abort_code, error);
765-
if (aborted)
766-
rxrpc_send_abort_packet(call);
767-
782+
aborted = rxrpc_propose_abort(call, abort_code, error, why);
768783
mutex_unlock(&call->user_mutex);
769784
return aborted;
770785
}

0 commit comments

Comments
 (0)