Skip to content

Commit ee2f9f0

Browse files
client: replace all Connection usage with ConnectionImpl
Closes #140
1 parent 8e009e8 commit ee2f9f0

File tree

4 files changed

+230
-215
lines changed

4 files changed

+230
-215
lines changed

src/Client/Connection.hpp

Lines changed: 107 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -79,8 +79,20 @@ struct ConnectionImpl
7979
void ref();
8080
void unref();
8181

82+
typename NetProvider::Stream_t &get_strm() { return strm; }
83+
const typename NetProvider::Stream_t &get_strm() const { return strm; }
84+
85+
void setError(const std::string &msg, int errno_ = 0);
86+
BUFFER &getInBuf();
87+
BUFFER &getOutBuf();
88+
89+
void prepare_auth(std::string_view user, std::string_view passwd);
90+
void commit_auth(std::string_view user, std::string_view passwd);
91+
8292
Connector<BUFFER, NetProvider> &connector;
8393
BUFFER inBuf;
94+
static constexpr size_t GC_STEP_CNT = 100;
95+
size_t gc_step = 0;
8496
BUFFER outBuf;
8597
RequestEncoder<BUFFER> enc;
8698
ResponseDecoder<BUFFER> dec;
@@ -133,6 +145,42 @@ ConnectionImpl<BUFFER, NetProvider>::unref()
133145
delete this;
134146
}
135147

148+
template <class BUFFER, class NetProvider>
149+
void
150+
ConnectionImpl<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
151+
{
152+
error.emplace(msg, errno_);
153+
}
154+
155+
template <class BUFFER, class NetProvider>
156+
BUFFER &
157+
ConnectionImpl<BUFFER, NetProvider>::getInBuf()
158+
{
159+
return inBuf;
160+
}
161+
162+
template <class BUFFER, class NetProvider>
163+
BUFFER &
164+
ConnectionImpl<BUFFER, NetProvider>::getOutBuf()
165+
{
166+
return outBuf;
167+
}
168+
169+
template <class BUFFER, class NetProvider>
170+
void
171+
ConnectionImpl<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
172+
{
173+
enc.encodeAuth(user, passwd, greeting);
174+
}
175+
176+
template <class BUFFER, class NetProvider>
177+
void
178+
ConnectionImpl<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
179+
{
180+
enc.reencodeAuth(user, passwd, greeting);
181+
connector.readyToSend(this);
182+
}
183+
136184
/** Each connection is supposed to be bound to a single socket. */
137185
template<class BUFFER, class NetProvider>
138186
class Connection
@@ -212,44 +260,11 @@ class Connection
212260
BUFFER& getInBuf();
213261
BUFFER& getOutBuf();
214262

215-
template<class B, class N>
216-
friend
217-
void hasSentBytes(Connection<B, N> &conn, size_t bytes);
218-
219-
template<class B, class N>
220-
friend
221-
void hasNotRecvBytes(Connection<B, N> &conn, size_t bytes);
222-
223-
template<class B, class N>
224-
friend
225-
bool hasDataToSend(Connection<B, N> &conn);
226-
227-
template<class B, class N>
228-
friend
229-
bool hasDataToDecode(Connection<B, N> &conn);
230-
231-
template<class B, class N>
232-
friend
233-
enum DecodeStatus processResponse(Connection<B, N> &conn, int req_sync, Response<B> *result);
234-
235-
template<class B, class N>
236-
friend
237-
void inputBufGC(Connection<B, N> &conn);
238-
239-
template<class B, class N>
240-
friend
241-
int decodeGreeting(Connection<B, N> &conn);
242-
243-
rid_t prepare_auth(std::string_view user,
244-
std::string_view passwd);
245-
246-
rid_t commit_auth(std::string_view user,
247-
std::string_view passwd);
263+
void prepare_auth(std::string_view user, std::string_view passwd);
264+
void commit_auth(std::string_view user, std::string_view passwd);
248265

249266
private:
250267
ConnectionImpl<BUFFER, NetProvider> *impl;
251-
static constexpr size_t GC_STEP_CNT = 100;
252-
size_t gc_step = 0;
253268

254269
template <class T>
255270
rid_t insert(const T &tuple, uint32_t space_id);
@@ -442,7 +457,7 @@ template<class BUFFER, class NetProvider>
442457
void
443458
Connection<BUFFER, NetProvider>::setError(const std::string &msg, int errno_)
444459
{
445-
impl->error.emplace(msg, errno_);
460+
impl->setError(msg, errno_);
446461
}
447462

448463
template<class BUFFER, class NetProvider>
@@ -471,73 +486,79 @@ template<class BUFFER, class NetProvider>
471486
BUFFER&
472487
Connection<BUFFER, NetProvider>::getInBuf()
473488
{
474-
return impl->inBuf;
489+
return impl->getInBuf();
475490
}
476491

