Skip to content

Commit ddaa801

Browse files
committed
Merge branch 'development' of https://git01.codeplex.com/casablanca into version_bump
2 parents 6ac22a5 + 61486be commit ddaa801

File tree

5 files changed

+221
-113
lines changed

5 files changed

+221
-113
lines changed

Release/include/cpprest/astreambuf.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -422,7 +422,7 @@ namespace streams
422422
}
423423

424424
// After the flush_internal task completed, "this" object may have been destroyed,
425-
// accessing the memebers is invalid, use shared_from_this to avoid access violation exception.
425+
// accessing the members is invalid, use shared_from_this to avoid access violation exception.
426426
auto this_ptr = std::static_pointer_cast<streambuf_state_manager>(this->shared_from_this());
427427

428428
if (mode & std::ios_base::out && can_write()) {

Release/include/cpprest/streams.h

Lines changed: 71 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -776,60 +776,52 @@ namespace Concurrency { namespace streams
776776

777777
std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>();
778778

779-
// We're having to create all these lambdas because VS 2010 has trouble compiling
780-
// nested lambdas.
781-
auto after_putn =
782-
[=](pplx::task<size_t> wrote) mutable -> bool
779+
auto flush = [=]() mutable
780+
{
781+
return target.putn(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable
783782
{
784-
_locals->total += wrote.get();
783+
_locals->total += wrote;
785784
_locals->write_pos = 0;
786-
target.sync().wait();
787-
return true;
788-
};
785+
return target.sync();
786+
});
787+
};
789788

790-
auto flush =
791-
[=] () mutable -> pplx::task<bool>
789+
auto update = [=](int_type ch) mutable -> pplx::task<bool>
792790
{
793-
return target.putn(_locals->outbuf, _locals->write_pos).then(after_putn);
794-
};
795-
796-
auto update = [=] (int_type ch) mutable -> bool
797-
{
798-
if ( ch == ::concurrency::streams::char_traits<CharType>::eof() ) return false;
799-
if ( ch == delim ) return false;
791+
if (ch == ::concurrency::streams::char_traits<CharType>::eof()) return pplx::task_from_result(false);
792+
if (ch == delim) return pplx::task_from_result(false);
800793

801794
_locals->outbuf[_locals->write_pos] = static_cast<CharType>(ch);
802795
_locals->write_pos += 1;
803796

804-
if ( _locals->is_full() )
797+
if (_locals->is_full())
805798
{
806-
return flush().get();
799+
return flush().then([] { return true; });
807800
}
808801

809-
return true;
802+
return pplx::task_from_result(true);
810803
};
811804

812805
auto loop = pplx::details::do_while([=]() mutable -> pplx::task<bool>
813806
{
814-
while ( buffer.in_avail() > 0 )
807+
while (buffer.in_avail() > 0)
815808
{
816809
int_type ch = buffer.sbumpc();
817810

818-
if ( ch == req_async )
811+
if (ch == req_async)
812+
{
819813
break;
820-
821-
if ( !update(ch) )
822-
return pplx::task_from_result(false);
823-
}
814+
}
824815

816+
return update(ch);
817+
}
825818
return buffer.bumpc().then(update);
826819
});
827820

828-
return loop.then([=](bool) mutable -> size_t
829-
{
830-
flush().wait();
831-
return _locals->total;
832-
});
821+
return loop.then([=](bool) mutable
822+
{
823+
return flush().then([=] { return _locals->total; });
824+
});
833825
}
834826

835827
/// <summary>
@@ -851,52 +843,48 @@ namespace Concurrency { namespace streams
851843

852844
std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>();
853845

854-
// We're having to create all these lambdas because VS 2010 has trouble compiling
855-
// nested lambdas.
856-
auto after_putn =
857-
[=](pplx::task<size_t> wrote) mutable -> bool
846+
auto flush = [=]() mutable
847+
{
848+
return target.putn(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable
858849
{
859-
_locals->total += wrote.get();
850+
_locals->total += wrote;
860851
_locals->write_pos = 0;
861-
target.sync().wait();
862-
return true;
863-
};
852+
return target.sync();
853+
});
854+
};
864855

