Skip to content

Commit 5eac82a

Browse files
client: replace all internal Connection usage with ConnectionImpl
`Connection`s model shared pointers on `ConnectionImpl` and are intended for external usage by library consumers. Using them internally in the Connector can cause bugs. For instance, we internally store `Connection` objects in the LibevNetProvider, which adds an additional reference to them and prevents them from being deleted, even when all user objects are dead. Also we leak connections in `close`, since we do not erase them from the `m_ReadyToSend` and `m_ReadyToDecode` sets. To fix this, let's internally use `ConnectionImpl` to model weak pointers. We rely on the fact that the lifetime of these weak pointers is tied to the lifetime of the shared user objects. The ideal solution would be to remove the `Connection`<-`ConnectionImpl` constructor, but we still need to return new Connection objects in methods like waitAny, so let's just enforce as a rule that we should not use `Connection` objects internally. While we are here, let's also fix all the formating issues that the linter is reporting for the refactored code. Closes #140 Closes #147
1 parent 8e009e8 commit 5eac82a

File tree

5 files changed

+256
-221
lines changed

5 files changed

+256
-221
lines changed

src/Client/Connection.hpp

Lines changed: 109 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,21 @@ struct ConnectionImpl
7979
void ref();
8080
void unref();
8181

82+
typename NetProvider::Stream_t &get_strm() { return strm; }
83+
const typename NetProvider::Stream_t &get_strm() const { return strm; }
84+
85+
void setError(const std::string &msg, int errno_ = 0);
86+
87+
BUFFER &getInBuf();
88+
BUFFER &getOutBuf();
89+
90+
void prepare_auth(std::string_view user, std::string_view passwd);
91+
void commit_auth(std::string_view user, std::string_view passwd);
92+
8293
Connector<BUFFER, NetProvider> &connector;
8394
BUFFER inBuf;
95+
static constexpr size_t GC_STEP_CNT = 100;
96+
size_t gc_step = 0;
8497
BUFFER outBuf;
8598
RequestEncoder<BUFFER> enc;
8699
ResponseDecoder<BUFFER> dec;
@@ -112,7 +125,7 @@ ConnectionImpl<BUFFER, NetProvider>::~ConnectionImpl()
112125
{
113126
assert(refs == 0);
114127
if (!strm.has_status(SS_DEAD)) {
115-
connector.close(*this);
128+
connector.close(this);
116129
}
117130
}
118131

@@ -133,6 +146,42 @@ ConnectionImpl<BUFFER, NetProvider>::unref()
133146
delete this;
134147
}
135148

149+
template <class BUFFER, class NetProvider>
150+
void
151+
ConnectionImpl<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
152+
{
153+
error.emplace(msg, errno_);
154+
}
155+
156+
template <class BUFFER, class NetProvider>
157+
BUFFER &
158+
ConnectionImpl<BUFFER, NetProvider>::getInBuf()
159+
{
160+
return inBuf;
161+
}
162+
163+
template <class BUFFER, class NetProvider>
164+
BUFFER &
165+
ConnectionImpl<BUFFER, NetProvider>::getOutBuf()
166+
{
167+
return outBuf;
168+
}
169+
170+
template <class BUFFER, class NetProvider>
171+
void
172+
ConnectionImpl<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
173+
{
174+
enc.encodeAuth(user, passwd, greeting);
175+
}
176+
177+
template <class BUFFER, class NetProvider>
178+
void
179+
ConnectionImpl<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
180+
{
181+
enc.reencodeAuth(user, passwd, greeting);
182+
connector.readyToSend(this);
183+
}
184+
136185
/** Each connection is supposed to be bound to a single socket. */
137186
template<class BUFFER, class NetProvider>
138187
class Connection
@@ -212,44 +261,11 @@ class Connection
212261
BUFFER& getInBuf();
213262
BUFFER& getOutBuf();
214263

215-
template<class B, class N>
216-
friend
217-
void hasSentBytes(Connection<B, N> &conn, size_t bytes);
218-
219-
template<class B, class N>
220-
friend
221-
void hasNotRecvBytes(Connection<B, N> &conn, size_t bytes);
222-
223-
template<class B, class N>
224-
friend
225-
bool hasDataToSend(Connection<B, N> &conn);
226-
227-
template<class B, class N>
228-
friend
229-
bool hasDataToDecode(Connection<B, N> &conn);
230-
231-
template<class B, class N>
232-
friend
233-
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
234-
235-
template<class B, class N>
236-
friend
237-
void inputBufGC(Connection<B, N> &conn);
238-
239-
template<class B, class N>
240-
friend
241-
int decodeGreeting(Connection<B, N> &conn);
242-
243-
rid_t prepare_auth(std::string_view user,
244-
std::string_view passwd);
245-
246-
rid_t commit_auth(std::string_view user,
247-
std::string_view passwd);
264+
void prepare_auth(std::string_view user, std::string_view passwd);
265+
void commit_auth(std::string_view user, std::string_view passwd);
248266

