Skip to content

Commit 53b810c

Browse files
committed
Improved buffer management in serializer
1 parent cd4141b commit 53b810c

File tree

5 files changed

+153
-113
lines changed

5 files changed

+153
-113
lines changed

include/boost/http_proto/serializer.hpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,10 @@ struct serializer::stream
444444
class serializer::const_buf_gen_base
445445
{
446446
public:
447-
// Next non-empty buffer
447+
// Returns the next non-empty buffer if exists
448448
virtual
449449
buffers::const_buffer
450-
operator()() = 0;
450+
next() = 0;
451451

452452
// Size of remaining buffers
453453
virtual
@@ -487,7 +487,7 @@ class serializer::const_buf_gen
487487
}
488488

489489
const_buffer
490-
operator()() override
490+
next() override
491491
{
492492
while(current_ != buffers::end(cbs_))
493493
{

src/detail/impl/filter.hpp

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,20 +30,44 @@ process_impl(
3030
results
3131
{
3232
results rv;
33+
3334
auto it_o = buffers::begin(out);
3435
auto it_i = buffers::begin(in);
3536

36-
if( it_o == buffers::end(out) ||
37-
it_i == buffers::end(in) )
38-
return rv;
37+
auto ob = [&]() -> buffers::mutable_buffer
38+
{
39+
if( it_o != buffers::end(out) )
40+
return *it_o++;
41+
return {};
42+
}();
43+
44+
buffers::const_buffer ib;
3945

40-
auto ob = *it_o++;
41-
auto ib = *it_i++;
4246
for(;;)
4347
{
48+
while( ib.size() == 0 )
49+
{
50+
if( it_i == buffers::end(in) )
51+
{
52+
if( more )
53+
return rv;
54+
55+
// if more == false we return only
56+
// when output buffers are full.
57+
break;
58+
}
59+
else
60+
{
61+
ib = *it_i++;
62+
}
63+
}
64+
4465
// empty input buffers may be passed, and
4566
// this is intentional and valid.
46-
results rs = process_impl(ob, ib, more);
67+
results rs = process_impl(
68+
ob,
69+
ib,
70+
more || it_i != buffers::end(in));
4771

4872
rv.out_bytes += rs.out_bytes;
4973
rv.in_bytes += rs.in_bytes;
@@ -62,21 +86,6 @@ process_impl(
6286
return rv;
6387
ob = *it_o++;
6488
}
65-
66-
if( ib.size() == 0 )
67-
{
68-
if( it_i == buffers::end(in) )
69-
{
70-
// if `more == false` we return only
71-
// when `out` buffers are full.
72-
if( more )
73-
return rv;
74-
}
75-
else
76-
{
77-
ib = *it_i++;
78-
}
79-
}
8089
}
8190
}
8291

src/serializer.cpp

Lines changed: 99 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -286,19 +286,11 @@ void
286286
serializer::
287287
reset() noexcept
288288
{
289+
ws_.clear();
289290
filter_ = nullptr;
290-
291-
cb0_ = {};
292-
tmp_ = {};
293-
294-
more_input_ = false;
295291
is_done_ = false;
296292
is_header_done_ = false;
297-
is_chunked_ = false;
298-
needs_exp100_continue_ = false;
299293
filter_done_ = false;
300-
301-
ws_.clear();
302294
}
303295

304296
//------------------------------------------------
@@ -343,23 +335,23 @@ prepare() ->
343335
prepped_.slide_to_front();
344336
while(prepped_.capacity() != 0)
345337
{
346-
auto buf = buf_gen_->operator()();
338+
auto buf = buf_gen_->next();
347339
if(buf.size() != 0)
348340
{
349341
prepped_.append(buf);
350342
}
351343
else // buf_gen_ is empty
352344
{
353-
// crlf and final chunk
354-
if(tmp_.size() != 0)
345+
// append crlf and final chunk
346+
if(is_chunked_)
355347
{
356348
prepped_.append(tmp_);
357-
tmp_ = {};
349+
more_input_ = false;
358350
}
359351
break;
360352
}
361353
}
362-
if(buf_gen_->is_empty() && tmp_.size() == 0)
354+
if(buf_gen_->is_empty() && !is_chunked_)
363355
more_input_ = false;
364356
}
365357
return const_buffers_type(
@@ -377,16 +369,18 @@ prepare() ->
377369
if(apndr.is_full())
378370
break;
379371

380-
auto rs = source_->read(
372+
const auto rs = source_->read(
381373
apndr.prepare());
382374

383375
if(rs.ec.failed())
384376
{
385377
is_done_ = true;
386-
BOOST_HTTP_PROTO_RETURN_EC(rs.ec);
378+
return rs.ec;
387379
}
388380

389-
more_input_ = !rs.finished;
381+
if(rs.finished)
382+
more_input_ = false;
383+
390384
apndr.commit(rs.bytes, more_input_);
391385
break;
392386
}
@@ -400,90 +394,121 @@ prepare() ->
400394
}
401395
else // filter
402396
{
403-
if(st_ == style::empty)
397+
switch(st_)
398+
{
399+
case style::empty:
404400
return const_buffers_type(
405401
prepped_.begin(),
406402
prepped_.size());
407403

408-
auto get_input = [&]()
404+
case style::buffers:
409405
{
410-
if(st_ == style::buffers)
406+
appender apndr(cb0_, is_chunked_);
407+
while(!apndr.is_full() && !filter_done_)
411408
{
412-
// TODO: for efficiency of deflator, we might
413-
// need to return multiple buffers at once
414-
if(tmp_.size() == 0)
409+
if(more_input_ && tmp_.size() == 0)
415410
{
416-
tmp_ = buf_gen_->operator()();
417-
more_input_ = !buf_gen_->is_empty();
411+
tmp_ = buf_gen_->next();
412+
if(tmp_.size() == 0) // buf_gen_ is empty
413+
more_input_ = false;
418414
}
419-
return buffers::
420-
const_buffer_pair{ tmp_, {} };
421-
}
422415

423-
BOOST_ASSERT(
424-
st_ == style::source ||
425-
st_ == style::stream);
416+
const auto rs = filter_->process(
417+
apndr.prepare(),
418+
tmp_,
419+
more_input_);
420+
421+
if(rs.ec.failed())
422+
{
423+
is_done_ = true;
424+
return rs.ec;
425+
}
426+
427+
tmp_ = buffers::sans_prefix(tmp_, rs.in_bytes);
428+
apndr.commit(rs.out_bytes, !rs.finished);
426429

427-
if(st_ == style::source &&
428-
more_input_ &&
429-
cb1_.capacity() != 0)
430-
{
431-
// TODO: handle source error
432-
auto rs = source_->read(
433-
cb1_.prepare(cb1_.capacity()));
434430
if(rs.finished)
435-
more_input_ = false;
436-
cb1_.commit(rs.bytes);
431+
filter_done_ = true;
437432
}
433+
break;
434+
}
438435

439-
return cb1_.data();
440-
};
441-
442-
auto consume = [&](std::size_t n)
436+
case style::source:
443437
{
444-
if(st_ == style::buffers)
438+
appender apndr(cb0_, is_chunked_);
439+
while(!apndr.is_full() && !filter_done_)
445440
{
446-
tmp_ = buffers::sans_prefix(
447-
tmp_, n);
448-
return;
441+
if(more_input_ && cb1_.capacity() != 0)
442+
{
443+
const auto rs = source_->read(
444+
cb1_.prepare(cb1_.capacity()));
445+
if(rs.ec.failed())
446+
{
447+
is_done_ = true;
448+
return rs.ec;
449+
}
450+
if(rs.finished)
451+
more_input_ = false;
452+
cb1_.commit(rs.bytes);
453+
}
454+
455+
const auto rs = filter_->process(
456+
apndr.prepare(),
457+
cb1_.data(),
458+
more_input_);
459+
460+
if(rs.ec.failed())
461+
{
462+
is_done_ = true;
463+
return rs.ec;
464+
}
465+
466+
cb1_.consume(rs.in_bytes);
467+
apndr.commit(rs.out_bytes, !rs.finished);
468+
469+
if(rs.finished)
470+
filter_done_ = true;
449471
}
450-
BOOST_ASSERT(
451-
st_ == style::source ||
452-
st_ == style::stream);
453-
cb1_.consume(n);
454-
};
455-
456-
// handles chunked payloads automatically
457-
appender apndr(cb0_, is_chunked_);
458-
for(;;)
472+
break;
473+
}
474+
475+
case style::stream:
459476
{
460-
if(apndr.is_full())
477+
appender apndr(cb0_, is_chunked_);
478+
479+
if(apndr.is_full() || filter_done_)
461480
break;
462481

463-
auto cbs = get_input();
482+
// The stream object is expected to
483+
// have already populated cb1_
484+
if(more_input_ && cb1_.size() == 0)
485+
{
486+
if(!prepped_.empty())
487+
break;
464488

465-
if(more_input_ && buffers::size(cbs) == 0)
466-
break;
489+
BOOST_HTTP_PROTO_RETURN_EC(
490+
error::need_data);
491+
}
467492

468-
auto rs = filter_->process(
493+
const auto rs = filter_->process(
469494
apndr.prepare(),
470-
cbs,
495+
cb1_.data(),
471496
more_input_);
472497

473498
if(rs.ec.failed())
474499
{
475500
is_done_ = true;
476-
BOOST_HTTP_PROTO_RETURN_EC(rs.ec);
501+
return rs.ec;
477502
}
478503

479-
consume(rs.in_bytes);
504+
cb1_.consume(rs.in_bytes);
480505
apndr.commit(rs.out_bytes, !rs.finished);
481506

482507
if(rs.finished)
483-
{
484508
filter_done_ = true;
485-
break;
486-
}
509+
510+
break;
511+
}
487512
}
488513
}
489514

@@ -623,6 +648,7 @@ start_empty(
623648
}
624649

625650
prepped_[0] = { m.ph_->cbuf, m.ph_->size };
651+
more_input_ = false;
626652
}
627653

