Skip to content

Commit e1b7e9f

Browse files
committed
Fixing a bunch of blocking wait/get calls in streams.h
1 parent 61795a4 commit e1b7e9f

File tree

1 file changed

+67
-88
lines changed

1 file changed

+67
-88
lines changed

Release/include/cpprest/streams.h

Lines changed: 67 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -776,60 +776,52 @@ namespace Concurrency { namespace streams
776776

777777
std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>();
778778

779-
// We're having to create all these lambdas because VS 2010 has trouble compiling
780-
// nested lambdas.
781-
auto after_putn =
782-
[=](pplx::task<size_t> wrote) mutable -> bool
779+
auto flush = [=]() mutable
780+
{
781+
return target.putn(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable
783782
{
784-
_locals->total += wrote.get();
783+
_locals->total += wrote;
785784
_locals->write_pos = 0;
786-
target.sync().wait();
787-
return true;
788-
};
785+
return target.sync();
786+
});
787+
};
789788

790-
auto flush =
791-
[=] () mutable -> pplx::task<bool>
789+
auto update = [=](int_type ch) mutable -> pplx::task<bool>
792790
{
793-
return target.putn(_locals->outbuf, _locals->write_pos).then(after_putn);
794-
};
795-
796-
auto update = [=] (int_type ch) mutable -> bool
797-
{
798-
if ( ch == ::concurrency::streams::char_traits<CharType>::eof() ) return false;
799-
if ( ch == delim ) return false;
791+
if (ch == ::concurrency::streams::char_traits<CharType>::eof()) return pplx::task_from_result(false);
792+
if (ch == delim) return pplx::task_from_result(false);
800793

801794
_locals->outbuf[_locals->write_pos] = static_cast<CharType>(ch);
802795
_locals->write_pos += 1;
803796

804-
if ( _locals->is_full() )
797+
if (_locals->is_full())
805798
{
806-
return flush().get();
799+
return flush().then([] { return true; });
807800
}
808801

809-
return true;
802+
return pplx::task_from_result(true);
810803
};
811804

812805
auto loop = pplx::details::do_while([=]() mutable -> pplx::task<bool>
813806
{
814-
while ( buffer.in_avail() > 0 )
807+
while (buffer.in_avail() > 0)
815808
{
816809
int_type ch = buffer.sbumpc();
817810

818-
if ( ch == req_async )
811+
if (ch == req_async)
812+
{
819813
break;
820-
821-
if ( !update(ch) )
822-
return pplx::task_from_result(false);
823-
}
814+
}
824815

816+
return update(ch);
817+
}
825818
return buffer.bumpc().then(update);
826819
});
827820

828-
return loop.then([=](bool) mutable -> size_t
829-
{
830-
flush().wait();
831-
return _locals->total;
832-
});
821+
return loop.then([=](bool) mutable
822+
{
823+
return flush().then([=] { return _locals->total; });
824+
});
833825
}
834826

835827
/// <summary>
@@ -851,52 +843,44 @@ namespace Concurrency { namespace streams
851843

852844
std::shared_ptr<_read_helper> _locals = std::make_shared<_read_helper>();
853845

854-
// We're having to create all these lambdas because VS 2010 has trouble compiling
855-
// nested lambdas.
856-
auto after_putn =
857-
[=](pplx::task<size_t> wrote) mutable -> bool
846+
auto flush = [=]() mutable
847+
{
848+
return target.putn(_locals->outbuf, _locals->write_pos).then([=](size_t wrote) mutable
858849
{
859-
_locals->total += wrote.get();
850+
_locals->total += wrote;
860851
_locals->write_pos = 0;
861-
target.sync().wait();
862-
return true;
863-
};
852+
return target.sync();
853+
});
854+
};
864855

865-
auto flush =
866-
[=] () mutable -> pplx::task<bool>
856+
auto update = [=](typename concurrency::streams::char_traits<CharType>::int_type ch) mutable
867857
{
868-
return target.putn(_locals->outbuf, _locals->write_pos).then(after_putn);
869-
};
870-
871-
auto update = [=] (typename concurrency::streams::char_traits<CharType>::int_type ch) mutable -> bool
872-
{
873-
if ( ch == concurrency::streams::char_traits<CharType>::eof() ) return false;
874-
if ( ch == '\n' ) return false;
875-
if ( ch == '\r' ) { _locals->saw_CR = true; return true; }
858+
if (ch == concurrency::streams::char_traits<CharType>::eof()) return pplx::task_from_result(false);
859+
if (ch == '\n') return pplx::task_from_result(false);
860+
if (ch == '\r')
861+
{
862+
_locals->saw_CR = true;
863+
return pplx::task_from_result(true);
864+
}
876865

877866
_locals->outbuf[_locals->write_pos] = static_cast<CharType>(ch);
878867
_locals->write_pos += 1;
879868

880-
if ( _locals->is_full() )
869+
if (_locals->is_full())
881870
{
882-
return flush().get();
871+
return flush().then([] { return true; });
883872
}
884873

885-
return true;
886-
};
887-
888-
auto return_false =
889-
[](pplx::task<typename concurrency::streams::char_traits<CharType>::int_type>) -> pplx::task<bool>
890-
{
891-
return pplx::task_from_result(false);
874+
return pplx::task_from_result(true);
892875
};
893876

