Skip to content

Commit 14f7a2d

Browse files
authored
fix(MongoDB): Add timeout support and harden replica set connections (#5204)
* fix(MongoDB): Add timeouts to network connections to database to prevent hangs when servers are not accessible. * enh(MongoDB): Many document consistency checks and improvements. * enh(MongoDB): Improved handling of monodb replica sets by using last write reported from hello.
1 parent 3c9f663 commit 14f7a2d

25 files changed

+766
-125
lines changed

MongoDB/include/Poco/MongoDB/BSONReader.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "Poco/MongoDB/MongoDB.h"
2222
#include "Poco/BinaryReader.h"
23+
#include "Poco/Exception.h"
2324

2425

2526
namespace Poco {
@@ -72,6 +73,8 @@ inline std::string BSONReader::readCString()
7273
{
7374
if (c == 0x00) return val;
7475
else val += c;
76+
if (val.size() > static_cast<std::size_t>(BSON_MAX_DOCUMENT_SIZE))
77+
throw Poco::DataFormatException("BSON cstring exceeds maximum size");
7578
}
7679
}
7780
return val;

MongoDB/include/Poco/MongoDB/Binary.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,13 +154,19 @@ inline void BSONReader::read<Binary::Ptr>(Binary::Ptr& to)
154154
Poco::Int32 size;
155155
_reader >> size;
156156

157+
if (size < 0)
158+
throw Poco::DataFormatException("Invalid BSON binary size: " + std::to_string(size));
159+
if (size > BSON_MAX_DOCUMENT_SIZE)
160+
throw Poco::DataFormatException("BSON binary size exceeds maximum: " + std::to_string(size));
161+
157162
to->buffer().resize(size);
158163

159164
unsigned char subtype;
160165
_reader >> subtype;
161166
to->subtype(subtype);
162167

163-
_reader.readRaw(reinterpret_cast<char*>(to->buffer().begin()), size);
168+
if (size > 0)
169+
_reader.readRaw(reinterpret_cast<char*>(to->buffer().begin()), size);
164170
}
165171

166172

MongoDB/include/Poco/MongoDB/Connection.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,13 @@ class MongoDB_API Connection
136136
void connect(const Poco::Net::SocketAddress& addrs);
137137
/// Connects to the given MongoDB server.
138138

139+
void connect(const Poco::Net::SocketAddress& addrs, const Poco::Timespan& connectTimeout, const Poco::Timespan& socketTimeout = 0);
140+
/// Connects to the given MongoDB server with the specified connect timeout.
141+
/// If connectTimeout is non-zero, the connection attempt will be aborted
142+
/// after the specified time.
143+
/// If socketTimeout is non-zero, the send and receive timeouts on the socket
144+
/// will be set after a successful connection.
145+
139146
void connect(const Poco::Net::StreamSocket& socket);
140147
/// Connects using an already connected socket.
141148

MongoDB/include/Poco/MongoDB/Element.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
#include "Poco/BinaryReader.h"
2222
#include "Poco/BinaryWriter.h"
2323
#include "Poco/DateTimeFormatter.h"
24+
#include "Poco/Exception.h"
2425
#include "Poco/MongoDB/BSONReader.h"
2526
#include "Poco/MongoDB/BSONWriter.h"
2627
#include "Poco/MongoDB/MongoDB.h"
@@ -188,6 +189,10 @@ inline void BSONReader::read<std::string>(std::string& to)
188189
{
189190
Poco::Int32 size;
190191
_reader >> size;
192+
if (size < BSON_MIN_STRING_SIZE)
193+
throw Poco::DataFormatException("Invalid BSON string size: " + std::to_string(size));
194+
if (size > BSON_MAX_DOCUMENT_SIZE)
195+
throw Poco::DataFormatException("BSON string size exceeds maximum: " + std::to_string(size));
191196
_reader.readRaw(size, to);
192197
to.erase(to.end() - 1); // remove terminating 0
193198
}

MongoDB/include/Poco/MongoDB/MessageHeader.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ inline Int32 MessageHeader::getMessageLength() const
110110

111111
inline void MessageHeader::setMessageLength(Int32 length)
112112
{
113-
poco_assert (_messageLength >= 0);
113+
poco_assert (length >= 0);
114114
_messageLength = MSG_HEADER_SIZE + length;
115115
}
116116

MongoDB/include/Poco/MongoDB/MongoDB.h

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,33 @@
5151
#endif
5252

5353

