diff --git a/include/linux/skmsg.h b/include/linux/skmsg.h index 49847888c287..49069b519499 100644 --- a/include/linux/skmsg.h +++ b/include/linux/skmsg.h @@ -97,6 +97,7 @@ struct sk_psock { struct sk_buff_head ingress_skb; struct list_head ingress_msg; spinlock_t ingress_lock; + ssize_t ingress_size; unsigned long state; struct list_head link; spinlock_t link_lock; @@ -141,6 +142,8 @@ int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from, struct sk_msg *msg, u32 bytes); int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, int len, int flags); +int __sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, + int len, int flags, int *from_self_copied); bool sk_msg_is_readable(struct sock *sk); static inline void sk_msg_check_to_free(struct sk_msg *msg, u32 i, u32 bytes) @@ -319,6 +322,16 @@ static inline void sock_drop(struct sock *sk, struct sk_buff *skb) kfree_skb(skb); } +static inline ssize_t sk_psock_get_msg_size(struct sk_psock *psock) +{ + return psock->ingress_size; +} + +static inline void sk_psock_inc_msg_size(struct sk_psock *psock, ssize_t diff) +{ + psock->ingress_size += diff; +} + static inline bool sk_psock_queue_msg(struct sk_psock *psock, struct sk_msg *msg) { @@ -327,6 +340,7 @@ static inline bool sk_psock_queue_msg(struct sk_psock *psock, spin_lock_bh(&psock->ingress_lock); if (sk_psock_test_state(psock, SK_PSOCK_TX_ENABLED)) { list_add_tail(&msg->list, &psock->ingress_msg); + sk_psock_inc_msg_size(psock, msg->sg.size); ret = true; } else { sk_msg_free(psock->sk, msg); @@ -343,8 +357,10 @@ static inline struct sk_msg *sk_psock_dequeue_msg(struct sk_psock *psock) spin_lock_bh(&psock->ingress_lock); msg = list_first_entry_or_null(&psock->ingress_msg, struct sk_msg, list); - if (msg) + if (msg) { list_del(&msg->list); + sk_psock_inc_msg_size(psock, -msg->sg.size); + } spin_unlock_bh(&psock->ingress_lock); return msg; } @@ -521,6 +537,36 @@ static inline bool sk_psock_strp_enabled(struct sk_psock *psock) return !!psock->saved_data_ready; } +static inline ssize_t sk_psock_msg_inq(struct sock *sk) +{ + struct sk_psock *psock; + ssize_t inq = 0; + + psock = sk_psock_get(sk); + if (likely(psock)) { + inq = sk_psock_get_msg_size(psock); + sk_psock_put(sk, psock); + } + return inq; +} + +/* for udp */ +static inline ssize_t sk_msg_first_length(struct sock *sk) +{ + struct sk_psock *psock; + struct sk_msg *msg; + ssize_t inq = 0; + + psock = sk_psock_get(sk); + if (likely(psock)) { + msg = sk_psock_peek_msg(psock); + if (msg) + inq = msg->sg.size; + sk_psock_put(sk, psock); + } + return inq; +} + #if IS_ENABLED(CONFIG_NET_SOCK_MSG) #define BPF_F_STRPARSER (1UL << 1) diff --git a/net/core/skmsg.c b/net/core/skmsg.c index 2ac7731e1e0a..c959d52a62b2 100644 --- a/net/core/skmsg.c +++ b/net/core/skmsg.c @@ -409,14 +409,14 @@ int sk_msg_memcopy_from_iter(struct sock *sk, struct iov_iter *from, } EXPORT_SYMBOL_GPL(sk_msg_memcopy_from_iter); -/* Receive sk_msg from psock->ingress_msg to @msg. */ -int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, - int len, int flags) +int __sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, + int len, int flags, int *from_self_copied) { struct iov_iter *iter = &msg->msg_iter; int peek = flags & MSG_PEEK; struct sk_msg *msg_rx; int i, copied = 0; + bool to_self; msg_rx = sk_psock_peek_msg(psock); while (copied != len) { @@ -425,6 +425,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, if (unlikely(!msg_rx)) break; + to_self = msg_rx->sk == sk; i = msg_rx->sg.start; do { struct page *page; @@ -443,6 +444,9 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, } copied += copy; + if (to_self && from_self_copied) + *from_self_copied += copy; + if (likely(!peek)) { sge->offset += copy; sge->length -= copy; @@ -451,6 +455,7 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, atomic_sub(copy, &sk->sk_rmem_alloc); } msg_rx->sg.size -= copy; + sk_psock_inc_msg_size(psock, -copy); if (!sge->length) { sk_msg_iter_var_next(i); @@ -487,6 +492,14 @@ int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, out: return copied; } +EXPORT_SYMBOL_GPL(__sk_msg_recvmsg); + +/* Receive sk_msg from psock->ingress_msg to @msg. */ +int sk_msg_recvmsg(struct sock *sk, struct sk_psock *psock, struct msghdr *msg, + int len, int flags) +{ + return __sk_msg_recvmsg(sk, psock, msg, len, flags, NULL); +} EXPORT_SYMBOL_GPL(sk_msg_recvmsg); bool sk_msg_is_readable(struct sock *sk) @@ -616,6 +629,12 @@ static int sk_psock_skb_ingress_self(struct sk_psock *psock, struct sk_buff *skb if (unlikely(!msg)) return -EAGAIN; skb_set_owner_r(skb, sk); + + /* This is used in tcp_bpf_recvmsg_parser() to determine whether the + * data originates from the socket's own protocol stack. No need to + * refcount sk because msg's lifetime is bound to sk via the ingress_msg. + */ + msg->sk = sk; err = sk_psock_skb_ingress_enqueue(skb, off, len, psock, sk, msg, take_ref); if (err < 0) kfree(msg); @@ -801,9 +820,11 @@ static void __sk_psock_purge_ingress_msg(struct sk_psock *psock) list_del(&msg->list); if (!msg->skb) atomic_sub(msg->sg.size, &psock->sk->sk_rmem_alloc); + sk_psock_inc_msg_size(psock, -((ssize_t)msg->sg.size)); sk_msg_free(psock->sk, msg); kfree(msg); } + WARN_ON_ONCE(psock->ingress_size); } static void __sk_psock_zap_ingress(struct sk_psock *psock) @@ -909,6 +930,7 @@ int sk_psock_msg_verdict(struct sock *sk, struct sk_psock *psock, sk_msg_compute_data_pointers(msg); msg->sk = sk; ret = bpf_prog_run_pin_on_cpu(prog, msg); + msg->sk = NULL; ret = sk_psock_map_verd(ret, msg->sk_redir); psock->apply_bytes = msg->apply_bytes; if (ret == __SK_REDIRECT) { diff --git a/net/ipv4/tcp_bpf.c b/net/ipv4/tcp_bpf.c index a268e1595b22..a9c758868f13 100644 --- a/net/ipv4/tcp_bpf.c +++ b/net/ipv4/tcp_bpf.c @@ -10,6 +10,7 @@ #include #include +#include void tcp_eat_skb(struct sock *sk, struct sk_buff *skb) { @@ -226,6 +227,7 @@ static int tcp_bpf_recvmsg_parser(struct sock *sk, int peek = flags & MSG_PEEK; struct sk_psock *psock; struct tcp_sock *tcp; + int from_self_copied = 0; int copied = 0; u32 seq; @@ -262,7 +264,7 @@ static int tcp_bpf_recvmsg_parser(struct sock *sk, } msg_bytes_ready: - copied = sk_msg_recvmsg(sk, psock, msg, len, flags); + copied = __sk_msg_recvmsg(sk, psock, msg, len, flags, &from_self_copied); /* The typical case for EFAULT is the socket was gracefully * shutdown with a FIN pkt. So check here the other case is * some error on copy_page_to_iter which would be unexpected. @@ -277,7 +279,7 @@ static int tcp_bpf_recvmsg_parser(struct sock *sk, goto out; } } - seq += copied; + seq += from_self_copied; if (!copied) { long timeo; int data; @@ -331,6 +333,25 @@ static int tcp_bpf_recvmsg_parser(struct sock *sk, return copied; } +static int tcp_bpf_ioctl(struct sock *sk, int cmd, int *karg) +{ + bool slow; + + /* we only care about FIONREAD */ + if (cmd != SIOCINQ) + return tcp_ioctl(sk, cmd, karg); + + /* works similar as tcp_ioctl */ + if (sk->sk_state == TCP_LISTEN) + return -EINVAL; + + slow = lock_sock_fast(sk); + *karg = sk_psock_msg_inq(sk); + unlock_sock_fast(sk, slow); + + return 0; +} + static int tcp_bpf_recvmsg(struct sock *sk, struct msghdr *msg, size_t len, int flags, int *addr_len) { @@ -609,6 +630,7 @@ static void tcp_bpf_rebuild_protos(struct proto prot[TCP_BPF_NUM_CFGS], prot[TCP_BPF_BASE].close = sock_map_close; prot[TCP_BPF_BASE].recvmsg = tcp_bpf_recvmsg; prot[TCP_BPF_BASE].sock_is_readable = sk_msg_is_readable; + prot[TCP_BPF_BASE].ioctl = tcp_bpf_ioctl; prot[TCP_BPF_TX] = prot[TCP_BPF_BASE]; prot[TCP_BPF_TX].sendmsg = tcp_bpf_sendmsg; diff --git a/net/ipv4/udp_bpf.c b/net/ipv4/udp_bpf.c index 0735d820e413..cc1156aef14d 100644 --- a/net/ipv4/udp_bpf.c +++ b/net/ipv4/udp_bpf.c @@ -5,6 +5,7 @@ #include #include #include +#include #include "udp_impl.h" @@ -111,12 +112,28 @@ enum { static DEFINE_SPINLOCK(udpv6_prot_lock); static struct proto udp_bpf_prots[UDP_BPF_NUM_PROTS]; +static int udp_bpf_ioctl(struct sock *sk, int cmd, int *karg) +{ + /* we only care about FIONREAD */ + if (cmd != SIOCINQ) + return udp_ioctl(sk, cmd, karg); + + /* works similar as udp_ioctl. + * man udp(7): "FIONREAD (SIOCINQ): Returns the size of the next + * pending datagram in the integer in bytes, or 0 when no datagram + * is pending." + */ + *karg = sk_msg_first_length(sk); + return 0; +} + static void udp_bpf_rebuild_protos(struct proto *prot, const struct proto *base) { - *prot = *base; - prot->close = sock_map_close; - prot->recvmsg = udp_bpf_recvmsg; - prot->sock_is_readable = sk_msg_is_readable; + *prot = *base; + prot->close = sock_map_close; + prot->recvmsg = udp_bpf_recvmsg; + prot->sock_is_readable = sk_msg_is_readable; + prot->ioctl = udp_bpf_ioctl; } static void udp_bpf_check_v6_needs_rebuild(struct proto *ops) diff --git a/tools/testing/selftests/bpf/prog_tests/sockmap_basic.c b/tools/testing/selftests/bpf/prog_tests/sockmap_basic.c index 1e3e4392dcca..8828f2b2f761 100644 --- a/tools/testing/selftests/bpf/prog_tests/sockmap_basic.c +++ b/tools/testing/selftests/bpf/prog_tests/sockmap_basic.c @@ -1,7 +1,8 @@ // SPDX-License-Identifier: GPL-2.0 // Copyright (c) 2020 Cloudflare #include -#include +#include +#include #include #include "test_progs.h" @@ -22,6 +23,16 @@ #define TCP_REPAIR_ON 1 #define TCP_REPAIR_OFF_NO_WP -1 /* Turn off without window probes */ +/** + * SOL_TCP is defined in while field + * copybuf_address of tcp_zerocopy_receive is not in it + * Although glibc has merged my patch to sync headers, + * the fix will take time to propagate, hence this workaround. + */ +#ifndef SOL_TCP +#define SOL_TCP 6 +#endif + static int connected_socket_v4(void) { struct sockaddr_in addr = { @@ -536,13 +547,14 @@ static void test_sockmap_skb_verdict_shutdown(void) } -static void test_sockmap_skb_verdict_fionread(bool pass_prog) +static void do_test_sockmap_skb_verdict_fionread(int sotype, bool pass_prog) { int err, map, verdict, c0 = -1, c1 = -1, p0 = -1, p1 = -1; int expected, zero = 0, sent, recvd, avail; struct test_sockmap_pass_prog *pass = NULL; struct test_sockmap_drop_prog *drop = NULL; char buf[256] = "0123456789"; + int split_len = sizeof(buf) / 2; if (pass_prog) { pass = test_sockmap_pass_prog__open_and_load(); @@ -550,7 +562,10 @@ static void test_sockmap_skb_verdict_fionread(bool pass_prog) return; verdict = bpf_program__fd(pass->progs.prog_skb_verdict); map = bpf_map__fd(pass->maps.sock_map_rx); - expected = sizeof(buf); + if (sotype == SOCK_DGRAM) + expected = split_len; /* FIONREAD for UDP is different from TCP */ + else + expected = sizeof(buf); } else { drop = test_sockmap_drop_prog__open_and_load(); if (!ASSERT_OK_PTR(drop, "open_and_load")) @@ -566,7 +581,7 @@ static void test_sockmap_skb_verdict_fionread(bool pass_prog) if (!ASSERT_OK(err, "bpf_prog_attach")) goto out; - err = create_socket_pairs(AF_INET, SOCK_STREAM, &c0, &c1, &p0, &p1); + err = create_socket_pairs(AF_INET, sotype, &c0, &c1, &p0, &p1); if (!ASSERT_OK(err, "create_socket_pairs()")) goto out; @@ -574,8 +589,9 @@ static void test_sockmap_skb_verdict_fionread(bool pass_prog) if (!ASSERT_OK(err, "bpf_map_update_elem(c1)")) goto out_close; - sent = xsend(p1, &buf, sizeof(buf), 0); - ASSERT_EQ(sent, sizeof(buf), "xsend(p0)"); + sent = xsend(p1, &buf, split_len, 0); + sent += xsend(p1, &buf, sizeof(buf) - split_len, 0); + ASSERT_EQ(sent, sizeof(buf), "xsend(p1)"); err = ioctl(c1, FIONREAD, &avail); ASSERT_OK(err, "ioctl(FIONREAD) error"); ASSERT_EQ(avail, expected, "ioctl(FIONREAD)"); @@ -597,6 +613,12 @@ static void test_sockmap_skb_verdict_fionread(bool pass_prog) test_sockmap_drop_prog__destroy(drop); } +static void test_sockmap_skb_verdict_fionread(bool pass_prog) +{ + do_test_sockmap_skb_verdict_fionread(SOCK_STREAM, pass_prog); + do_test_sockmap_skb_verdict_fionread(SOCK_DGRAM, pass_prog); +} + static void test_sockmap_skb_verdict_change_tail(void) { struct test_sockmap_change_tail *skel; @@ -1042,6 +1064,169 @@ static void test_sockmap_vsock_unconnected(void) xclose(map); } +/* it used to reproduce WARNING */ +static void test_sockmap_zc(void) +{ + int map, err, sent, recvd, zero = 0, one = 1, on = 1; + char buf[10] = "0123456789", rcv[11], addr[100]; + struct test_sockmap_pass_prog *skel = NULL; + int c0 = -1, p0 = -1, c1 = -1, p1 = -1; + struct tcp_zerocopy_receive zc; + socklen_t zc_len = sizeof(zc); + struct bpf_program *prog; + + skel = test_sockmap_pass_prog__open_and_load(); + if (!ASSERT_OK_PTR(skel, "open_and_load")) + return; + + if (create_socket_pairs(AF_INET, SOCK_STREAM, &c0, &c1, &p0, &p1)) + goto end; + + prog = skel->progs.prog_skb_verdict_ingress; + map = bpf_map__fd(skel->maps.sock_map_rx); + + err = bpf_prog_attach(bpf_program__fd(prog), map, BPF_SK_SKB_STREAM_VERDICT, 0); + if (!ASSERT_OK(err, "bpf_prog_attach")) + goto end; + + err = bpf_map_update_elem(map, &zero, &p0, BPF_ANY); + if (!ASSERT_OK(err, "bpf_map_update_elem")) + goto end; + + err = bpf_map_update_elem(map, &one, &p1, BPF_ANY); + if (!ASSERT_OK(err, "bpf_map_update_elem")) + goto end; + + sent = xsend(c0, buf, sizeof(buf), 0); + if (!ASSERT_EQ(sent, sizeof(buf), "xsend")) + goto end; + + /* trigger tcp_bpf_recvmsg_parser and inc copied_seq of p1 */ + recvd = recv_timeout(p1, rcv, sizeof(rcv), MSG_DONTWAIT, 1); + if (!ASSERT_EQ(recvd, sent, "recv_timeout(p1)")) + goto end; + + /* uninstall sockmap of p1 */ + bpf_map_delete_elem(map, &one); + + /* trigger tcp stack and the rcv_nxt of p1 is less than copied_seq */ + sent = xsend(c1, buf, sizeof(buf) - 1, 0); + if (!ASSERT_EQ(sent, sizeof(buf) - 1, "xsend")) + goto end; + + err = setsockopt(p1, SOL_SOCKET, SO_ZEROCOPY, &on, sizeof(on)); + if (!ASSERT_OK(err, "setsockopt")) + goto end; + + memset(&zc, 0, sizeof(zc)); + zc.copybuf_address = (__u64)((unsigned long)addr); + zc.copybuf_len = sizeof(addr); + + err = getsockopt(p1, IPPROTO_TCP, TCP_ZEROCOPY_RECEIVE, &zc, &zc_len); + if (!ASSERT_OK(err, "getsockopt")) + goto end; + +end: + if (c0 >= 0) + close(c0); + if (p0 >= 0) + close(p0); + if (c1 >= 0) + close(c1); + if (p1 >= 0) + close(p1); + test_sockmap_pass_prog__destroy(skel); +} + +/* it used to check whether copied_seq of sk is correct */ +static void test_sockmap_copied_seq(bool strp) +{ + int i, map, err, sent, recvd, zero = 0, one = 1; + struct test_sockmap_pass_prog *skel = NULL; + int c0 = -1, p0 = -1, c1 = -1, p1 = -1; + char buf[10] = "0123456789", rcv[11]; + struct bpf_program *prog; + + skel = test_sockmap_pass_prog__open_and_load(); + if (!ASSERT_OK_PTR(skel, "open_and_load")) + return; + + if (create_socket_pairs(AF_INET, SOCK_STREAM, &c0, &c1, &p0, &p1)) + goto end; + + prog = skel->progs.prog_skb_verdict_ingress; + map = bpf_map__fd(skel->maps.sock_map_rx); + + err = bpf_prog_attach(bpf_program__fd(prog), map, BPF_SK_SKB_STREAM_VERDICT, 0); + if (!ASSERT_OK(err, "bpf_prog_attach verdict")) + goto end; + + if (strp) { + prog = skel->progs.prog_skb_parser; + err = bpf_prog_attach(bpf_program__fd(prog), map, BPF_SK_SKB_STREAM_PARSER, 0); + if (!ASSERT_OK(err, "bpf_prog_attach parser")) + goto end; + } + + err = bpf_map_update_elem(map, &zero, &p0, BPF_ANY); + if (!ASSERT_OK(err, "bpf_map_update_elem(p0)")) + goto end; + + err = bpf_map_update_elem(map, &one, &p1, BPF_ANY); + if (!ASSERT_OK(err, "bpf_map_update_elem(p1)")) + goto end; + + /* just trigger sockamp: data sent by c0 will be received by p1 */ + sent = xsend(c0, buf, sizeof(buf), 0); + if (!ASSERT_EQ(sent, sizeof(buf), "xsend(c0), bpf")) + goto end; + + recvd = recv_timeout(p1, rcv, sizeof(rcv), MSG_DONTWAIT, 1); + if (!ASSERT_EQ(recvd, sent, "recv_timeout(p1), bpf")) + goto end; + + /* uninstall sockmap of p1 and p0 */ + err = bpf_map_delete_elem(map, &one); + if (!ASSERT_OK(err, "bpf_map_delete_elem(1)")) + goto end; + + err = bpf_map_delete_elem(map, &zero); + if (!ASSERT_OK(err, "bpf_map_delete_elem(0)")) + goto end; + + /* now all sockets become plain socket, they should still work */ + for (i = 0; i < 5; i++) { + /* test copied_seq of p1 by running tcp native stack */ + sent = xsend(c1, buf, sizeof(buf), 0); + if (!ASSERT_EQ(sent, sizeof(buf), "xsend(c1), native")) + goto end; + + recvd = recv(p1, rcv, sizeof(rcv), MSG_DONTWAIT); + if (!ASSERT_EQ(recvd, sent, "recv_timeout(p1), native")) + goto end; + + /* p0 previously redirected skb to p1, we also check copied_seq of p0 */ + sent = xsend(c0, buf, sizeof(buf), 0); + if (!ASSERT_EQ(sent, sizeof(buf), "xsend(c0), native")) + goto end; + + recvd = recv(p0, rcv, sizeof(rcv), MSG_DONTWAIT); + if (!ASSERT_EQ(recvd, sent, "recv_timeout(p0), native")) + goto end; + } + +end: + if (c0 >= 0) + close(c0); + if (p0 >= 0) + close(p0); + if (c1 >= 0) + close(c1); + if (p1 >= 0) + close(p1); + test_sockmap_pass_prog__destroy(skel); +} + void test_sockmap_basic(void) { if (test__start_subtest("sockmap create_update_free")) @@ -1108,4 +1293,10 @@ void test_sockmap_basic(void) test_sockmap_skb_verdict_vsock_poll(); if (test__start_subtest("sockmap vsock unconnected")) test_sockmap_vsock_unconnected(); + if (test__start_subtest("sockmap with zc")) + test_sockmap_zc(); + if (test__start_subtest("sockmap recover")) + test_sockmap_copied_seq(false); + if (test__start_subtest("sockmap recover with strp")) + test_sockmap_copied_seq(true); } diff --git a/tools/testing/selftests/bpf/progs/test_sockmap_pass_prog.c b/tools/testing/selftests/bpf/progs/test_sockmap_pass_prog.c index 69aacc96db36..085d7e61e78d 100644 --- a/tools/testing/selftests/bpf/progs/test_sockmap_pass_prog.c +++ b/tools/testing/selftests/bpf/progs/test_sockmap_pass_prog.c @@ -44,4 +44,12 @@ int prog_skb_parser(struct __sk_buff *skb) return SK_PASS; } +SEC("sk_skb/stream_verdict") +int prog_skb_verdict_ingress(struct __sk_buff *skb) +{ + int one = 1; + + return bpf_sk_redirect_map(skb, &sock_map_rx, one, BPF_F_INGRESS); +} + char _license[] SEC("license") = "GPL";