Skip to content

Commit 277d5d2

Browse files
client: replace all internal Connection usage with ConnectionImpl
`Connection`s model shared pointers on `ConnectionImpl` and are intended for external usage by library consumers. Using them internally in the Connector can cause bugs. For instance, we internally store `Connection` objects in the LibevNetProvider, which adds an additional reference to them and prevents them from being deleted, even when all user objects are dead. Also we leak connections in `close`, since we do not erase them from the `m_ReadyToSend` and `m_ReadyToDecode` sets. To fix this, let's internally use `ConnectionImpl` to model weak pointers. We rely on the fact that the lifetime of these weak pointers is tied to the lifetime of the shared user objects. The ideal solution would be to remove the `Connection`<-`ConnectionImpl` constructor, but we still need to return new Connection objects in methods like waitAny, so let's just enforce as a rule that we should not use `Connection` objects internally. While we are here, let's also fix all the formating issues that the linter is reporting for the refactored code. Closes #140 Closes #147
1 parent 2184427 commit 277d5d2

File tree

5 files changed

+256
-219
lines changed

5 files changed

+256
-219
lines changed

src/Client/Connection.hpp

Lines changed: 109 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,21 @@ struct ConnectionImpl
8080
void ref();
8181
void unref();
8282

83+
typename NetProvider::Stream_t &get_strm() { return strm; }
84+
const typename NetProvider::Stream_t &get_strm() const { return strm; }
85+
86+
void setError(const std::string &msg, int errno_ = 0);
87+
88+
BUFFER &getInBuf();
89+
BUFFER &getOutBuf();
90+
91+
void prepare_auth(std::string_view user, std::string_view passwd);
92+
void commit_auth(std::string_view user, std::string_view passwd);
93+
8394
Connector<BUFFER, NetProvider> &connector;
8495
BUFFER inBuf;
96+
static constexpr size_t GC_STEP_CNT = 100;
97+
size_t gc_step = 0;
8598
BUFFER outBuf;
8699
RequestEncoder<BUFFER> enc;
87100
ResponseDecoder<BUFFER> dec;
@@ -113,7 +126,7 @@ ConnectionImpl<BUFFER, NetProvider>::~ConnectionImpl()
113126
{
114127
assert(refs == 0);
115128
if (!strm.has_status(SS_DEAD)) {
116-
connector.close(*this);
129+
connector.close(this);
117130
}
118131
}
119132

@@ -134,6 +147,42 @@ ConnectionImpl<BUFFER, NetProvider>::unref()
134147
delete this;
135148
}
136149

150+
template <class BUFFER, class NetProvider>
151+
void
152+
ConnectionImpl<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
153+
{
154+
error.emplace(msg, errno_);
155+
}
156+
157+
template <class BUFFER, class NetProvider>
158+
BUFFER &
159+
ConnectionImpl<BUFFER, NetProvider>::getInBuf()
160+
{
161+
return inBuf;
162+
}
163+
164+
template <class BUFFER, class NetProvider>
165+
BUFFER &
166+
ConnectionImpl<BUFFER, NetProvider>::getOutBuf()
167+
{
168+
return outBuf;
169+
}
170+
171+
template <class BUFFER, class NetProvider>
172+
void
173+
ConnectionImpl<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
174+
{
175+
enc.encodeAuth(user, passwd, greeting);
176+
}
177+
178+
template <class BUFFER, class NetProvider>
179+
void
180+
ConnectionImpl<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
181+
{
182+
enc.reencodeAuth(user, passwd, greeting);
183+
connector.readyToSend(this);
184+
}
185+
137186
/** Each connection is supposed to be bound to a single socket. */
138187
template<class BUFFER, class NetProvider>
139188
class Connection
@@ -213,44 +262,11 @@ class Connection
213262
BUFFER& getInBuf();
214263
BUFFER& getOutBuf();
215264

216-
template<class B, class N>
217-
friend
218-
void hasSentBytes(Connection<B, N> &conn, size_t bytes);
219-
220-
template<class B, class N>
221-
friend
222-
void hasNotRecvBytes(Connection<B, N> &conn, size_t bytes);
223-
224-
template<class B, class N>
225-
friend
226-
bool hasDataToSend(Connection<B, N> &conn);
227-
228-
template<class B, class N>
229-
friend
230-
bool hasDataToDecode(Connection<B, N> &conn);
231-
232-
template<class B, class N>
233-
friend
234-
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
235-
236-
template<class B, class N>
237-
friend
238-
void inputBufGC(Connection<B, N> &conn);
239-
240-
template<class B, class N>
241-
friend
242-
int decodeGreeting(Connection<B, N> &conn);
243-
244-
rid_t prepare_auth(std::string_view user,
245-
std::string_view passwd);
246-
247-
rid_t commit_auth(std::string_view user,
248-
std::string_view passwd);
265+
void prepare_auth(std::string_view user, std::string_view passwd);
266+
void commit_auth(std::string_view user, std::string_view passwd);
249267

