Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions src/Client/Connection.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,7 +523,7 @@ static void
inputBufGC(Connection<BUFFER, NetProvider> &conn)
{
if ((conn.gc_step++ % Connection<BUFFER, NetProvider>::GC_STEP_CNT) == 0) {
LOG_DEBUG("Flushed input buffer of the connection %p", &conn);
TNT_LOG_DEBUG("Flushed input buffer of the connection %p", &conn);
conn.impl->inBuf.flush();
}
}
Expand All @@ -540,7 +540,7 @@ processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BU
Response<BUFFER> response;
response.size = conn.impl->dec.decodeResponseSize();
if (response.size < 0) {
LOG_ERROR("Failed to decode response size");
TNT_LOG_ERROR("Failed to decode response size");
//In case of corrupted response size all other data in the buffer
//is likely to be decoded in the wrong way (since we don't
// know how much bytes should be skipped). So let's simply
Expand All @@ -560,8 +560,8 @@ processResponse(Connection<BUFFER, NetProvider> &conn, int req_sync, Response<BU
conn.impl->endDecoded += response.size;
return DECODE_ERR;
}
LOG_DEBUG("Header: sync=", response.header.sync, ", code=",
response.header.code, ", schema=", response.header.schema_id);
TNT_LOG_DEBUG("Header: sync=", response.header.sync, ", code=", response.header.code,
", schema=", response.header.schema_id);
if (result != nullptr && response.header.sync == req_sync) {
*result = std::move(response);
} else {
Expand All @@ -586,7 +586,7 @@ decodeGreeting(Connection<BUFFER, NetProvider> &conn)
conn.impl->greeting) != 0)
return -1;
conn.impl->is_greeting_received = true;
LOG_DEBUG("Version: ", conn.impl->greeting.version_id);
TNT_LOG_DEBUG("Version: ", conn.impl->greeting.version_id);

#ifndef NDEBUG
//print salt in hex format.
Expand All @@ -598,7 +598,7 @@ decodeGreeting(Connection<BUFFER, NetProvider> &conn)
hex_salt[i * 2 + 1] = hex[u % 16];
}
hex_salt[conn.impl->greeting.salt_size * 2] = 0;
LOG_DEBUG("Salt: ", hex_salt);
TNT_LOG_DEBUG("Salt: ", hex_salt);
#endif
return 0;
}
Expand Down
32 changes: 14 additions & 18 deletions src/Client/Connector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,7 @@ Connector<BUFFER, NetProvider>::connect(Connection<BUFFER, NetProvider> &conn,
//Make sure that connection is not yet established.
assert(conn.get_strm().has_status(SS_DEAD));
if (m_NetProvider.connect(conn, opts) != 0) {
LOG_ERROR("Failed to connect to ",
opts.address, ':', opts.service);
TNT_LOG_ERROR("Failed to connect to ", opts.address, ':', opts.service);
return -1;
}
conn.getImpl()->is_greeting_received = false;
Expand All @@ -156,8 +155,7 @@ Connector<BUFFER, NetProvider>::connect(Connection<BUFFER, NetProvider> &conn,
// Encode auth request to reserve space in buffer.
conn.prepare_auth(opts.user, opts.passwd);
}
LOG_DEBUG("Connection to ", opts.address, ':', opts.service,
" has been established");
TNT_LOG_DEBUG("Connection to ", opts.address, ':', opts.service, " has been established");
return 0;
}

