Skip to content

Commit 4d52eae

Browse files
committed
Merge branch 'dev/roschuma/host_based_connection_pool' into development
2 parents 21eda51 + 6cd6392 commit 4d52eae

File tree

5 files changed

+39
-155
lines changed

5 files changed

+39
-155
lines changed

Release/src/http/client/http_client.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -191,7 +191,7 @@ const uri & _http_client_communicator::base_uri() const
191191
return m_uri;
192192
}
193193

194-
_http_client_communicator::_http_client_communicator(http::uri address, http_client_config client_config)
194+
_http_client_communicator::_http_client_communicator(http::uri&& address, http_client_config&& client_config)
195195
: m_uri(std::move(address)), m_client_config(std::move(client_config)), m_opened(false), m_scheduled(0)
196196
{
197197
}
@@ -370,12 +370,12 @@ http_client::http_client(const uri &base_uri, const http_client_config &client_c
370370
uribuilder.set_scheme(_XPLATSTR("http"));
371371
uri uriWithScheme = uribuilder.to_uri();
372372
verify_uri(uriWithScheme);
373-
final_pipeline_stage = details::create_platform_final_pipeline_stage(uriWithScheme, client_config);
373+
final_pipeline_stage = details::create_platform_final_pipeline_stage(std::move(uriWithScheme), http_client_config(client_config));
374374
}
375375
else
376376
{
377377
verify_uri(base_uri);
378-
final_pipeline_stage = details::create_platform_final_pipeline_stage(base_uri, client_config);
378+
final_pipeline_stage = details::create_platform_final_pipeline_stage(uri(base_uri), http_client_config(client_config));
379379
}
380380

381381
m_pipeline = std::make_shared<http_pipeline>(std::move(final_pipeline_stage));

Release/src/http/client/http_client_asio.cpp

Lines changed: 28 additions & 144 deletions
Original file line numberDiff line numberDiff line change
@@ -235,9 +235,10 @@ class asio_connection
235235
///
236236
/// During the cleanup phase, connections are removed starting with the oldest. This
237237
/// ensures that if a high intensity workload is followed by a low intensity workload,
238-
/// the connection pool will correctly adapt to the current workload. Specifically,
239-
/// the following code will eventually result in a maximum of one pooled connection
240-
/// regardless of the initial number of pooled connections:
238+
/// the connection pool will correctly adapt to the low intensity workload.
239+
///
240+
/// Specifically, the following code will eventually result in a maximum of one pooled
241+
/// connection regardless of the initial number of pooled connections:
241242
/// <code>
242243
/// while(1)
243244
/// {
@@ -246,18 +247,11 @@ class asio_connection
246247
/// pool.release(conn);
247248
/// }
248249
/// </code>
249-
///
250-
/// Additionally, when two cleanup phases have occurred with no calls to `release()`
251-
/// between them, the internal self-reference is cleared. If there are no active
252-
/// `http_client`s keeping the pool alive, this will cause the pool to expire upon
253-
/// cleanup handler termination. Whenever a new call to `release()` arrives, the self
254-
/// reference is re-applied to keep the pool alive.
255250
/// </remarks>
256251
class asio_connection_pool : public std::enable_shared_from_this<asio_connection_pool>
257252
{
258253
public:
259-
asio_connection_pool()
260-
: m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
254+
asio_connection_pool() : m_pool_epoch_timer(crossplat::threadpool::shared_instance().service())
261255
{}
262256

263257
std::shared_ptr<asio_connection> acquire()
@@ -281,29 +275,29 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
281275
return;
282276

283277
std::lock_guard<std::mutex> lock(m_lock);
284-
if (m_self_reference == nullptr)
278+
if (!is_timer_running)
285279
{
286-
auto sptr = this->shared_from_this();
287-
m_self_reference = sptr;
288-
start_epoch_interval(sptr);
280+
start_epoch_interval(shared_from_this());
281+
is_timer_running = true;
289282
}
290283

291284
m_epoch++;
292-
m_connections.emplace_back(m_epoch, connection);
285+
m_connections.emplace_back(m_epoch, std::move(connection));
293286
}
294287

