Skip to content

Commit 61795a4

Browse files
committed
Fixing a bug in http_client where we were waiting for the response stream to close in _complete and not handling any exceptions.
1 parent a2d5d7e commit 61795a4

File tree

3 files changed

+136
-15
lines changed

3 files changed

+136
-15
lines changed

Release/include/cpprest/astreambuf.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ namespace streams
422422
}
423423

424424
// After the flush_internal task completed, "this" object may have been destroyed,
425-
// accessing the memebers is invalid, use shared_from_this to avoid access violation exception.
425+
// accessing the members is invalid, use shared_from_this to avoid access violation exception.
426426
auto this_ptr = std::static_pointer_cast<streambuf_state_manager>(this->shared_from_this());
427427

428428
if (mode & std::ios_base::out && can_write()) {

Release/src/http/common/http_msg.cpp

Lines changed: 58 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -170,29 +170,73 @@ size_t http_msg_base::_get_content_length()
170170
/// </summary>
171171
void http_msg_base::_complete(utility::size64_t body_size, const std::exception_ptr &exceptionPtr)
172172
{
173-
// Close the write head
174-
if ((bool)outstream())
173+
const auto hasBody = outstream().is_valid();
174+
const auto hasException = exceptionPtr != std::exception_ptr();
175+
const auto &completionEvent = _get_data_available();
176+
auto closeTask = pplx::task_from_result();
177+
178+
if (hasBody)
175179
{
176-
if ( !(exceptionPtr == std::exception_ptr()) )
177-
outstream().close(exceptionPtr).get();
178-
else if ( m_default_outstream )
179-
outstream().close().get();
180+
if (hasException)
181+
{
182+
closeTask = outstream().close(exceptionPtr);
183+
}
184+
else if (m_default_outstream)
185+
{
186+
closeTask = outstream().close();
187+
}
180188
}
181189

182-
if(exceptionPtr == std::exception_ptr())
190+
if (hasException)
183191
{
184-
_get_data_available().set(body_size);
192+
auto setException = [completionEvent, exceptionPtr](pplx::task<void> t)
193+
{
194+
// If closing stream throws an exception ignore since we already have an error.
195+
try { t.get(); } catch (...) {}
196+
completionEvent.set_exception(exceptionPtr);
197+
pplx::create_task(completionEvent).then([](pplx::task<utility::size64_t> t)
198+
{
199+
try { t.get(); } catch (...) {}
200+
});
201+
};
202+
203+
if (closeTask.is_done())
204+
{
205+
setException(closeTask);
206+
}
207+
else
208+
{
209+
closeTask.then(setException);
210+
}
185211
}
186212
else
187213
{
188-
_get_data_available().set_exception(exceptionPtr);
189-
// The exception for body will be observed by default, because read body is not always required.
190-
pplx::create_task(_get_data_available()).then([](pplx::task<utility::size64_t> t) {
191-
try {
214+
auto setBodySize = [completionEvent, body_size](pplx::task<void> t)
215+
{
216+
try
217+
{
192218
t.get();
193-
} catch (...) {
219+
completionEvent.set(body_size);
194220
}
195-
});
221+
catch (...)
222+
{
223+
// If close throws an exception report back to user.
224+
completionEvent.set_exception(std::current_exception());
225+
pplx::create_task(completionEvent).then([](pplx::task<utility::size64_t> t)
226+
{
227+
try { t.get(); } catch (...) {}
228+
});
229+
}
230+
};
231+
232+
if (closeTask.is_done())
233+
{
234+
setBodySize(closeTask);
235+
}
236+
else
237+
{
238+
closeTask.then(setBodySize);
239+
}
196240
}
197241
}
198242

Release/tests/functional/http/client/response_stream_tests.cpp

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,83 @@ TEST_FIXTURE(uri_address, response_stream_large_file_stream)
251251
}
252252

253253
#if !defined(__cplusplus_winrt)
254+
255+
template<typename CharType>
256+
class basic_throws_buffer : public streams::details::streambuf_state_manager<CharType>
257+
{
258+
public:
259+
basic_throws_buffer() : streams::details::streambuf_state_manager<CharType>(std::ios_base::out) {}
260+
261+
bool can_seek() const override { return true; }
262+
bool has_size() const override { return false; }
263+
size_t buffer_size(std::ios_base::openmode) const override { return 0; }
264+
void set_buffer_size(size_t, std::ios_base::openmode) override {}
265+
size_t in_avail() const override { return 0; }
266+
pos_type getpos(std::ios_base::openmode) const override { return 0; }
267+
pos_type seekpos(pos_type, std::ios_base::openmode) { return 0; }
268+
pos_type seekoff(off_type, std::ios_base::seekdir, std::ios_base::openmode) override { return 0; }
269+
bool acquire(_Out_writes_(count) CharType*&, _In_ size_t&) override { return false; }
270+
void release(_Out_writes_(count) CharType *, _In_ size_t) override {}
271+
272+
protected:
273+
pplx::task<int_type> _putc(CharType) override { throw std::runtime_error("error"); }
274+
pplx::task<size_t> _putn(const CharType *, size_t) override { throw std::runtime_error("error"); }
275+
pplx::task<int_type> _bumpc() override { throw std::runtime_error("error"); }
276+
int_type _sbumpc() override { throw std::runtime_error("error"); }
277+
pplx::task<int_type> _getc() override { throw std::runtime_error("error"); }
278+
int_type _sgetc() override { throw std::runtime_error("error"); }
279+
pplx::task<int_type> _nextc() override { throw std::runtime_error("error"); }
280+
pplx::task<int_type> _ungetc() override { throw std::runtime_error("error"); }
281+
pplx::task<size_t> _getn(_Out_writes_(count) CharType *, _In_ size_t) override { throw std::runtime_error("error"); }
282+
size_t _scopy(_Out_writes_(count) CharType *, _In_ size_t) override { throw std::runtime_error("error"); }
283+
pplx::task<bool> _sync() override { throw std::runtime_error("error"); }
284+
CharType* _alloc(size_t) override { throw std::runtime_error("error"); }
285+
void _commit(size_t) override { throw std::runtime_error("error"); }
286+
287+
pplx::task<void> _close_write() override
288+
{
289+
return pplx::task_from_exception<void>(std::invalid_argument("test"));
290+
}
291+
};
292+
293+
template<typename CharType>
294+
class close_throws_buffer : public streams::streambuf<CharType>
295+
{
296+
public:
297+
close_throws_buffer() : streambuf<CharType>(std::shared_ptr<basic_throws_buffer<CharType>>(new basic_throws_buffer<CharType>())) {}
298+
};
299+
300+
// Tests if an exception occurs and close throws an exception that the close
301+
// one is ignored and doesn't bring down the process.
302+
TEST_FIXTURE(uri_address, response_stream_close_throws_with_exception)
303+
{
304+
web::http::experimental::listener::http_listener listener(m_uri);
305+
listener.open().wait();
306+
307+
streams::producer_consumer_buffer<uint8_t> buf;
308+
309+
listener.support([buf](http_request request)
310+
{
311+
http_response response(200);
312+
response.set_body(streams::istream(buf), U("text/plain"));
313+
response.headers().add(header_names::connection, U("close"));
314+
request.reply(response);
315+
});
316+
317+
http_client_config config;
318+
config.set_timeout(utility::seconds(1));
319+
http_client client(m_uri, config);
320+
321+
close_throws_buffer<uint8_t> responseBody;
322+
http_request msg(methods::GET);
323+
msg.set_response_stream(responseBody.create_ostream());
324+
http_response rsp = client.request(msg).get();
325+
VERIFY_THROWS(rsp.content_ready().get(), http_exception);
326+
327+
buf.close(std::ios_base::out).wait();
328+
listener.close().wait();
329+
}
330+
254331
TEST_FIXTURE(uri_address, content_ready)
255332
{
256333
http_client client(m_uri);

0 commit comments

Comments
 (0)