249267
private:
250268
ConnectionImpl<BUFFER, NetProvider> *impl;
251-
static constexpr size_t GC_STEP_CNT = 100;
252-
size_t gc_step = 0;
253269

254270
template <class T>
255271
rid_t insert(const T &tuple, uint32_t space_id);
@@ -442,7 +458,7 @@ template<class BUFFER, class NetProvider>
442458
void
443459
Connection<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
444460
{
445-
impl->error.emplace(msg, errno_);
461+
impl->setError(msg, errno_);
446462
}
447463

448464
template<class BUFFER, class NetProvider>
@@ -471,73 +487,79 @@ template<class BUFFER, class NetProvider>
471487
BUFFER&
472488
Connection<BUFFER, NetProvider>::getInBuf()
473489
{
474-
return impl->inBuf;
490+
return impl->getInBuf();
475491
}
476492

477493
template<class BUFFER, class NetProvider>
478494
BUFFER&
479495
Connection<BUFFER, NetProvider>::getOutBuf()
480496
{
481-
return impl->outBuf;
497+
return impl->getOutBuf();
482498
}
483499

484-
template<class BUFFER, class NetProvider>
500+
template <class BUFFER, class NetProvider>
485501
void
486-
hasSentBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
502+
hasSentBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
487503
{
488504
//dropBack()/dropFront() interfaces require number of bytes be greater
489505
//than zero so let's check it first.
490506
if (bytes > 0)
491-
conn.impl->outBuf.dropFront(bytes);
507+
conn->getOutBuf().dropFront(bytes);
492508
}
493509

494-
template<class BUFFER, class NetProvider>
510+
template <class BUFFER, class NetProvider>
495511
void
496-
hasNotRecvBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
512+
hasNotRecvBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
497513
{
498514
if (bytes > 0)
499-
conn.impl->inBuf.dropBack(bytes);
515+
conn->getInBuf().dropBack(bytes);
500516
}
501517

502-
template<class BUFFER, class NetProvider>
518+
template <class BUFFER, class NetProvider>
503519
bool
504-
hasDataToSend(Connection<BUFFER, NetProvider> &conn)
520+
hasDataToSend(ConnectionImpl<BUFFER, NetProvider> *conn)
505521
{
506522
//We drop content of input buffer once it has been sent. So to detect
507523
//if there's any data to send it's enough to check buffer's emptiness.
508-
return !conn.impl->outBuf.empty();
524+
return !conn->getOutBuf().empty();
509525
}
510526

511-
template<class BUFFER, class NetProvider>
527+
template <class BUFFER, class NetProvider>
512528
bool
513529
hasDataToDecode(Connection<BUFFER, NetProvider> &conn)
514530
{
515-
assert(conn.impl->endDecoded < conn.impl->inBuf.end() ||
516-
conn.impl->endDecoded == conn.impl->inBuf.end());
517-
return conn.impl->endDecoded != conn.impl->inBuf.end();
531+
return hasDataToDecode(conn.getImpl());
518532
}
519533