295288
private:
296289
// Note: must be called under m_lock
297-
static void start_epoch_interval(const std::shared_ptr<asio_connection_pool>& pool) {
290+
static void start_epoch_interval(const std::shared_ptr<asio_connection_pool>& pool)
291+
{
298292
_ASSERTE(pool.get() != nullptr);
299-
_ASSERTE(pool->m_self_reference != nullptr);
300293

301294
auto& self = *pool;
302295
std::weak_ptr<asio_connection_pool> weak_pool = pool;
303296

304297
self.m_prev_epoch = self.m_epoch;
305298
pool->m_pool_epoch_timer.expires_from_now(boost::posix_time::seconds(30));
306-
pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec) {
299+
pool->m_pool_epoch_timer.async_wait([weak_pool](const boost::system::error_code& ec)
300+
{
307301
if (ec)
308302
return;
309303

@@ -313,11 +307,11 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
313307
auto& self = *pool;
314308

315309
std::lock_guard<std::mutex> lock(self.m_lock);
316-
_ASSERTE(self.m_self_reference != nullptr);
317310
if (self.m_prev_epoch == self.m_epoch)
318311
{
319312
self.m_connections.clear();
320-
self.m_self_reference = nullptr;
313+
self.is_timer_running = false;
314+
return;
321315
}
322316
else
323317
{
@@ -335,109 +329,23 @@ class asio_connection_pool : public std::enable_shared_from_this<asio_connection
335329
}
336330

337331
std::mutex m_lock;
338-
boost::asio::deadline_timer m_pool_epoch_timer;
339332
std::deque<std::pair<uint64_t, std::shared_ptr<asio_connection>>> m_connections;
333+
340334
uint64_t m_epoch = 0;
341335
uint64_t m_prev_epoch = 0;
342-
343-
std::shared_ptr<asio_connection_pool> m_self_reference;
344-
};
345-
346-
class asio_shared_connection_pool : public std::enable_shared_from_this<asio_shared_connection_pool>
347-
{
348-
public:
349-
std::shared_ptr<asio_connection_pool> obtain(const std::string &pool_key)
350-
{
351-
std::shared_ptr<asio_connection_pool> ret;
352-
353-
std::lock_guard<std::mutex> lock(m_lock);
354-
auto it = m_pools.find(pool_key);
355-
if (it != m_pools.end())
356-
{
357-
ret = it->second.lock();
358-
if (ret == nullptr)
359-
{
360-
// Previous pool expired
361-
ret = std::make_shared<asio_connection_pool>();
362-
it->second = ret;
363-
}
364-
}
365-
else
366-
{
367-
if (m_pools.empty())
368-
{
369-
// If transitioning from empty to having a single element, restart the timer.
370-
start_timer(shared_from_this());
371-
}
372-
ret = std::make_shared<asio_connection_pool>();
373-
m_pools.emplace(pool_key, ret);
374-
}
375-
376-
assert(ret != nullptr);
377-
return ret;
378-
}
379-
380-
static std::shared_ptr<asio_shared_connection_pool>& shared_instance()
381-
{
382-
static std::shared_ptr<asio_shared_connection_pool> s_instance = std::make_shared<asio_shared_connection_pool>();
383-
384-
return s_instance;
385-
}
386-
387-
asio_shared_connection_pool() : m_timer(crossplat::threadpool::shared_instance().service()) {}
388-
389-
private:
390-
static void start_timer(const std::shared_ptr<asio_shared_connection_pool>& self)
391-
{
392-
self->m_timer.expires_from_now(boost::posix_time::seconds(60));
393-
std::weak_ptr<asio_shared_connection_pool> weak_this = self;
394-
self->m_timer.async_wait([weak_this](const boost::system::error_code& ec)
395-
{
396-
if (ec)
397-
return;
398-
auto strong_this = weak_this.lock();
399-
if (!strong_this)
400-
return;
401-
402-
std::lock_guard<std::mutex> lock(strong_this->m_lock);
403-
auto b = strong_this->m_pools.begin();
404-
auto e = strong_this->m_pools.end();
405-
for (; b != e;)
406-
{
407-
if (b->second.expired())
408-
b = strong_this->m_pools.erase(b);
409-
else
410-
++b;
411-
}
412-
if (!strong_this->m_pools.empty())
413-
start_timer(strong_this);
414-
});
415-
}
416-
417-
boost::asio::deadline_timer m_timer;
418-
std::mutex m_lock;
419-
std::unordered_map<std::string, std::weak_ptr<asio_connection_pool>> m_pools;
336+
bool is_timer_running = false;
337+
boost::asio::deadline_timer m_pool_epoch_timer;
420338
};
421339

422340
class asio_client final : public _http_client_communicator
423341
{
424342
public:
425-
asio_client(http::uri address, http_client_config client_config)
426-
: _http_client_communicator(std::move(address), std::move(client_config))
427-
, m_resolver(crossplat::threadpool::shared_instance().service())
428-
{
429-
m_start_with_ssl = base_uri().scheme() == "https" && !this->client_config().proxy().is_specified();
430-
431-
if (this->client_config().get_ssl_context_callback())
432-
{
433-
// We will use a private connection pool because there is no better approaches to compare callback functors.
434-
m_pool = std::make_shared<asio_connection_pool>();
435-
}
436-
else
437-
{
438-
m_pool = asio_shared_connection_pool::shared_instance()->obtain(get_pool_key());
439-
}
440-
}
343+
asio_client(http::uri&& address, http_client_config&& client_config)
344+
: _http_client_communicator(std::move(address), std::move(client_config))
345+
, m_resolver(crossplat::threadpool::shared_instance().service())
346+
, m_pool(std::make_shared<asio_connection_pool>())
347+
, m_start_with_ssl(base_uri().scheme() == "https" && !this->client_config().proxy().is_specified())
348+
{}
441349

442350
void send_request(const std::shared_ptr<request_context> &request_ctx) override;
443351

@@ -464,35 +372,11 @@ class asio_client final : public _http_client_communicator
464372

465373
virtual pplx::task<http_response> propagate(http_request request) override;
466374

467-
private:
468-
std::string get_pool_key() const
469-
{
470-
auto pool_key = base_uri().to_string();
471-
472-
auto &credentials = _http_client_communicator::client_config().credentials();
473-
if (credentials.is_set())
474-
{
475-
pool_key.append(credentials.username());
476-
}
477-
478-
auto &proxy = _http_client_communicator::client_config().proxy();
479-
if (proxy.is_specified())
480-
{
481-
pool_key.append(proxy.address().to_string());
482-
if (proxy.credentials().is_set())
483-
{
484-
pool_key.append(proxy.credentials().username());
485-
}
486-
}
487-
488-
return pool_key;
489-
}
490-
491-
std::shared_ptr<asio_connection_pool> m_pool;
492375
public:
493376
tcp::resolver m_resolver;
494377
private:
495-
bool m_start_with_ssl;
378+
const std::shared_ptr<asio_connection_pool> m_pool;
379+
const bool m_start_with_ssl;
496380
};
497381

498382
class asio_context : public request_context, public std::enable_shared_from_this<asio_context>
@@ -1612,9 +1496,9 @@ class asio_context : public request_context, public std::enable_shared_from_this
16121496
};
16131497

16141498

1615-
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config)
1499+
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config)
16161500
{
1617-
return std::make_shared<asio_client>(base_uri, client_config);
1501+
return std::make_shared<asio_client>(std::move(base_uri), std::move(client_config));
16181502
}
16191503

