Skip to content

Commit a749436

Browse files
client: add connection bookkeeping to Connector for waitAny robustness
Currently, all connection bookkeeping is done opaquely through the network providers which, in turn, also do this bookkeeping opaquely using system interfaces (e.g., libev, epoll). Because of this, we cannot handle cases when waitAny is called and there are no connections (gh-51) or when a connection has ready responses (gh-132). In order to improve `waitAny` robustness, we need to add connection bookkeeping to Connector. We should move the timer start to the beginning of the waiting loop, since the preceding checking overhead should not be accounted for the waiting time. Closes #51 Needed for #132
1 parent 97c61f4 commit a749436

File tree

3 files changed

+34
-4
lines changed

3 files changed

+34
-4
lines changed

src/Client/Connection.hpp

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ struct ConnectionImpl
8484
const typename NetProvider::Stream_t &get_strm() const { return strm; }
8585

8686
void setError(const std::string &msg, int errno_ = 0);
87+
bool hasError() const;
8788

8889
BUFFER &getInBuf();
8990
BUFFER &getOutBuf();
@@ -154,6 +155,13 @@ ConnectionImpl<BUFFER, NetProvider>::setError(const std::string &msg, int errno_
154155
error.emplace(msg, errno_);
155156
}
156157

158+
template <class BUFFER, class NetProvider>
159+
bool
160+
ConnectionImpl<BUFFER, NetProvider>::hasError() const
161+
{
162+
return error.has_value();
163+
}
164+
157165
template <class BUFFER, class NetProvider>
158166
BUFFER &
159167
ConnectionImpl<BUFFER, NetProvider>::getInBuf()
@@ -466,7 +474,7 @@ template<class BUFFER, class NetProvider>
466474
bool
467475
Connection<BUFFER, NetProvider>::hasError() const
468476
{
469-
return impl->error.has_value();
477+
return impl->hasError();
470478
}
471479

472480
template<class BUFFER, class NetProvider>

src/Client/Connector.hpp

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ class Connector
129129
* and `connectionDecodeResponses`.
130130
*/
131131
std::set<ConnectionImpl<BUFFER, NetProvider> *> m_ReadyToDecode;
132+
/**
133+
* Set of active connections owned by connector.
134+
*/
135+
std::set<ConnectionImpl<BUFFER, NetProvider> *> m_Connections;
132136
};
133137

134138
template<class BUFFER, class NetProvider>
@@ -159,6 +163,7 @@ Connector<BUFFER, NetProvider>::connect(Connection<BUFFER, NetProvider> &conn,
159163
conn.getImpl()->prepare_auth(opts.user, opts.passwd);
160164
}
161165
TNT_LOG_DEBUG("Connection to ", opts.address, ':', opts.service, " has been established");
166+
m_Connections.insert(conn.getImpl());
162167
return 0;
163168
}
164169

@@ -190,6 +195,7 @@ Connector<BUFFER, NetProvider>::close(ConnectionImpl<BUFFER, NetProvider> *conn)
190195
m_NetProvider.close(conn->get_strm());
191196
m_ReadyToSend.erase(conn);
192197
m_ReadyToDecode.erase(conn);
198+
m_Connections.erase(conn);
193199
}
194200
}
195201

@@ -353,9 +359,24 @@ template<class BUFFER, class NetProvider>
353359
std::optional<Connection<BUFFER, NetProvider>>
354360
Connector<BUFFER, NetProvider>::waitAny(int timeout)
355361
{
362+
if (m_Connections.empty()) {
363+
TNT_LOG_DEBUG("waitAny() called on connector without connections");
364+
return std::nullopt;
365+
}
356366
Timer timer{timeout};
357367
timer.start();
358368
while (m_ReadyToDecode.empty()) {
369+
bool has_alive_conn = false;
370+
for (auto *conn : m_Connections) {
371+
if (!conn->hasError()) {
372+
has_alive_conn = true;
373+
break;
374+
}
375+
}
376+
if (!has_alive_conn) {
377+
TNT_LOG_ERROR("All connections have an error");
378+
return std::nullopt;
379+
}
359380
if (m_NetProvider.wait(timer.timeLeft()) != 0) {
360381
TNT_LOG_ERROR("Failed to poll connections: ", strerror(errno));
361382
return std::nullopt;

test/ClientTest.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1118,11 +1118,9 @@ test_dead_connection_wait(void)
11181118
fail_if(client.waitCount(conn, 1) == 0);
11191119
fail_if(conn.futureIsReady(f));
11201120

1121-
/* FIXME(gh-51) */
1122-
#if 0
1121+
TEST_CASE("waitAny() correctly handles case when all connections have an error (gh-51");
11231122
fail_if(client.waitAny() != std::nullopt);
11241123
fail_if(conn.futureIsReady(f));
1125-
#endif
11261124
}
11271125

11281126
/**
@@ -1451,6 +1449,9 @@ test_wait(Connector<BUFFER, NetProvider> &client)
14511449
#endif /* __linux__ */
14521450

14531451
client.close(conn);
1452+
1453+
TEST_CASE("waitAny() correctly handles case when there are no connections (gh-51");
1454+
fail_if(client.waitAny() != std::nullopt);
14541455
}
14551456

14561457
int main()

0 commit comments

Comments
 (0)