Expand Down Expand Up @@ -258,7 +256,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
rid_t future, int timeout,
Response<BUFFER> *result)
{
LOG_DEBUG("Waiting for the future ", future, " with timeout ", timeout);
TNT_LOG_DEBUG("Waiting for the future ", future, " with timeout ", timeout);
Timer timer{timeout};
timer.start();
static constexpr int INVALID_SYNC = -1;
Expand All @@ -269,7 +267,7 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
return -1;
if (result != NULL && result->header.sync != INVALID_SYNC) {
assert(result->header.sync == req_sync);
LOG_DEBUG("Future ", future, " is ready and decoded");
TNT_LOG_DEBUG("Future ", future, " is ready and decoded");
return 0;
}
while (!conn.hasError() && !conn.futureIsReady(future)) {
Expand All @@ -282,24 +280,23 @@ Connector<BUFFER, NetProvider>::wait(Connection<BUFFER, NetProvider> &conn,
return -1;
if (result != NULL && result->header.sync != INVALID_SYNC) {
assert(result->header.sync == req_sync);
LOG_DEBUG("Future ", future, " is ready and decoded");
TNT_LOG_DEBUG("Future ", future, " is ready and decoded");
return 0;
}
if (timer.isExpired())
break;
}
if (conn.hasError()) {
LOG_ERROR("Connection got an error: ", conn.getError().msg);
TNT_LOG_ERROR("Connection got an error: ", conn.getError().msg);
return -1;
}
if (! conn.futureIsReady(future)) {
LOG_DEBUG("Connection has been timed out: future ", future,
" is not ready");
TNT_LOG_DEBUG("Connection has been timed out: future ", future, " is not ready");
return -1;
} else if (result != NULL) {
*result = std::move(conn.getResponse(future));
}
LOG_DEBUG("Feature ", future, " is ready and decoded");
TNT_LOG_DEBUG("Feature ", future, " is ready and decoded");
return 0;
}

Expand Down Expand Up @@ -331,10 +328,10 @@ Connector<BUFFER, NetProvider>::waitAll(Connection<BUFFER, NetProvider> &conn,
break;
}
if (conn.hasError()) {
LOG_ERROR("Connection got an error: ", conn.getError().msg);
TNT_LOG_ERROR("Connection got an error: ", conn.getError().msg);
return -1;
}
LOG_DEBUG("Connection has been timed out: not all futures are ready");
TNT_LOG_DEBUG("Connection has been timed out: not all futures are ready");
return -1;
}

Expand All @@ -346,14 +343,14 @@ Connector<BUFFER, NetProvider>::waitAny(int timeout)
timer.start();
while (m_ReadyToDecode.empty()) {
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
LOG_ERROR("Failed to poll connections: ", strerror(errno));
TNT_LOG_ERROR("Failed to poll connections: ", strerror(errno));
return std::nullopt;
}
if (timer.isExpired())
break;
}
if (m_ReadyToDecode.empty()) {
LOG_DEBUG("wait() has been timed out! No responses are received");
TNT_LOG_DEBUG("wait() has been timed out! No responses are received");
return std::nullopt;
}
Connection<BUFFER, NetProvider> conn = *m_ReadyToDecode.begin();
Expand Down Expand Up @@ -391,11 +388,10 @@ Connector<BUFFER, NetProvider>::waitCount(Connection<BUFFER, NetProvider> &conn,
break;
}
if (conn.hasError()) {
LOG_ERROR("Connection got an error: ", conn.getError().msg);
TNT_LOG_ERROR("Connection got an error: ", conn.getError().msg);
return -1;
}
LOG_DEBUG("Connection has been timed out: only ",
conn.getFutureCount() - ready_futures, " are ready");
TNT_LOG_DEBUG("Connection has been timed out: only ", conn.getFutureCount() - ready_futures, " are ready");
return -1;
}

Expand Down
33 changes: 13 additions & 20 deletions src/Client/EpollNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ EpollNetProvider<BUFFER, Stream>::EpollNetProvider(Connector_t &connector) :
{
m_EpollFd = epoll_create1(EPOLL_CLOEXEC);
if (m_EpollFd == -1) {
LOG_ERROR("Failed to initialize epoll: ", strerror(errno));
TNT_LOG_ERROR("Failed to initialize epoll: ", strerror(errno));
abort();
}
}
Expand All @@ -109,9 +109,7 @@ EpollNetProvider<BUFFER, Stream>::registerEpoll(Conn_t &conn)
event.data.ptr = conn.getImpl();
if (epoll_ctl(m_EpollFd, EPOLL_CTL_ADD, conn.get_strm().get_fd(),
&event) != 0) {
LOG_ERROR("Failed to add socket to epoll: "
"epoll_ctl() returned with errno: ",
strerror(errno));
TNT_LOG_ERROR("Failed to add socket to epoll: epoll_ctl() returned with errno: ", strerror(errno));
abort();
}
}
Expand All @@ -124,9 +122,7 @@ EpollNetProvider<BUFFER, Stream>::setPollSetting(Conn_t &conn, int setting) {
event.data.ptr = conn.getImpl();
if (epoll_ctl(m_EpollFd, EPOLL_CTL_MOD, conn.get_strm().get_fd(),
&event) != 0) {
LOG_ERROR("Failed to change epoll mode: "
"epoll_ctl() returned with errno: ",
strerror(errno));
TNT_LOG_ERROR("Failed to change epoll mode: epoll_ctl() returned with errno: ", strerror(errno));
abort();
}
}
Expand All @@ -142,7 +138,7 @@ EpollNetProvider<BUFFER, Stream>::connect(Conn_t &conn,
opts.address);
return -1;
}
LOG_DEBUG("Connected to ", opts.address, ", socket is ", strm.get_fd());
TNT_LOG_DEBUG("Connected to ", opts.address, ", socket is ", strm.get_fd());

