@@ -61,18 +61,19 @@ namespace web { namespace http
61
61
m_socket (io_service),
62
62
m_pool_timer (io_service),
63
63
m_pool_weak (pool_weak),
64
- m_is_reused (false )
64
+ m_is_reused (false ),
65
+ m_keep_alive (true )
65
66
{}
66
67
67
- std::weak_ptr<linux_connection_pool> m_pool_weak;
68
- tcp::socket m_socket;
69
- boost::asio::deadline_timer m_pool_timer;
70
- bool m_is_reused;
71
-
72
68
void handle_pool_timer (const boost::system::error_code& ec);
73
69
74
- bool close ()
70
+ bool close (bool cancel_timer )
75
71
{
72
+ if (cancel_timer)
73
+ {
74
+ m_pool_timer.cancel ();
75
+ }
76
+ m_keep_alive = false ;
76
77
boost::system::error_code error;
77
78
m_socket.shutdown (tcp::socket::shutdown_both, error);
78
79
m_socket.close (error);
@@ -85,6 +86,33 @@ namespace web { namespace http
85
86
m_socket.cancel (error);
86
87
return !error;
87
88
}
89
+
90
+ template <typename TimeoutHandler>
91
+ void start_pool_timer (int timeout_secs, TimeoutHandler handler)
92
+ {
93
+ m_pool_timer.expires_from_now (boost::posix_time::milliseconds (timeout_secs * 1000 ));
94
+ m_pool_timer.async_wait (handler);
95
+ }
96
+
97
+ void start_reuse ()
98
+ {
99
+ m_pool_timer.cancel ();
100
+ m_is_reused = true ;
101
+ }
102
+
103
+ bool is_reused () const { return m_is_reused; }
104
+
105
+ bool keep_alive () const { return m_keep_alive; }
106
+ void set_keep_alive (bool keep_alive) { m_keep_alive = keep_alive; }
107
+
108
+ tcp::socket& socket () { return m_socket; }
109
+
110
+ private:
111
+ tcp::socket m_socket;
112
+ boost::asio::deadline_timer m_pool_timer;
113
+ std::weak_ptr<linux_connection_pool> m_pool_weak;
114
+ bool m_is_reused;
115
+ bool m_keep_alive;
88
116
};
89
117
90
118
class linux_connection_pool : public std ::enable_shared_from_this<linux_connection_pool>
@@ -93,46 +121,51 @@ namespace web { namespace http
93
121
94
122
linux_connection_pool (boost::asio::io_service& io_service, utility::seconds idle_timeout) :
95
123
m_io_service (io_service),
96
- m_timeout_secs (idle_timeout)
124
+ m_timeout_secs (static_cast < int >( idle_timeout.count()) )
97
125
{}
98
126
99
127
~linux_connection_pool ()
100
128
{
101
129
std::lock_guard<std::mutex> lock (m_connections_mutex);
102
130
for (auto & connection : m_connections)
103
131
{
104
- connection->m_pool_timer .cancel ();
105
- connection->close ();
132
+ connection->close (true );
106
133
}
107
134
}
108
135
109
136
void release (std::shared_ptr<linux_connection> connection)
110
137
{
111
- std::lock_guard<std::mutex> lock (m_connections_mutex);
112
-
113
- const int secs = static_cast < int >(m_timeout_secs. count () );
114
- connection-> m_pool_timer . expires_from_now ( boost::posix_time::milliseconds (secs * 1000 ));
115
- connection->m_pool_timer . async_wait ( boost::bind (&linux_connection::handle_pool_timer, connection, boost::asio::placeholders::error));
138
+ if (connection-> keep_alive () && (m_timeout_secs > 0 ))
139
+ {
140
+ connection-> cancel ( );
141
+ // Remove idle connections from pool after timeout.
142
+ connection->start_pool_timer (m_timeout_secs, boost::bind (&linux_connection::handle_pool_timer, connection, boost::asio::placeholders::error));
116
143
117
- m_connections.insert (connection);
144
+ {
145
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
146
+ m_connections.insert (connection);
147
+ }
148
+ }
149
+ else
150
+ {
151
+ // Connection is not returned to pool => will be destroyed.
152
+ connection->close (false );
153
+ }
118
154
}
119
155
120
156
std::shared_ptr<linux_connection> obtain ()
121
157
{
122
- std::lock_guard<std::mutex> lock (m_connections_mutex);
123
-
124
- if (m_connections.empty ())
158
+ if (is_connections_empty ())
125
159
{
160
+ // No connections in pool => create new connection instance.
126
161
return std::make_shared<linux_connection>(shared_from_this (), m_io_service);
127
162
}
128
163
else
129
164
{
130
- (*m_connections.begin ())->m_pool_timer .cancel ();
131
- auto result (*m_connections.begin ());
132
- result->m_is_reused = true ;
133
- m_connections.erase (m_connections.begin ());
134
-
135
- return result;
165
+ // Reuse connection from pool.
166
+ auto connection (obtain_begin ());
167
+ connection->start_reuse ();
168
+ return connection;
136
169
}
137
170
}
138
171
@@ -143,8 +176,22 @@ namespace web { namespace http
143
176
}
144
177
145
178
private:
179
+ bool is_connections_empty ()
180
+ {
181
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
182
+ return m_connections.empty ();
183
+ }
184
+
185
+ std::shared_ptr<linux_connection> obtain_begin ()
186
+ {
187
+ std::lock_guard<std::mutex> lock (m_connections_mutex);
188
+ auto connection (*m_connections.begin ());
189
+ m_connections.erase (m_connections.begin ());
190
+ return connection;
191
+ }
192
+
146
193
boost::asio::io_service& m_io_service;
147
- const utility::seconds m_timeout_secs;
194
+ const int m_timeout_secs;
148
195
std::unordered_set<std::shared_ptr<linux_connection> > m_connections;
149
196
std::mutex m_connections_mutex;
150
197
};
@@ -216,7 +263,6 @@ namespace web { namespace http
216
263
bool m_timedout;
217
264
boost::asio::streambuf m_body_buf;
218
265
boost::asio::deadline_timer m_timeout_timer;
219
- bool m_close_socket_in_destructor;
220
266
221
267
~linux_client_request_context ();
222
268
@@ -271,7 +317,7 @@ namespace web { namespace http
271
317
{
272
318
boost::asio::ssl::context context (boost::asio::ssl::context::sslv23);
273
319
context.set_default_verify_paths ();
274
- ctx->m_ssl_stream .reset (new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(ctx->m_connection ->m_socket , context));
320
+ ctx->m_ssl_stream .reset (new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(ctx->m_connection ->socket () , context));
275
321
}
276
322
277
323
auto encoded_resource = uri_builder (m_uri).append (ctx->m_request .relative_uri ()).to_uri ().resource ().to_string ();
@@ -346,7 +392,7 @@ namespace web { namespace http
346
392
347
393
ctx->set_timer (static_cast <int >(client_config ().timeout ().count ()));
348
394
349
- if (ctx->m_connection ->m_socket .is_open ())
395
+ if (ctx->m_connection ->socket () .is_open ())
350
396
{
351
397
write_request (ctx);
352
398
}
@@ -363,8 +409,7 @@ namespace web { namespace http
363
409
ctx->m_cancellationRegistration = request_ctx->m_request ._cancellation_token ().register_callback ([ctx]()
364
410
{
365
411
ctx->m_connection ->cancel ();
366
- ctx->m_connection ->close ();
367
- ctx->m_close_socket_in_destructor = true ;
412
+ ctx->m_connection ->close (false );
368
413
});
369
414
}
370
415
}
@@ -414,7 +459,7 @@ namespace web { namespace http
414
459
ctx->m_ssl_stream ->set_verify_mode (boost::asio::ssl::context::verify_none);
415
460
}
416
461
}
417
- ctx->m_connection ->m_socket .async_connect (endpoint, boost::bind (&linux_client::handle_connect, shared_from_this (), boost::asio::placeholders::error, ++endpoints, ctx));
462
+ ctx->m_connection ->socket () .async_connect (endpoint, boost::bind (&linux_client::handle_connect, shared_from_this (), boost::asio::placeholders::error, ++endpoints, ctx));
418
463
}
419
464
}
420
465
@@ -426,7 +471,7 @@ namespace web { namespace http
426
471
}
427
472
else
428
473
{
429
- boost::asio::async_write (ctx->m_connection ->m_socket , ctx->m_body_buf , boost::bind (&linux_client::handle_write_request, shared_from_this (), boost::asio::placeholders::error, ctx));
474
+ boost::asio::async_write (ctx->m_connection ->socket () , ctx->m_body_buf , boost::bind (&linux_client::handle_write_request, shared_from_this (), boost::asio::placeholders::error, ctx));
430
475
}
431
476
}
432
477
@@ -444,15 +489,15 @@ namespace web { namespace http
444
489
{
445
490
ctx->m_timeout_timer .cancel ();
446
491
447
- ctx->m_connection ->close ();
492
+ ctx->m_connection ->close (false );
448
493
ctx->m_connection = m_pool->obtain ();
449
494
450
495
auto endpoint = *endpoints;
451
496
if (ctx->m_ssl_stream )
452
497
{
453
498
boost::asio::ssl::context context (boost::asio::ssl::context::sslv23);
454
499
context.set_default_verify_paths ();
455
- ctx->m_ssl_stream .reset (new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(ctx->m_connection ->m_socket , context));
500
+ ctx->m_ssl_stream .reset (new boost::asio::ssl::stream<boost::asio::ip::tcp::socket &>(ctx->m_connection ->socket () , context));
456
501
457
502
// Check to turn off server certificate verification.
458
503
if (client_config ().validate_certificates ())
@@ -466,7 +511,7 @@ namespace web { namespace http
466
511
}
467
512
}
468
513
469
- ctx->m_connection ->m_socket .async_connect (endpoint, boost::bind (&linux_client::handle_connect, shared_from_this (), boost::asio::placeholders::error, ++endpoints, ctx));
514
+ ctx->m_connection ->socket () .async_connect (endpoint, boost::bind (&linux_client::handle_connect, shared_from_this (), boost::asio::placeholders::error, ++endpoints, ctx));
470
515
}
471
516
}
472
517
@@ -540,12 +585,12 @@ namespace web { namespace http
540
585
{
541
586
if (readSize != 0 )
542
587
{
543
- boost::asio::async_write (ctx->m_connection ->m_socket , ctx->m_body_buf ,
588
+ boost::asio::async_write (ctx->m_connection ->socket () , ctx->m_body_buf ,
544
589
boost::bind (&linux_client::handle_write_chunked_body, shared_from_this (), boost::asio::placeholders::error, ctx));
545
590
}
546
591
else
547
592
{
548
- boost::asio::async_write (ctx->m_connection ->m_socket , ctx->m_body_buf ,
593
+ boost::asio::async_write (ctx->m_connection ->socket () , ctx->m_body_buf ,
549
594
boost::bind (&linux_client::handle_write_body, shared_from_this (), boost::asio::placeholders::error, ctx));
550
595
}
551
596
}
@@ -600,7 +645,7 @@ namespace web { namespace http
600
645
}
601
646
else
602
647
{
603
- boost::asio::async_write (ctx->m_connection ->m_socket , ctx->m_body_buf ,
648
+ boost::asio::async_write (ctx->m_connection ->socket () , ctx->m_body_buf ,
604
649
boost::bind (&linux_client::handle_write_large_body, shared_from_this (), boost::asio::placeholders::error, ctx));
605
650
}
606
651
});
@@ -653,7 +698,7 @@ namespace web { namespace http
653
698
}
654
699
else
655
700
{
656
- boost::asio::async_read_until (ctx->m_connection ->m_socket , ctx->m_body_buf , CRLF+CRLF,
701
+ boost::asio::async_read_until (ctx->m_connection ->socket () , ctx->m_body_buf , CRLF+CRLF,
657
702
boost::bind (&linux_client::handle_status_line, shared_from_this (), boost::asio::placeholders::error, ctx));
658
703
}
659
704
}
@@ -695,12 +740,11 @@ namespace web { namespace http
695
740
const bool socket_was_closed ((boost::asio::error::eof == ec)
696
741
|| (boost::asio::error::connection_reset == ec)
697
742
|| (boost::asio::error::connection_aborted == ec));
698
- if (socket_was_closed && ctx->m_connection ->m_is_reused && ctx->m_connection ->m_socket .is_open ())
743
+ if (socket_was_closed && ctx->m_connection ->is_reused () && ctx->m_connection ->socket () .is_open ())
699
744
{
700
745
// Connection was closed by the server for some reason during the connection was
701
746
// being pooled. We re-send the request to get a new connection.
702
- ctx->m_connection ->close ();
703
- ctx->m_close_socket_in_destructor = true ;
747
+ ctx->m_connection ->close (false );
704
748
705
749
auto new_ctx = details::linux_client_request_context::create_request_context (ctx->m_http_client , ctx->m_request );
706
750
new_ctx->m_request_completion = ctx->m_request_completion ;
@@ -740,10 +784,11 @@ namespace web { namespace http
740
784
741
785
if (boost::iequals (name, header_names::connection))
742
786
{
743
- // This assumes server uses HTTP/1.1 so that 'Keep-Alive' is the default.
744
- // We don't handle HTTP/1.0 server explicitly here. HTTP/1.0 server would need
745
- // to respond with 'Connection: Keep-Alive' to resume connection.
746
- ctx->m_close_socket_in_destructor = boost::iequals (value, U (" close" ));
787
+ // This assumes server uses HTTP/1.1 so that 'Keep-Alive' is the default,
788
+ // so connection is explicitly closed only if we get "Connection: close".
789
+ // We don't handle HTTP/1.0 server here. HTTP/1.0 server would need
790
+ // to respond using 'Connection: Keep-Alive' every time.
791
+ ctx->m_connection ->set_keep_alive (!boost::iequals (value, U (" close" )));
747
792
}
748
793
749
794
ctx->m_response .headers ().add (std::move (name), std::move (value));
@@ -792,7 +837,7 @@ namespace web { namespace http
792
837
}
793
838
else
794
839
{
795
- boost::asio::async_read_until (ctx->m_connection ->m_socket , ctx->m_body_buf , CRLF,
840
+ boost::asio::async_read_until (ctx->m_connection ->socket () , ctx->m_body_buf , CRLF,
796
841
boost::bind (&linux_client::handle_chunk_header, shared_from_this (), boost::asio::placeholders::error, ctx));
797
842
}
798
843
}
@@ -814,7 +859,7 @@ namespace web { namespace http
814
859
}
815
860
else
816
861
{
817
- boost::asio::async_read (ctx->m_connection ->m_socket , ctx->m_body_buf , boost::asio::transfer_at_least (size_to_read), handler);
862
+ boost::asio::async_read (ctx->m_connection ->socket () , ctx->m_body_buf , boost::asio::transfer_at_least (size_to_read), handler);
818
863
}
819
864
}
820
865
@@ -909,7 +954,7 @@ namespace web { namespace http
909
954
}
910
955
else
911
956
{
912
- boost::asio::async_read_until (ctx->m_connection ->m_socket , ctx->m_body_buf , CRLF,
957
+ boost::asio::async_read_until (ctx->m_connection ->socket () , ctx->m_body_buf , CRLF,
913
958
boost::bind (&linux_client::handle_chunk_header, shared_from_this (), boost::asio::placeholders::error, ctx));
914
959
}
915
960
});
@@ -1024,7 +1069,6 @@ namespace web { namespace http
1024
1069
, m_current_size(0 )
1025
1070
, m_timeout_timer(crossplat::threadpool::shared_instance().service())
1026
1071
, m_connection(connection)
1027
- , m_close_socket_in_destructor(false )
1028
1072
{}
1029
1073
1030
1074
std::shared_ptr<request_context> linux_client_request_context::create_request_context (
@@ -1038,16 +1082,7 @@ namespace web { namespace http
1038
1082
linux_client_request_context::~linux_client_request_context ()
1039
1083
{
1040
1084
m_timeout_timer.cancel ();
1041
-
1042
- if (m_close_socket_in_destructor)
1043
- {
1044
- m_connection->close ();
1045
- }
1046
- else
1047
- {
1048
- m_connection->cancel ();
1049
- std::static_pointer_cast<linux_client>(m_http_client)->m_pool ->release (m_connection);
1050
- }
1085
+ std::static_pointer_cast<linux_client>(m_http_client)->m_pool ->release (m_connection);
1051
1086
}
1052
1087
1053
1088
void linux_connection::handle_pool_timer (const boost::system::error_code& ec)
0 commit comments