Skip to content

Commit 2b27352

Browse files
committed
CCBC-1672: handle case when packet gets discarded before IO calls completion handler
When the operation is getting retried, its buffers are getting relocated to new pipeline, and removed from the old one. But if the write operation still in the kernel and completion-based IO (e.g. windows IOCP) is being used, the state of the buffer manager of the old pipeline have to be adjusted. Change-Id: I950c519546d7d29a4ce05d02eb57c14e06b2e0e2 Reviewed-on: https://review.couchbase.org/c/libcouchbase/+/233237 Reviewed-by: Jared Casey <jared.casey@couchbase.com> Tested-by: Build Bot <build@couchbase.com>
1 parent 8d7e643 commit 2b27352

File tree

8 files changed

+167
-13
lines changed

8 files changed

+167
-13
lines changed

plugins/io/iocp/iocp_iops.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,8 @@ static void iops_dtor(lcb_io_opt_t iobase)
370370
{
371371
iocp_sockdata_t *sd = LCB_LIST_ITEM(cur, iocp_sockdata_t, list);
372372

373-
IOCP_LOG(IOCP_WARN, "Leak detected in socket %p (%lu). Refcount=%d", sd, sd->sSocket, sd->refcount);
373+
IOCP_LOG(IOCP_WARN, "Leak detected in socket %p (%lu). Refcount=%d", sd, (unsigned long)sd->sSocket,
374+
sd->refcount);
374375
if (sd->sSocket != INVALID_SOCKET) {
375376
closesocket(sd->sSocket);
376377
sd->sSocket = INVALID_SOCKET;

plugins/io/iocp/iocp_timer.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ iocp_timer_t *iocp_tmq_pop(lcb_list_t *list, lcb_uint64_t now)
6767

6868
void iocp_tmq_add(lcb_list_t *list, iocp_timer_t *timer)
6969
{
70-
IOCP_LOG(IOCP_TRACE, "Adding timer %p with ms %lu", timer, timer->ms);
70+
IOCP_LOG(IOCP_TRACE, "Adding timer %p with ms %lu", timer, (unsigned long)timer->ms);
7171
lcb_list_add_sorted(list, &timer->list, iocp_timer_cmp_asc);
7272
}
7373

src/mc/mcreq-flush-inl.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -89,11 +89,13 @@ static nb_SIZE mcreq__pktflush_callback(void *p, nb_SIZE hint, void *arg)
8989
*/
9090
static void mcreq_flush_done_ex(mc_PIPELINE *pl, unsigned nflushed, unsigned expected, lcb_U64 now)
9191
{
92+
unsigned nflushed_extra = 0; // number of bytes flushed by IO, but discarded already
9293
if (nflushed) {
9394
mc__FLUSHINFO info = {pl, now};
94-
netbuf_end_flush2(&pl->nbmgr, nflushed, mcreq__pktflush_callback, offsetof(mc_PACKET, sl_flushq), &info);
95+
nflushed_extra =
96+
netbuf_end_flush2(&pl->nbmgr, nflushed, mcreq__pktflush_callback, offsetof(mc_PACKET, sl_flushq), &info);
9597
}
96-
if (nflushed < expected) {
98+
if (nflushed < expected || nflushed_extra) {
9799
netbuf_reset_flush(&pl->nbmgr);
98100
}
99101
}

src/mc/mcreq.c

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,15 +366,16 @@ void mcreq_wipe_packet(mc_PIPELINE *pipeline, mc_PACKET *packet)
366366
{
367367
if (!(packet->flags & MCREQ_F_KEY_NOCOPY)) {
368368
if ((packet->flags & MCREQ_F_DETACHED)) {
369-
if (pipeline) {
370-
netbuf_cleanup_packet(&pipeline->nbmgr, packet);
371-
}
372369
free(SPAN_BUFFER(&packet->kh_span));
373370
} else {
374371
netbuf_mblock_release(&pipeline->nbmgr, &packet->kh_span);
375372
}
376373
}
377374

375+
if (pipeline) {
376+
netbuf_cleanup_packet(&pipeline->nbmgr, packet);
377+
}
378+
378379
if (!(packet->flags & MCREQ_F_HASVALUE)) {
379380
return;
380381
}

src/mcserver/mcserver.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1151,6 +1151,9 @@ void Server::handle_connected(lcbio_SOCKET *sock, lcb_STATUS err, lcbio_OSERR sy
11511151
lcbio_CTXPROCS procs{};
11521152
procs.cb_err = on_error;
11531153
procs.cb_read = on_read;
1154+
1155+
netbuf_reset_flush(&nbmgr);
1156+
11541157
procs.cb_flush_done = on_flush_done;
11551158
procs.cb_flush_ready = on_flush_ready;
11561159
connctx = lcbio_ctx_new(sock, this, &procs, "memcached");

src/netbuf/netbuf.c

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -605,7 +605,7 @@ nb_SIZE netbuf_start_flush(nb_MGR *mgr, nb_IOV *iovs, int niov, int *nused)
605605
return ret;
606606
}
607607

608-
void netbuf_end_flush(nb_MGR *mgr, unsigned int nflushed)
608+
unsigned int netbuf_end_flush(nb_MGR *mgr, unsigned int nflushed)
609609
{
610610
nb_SENDQ *q = &mgr->sendq;
611611
sllist_iterator iter;
@@ -635,7 +635,7 @@ void netbuf_end_flush(nb_MGR *mgr, unsigned int nflushed)
635635
break;
636636
}
637637
}
638-
lcb_assert(!nflushed);
638+
return nflushed;
639639
}
640640

641641
void netbuf_pdu_enqueue(nb_MGR *mgr, void *pdu, nb_SIZE lloff)
@@ -644,11 +644,17 @@ void netbuf_pdu_enqueue(nb_MGR *mgr, void *pdu, nb_SIZE lloff)
644644
sllist_append(&q->pdus, (sllist_node *)(void *)((char *)pdu + lloff));
645645
}
646646

647-
void netbuf_end_flush2(nb_MGR *mgr, unsigned int nflushed, nb_getsize_fn callback, nb_SIZE lloff, void *arg)
647+
unsigned netbuf_end_flush2(nb_MGR *mgr, unsigned int nflushed, nb_getsize_fn callback, nb_SIZE lloff, void *arg)
648648
{
649649
sllist_iterator iter;
650650
nb_SENDQ *q = &mgr->sendq;
651-
netbuf_end_flush(mgr, nflushed);
651+
652+
unsigned nflushed_extra = netbuf_end_flush(mgr, nflushed);
653+
654+
/* some operations has been discarded while waiting for IO,
655+
* adjusting nflushed to account for the extra */
656+
lcb_assert(nflushed_extra <= nflushed);
657+
nflushed -= nflushed_extra;
652658

653659
/** Add to the nflushed overflow from last call */
654660
nflushed += q->pdu_offset;
@@ -672,6 +678,8 @@ void netbuf_end_flush2(nb_MGR *mgr, unsigned int nflushed, nb_getsize_fn callbac
672678

673679
/** Store the remainder of data that wasn't processed for next call */
674680
q->pdu_offset = nflushed;
681+
682+
return nflushed_extra;
675683
}
676684

677685
/******************************************************************************
@@ -741,6 +749,9 @@ void netbuf_cleanup_packet(nb_MGR *mgr, const void *packet)
741749
if (e->parent == packet) {
742750
sllist_iter_remove(&mgr->sendq.pending, &iter);
743751
mblock_release_ptr(&mgr->sendq.elempool, (char *)e, sizeof(*e));
752+
if (e == mgr->sendq.last_requested) {
753+
netbuf_reset_flush(mgr);
754+
}
744755
}
745756
}
746757
}

src/netbuf/netbuf.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -280,7 +280,7 @@ nb_SIZE netbuf_start_flush(nb_MGR *mgr, nb_IOV *iovs, int niov, int *nused);
280280
* @param mgr the manager object
281281
* @param nflushed how much data in bytes was flushed to the network.
282282
*/
283-
void netbuf_end_flush(nb_MGR *mgr, nb_SIZE nflushed);
283+
unsigned netbuf_end_flush(nb_MGR *mgr, nb_SIZE nflushed);
284284

285285
/**
286286
* Reset the flush context for the buffer. This should be called only when the
@@ -405,7 +405,7 @@ void netbuf_pdu_enqueue(nb_MGR *mgr, void *pdu, nb_SIZE lloff);
405405
*/
406406
typedef nb_SIZE (*nb_getsize_fn)(void *pdu, nb_SIZE remaining, void *arg);
407407

408-
void netbuf_end_flush2(nb_MGR *mgr, unsigned int nflushed, nb_getsize_fn callback, nb_SIZE lloff, void *arg);
408+
unsigned netbuf_end_flush2(nb_MGR *mgr, unsigned int nflushed, nb_getsize_fn callback, nb_SIZE lloff, void *arg);
409409

410410
/**
411411
* Ensures that the given internal structures of the manager are not allocated

tests/basic/t_netbuf.cc

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,11 @@
2424
#endif
2525
#include "netbuf/netbuf.h"
2626

27+
#include "sllist.h"
28+
#include "sllist-inl.h"
29+
30+
#include <array>
31+
2732
#define BIG_BUF_SIZE 5000
2833
#define SMALL_BUF_SIZE 50
2934

@@ -456,3 +461,134 @@ TEST_F(NetbufTest, testOutOfOrder)
456461

457462
clean_check(&mgr);
458463
}
464+
465+
struct my_PACKET {
466+
sllist_node slnode{nullptr};
467+
nb_SPAN key_{};
468+
nb_SPAN value_{};
469+
bool is_flushed_{false};
470+
471+
my_PACKET(nb_MGR *mgr, std::string key, std::string value)
472+
{
473+
key_.size = key.size();
474+
netbuf_mblock_reserve(mgr, &key_);
475+
476+
value_.size = value.size();
477+
netbuf_mblock_reserve(mgr, &value_);
478+
}
479+
480+
void remove_pdu_from(nb_MGR *mgr)
481+
{
482+
sllist_iterator iter;
483+
SLLIST_ITERFOR(&mgr->sendq.pdus, &iter)
484+
{
485+
my_PACKET *el = SLLIST_ITEM(iter.cur, my_PACKET, slnode);
486+
if (el == this) {
487+
sllist_iter_remove(&mgr->sendq.pdus, &iter);
488+
}
489+
}
490+
}
491+
492+
[[nodiscard]] auto key() -> nb_SPAN *
493+
{
494+
return &key_;
495+
}
496+
497+
[[nodiscard]] auto value() -> nb_SPAN *
498+
{
499+
return &value_;
500+
}
501+
502+
[[nodiscard]] auto is_flushed() const -> bool
503+
{
504+
return is_flushed_;
505+
}
506+
507+
void mark_as_flushed()
508+
{
509+
is_flushed_ = true;
510+
}
511+
512+
[[nodiscard]] auto size() const -> std::size_t
513+
{
514+
return key_.size + value_.size;
515+
}
516+
};
517+
518+
static nb_SIZE packet_flush_callback(void *pdu, nb_SIZE hint, void * /* arg */)
519+
{
520+
my_PACKET *packet = (my_PACKET *)pdu;
521+
if (hint >= packet->size()) {
522+
packet->mark_as_flushed();
523+
}
524+
return packet->size();
525+
}
526+
527+
TEST_F(NetbufTest, testPacketCleanup)
528+
{
529+
nb_MGR mgr;
530+
nb_SETTINGS settings;
531+
netbuf_default_settings(&settings);
532+
settings.data_basealloc = 1;
533+
netbuf_init(&mgr, &settings);
534+
535+
std::array<my_PACKET, 3> packets{
536+
my_PACKET{&mgr, "key_1", "value_1"},
537+
my_PACKET{&mgr, "key_2", "value_2"},
538+
my_PACKET{&mgr, "key_3", "value_3"},
539+
};
540+
541+
/* enqueue first two packets */
542+
for (std::size_t i = 0; i < 2; ++i) {
543+
netbuf_enqueue_span(&mgr, packets[i].key(), &packets[i]);
544+
netbuf_enqueue_span(&mgr, packets[i].value(), &packets[i]);
545+
netbuf_pdu_enqueue(&mgr, &packets[i], offsetof(my_PACKET, slnode));
546+
}
547+
548+
nb_IOV iov[10];
549+
nb_SIZE to_flush;
550+
551+
/* start flushing first two packets */
552+
to_flush = netbuf_start_flush(&mgr, iov, 4, NULL);
553+
ASSERT_EQ(packets[0].size() + packets[1].size(), to_flush);
554+
555+
/* discard second packet
556+
*
557+
* this simulates network failure and relocation of the packet
558+
* to some other pipeline while the IO still being processed by
559+
* the OS kernel
560+
*/
561+
packets[1].remove_pdu_from(&mgr);
562+
netbuf_cleanup_packet(&mgr, &packets[1]);
563+
564+
/*
565+
* OS kernel returned and completed flushing
566+
*/
567+
netbuf_end_flush2(&mgr, to_flush, packet_flush_callback, offsetof(my_PACKET, slnode), NULL);
568+
ASSERT_TRUE(packets[0].is_flushed());
569+
ASSERT_FALSE(packets[1].is_flushed());
570+
ASSERT_FALSE(packets[2].is_flushed());
571+
572+
/*
573+
* enqueue last packet
574+
*/
575+
netbuf_enqueue_span(&mgr, packets[2].key(), &packets[2]);
576+
netbuf_enqueue_span(&mgr, packets[2].value(), &packets[2]);
577+
netbuf_pdu_enqueue(&mgr, &packets[2], offsetof(my_PACKET, slnode));
578+
579+
/* start flushing last packet */
580+
to_flush = netbuf_start_flush(&mgr, iov, 10, NULL);
581+
ASSERT_EQ(packets[2].size(), to_flush);
582+
583+
netbuf_end_flush2(&mgr, to_flush, packet_flush_callback, 0, NULL);
584+
ASSERT_TRUE(packets[0].is_flushed());
585+
ASSERT_FALSE(packets[1].is_flushed());
586+
ASSERT_TRUE(packets[2].is_flushed());
587+
588+
for (auto &packet : packets) {
589+
netbuf_mblock_release(&mgr, packet.key());
590+
netbuf_mblock_release(&mgr, packet.value());
591+
}
592+
593+
clean_check(&mgr);
594+
}

0 commit comments

Comments
 (0)