16201504
void asio_client::send_request(const std::shared_ptr<request_context> &request_ctx)

Release/src/http/client/http_client_impl.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ class _http_client_communicator : public http_pipeline_stage
100100
{
101101
public:
102102

103-
virtual ~_http_client_communicator() {}
103+
virtual ~_http_client_communicator() override = default;
104104

105105
// Asynchronously send a HTTP request and process the response.
106106
void async_send_request(const std::shared_ptr<request_context> &request);
@@ -112,7 +112,7 @@ class _http_client_communicator : public http_pipeline_stage
112112
const uri & base_uri() const;
113113

114114
protected:
115-
_http_client_communicator(http::uri address, http_client_config client_config);
115+
_http_client_communicator(http::uri&& address, http_client_config&& client_config);
116116

117117
// Method to open client.
118118
virtual unsigned long open() = 0;
@@ -146,6 +146,6 @@ class _http_client_communicator : public http_pipeline_stage
146146
/// <summary>
147147
/// Factory function implemented by the separate platforms to construct their subclasses of _http_client_communicator
148148
/// </summary>
149-
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config);
149+
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config);
150150

151151
}}}}

Release/src/http/client/http_client_winhttp.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1294,9 +1294,9 @@ class winhttp_client : public _http_client_communicator
12941294
bool m_secure;
12951295
};
12961296

1297-
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config)
1297+
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config)
12981298
{
1299-
return std::make_shared<details::winhttp_client>(std::move(base_uri), client_config);
1299+
return std::make_shared<details::winhttp_client>(std::move(base_uri), std::move(client_config));
13001300
}
13011301

13021302
}}}}

Release/src/http/client/http_client_winrt.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ class IResponseStream
356356
class winrt_client : public _http_client_communicator
357357
{
358358
public:
359-
winrt_client(http::uri address, http_client_config client_config)
359+
winrt_client(http::uri&& address, http_client_config&& client_config)
360360
: _http_client_communicator(std::move(address), std::move(client_config)) { }
361361

362362
winrt_client(const winrt_client&) = delete;
@@ -560,9 +560,9 @@ class winrt_client : public _http_client_communicator
560560
}
561561
};
562562

563-
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri base_uri, const http_client_config& client_config)
563+
std::shared_ptr<_http_client_communicator> create_platform_final_pipeline_stage(uri&& base_uri, http_client_config&& client_config)
564564
{
565-
return std::make_shared<details::winrt_client>(std::move(base_uri), client_config);
565+
return std::make_shared<details::winrt_client>(std::move(base_uri), std::move(client_config));
566566
}
567567

568568
}}}}

0 commit comments

Comments
 (0)