250268
private:
251269
ConnectionImpl<BUFFER, NetProvider> *impl;
252-
static constexpr size_t GC_STEP_CNT = 100;
253-
size_t gc_step = 0;
254270

255271
template <class T>
256272
rid_t insert(const T &tuple, uint32_t space_id);
@@ -443,7 +459,7 @@ template<class BUFFER, class NetProvider>
443459
void
444460
Connection<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
445461
{
446-
impl->error.emplace(msg, errno_);
462+
impl->setError(msg, errno_);
447463
}
448464

449465
template<class BUFFER, class NetProvider>
@@ -472,73 +488,79 @@ template<class BUFFER, class NetProvider>
472488
BUFFER&
473489
Connection<BUFFER, NetProvider>::getInBuf()
474490
{
475-
return impl->inBuf;
491+
return impl->getInBuf();
476492
}
477493

478494
template<class BUFFER, class NetProvider>
479495
BUFFER&
480496
Connection<BUFFER, NetProvider>::getOutBuf()
481497
{
482-
return impl->outBuf;
498+
return impl->getOutBuf();
483499
}
484500

485-
template<class BUFFER, class NetProvider>
501+
template <class BUFFER, class NetProvider>
486502
void
487-
hasSentBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
503+
hasSentBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
488504
{
489505
//dropBack()/dropFront() interfaces require number of bytes be greater
490506
//than zero so let's check it first.
491507
if (bytes > 0)
492-
conn.impl->outBuf.dropFront(bytes);
508+
conn->getOutBuf().dropFront(bytes);
493509
}
494510

495-
template<class BUFFER, class NetProvider>
511+
template <class BUFFER, class NetProvider>
496512
void
497-
hasNotRecvBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
513+
hasNotRecvBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
498514
{
499515
if (bytes > 0)
500-
conn.impl->inBuf.dropBack(bytes);
516+
conn->getInBuf().dropBack(bytes);
501517
}
502518

503-
template<class BUFFER, class NetProvider>
519+
template <class BUFFER, class NetProvider>
504520
bool
505-
hasDataToSend(Connection<BUFFER, NetProvider> &conn)
521+
hasDataToSend(ConnectionImpl<BUFFER, NetProvider> *conn)
506522
{
507523
//We drop content of input buffer once it has been sent. So to detect
508524
//if there's any data to send it's enough to check buffer's emptiness.
509-
return !conn.impl->outBuf.empty();
525+
return !conn->getOutBuf().empty();
510526
}
511527

512-
template<class BUFFER, class NetProvider>
528+
template <class BUFFER, class NetProvider>
513529
bool
514530
hasDataToDecode(Connection<BUFFER, NetProvider> &conn)
515531
{
516-
assert(conn.impl->endDecoded < conn.impl->inBuf.end() ||
517-
conn.impl->endDecoded == conn.impl->inBuf.end());
518-
return conn.impl->endDecoded != conn.impl->inBuf.end();
532+
return hasDataToDecode(conn.getImpl());
519533
}
520534