865-
auto flush =
866-
[=] () mutable -> pplx::task<bool>
856+
auto update = [=](typename concurrency::streams::char_traits<CharType>::int_type ch) mutable
867857
{
868-
return target.putn(_locals->outbuf, _locals->write_pos).then(after_putn);
869-
};
870-
871-
auto update = [=] (typename concurrency::streams::char_traits<CharType>::int_type ch) mutable -> bool
872-
{
873-
if ( ch == concurrency::streams::char_traits<CharType>::eof() ) return false;
874-
if ( ch == '\n' ) return false;
875-
if ( ch == '\r' ) { _locals->saw_CR = true; return true; }
858+
if (ch == concurrency::streams::char_traits<CharType>::eof()) return pplx::task_from_result(false);
859+
if (ch == '\n') return pplx::task_from_result(false);
860+
if (ch == '\r')
861+
{
862+
_locals->saw_CR = true;
863+
return pplx::task_from_result(true);
864+
}
876865

877866
_locals->outbuf[_locals->write_pos] = static_cast<CharType>(ch);
878867
_locals->write_pos += 1;
879868

880-
if ( _locals->is_full() )
869+
if (_locals->is_full())
881870
{
882-
return flush().get();
871+
return flush().then([] { return true; });
883872
}
884873

885-
return true;
886-
};
887-
888-
auto return_false =
889-
[](pplx::task<typename concurrency::streams::char_traits<CharType>::int_type>) -> pplx::task<bool>
890-
{
891-
return pplx::task_from_result(false);
874+
return pplx::task_from_result(true);
892875
};
893876

894-
auto update_after_cr =
895-
[=] (typename concurrency::streams::char_traits<CharType>::int_type ch) mutable -> pplx::task<bool>
877+
auto update_after_cr = [=] (typename concurrency::streams::char_traits<CharType>::int_type ch) mutable -> pplx::task<bool>
896878
{
897-
if ( ch == concurrency::streams::char_traits<CharType>::eof() ) return pplx::task_from_result(false);
898-
if ( ch == '\n' )
899-
return buffer.bumpc().then(return_false);
879+
if (ch == concurrency::streams::char_traits<CharType>::eof()) return pplx::task_from_result(false);
880+
if (ch == '\n')
881+
{
882+
return buffer.bumpc().then([](
883+
#ifndef _MS_WINDOWS // Required by GCC
884+
typename
885+
#endif
886+
concurrency::streams::char_traits<CharType>::int_type) { return false; });
887+
}
900888

901889
return pplx::task_from_result(false);
902890
};
@@ -910,35 +898,33 @@ namespace Concurrency { namespace streams
910898
#endif
911899
concurrency::streams::char_traits<CharType>::int_type ch;
912900

913-
if ( _locals->saw_CR )
901+
if (_locals->saw_CR)
914902
{
915903
ch = buffer.sgetc();
916-
if ( ch == '\n' )
904+
if (ch == '\n')
917905
buffer.sbumpc();
918906
return pplx::task_from_result(false);
919907
}
920908

921909
ch = buffer.sbumpc();
922910

923-
if ( ch == req_async )
911+
if (ch == req_async)
924912
break;
925913

926-
if ( !update(ch) )
927-
return pplx::task_from_result(false);
914+
return update(ch);
928915
}
929916

930-
if ( _locals->saw_CR )
917+
if (_locals->saw_CR)
931918
{
932919
return buffer.getc().then(update_after_cr);
933920
}
934921
return buffer.bumpc().then(update);
935922
});
936923

937-
return loop.then([=](bool) mutable -> size_t
938-
{
939-
flush().wait();
940-
return _locals->total;
941-
});
924+
return loop.then([=](bool) mutable
925+
{
926+
return flush().then([=] { return _locals->total; });
927+
});
942928
}
943929