54+
// MongoDB wire protocol and BSON specification limits.
55+
// These constants are defined centrally so that all validation
56+
// code in the library uses the same values.
57+
58+
namespace Poco {
59+
namespace MongoDB {
60+
61+
/// Maximum BSON document size (16 MB) per MongoDB specification.
62+
/// Applies to documents, strings, binary data, and cstrings.
63+
static constexpr Poco::Int32 BSON_MAX_DOCUMENT_SIZE = 16 * 1024 * 1024;
64+
65+
/// Minimum BSON document size (5 bytes): 4-byte size field + 1-byte null terminator.
66+
static constexpr Poco::Int32 BSON_MIN_DOCUMENT_SIZE = 5;
67+
68+
/// Minimum BSON string size (1 byte for the null terminator).
69+
static constexpr Poco::Int32 BSON_MIN_STRING_SIZE = 1;
70+
71+
/// Maximum OP_MSG message size (48 MB) per MongoDB specification.
72+
static constexpr Poco::Int32 OP_MSG_MAX_SIZE = 48 * 1024 * 1024;
73+
74+
/// Default local threshold for "nearest" read preference (15 ms = 15000 µs).
75+
/// Servers within this threshold of the minimum RTT are eligible for selection.
76+
static constexpr Poco::Int64 DEFAULT_LOCAL_THRESHOLD_US = 15000;
77+
78+
} } // namespace Poco::MongoDB
79+
80+
5481
//
5582
// Automatically link MongoDB library.
5683
//

MongoDB/include/Poco/MongoDB/ObjectId.h

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,11 @@ class MongoDB_API ObjectId
9292
//
9393
inline Timestamp ObjectId::timestamp() const noexcept
9494
{
95-
int time;
96-
char* T = reinterpret_cast<char*>(&time);
97-
T[0] = _id[3];
98-
T[1] = _id[2];
99-
T[2] = _id[1];
100-
T[3] = _id[0];
95+
const Poco::Int32 time =
96+
(static_cast<Poco::Int32>(_id[0]) << 24) |
97+
(static_cast<Poco::Int32>(_id[1]) << 16) |
98+
(static_cast<Poco::Int32>(_id[2]) << 8) |
99+
static_cast<Poco::Int32>(_id[3]);
101100
return Timestamp::fromEpochTime(static_cast<time_t>(time));
102101
}
103102

MongoDB/include/Poco/MongoDB/PoolableConnectionFactory.h

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include "Poco/MongoDB/Connection.h"
2222
#include "Poco/ObjectPool.h"
23+
#include "Poco/Timespan.h"
2324

2425