521-
template<class BUFFER, class NetProvider>
535+
template <class BUFFER, class NetProvider>
536+
bool
537+
hasDataToDecode(ConnectionImpl<BUFFER, NetProvider> *conn)
538+
{
539+
assert(conn->endDecoded < conn->getInBuf().end() || conn->endDecoded == conn->getInBuf().end());
540+
return conn->endDecoded != conn->getInBuf().end();
541+
}
542+
543+
template <class BUFFER, class NetProvider>
522544
static void
523-
inputBufGC(Connection<BUFFER, NetProvider> &conn)
545+
inputBufGC(ConnectionImpl<BUFFER, NetProvider> *conn)
524546
{
525-
if ((conn.gc_step++ % Connection<BUFFER, NetProvider>::GC_STEP_CNT) == 0) {
526-
TNT_LOG_DEBUG("Flushed input buffer of the connection %p", &conn);
527-
conn.impl->inBuf.flush();
547+
if (conn->gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
548+
TNT_LOG_DEBUG("Flushed input buffer of the connection %p", conn);
549+
conn->getInBuf().flush();
528550
}
529551
}
530552

531-
template<class BUFFER, class NetProvider>
553+
template <class BUFFER, class NetProvider>
532554
DecodeStatus
533-
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
555+
processResponse(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync, Response<BUFFER> *result)
534556
{
535557
//Decode response. In case of success - fill in feature map
536558
//and adjust end-of-decoded data pointer. Call GC if needed.
537-
if (! conn.impl->inBuf.has(conn.impl->endDecoded, MP_RESPONSE_SIZE))
559+
if (!conn->getInBuf().has(conn->endDecoded, MP_RESPONSE_SIZE))
538560
return DECODE_NEEDMORE;
539561

540562
Response<BUFFER> response;
541-
response.size = conn.impl->dec.decodeResponseSize();
563+
response.size = conn->dec.decodeResponseSize();
542564
if (response.size < 0) {
543565
TNT_LOG_ERROR("Failed to decode response size");
544566
//In case of corrupted response size all other data in the buffer
@@ -549,55 +571,53 @@ processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BU
549571

550572
}
551573
response.size += MP_RESPONSE_SIZE;
552-
if (! conn.impl->inBuf.has(conn.impl->endDecoded, response.size)) {
574+
if (!conn->getInBuf().has(conn->endDecoded, response.size)) {
553575
//Response was received only partially. Reset decoder position
554576
//to the start of response to make this function re-entered.
555-
conn.impl->dec.reset(conn.impl->endDecoded);
577+
conn->dec.reset(conn->endDecoded);
556578
return DECODE_NEEDMORE;
557579
}
558-
if (conn.impl->dec.decodeResponse(response) != 0) {
559-
conn.setError("Failed to decode response, skipping bytes..");
560-
conn.impl->endDecoded += response.size;
580+
if (conn->dec.decodeResponse(response) != 0) {
581+
conn->setError("Failed to decode response, skipping bytes..");
582+
conn->endDecoded += response.size;
561583
return DECODE_ERR;
562584
}
563585
TNT_LOG_DEBUG("Header: sync=", response.header.sync, ", code=", response.header.code,
564586
", schema=", response.header.schema_id);
565587
if (result != nullptr && response.header.sync == req_sync) {
566588
*result = std::move(response);
567589
} else {
568-
conn.impl->futures.insert({response.header.sync,
569-
std::move(response)});
590+
conn->futures.insert({response.header.sync, std::move(response)});
570591
}
571-
conn.impl->endDecoded += response.size;
592+
conn->endDecoded += response.size;
572593
inputBufGC(conn);
573594
return DECODE_SUCC;
574595
}
575596

576-
template<class BUFFER, class NetProvider>
597+
template <class BUFFER, class NetProvider>
577598
int
578-
decodeGreeting(Connection<BUFFER, NetProvider> &conn)
599+
decodeGreeting(ConnectionImpl<BUFFER, NetProvider> *conn)
579600
{
580601
//TODO: that's not zero-copy, should be rewritten in that pattern.
581-
assert(conn.getInBuf().has(conn.impl->endDecoded, Iproto::GREETING_SIZE));
602+
assert(conn->getInBuf().has(conn->endDecoded, Iproto::GREETING_SIZE));
582603
char greeting_buf[Iproto::GREETING_SIZE];
583-
conn.impl->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
584-
conn.impl->dec.reset(conn.impl->endDecoded);
585-
if (parseGreeting(std::string_view{greeting_buf, Iproto::GREETING_SIZE},
586-
conn.impl->greeting) != 0)
604+
conn->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
605+
conn->dec.reset(conn->endDecoded);
606+
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, conn->greeting) != 0)
587607
return -1;
588-
conn.impl->is_greeting_received = true;
589-
TNT_LOG_DEBUG("Version: ", conn.impl->greeting.version_id);
608+
conn->is_greeting_received = true;
609+
TNT_LOG_DEBUG("Version: ", conn->greeting.version_id);
590610

591611
#ifndef NDEBUG
592612
//print salt in hex format.
593613
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
594614
const char *hex = "0123456789abcdef";
595-
for (size_t i = 0; i < conn.impl->greeting.salt_size; i++) {
596-
uint8_t u = conn.impl->greeting.salt[i];
615+
for (size_t i = 0; i < conn->greeting.salt_size; i++) {
616+
uint8_t u = conn->greeting.salt[i];
597617
hex_salt[i * 2] = hex[u / 16];
598618
hex_salt[i * 2 + 1] = hex[u % 16];
599619
}
600-
hex_salt[conn.impl->greeting.salt_size * 2] = 0;
620+
hex_salt[conn->greeting.salt_size * 2] = 0;
601621
TNT_LOG_DEBUG("Salt: ", hex_salt);
602622
#endif
603623
return 0;
@@ -718,21 +738,16 @@ Connection<BUFFER, NetProvider>::select(const T &key, uint32_t space_id,
718738
return impl->enc.getSync();
719739
}
720740

721-
template<class BUFFER, class NetProvider>
722-
rid_t
723-
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user,
724-
std::string_view passwd)
741+
template <class BUFFER, class NetProvider>
742+
void
743+
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
725744
{
726-
impl->enc.encodeAuth(user, passwd, impl->greeting);
727-
return 0;
745+
impl->prepare_auth(user, passwd);
728746
}
729747

730-
template<class BUFFER, class NetProvider>
731-
rid_t
732-
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user,
733-
std::string_view passwd)
748+
template <class BUFFER, class NetProvider>
749+
void
750+
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
734751
{
735-
impl->enc.reencodeAuth(user, passwd, impl->greeting);;
736-
impl->connector.readyToSend(*this);
737-
return 0;
752+
impl->commit_auth(user, passwd);
738753
}

0 commit comments

Comments
 (0)