registerEpoll(conn);
return 0;
Expand All @@ -168,8 +164,7 @@ EpollNetProvider<BUFFER, Stream>::close(Stream_t& strm)
struct sockaddr_un *sa_un = (struct sockaddr_un *) &sa;
snprintf(addr, 120, "%s", sa_un->sun_path);
}
LOG_DEBUG("Closed connection to socket ", was_fd,
" corresponding to address ", addr);
TNT_LOG_DEBUG("Closed connection to socket ", was_fd, " corresponding to address ", addr);
}
#endif
/*
Expand Down Expand Up @@ -209,12 +204,12 @@ EpollNetProvider<BUFFER, Stream>::recv(Conn_t &conn)
if ((size_t) rcvd < Iproto::GREETING_SIZE)
return 0;
/* Receive and decode greetings. */
LOG_DEBUG("Greetings are received, read bytes ", rcvd);
TNT_LOG_DEBUG("Greetings are received, read bytes ", rcvd);
if (decodeGreeting(conn) != 0) {
conn.setError("Failed to decode greetings");
return -1;
}
LOG_DEBUG("Greetings are decoded");
TNT_LOG_DEBUG("Greetings are decoded");
rcvd -= Iproto::GREETING_SIZE;
if (conn.getImpl()->is_auth_required) {
// Finalize auth request in buffer.
Expand Down Expand Up @@ -261,7 +256,7 @@ EpollNetProvider<BUFFER, Stream>::wait(int timeout)
assert(timeout >= -1);
if (timeout == -1)
timeout = TIMEOUT_INFINITY;
LOG_DEBUG("Network engine wait for ", timeout, " milliseconds");
TNT_LOG_DEBUG("Network engine wait for ", timeout, " milliseconds");
/* Send pending requests. */
for (auto conn = m_Connector.m_ReadyToSend.begin();
conn != m_Connector.m_ReadyToSend.end();) {
Expand All @@ -276,15 +271,14 @@ EpollNetProvider<BUFFER, Stream>::wait(int timeout)
if (event_cnt < 0) {
//Poll error doesn't belong to any connection so just global
//log it.
LOG_ERROR("Poll failed: ", strerror(errno));
TNT_LOG_ERROR("Poll failed: ", strerror(errno));
return -1;
}
for (int i = 0; i < event_cnt; ++i) {
Conn_t conn((typename Conn_t::Impl_t *)events[i].data.ptr);
if ((events[i].events & EPOLLIN) != 0) {
LOG_DEBUG("Registered poll event ", i, ": ",
conn.get_strm().get_fd(),
" socket is ready to read");
TNT_LOG_DEBUG("Registered poll event ", i, ": ", conn.get_strm().get_fd(),
" socket is ready to read");
if (conn.get_strm().has_status(SS_NEED_READ_EVENT_FOR_WRITE)) {
int rc = send(conn);
if (rc < 0)
Expand All @@ -302,9 +296,8 @@ EpollNetProvider<BUFFER, Stream>::wait(int timeout)
}

if ((events[i].events & EPOLLOUT) != 0) {
LOG_DEBUG("Registered poll event ", i, ": ",
conn.get_strm().get_fd(),
" socket is ready to write");
TNT_LOG_DEBUG("Registered poll event ", i, ": ", conn.get_strm().get_fd(),
" socket is ready to write");
if (conn.get_strm().has_status(SS_NEED_WRITE_EVENT_FOR_READ)) {
int rc = recv(conn);
if (rc < 0)
Expand Down
12 changes: 6 additions & 6 deletions src/Client/LibevNetProvider.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,12 +142,12 @@ connectionReceive(Connection<BUFFER, LibevNetProvider<BUFFER, Stream>> &conn)
if ((size_t) rcvd < Iproto::GREETING_SIZE)
return 0;
/* Receive and decode greetings. */
LOG_DEBUG("Greetings are received, read bytes ", rcvd);
TNT_LOG_DEBUG("Greetings are received, read bytes ", rcvd);
if (decodeGreeting(conn) != 0) {
conn.setError("Failed to decode greetings");
return -1;
}
LOG_DEBUG("Greetings are decoded");
TNT_LOG_DEBUG("Greetings are decoded");
rcvd -= Iproto::GREETING_SIZE;
if (conn.getImpl()->is_auth_required) {
// Finalize auth request in buffer.
Expand Down Expand Up @@ -247,7 +247,7 @@ send_cb(struct ev_loop *loop, struct ev_io *watcher, int /* revents */)
}
if (rc > 0) {
/* Send is not complete, setting the write watcher. */
LOG_DEBUG("Send is not complete, setting the write watcher");
TNT_LOG_DEBUG("Send is not complete, setting the write watcher");
if (conn.get_strm().has_status(SS_NEED_WRITE_EVENT_FOR_WRITE))
if (!ev_is_active(&waitWatcher->out))
ev_io_start(loop, &waitWatcher->out);
Expand Down Expand Up @@ -302,7 +302,7 @@ LibevNetProvider<BUFFER, Stream>::registerWatchers(Conn_t &conn, int fd)
new (std::nothrow) WaitWatcher<BUFFER, Stream>(&m_Connector,
conn, &m_TimeoutWatcher);
if (watcher == nullptr) {
LOG_ERROR("Failed to allocate memory for WaitWatcher");
TNT_LOG_ERROR("Failed to allocate memory for WaitWatcher");
abort();
}

Expand All @@ -325,7 +325,7 @@ LibevNetProvider<BUFFER, Stream>::connect(Conn_t &conn,
opts.address);
return -1;
}
LOG_DEBUG("Connected to ", opts.address, ", socket is ", strm.get_fd());
TNT_LOG_DEBUG("Connected to ", opts.address, ", socket is ", strm.get_fd());

registerWatchers(conn, strm.get_fd());
return 0;
Expand Down Expand Up @@ -354,7 +354,7 @@ void
LibevNetProvider<BUFFER, Stream>::timeout_cb(EV_P_ ev_timer *w, int /* revents */)
{
(void) w;
LOG_DEBUG("Libev timed out!");
TNT_LOG_DEBUG("Libev timed out!");
/* Stop external loop */
ev_break(EV_A_ EVBREAK_ONE);
}
Expand Down
4 changes: 2 additions & 2 deletions src/Client/ResponseDecoder.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ ResponseDecoder<BUFFER>::decodeResponse(Response<BUFFER> &response)
{
/* Decode header and body separately to get more detailed error. */
if (!mpp::decode(it, response.header)) {
LOG_ERROR("Failed to decode header");
TNT_LOG_ERROR("Failed to decode header");
return -1;
}
if (!mpp::decode(it, response.body)) {
LOG_ERROR("Failed to decode body");
TNT_LOG_ERROR("Failed to decode body");
return -1;
}
return 0;
Expand Down
7 changes: 6 additions & 1 deletion src/Client/Stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,12 @@ struct ConnectOptions {
class Stream {
public:
/* Disabled copy, enabled move. */
Stream() noexcept = default;
/*
* Sic: toolchain of RedHat 7 doesn't have `noexcept` qualifier for `std::string`
* default constructor despite standard demands it. That's why `= default` syntax
* wouldn't work there, so let's define default constructor explicitly.
*/
Stream() noexcept {}
~Stream() noexcept = default;
Stream(const Stream &) = delete;
Stream &operator=(const Stream &) = delete;
Expand Down
12 changes: 5 additions & 7 deletions src/Client/UnixStream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,8 @@ class UnixStream : public Stream {

protected:
/** Log helpers. */
template <class ...MSG>
void log_wise(LogLevel level, const char *file, int line,
const char *msg, MSG &&...more);
template <class... MSG>
void log_wise(tnt::LogLevel level, const char *file, int line, const char *msg, MSG &&...more);
template <class ...MSG>
int die(const char *file, int line,
const char *msg, MSG &&...more);
Expand Down Expand Up @@ -109,8 +108,7 @@ class UnixStream : public Stream {

template <class... MSG>
void
UnixStream::log_wise(LogLevel level, const char *file, int line,
const char *msg, MSG&& ...more)
UnixStream::log_wise(tnt::LogLevel level, const char *file, int line, const char *msg, MSG &&...more)
{
if (sizeof...(MSG) == 0 && fd < 0)
log(level, file, line, msg);
Expand All @@ -127,7 +125,7 @@ template <class ...MSG>
int
UnixStream::die(const char *file, int line, const char *msg, MSG&& ...more)
{
log_wise(ERROR, file, line, msg, std::forward<MSG>(more)...);
log_wise(tnt::ERROR, file, line, msg, std::forward<MSG>(more)...);
set_status(SS_DEAD);
return -1;
}
Expand All @@ -137,7 +135,7 @@ int
UnixStream::tell(StreamStatus st, const char *file, int line,
const char *msg, MSG&& ...more)
{
log_wise(INFO, file, line, msg, std::forward<MSG>(more)...);
log_wise(tnt::INFO, file, line, msg, std::forward<MSG>(more)...);
set_status(st);
return 0;
}
Expand Down
Loading