477492
template<class BUFFER, class NetProvider>
478493
BUFFER&
479494
Connection<BUFFER, NetProvider>::getOutBuf()
480495
{
481-
return impl->outBuf;
496+
return impl->getOutBuf();
482497
}
483498

484-
template<class BUFFER, class NetProvider>
499+
template <class BUFFER, class NetProvider>
485500
void
486-
hasSentBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
501+
hasSentBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
487502
{
488503
//dropBack()/dropFront() interfaces require number of bytes be greater
489504
//than zero so let's check it first.
490505
if (bytes > 0)
491-
conn.impl->outBuf.dropFront(bytes);
506+
conn->getOutBuf().dropFront(bytes);
492507
}
493508

494-
template<class BUFFER, class NetProvider>
509+
template <class BUFFER, class NetProvider>
495510
void
496-
hasNotRecvBytes(Connection<BUFFER, NetProvider> &conn, size_t bytes)
511+
hasNotRecvBytes(ConnectionImpl<BUFFER, NetProvider> *conn, size_t bytes)
497512
{
498513
if (bytes > 0)
499-
conn.impl->inBuf.dropBack(bytes);
514+
conn->getInBuf().dropBack(bytes);
500515
}
501516

502-
template<class BUFFER, class NetProvider>
517+
template <class BUFFER, class NetProvider>
503518
bool
504-
hasDataToSend(Connection<BUFFER, NetProvider> &conn)
519+
hasDataToSend(ConnectionImpl<BUFFER, NetProvider> *conn)
505520
{
506521
//We drop content of input buffer once it has been sent. So to detect
507522
//if there's any data to send it's enough to check buffer's emptiness.
508-
return !conn.impl->outBuf.empty();
523+
return !conn->getOutBuf().empty();
509524
}
510525

511-
template<class BUFFER, class NetProvider>
526+
template <class BUFFER, class NetProvider>
512527
bool
513528
hasDataToDecode(Connection<BUFFER, NetProvider> &conn)
514529
{
515-
assert(conn.impl->endDecoded < conn.impl->inBuf.end() ||
516-
conn.impl->endDecoded == conn.impl->inBuf.end());
517-
return conn.impl->endDecoded != conn.impl->inBuf.end();
530+
return hasDataToDecode(conn.getImpl());
518531
}
519532