628654
void
@@ -654,7 +680,7 @@ start_buffers(
654680
std::generate(
655681
prepped_.begin() + 1,
656682
prepped_.end(),
657-
std::ref(*buf_gen_));
683+
[this](){ return buf_gen_->next(); });
658684
more_input_ = !buf_gen_->is_empty();
659685
return;
660686
}
@@ -676,6 +702,7 @@ start_buffers(
676702

677703
prepped_[0] = { m.ph_->cbuf, m.ph_->size };
678704
prepped_[1] = final_chunk;
705+
more_input_ = false;
679706
return;
680707
}
681708

@@ -717,14 +744,14 @@ start_buffers(
717744
std::generate(
718745
prepped_.begin() + 2,
719746
prepped_.end() - 1,
720-
std::ref(*buf_gen_));
747+
[this](){ return buf_gen_->next(); });
721748

722749
more_input_ = !buf_gen_->is_empty();
723750
// assigning the last slot
724751
if(more_input_)
725752
{
726753
prepped_[prepped_.size() - 1] =
727-
buf_gen_->operator()();
754+
buf_gen_->next();
728755

729756
// deferred until buf_gen_ is drained
730757
tmp_ = crlf_and_final_chunk;
@@ -758,6 +785,7 @@ start_buffers(
758785
}
759786

760787
prepped_[0] = { m.ph_->cbuf, m.ph_->size };
788+
tmp_ = {};
761789
more_input_ = !buf_gen_->is_empty();
762790
}
763791

0 commit comments

Comments
 (0)