944930
/// <summary>
@@ -1141,30 +1127,30 @@ pplx::task<void> concurrency::streams::_type_parser_base<CharType>::_skip_whites
11411127
{
11421128
int_type req_async = concurrency::streams::char_traits<CharType>::requires_async();
11431129

1144-
auto update = [=] (int_type ch) mutable -> bool
1130+
auto update = [=] (int_type ch) mutable
11451131
{
11461132
if (isspace(ch))
11471133
{
11481134
if (buffer.sbumpc() == req_async)
1149-
buffer.nextc().wait();
1150-
return true;
1135+
{
1136+
return buffer.nextc().then([](int_type) { return true; });
1137+
}
1138+
return pplx::task_from_result(true);
11511139
}
11521140

1153-
return false;
1141+
return pplx::task_from_result(false);
11541142
};
11551143

11561144
auto loop = pplx::details::do_while([=]() mutable -> pplx::task<bool>
11571145
{
1158-
while ( buffer.in_avail() > 0 )
1146+
while (buffer.in_avail() > 0)
11591147
{
11601148
int_type ch = buffer.sgetc();
11611149

1162-
if ( ch == req_async )
1150+
if (ch == req_async)
11631151
break;
11641152

1165-
if ( !update(ch) ) {
1166-
return pplx::task_from_result(false);
1167-
}
1153+
return update(ch);
11681154
}
11691155
return buffer.getc().then(update);
11701156
});
@@ -1184,8 +1170,6 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
11841170
{
11851171
std::shared_ptr<StateType> state = std::make_shared<StateType>();
11861172

1187-
auto update_end = [=] (pplx::task<int_type> op) -> bool { op.wait(); return true; };
1188-
11891173
auto update = [=] (pplx::task<int_type> op) -> pplx::task<bool>
11901174
{
11911175
int_type ch = op.get();
@@ -1195,7 +1179,7 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
11951179
return pplx::task_from_result(false);
11961180
// We peeked earlier, so now we must advance the position.
11971181
concurrency::streams::streambuf<CharType> buf = buffer;
1198-
return buf.bumpc().then(update_end);
1182+
return buf.bumpc().then([](int_type) { return true; });
11991183
};
12001184

12011185
auto peek_char = [=]() -> pplx::task<bool>
@@ -1209,7 +1193,7 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
12091193
while (get_op.is_done())
12101194
{
12111195
auto condition = update(get_op);
1212-
if ( !condition.is_done() || !condition.get())
1196+
if (!condition.is_done() || !condition.get())
12131197
return condition;
12141198

12151199
get_op = buf.getc();
@@ -1229,7 +1213,6 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
12291213
return _skip_whitespace(buffer).then([=](pplx::task<void> op) -> pplx::task<ReturnType>
12301214
{
12311215
op.wait();
1232-
12331216
return pplx::details::do_while(peek_char).then(finish);
12341217
});
12351218
}

Release/src/http/common/http_msg.cpp

Lines changed: 63 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -165,33 +165,77 @@ size_t http_msg_base::_get_content_length()
165165
return 0;
166166
}
167167

168-
/// <summary>
169-
/// Completes this message
170-
/// </summary>
171-
void http_msg_base::_complete(utility::size64_t body_size, const std::exception_ptr &exceptionPtr)
168+
// Helper function to inline continuation if possible.
169+
struct inline_continuation
172170
{
173-
// Close the write head
174-
if ((bool)outstream())
171+
inline_continuation(pplx::task<void> &prev, const std::function<void(pplx::task<void>)> &next) : m_prev(prev), m_next(next) {}
172+
~inline_continuation()
175173
{
176-
if ( !(exceptionPtr == std::exception_ptr()) )
177-
outstream().close(exceptionPtr).get();
178-
else if ( m_default_outstream )
179-
outstream().close().get();
174+
if (m_prev.is_done())
175+
{
176+
m_next(m_prev);
177+
}
178+
else
179+
{
180+
m_prev.then(m_next);
181+
}
180182
}
183+
pplx::task<void> & m_prev;
184+
std::function<void(pplx::task<void>)> m_next;
185+
private:
186+
inline_continuation(const inline_continuation &);
187+
inline_continuation &operator=(const inline_continuation &);
188+
};
189+
190+
void http_msg_base::_complete(utility::size64_t body_size, const std::exception_ptr &exceptionPtr)
191+
{
192+
const auto &completionEvent = _get_data_available();
193+
auto closeTask = pplx::task_from_result();
181194

182-
if(exceptionPtr == std::exception_ptr())
195+
if (exceptionPtr == std::exception_ptr())
183196
{
184-
_get_data_available().set(body_size);
197+
if (m_default_outstream)
198+
{
199+
closeTask = outstream().close();
200+
}
201+
202+
inline_continuation(closeTask, [completionEvent, body_size](pplx::task<void> t)
203+
{
204+
try
205+
{
206+
t.get();
207+
completionEvent.set(body_size);
208+
}
209+
catch (...)
210+
{
211+
// If close throws an exception report back to user.
212+
completionEvent.set_exception(std::current_exception());
213+
pplx::create_task(completionEvent).then([](pplx::task<utility::size64_t> t)
214+
{
215+
try { t.get(); }
216+
catch (...) {}
217+
});
218+
}
219+
});
185220
}
186221
else
187222
{
188-
_get_data_available().set_exception(exceptionPtr);
189-
// The exception for body will be observed by default, because read body is not always required.
190-
pplx::create_task(_get_data_available()).then([](pplx::task<utility::size64_t> t) {
191-
try {
192-
t.get();
193-
} catch (...) {
194-
}
223+
if (outstream().is_valid())
224+
{
225+
closeTask = outstream().close(exceptionPtr);
226+
}
227+
228+
inline_continuation(closeTask, [completionEvent, exceptionPtr](pplx::task<void> t)
229+
{
230+
// If closing stream throws an exception ignore since we already have an error.
231+
try { t.get(); }
232+
catch (...) {}
233+
completionEvent.set_exception(exceptionPtr);
234+
pplx::create_task(completionEvent).then([](pplx::task<utility::size64_t> t)
235+
{
236+
try { t.get(); }
237+
catch (...) {}
238+
});
195239
});
196240
}
197241
}

0 commit comments

Comments
 (0)