520-
template<class BUFFER, class NetProvider>
534+
template <class BUFFER, class NetProvider>
535+
bool
536+
hasDataToDecode(ConnectionImpl<BUFFER, NetProvider> *conn)
537+
{
538+
assert(conn->endDecoded < conn->getInBuf().end() || conn->endDecoded == conn->getInBuf().end());
539+
return conn->endDecoded != conn->getInBuf().end();
540+
}
541+
542+
template <class BUFFER, class NetProvider>
521543
static void
522-
inputBufGC(Connection<BUFFER, NetProvider> &conn)
544+
inputBufGC(ConnectionImpl<BUFFER, NetProvider> *conn)
523545
{
524-
if ((conn.gc_step++ % Connection<BUFFER, NetProvider>::GC_STEP_CNT) == 0) {
525-
LOG_DEBUG("Flushed input buffer of the connection %p", &conn);
526-
conn.impl->inBuf.flush();
546+
if (conn->gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
547+
LOG_DEBUG("Flushed input buffer of the connection %p", conn);
548+
conn->getInBuf().flush();
527549
}
528550
}
529551

530-
template<class BUFFER, class NetProvider>
552+
template <class BUFFER, class NetProvider>
531553
DecodeStatus
532-
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
554+
processResponse(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync, Response<BUFFER> *result)
533555
{
534556
//Decode response. In case of success - fill in feature map
535557
//and adjust end-of-decoded data pointer. Call GC if needed.
536-
if (! conn.impl->inBuf.has(conn.impl->endDecoded, MP_RESPONSE_SIZE))
558+
if (!conn->getInBuf().has(conn->endDecoded, MP_RESPONSE_SIZE))
537559
return DECODE_NEEDMORE;
538560

539561
Response<BUFFER> response;
540-
response.size = conn.impl->dec.decodeResponseSize();
562+
response.size = conn->dec.decodeResponseSize();
541563
if (response.size < 0) {
542564
LOG_ERROR("Failed to decode response size");
543565
//In case of corrupted response size all other data in the buffer
@@ -548,55 +570,53 @@ processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BU
548570

549571
}
550572
response.size += MP_RESPONSE_SIZE;
551-
if (! conn.impl->inBuf.has(conn.impl->endDecoded, response.size)) {
573+
if (!conn->getInBuf().has(conn->endDecoded, response.size)) {
552574
//Response was received only partially. Reset decoder position
553575
//to the start of response to make this function re-entered.
554-
conn.impl->dec.reset(conn.impl->endDecoded);
576+
conn->dec.reset(conn->endDecoded);
555577
return DECODE_NEEDMORE;
556578
}
557-
if (conn.impl->dec.decodeResponse(response) != 0) {
558-
conn.setError("Failed to decode response, skipping bytes..");
559-
conn.impl->endDecoded += response.size;
579+
if (conn->dec.decodeResponse(response) != 0) {
580+
conn->setError("Failed to decode response, skipping bytes..");
581+
conn->endDecoded += response.size;
560582
return DECODE_ERR;
561583
}
562584
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
563585
response.header.code, ", schema=", response.header.schema_id);
564586
if (result != nullptr && response.header.sync == req_sync) {
565587
*result = std::move(response);
566588
} else {
567-
conn.impl->futures.insert({response.header.sync,
568-
std::move(response)});
589+
conn->futures.insert({response.header.sync, std::move(response)});
569590
}
570-
conn.impl->endDecoded += response.size;
591+
conn->endDecoded += response.size;
571592
inputBufGC(conn);
572593
return DECODE_SUCC;
573594
}
574595

575-
template<class BUFFER, class NetProvider>
596+
template <class BUFFER, class NetProvider>
576597
int
577-
decodeGreeting(Connection<BUFFER, NetProvider> &conn)
598+
decodeGreeting(ConnectionImpl<BUFFER, NetProvider> *conn)
578599
{
579600
//TODO: that's not zero-copy, should be rewritten in that pattern.
580-
assert(conn.getInBuf().has(conn.impl->endDecoded, Iproto::GREETING_SIZE));
601+
assert(conn->getInBuf().has(conn->endDecoded, Iproto::GREETING_SIZE));
581602
char greeting_buf[Iproto::GREETING_SIZE];
582-
conn.impl->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
583-
conn.impl->dec.reset(conn.impl->endDecoded);
584-
if (parseGreeting(std::string_view{greeting_buf, Iproto::GREETING_SIZE},
585-
conn.impl->greeting) != 0)
603+
conn->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
604+
conn->dec.reset(conn->endDecoded);
605+
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, conn->greeting) != 0)
586606
return -1;
587-
conn.impl->is_greeting_received = true;
588-
LOG_DEBUG("Version: ", conn.impl->greeting.version_id);
607+
conn->is_greeting_received = true;
608+
LOG_DEBUG("Version: ", conn->greeting.version_id);
589609

590610
#ifndef NDEBUG
591611
//print salt in hex format.
592612
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
593613
const char *hex = "0123456789abcdef";
594-
for (size_t i = 0; i < conn.impl->greeting.salt_size; i++) {
595-
uint8_t u = conn.impl->greeting.salt[i];
614+
for (size_t i = 0; i < conn->greeting.salt_size; i++) {
615+
uint8_t u = conn->greeting.salt[i];
596616
hex_salt[i * 2] = hex[u / 16];
597617
hex_salt[i * 2 + 1] = hex[u % 16];
598618
}
599-
hex_salt[conn.impl->greeting.salt_size * 2] = 0;
619+
hex_salt[conn->greeting.salt_size * 2] = 0;
600620
LOG_DEBUG("Salt: ", hex_salt);
601621
#endif
602622
return 0;
@@ -717,21 +737,16 @@ Connection<BUFFER, NetProvider>::select(const T &key, uint32_t space_id,
717737
return impl->enc.getSync();
718738
}
719739

720-
template<class BUFFER, class NetProvider>
721-
rid_t
722-
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user,
723-
std::string_view passwd)
740+
template <class BUFFER, class NetProvider>
741+
void
742+
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
724743
{
725-
impl->enc.encodeAuth(user, passwd, impl->greeting);
726-
return 0;
744+
impl->prepare_auth(user, passwd);
727745
}
728746

729-
template<class BUFFER, class NetProvider>
730-
rid_t
731-
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user,
732-
std::string_view passwd)
747+
template <class BUFFER, class NetProvider>
748+
void
749+
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
733750
{
734-
impl->enc.reencodeAuth(user, passwd, impl->greeting);;
735-
impl->connector.readyToSend(*this);
736-
return 0;
751+
impl->commit_auth(user, passwd);
737752
}

0 commit comments

Comments
 (0)