2526
namespace Poco {
@@ -32,6 +33,9 @@ class PoolableObjectFactory<MongoDB::Connection, MongoDB::Connection::Ptr>
3233
///
3334
/// If a Connection::SocketFactory is given, it must live for the entire
3435
/// lifetime of the PoolableObjectFactory.
36+
///
37+
/// It is strongly recommended to use one of the timeout-aware constructors
38+
/// to avoid indefinite hangs when the server is unreachable.
3539
{
3640
public:
3741
PoolableObjectFactory(Net::SocketAddress& address):
@@ -46,6 +50,22 @@ class PoolableObjectFactory<MongoDB::Connection, MongoDB::Connection::Ptr>
4650
{
4751
}
4852

53+
PoolableObjectFactory(Net::SocketAddress& address, Poco::Timespan connectTimeout, Poco::Timespan socketTimeout = 0):
54+
_address(address),
55+
_pSocketFactory(nullptr),
56+
_connectTimeout(connectTimeout),
57+
_socketTimeout(socketTimeout)
58+
{
59+
}
60+
61+
PoolableObjectFactory(const std::string& address, Poco::Timespan connectTimeout, Poco::Timespan socketTimeout = 0):
62+
_address(address),
63+
_pSocketFactory(nullptr),
64+
_connectTimeout(connectTimeout),
65+
_socketTimeout(socketTimeout)
66+
{
67+
}
68+
4969
PoolableObjectFactory(const std::string& uri, MongoDB::Connection::SocketFactory& socketFactory):
5070
_uri(uri),
5171
_pSocketFactory(&socketFactory)
@@ -55,9 +75,15 @@ class PoolableObjectFactory<MongoDB::Connection, MongoDB::Connection::Ptr>
5575
MongoDB::Connection::Ptr createObject()
5676
{
5777
if (_pSocketFactory)
78+
{
5879
return new MongoDB::Connection(_uri, *_pSocketFactory);
80+
}
5981
else
60-
return new MongoDB::Connection(_address);
82+
{
83+
MongoDB::Connection::Ptr conn = new MongoDB::Connection();
84+
conn->connect(_address, _connectTimeout, _socketTimeout);
85+
return conn;
86+
}
6187
}
6288

6389
bool validateObject(MongoDB::Connection::Ptr pObject)
@@ -80,7 +106,9 @@ class PoolableObjectFactory<MongoDB::Connection, MongoDB::Connection::Ptr>
80106
private:
81107
Net::SocketAddress _address;
82108
std::string _uri;
83-
MongoDB::Connection::SocketFactory* _pSocketFactory;
109+
MongoDB::Connection::SocketFactory* _pSocketFactory = nullptr;
110+
Poco::Timespan _connectTimeout;
111+
Poco::Timespan _socketTimeout;
84112
};
85113

86114

MongoDB/include/Poco/MongoDB/ReadPreference.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -111,8 +111,14 @@ class MongoDB_API ReadPreference
111111
[[nodiscard]] Poco::Int64 maxStalenessSeconds() const;
112112
/// Returns the max staleness in seconds, or NO_MAX_STALENESS if not set.
113113

114-
[[nodiscard]] std::vector<ServerDescription> selectServers(const TopologyDescription& topology) const;
114+
[[nodiscard]] std::vector<ServerDescription> selectServers(
115+
const TopologyDescription& topology,
116+
Poco::Int64 heartbeatFrequencyUs = 10000000) const;
115117
/// Selects eligible servers from the topology based on this read preference.
118+
/// heartbeatFrequencyUs is the topology monitoring interval in microseconds,
119+
/// used in the maxStalenessSeconds calculation per the MongoDB Server Selection spec.
120+
/// See: https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.md
121+
/// Defaults to 10 seconds (10000000 us) per the MongoDB specification.
116122
/// Returns a vector of eligible servers.
117123
/// If no servers match, returns an empty vector.
118124

@@ -137,7 +143,8 @@ class MongoDB_API ReadPreference
137143
private:
138144
bool matchesTags(const ServerDescription& server) const;
139145
std::vector<ServerDescription> filterByTags(const std::vector<ServerDescription>& servers) const;
140-
std::vector<ServerDescription> filterByMaxStaleness(const std::vector<ServerDescription>& servers, const ServerDescription& primary) const;
146+
std::vector<ServerDescription> filterByMaxStaleness(const std::vector<ServerDescription>& servers,
147+
const ServerDescription& primary, Poco::Int64 heartbeatFrequencyUs) const;
141148
std::vector<ServerDescription> selectByNearest(const std::vector<ServerDescription>& servers) const;
142149

143150
Mode _mode{Primary};

MongoDB/include/Poco/MongoDB/ReplicaSet.h

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,10 @@
2323
#include "Poco/MongoDB/ReadPreference.h"
2424
#include "Poco/MongoDB/TopologyDescription.h"
2525
#include "Poco/Net/SocketAddress.h"
26+
#include "Poco/Random.h"
2627
#include <atomic>
2728
#include <chrono>
29+
#include <condition_variable>
2830
#include <cstddef>
2931
#include <mutex>
3032
#include <string>
@@ -93,22 +95,20 @@ class MongoDB_API ReplicaSet
9395
/// Default read preference for this replica set.
9496

9597
unsigned int connectTimeoutSeconds{10};
96-
/// Connection timeout in seconds (default: 10)
98+
/// Connection timeout in seconds (default: 10).
9799
///
98-
/// NOTE: This value is currently unused by ReplicaSet itself. It is intended
99-
/// for use by custom SocketFactory implementations. Custom factories can
100-
/// access this value via ReplicaSet::configuration() and use it when creating
101-
/// sockets. Use ReplicaSet::setSocketFactory() to set a custom factory that
102-
/// utilizes this timeout value.
100+
/// Applied when connecting to MongoDB servers during topology monitoring
101+
/// and when creating connections via getConnection()/getPrimaryConnection()/
102+
/// getSecondaryConnection(). When using a custom SocketFactory, the factory
103+
/// is responsible for applying its own connect timeout.
103104

104105
unsigned int socketTimeoutSeconds{30};
105-
/// Socket send/receive timeout in seconds (default: 30)
106+
/// Socket send/receive timeout in seconds (default: 30).
106107
///
107-
/// NOTE: This value is currently unused by ReplicaSet itself. It is intended
108-
/// for use by custom SocketFactory implementations. Custom factories can
109-
/// access this value via ReplicaSet::configuration() and use it when creating
110-
/// sockets. Use ReplicaSet::setSocketFactory() to set a custom factory that
111-
/// utilizes this timeout value.
108+
/// Applied as the send and receive timeout on sockets after successful
109+
/// connection. This affects how long sendRequest()/readResponse() will
110+
/// wait before timing out. When using a custom SocketFactory, the factory
111+
/// is responsible for applying its own socket timeout.
112112

113113
unsigned int heartbeatFrequencySeconds{10};
114114
/// Topology monitoring interval in seconds (default: 10)
@@ -183,16 +183,33 @@ class MongoDB_API ReplicaSet
183183

184184
Connection::Ptr getConnection(const ReadPreference& readPref);
185185
/// Returns a connection to a server matching the read preference.
186+
/// Uses timeouts from Config.
187+
/// Returns null if no suitable server is available.
188+
189+
Connection::Ptr getConnection(const ReadPreference& readPref,
190+
const Poco::Timespan& connectTimeout, const Poco::Timespan& socketTimeout);
191+
/// Returns a connection to a server matching the read preference,
192+
/// using the specified connect and socket timeouts instead of
193+
/// the values from Config.
186194
/// Returns null if no suitable server is available.
187195

188196
Connection::Ptr waitForServerAvailability(const ReadPreference& readPref);
189197
/// Waits for a server to become available for the given read preference.
198+
/// Uses timeouts from Config.
190199
/// This method coordinates waiting between multiple threads - only one thread
191200
/// performs the actual sleep and topology refresh, while others benefit from
192201
/// the refresh done by the first thread.
193202
/// Returns a connection if a server becomes available, or null if still unavailable.
194203
/// Thread-safe: uses internal synchronization to prevent redundant refresh attempts.
195204

205+
Connection::Ptr waitForServerAvailability(const ReadPreference& readPref,
206+
const Poco::Timespan& connectTimeout, const Poco::Timespan& socketTimeout);
207+
/// Waits for a server to become available for the given read preference,
208+
/// using the specified connect and socket timeouts instead of
209+
/// the values from Config.
210+
/// Returns a connection if a server becomes available, or null if still unavailable.
211+
/// Thread-safe: uses internal synchronization to prevent redundant refresh attempts.
212+
196213
Connection::Ptr getPrimaryConnection();
197214
/// Returns a connection to the primary server.
198215
/// Returns null if no primary is available.
@@ -202,7 +219,7 @@ class MongoDB_API ReplicaSet
202219
/// Returns null if no secondary is available.
203220

204221
[[nodiscard]] Config configuration() const;
205-
// Returns a copy of replica set configuration.
222+
/// Returns a copy of replica set configuration.
206223

207224
[[nodiscard]] TopologyDescription topology() const;
208225
/// Returns a copy of the current topology description.
@@ -243,13 +260,22 @@ class MongoDB_API ReplicaSet
243260
Connection::Ptr selectServer(const ReadPreference& readPref);
244261
/// Selects a server based on read preference and creates a connection.
245262

263+
Connection::Ptr selectServer(const ReadPreference& readPref,
264+
const Poco::Timespan& connectTimeout, const Poco::Timespan& socketTimeout);
265+
/// Selects a server based on read preference and creates a connection
266+
/// using the specified timeouts.
267+
246268
Connection::Ptr createConnection(const Net::SocketAddress& address);
247269
/// Creates a new connection to the specified address.
248270

249-
void updateTopologyFromHello(const Net::SocketAddress& address) noexcept;
271+
Connection::Ptr createConnection(const Net::SocketAddress& address,
272+
const Poco::Timespan& connectTimeout, const Poco::Timespan& socketTimeout);
273+
/// Creates a new connection to the specified address using the given timeouts.
274+
275+
void updateTopologyFromHello(const Net::SocketAddress& address);
250276
/// Queries a server with 'hello' command and updates topology.
251277

252-
void updateTopologyFromAllServers() noexcept;
278+
void updateTopologyFromAllServers();
253279
/// Queries all known servers and updates topology.
254280

255281
void parseURI(const std::string& uri);
@@ -264,9 +290,13 @@ class MongoDB_API ReplicaSet
264290
std::thread _monitorThread;
265291
std::atomic<bool> _stopMonitoring{false};
266292
std::atomic<bool> _monitoringActive{false};
293+
std::mutex _monitorMutex;
294+
std::condition_variable _monitorCV;
295+
296+
Poco::Random _random;
267297

268298
std::mutex _serverAvailabilityRetryMutex;
269-
std::chrono::steady_clock::time_point _topologyRefreshTime;
299+
std::atomic<std::chrono::steady_clock::time_point> _topologyRefreshTime;
270300
};
271301

272302

0 commit comments

Comments
 (0)