520-
template<class BUFFER, class NetProvider>
533+
template <class BUFFER, class NetProvider>
534+
bool
535+
hasDataToDecode(ConnectionImpl<BUFFER, NetProvider> *conn)
536+
{
537+
assert(conn->endDecoded < conn->getInBuf().end() || conn->endDecoded == conn->getInBuf().end());
538+
return conn->endDecoded != conn->getInBuf().end();
539+
}
540+
541+
template <class BUFFER, class NetProvider>
521542
static void
522-
inputBufGC(Connection<BUFFER, NetProvider> &conn)
543+
inputBufGC(ConnectionImpl<BUFFER, NetProvider> *conn)
523544
{
524-
if ((conn.gc_step++ % Connection<BUFFER, NetProvider>::GC_STEP_CNT) == 0) {
525-
LOG_DEBUG("Flushed input buffer of the connection %p", &conn);
526-
conn.impl->inBuf.flush();
545+
if (conn->gc_step++ % ConnectionImpl<BUFFER, NetProvider>::GC_STEP_CNT == 0) {
546+
LOG_DEBUG("Flushed input buffer of the connection %p", conn);
547+
conn->getInBuf().flush();
527548
}
528549
}
529550

530-
template<class BUFFER, class NetProvider>
551+
template <class BUFFER, class NetProvider>
531552
DecodeStatus
532-
processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BUFFER> *result)
553+
processResponse(ConnectionImpl<BUFFER, NetProvider> *conn, int req_sync, Response<BUFFER> *result)
533554
{
534555
//Decode response. In case of success - fill in feature map
535556
//and adjust end-of-decoded data pointer. Call GC if needed.
536-
if (! conn.impl->inBuf.has(conn.impl->endDecoded, MP_RESPONSE_SIZE))
557+
if (!conn->getInBuf().has(conn->endDecoded, MP_RESPONSE_SIZE))
537558
return DECODE_NEEDMORE;
538559

539560
Response<BUFFER> response;
540-
response.size = conn.impl->dec.decodeResponseSize();
561+
response.size = conn->dec.decodeResponseSize();
541562
if (response.size < 0) {
542563
LOG_ERROR("Failed to decode response size");
543564
//In case of corrupted response size all other data in the buffer
@@ -548,55 +569,53 @@ processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BU
548569

549570
}
550571
response.size += MP_RESPONSE_SIZE;
551-
if (! conn.impl->inBuf.has(conn.impl->endDecoded, response.size)) {
572+
if (!conn->getInBuf().has(conn->endDecoded, response.size)) {
552573
//Response was received only partially. Reset decoder position
553574
//to the start of response to make this function re-entered.
554-
conn.impl->dec.reset(conn.impl->endDecoded);
575+
conn->dec.reset(conn->endDecoded);
555576
return DECODE_NEEDMORE;
556577
}
557-
if (conn.impl->dec.decodeResponse(response) != 0) {
558-
conn.setError("Failed to decode response, skipping bytes..");
559-
conn.impl->endDecoded += response.size;
578+
if (conn->dec.decodeResponse(response) != 0) {
579+
conn->setError("Failed to decode response, skipping bytes..");
580+
conn->endDecoded += response.size;
560581
return DECODE_ERR;
561582
}
562583
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
563584
response.header.code, ", schema=", response.header.schema_id);
564585
if (result != nullptr && response.header.sync == req_sync) {
565586
*result = std::move(response);
566587
} else {
567-
conn.impl->futures.insert({response.header.sync,
568-
std::move(response)});
588+
conn->futures.insert({response.header.sync, std::move(response)});
569589
}
570-
conn.impl->endDecoded += response.size;
590+
conn->endDecoded += response.size;
571591
inputBufGC(conn);
572592
return DECODE_SUCC;
573593
}
574594

575-
template<class BUFFER, class NetProvider>
595+
template <class BUFFER, class NetProvider>
576596
int
577-
decodeGreeting(Connection<BUFFER, NetProvider> &conn)
597+
decodeGreeting(ConnectionImpl<BUFFER, NetProvider> *conn)
578598
{
579599
//TODO: that's not zero-copy, should be rewritten in that pattern.
580-
assert(conn.getInBuf().has(conn.impl->endDecoded, Iproto::GREETING_SIZE));
600+
assert(conn->getInBuf().has(conn->endDecoded, Iproto::GREETING_SIZE));
581601
char greeting_buf[Iproto::GREETING_SIZE];
582-
conn.impl->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
583-
conn.impl->dec.reset(conn.impl->endDecoded);
584-
if (parseGreeting(std::string_view{greeting_buf, Iproto::GREETING_SIZE},
585-
conn.impl->greeting) != 0)
602+
conn->endDecoded.read({greeting_buf, sizeof(greeting_buf)});
603+
conn->dec.reset(conn->endDecoded);
604+
if (parseGreeting(std::string_view {greeting_buf, Iproto::GREETING_SIZE}, conn->greeting) != 0)
586605
return -1;
587-
conn.impl->is_greeting_received = true;
588-
LOG_DEBUG("Version: ", conn.impl->greeting.version_id);
606+
conn->is_greeting_received = true;
607+
LOG_DEBUG("Version: ", conn->greeting.version_id);
589608

590609
#ifndef NDEBUG
591610
//print salt in hex format.
592611
char hex_salt[Iproto::MAX_SALT_SIZE * 2 + 1];
593612
const char *hex = "0123456789abcdef";
594-
for (size_t i = 0; i < conn.impl->greeting.salt_size; i++) {
595-
uint8_t u = conn.impl->greeting.salt[i];
613+
for (size_t i = 0; i < conn->greeting.salt_size; i++) {
614+
uint8_t u = conn->greeting.salt[i];
596615
hex_salt[i * 2] = hex[u / 16];
597616
hex_salt[i * 2 + 1] = hex[u % 16];
598617
}
599-
hex_salt[conn.impl->greeting.salt_size * 2] = 0;
618+
hex_salt[conn->greeting.salt_size * 2] = 0;
600619
LOG_DEBUG("Salt: ", hex_salt);
601620
#endif
602621
return 0;
@@ -717,21 +736,16 @@ Connection<BUFFER, NetProvider>::select(const T &key, uint32_t space_id,
717736
return impl->enc.getSync();
718737
}
719738

720-
template<class BUFFER, class NetProvider>
721-
rid_t
722-
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user,
723-
std::string_view passwd)
739+
template <class BUFFER, class NetProvider>
740+
void
741+
Connection<BUFFER, NetProvider>::prepare_auth(std::string_view user, std::string_view passwd)
724742
{
725-
impl->enc.encodeAuth(user, passwd, impl->greeting);
726-
return 0;
743+
impl->prepare_auth(user, passwd);
727744
}
728745

729-
template<class BUFFER, class NetProvider>
730-
rid_t
731-
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user,
732-
std::string_view passwd)
746+
template <class BUFFER, class NetProvider>
747+
void
748+
Connection<BUFFER, NetProvider>::commit_auth(std::string_view user, std::string_view passwd)
733749
{
734-
impl->enc.reencodeAuth(user, passwd, impl->greeting);;
735-
impl->connector.readyToSend(*this);
736-
return 0;
750+
impl->commit_auth(user, passwd);
737751
}

0 commit comments

Comments
 (0)