894-
auto update_after_cr =
895-
[=] (typename concurrency::streams::char_traits<CharType>::int_type ch) mutable -> pplx::task<bool>
877+
auto update_after_cr = [=] (typename concurrency::streams::char_traits<CharType>::int_type ch) mutable -> pplx::task<bool>
896878
{
897-
if ( ch == concurrency::streams::char_traits<CharType>::eof() ) return pplx::task_from_result(false);
898-
if ( ch == '\n' )
899-
return buffer.bumpc().then(return_false);
879+
if (ch == concurrency::streams::char_traits<CharType>::eof()) return pplx::task_from_result(false);
880+
if (ch == '\n')
881+
{
882+
return buffer.bumpc().then([](concurrency::streams::char_traits<CharType>::int_type) { return false; });
883+
}
900884

901885
return pplx::task_from_result(false);
902886
};
@@ -910,35 +894,33 @@ namespace Concurrency { namespace streams
910894
#endif
911895
concurrency::streams::char_traits<CharType>::int_type ch;
912896

913-
if ( _locals->saw_CR )
897+
if (_locals->saw_CR)
914898
{
915899
ch = buffer.sgetc();
916-
if ( ch == '\n' )
900+
if (ch == '\n')
917901
buffer.sbumpc();
918902
return pplx::task_from_result(false);
919903
}
920904

921905
ch = buffer.sbumpc();
922906

923-
if ( ch == req_async )
907+
if (ch == req_async)
924908
break;
925909

926-
if ( !update(ch) )
927-
return pplx::task_from_result(false);
910+
return update(ch);
928911
}
929912

930-
if ( _locals->saw_CR )
913+
if (_locals->saw_CR)
931914
{
932915
return buffer.getc().then(update_after_cr);
933916
}
934917
return buffer.bumpc().then(update);
935918
});
936919

937-
return loop.then([=](bool) mutable -> size_t
938-
{
939-
flush().wait();
940-
return _locals->total;
941-
});
920+
return loop.then([=](bool) mutable
921+
{
922+
return flush().then([=] { return _locals->total; });
923+
});
942924
}
943925

944926
/// <summary>
@@ -1141,30 +1123,30 @@ pplx::task<void> concurrency::streams::_type_parser_base<CharType>::_skip_whites
11411123
{
11421124
int_type req_async = concurrency::streams::char_traits<CharType>::requires_async();
11431125

1144-
auto update = [=] (int_type ch) mutable -> bool
1126+
auto update = [=] (int_type ch) mutable
11451127
{
11461128
if (isspace(ch))
11471129
{
11481130
if (buffer.sbumpc() == req_async)
1149-
buffer.nextc().wait();
1150-
return true;
1131+
{
1132+
return buffer.nextc().then([](int_type) { return true; });
1133+
}
1134+
return pplx::task_from_result(true);
11511135
}
11521136

1153-
return false;
1137+
return pplx::task_from_result(false);
11541138
};
11551139

11561140
auto loop = pplx::details::do_while([=]() mutable -> pplx::task<bool>
11571141
{
1158-
while ( buffer.in_avail() > 0 )
1142+
while (buffer.in_avail() > 0)
11591143
{
11601144
int_type ch = buffer.sgetc();
11611145

1162-
if ( ch == req_async )
1146+
if (ch == req_async)
11631147
break;
11641148

1165-
if ( !update(ch) ) {
1166-
return pplx::task_from_result(false);
1167-
}
1149+
return update(ch);
11681150
}
11691151
return buffer.getc().then(update);
11701152
});
@@ -1184,8 +1166,6 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
11841166
{
11851167
std::shared_ptr<StateType> state = std::make_shared<StateType>();
11861168

1187-
auto update_end = [=] (pplx::task<int_type> op) -> bool { op.wait(); return true; };
1188-
11891169
auto update = [=] (pplx::task<int_type> op) -> pplx::task<bool>
11901170
{
11911171
int_type ch = op.get();
@@ -1195,7 +1175,7 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
11951175
return pplx::task_from_result(false);
11961176
// We peeked earlier, so now we must advance the position.
11971177
concurrency::streams::streambuf<CharType> buf = buffer;
1198-
return buf.bumpc().then(update_end);
1178+
return buf.bumpc().then([](int_type) { return true; });
11991179
};
12001180

12011181
auto peek_char = [=]() -> pplx::task<bool>
@@ -1209,7 +1189,7 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
12091189
while (get_op.is_done())
12101190
{
12111191
auto condition = update(get_op);
1212-
if ( !condition.is_done() || !condition.get())
1192+
if (!condition.is_done() || !condition.get())
12131193
return condition;
12141194

12151195
get_op = buf.getc();
@@ -1229,7 +1209,6 @@ pplx::task<ReturnType> concurrency::streams::_type_parser_base<CharType>::_parse
12291209
return _skip_whitespace(buffer).then([=](pplx::task<void> op) -> pplx::task<ReturnType>
12301210
{
12311211
op.wait();
1232-
12331212
return pplx::details::do_while(peek_char).then(finish);
12341213
});
12351214
}

0 commit comments

Comments
 (0)