Skip to content

Commit 80f33f1

Browse files
author
fangxiang
committed
add StreamReader cancel read steam
1 parent 44f796b commit 80f33f1

File tree

5 files changed

+48
-10
lines changed

5 files changed

+48
-10
lines changed

examples/async_stream/RequestStreamExampleCtrl.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ class RequestStreamExampleCtrl : public HttpController<RequestStreamExampleCtrl>
9898
[files](const char *data, size_t length) {
9999
if (files->back().tmpName.empty())
100100
{
101-
return;
101+
return false;
102102
}
103103
auto &currentFile = files->back().file;
104104
if (length == 0)
@@ -109,18 +109,20 @@ class RequestStreamExampleCtrl : public HttpController<RequestStreamExampleCtrl>
109109
currentFile.flush();
110110
currentFile.close();
111111
}
112-
return;
112+
return true;
113113
}
114114
LOG_INFO << "data[" << length << "]: ";
115115
if (currentFile.is_open())
116116
{
117117
LOG_INFO << "write file";
118118
currentFile.write(data, length);
119+
return true;
119120
}
120121
else
121122
{
122123
LOG_ERROR << "file not open";
123124
}
125+
return false;
124126
},
125127
[files, callback = std::move(callback)](std::exception_ptr ex) {
126128
if (ex)

lib/src/MultipartStreamParser.cc

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,23 +252,37 @@ void drogon::MultipartStreamParser::parse(
252252
}
253253
std::string_view v = buffer_.view();
254254
auto pos = v.find(crlfDashBoundary_);
255+
256+
const auto callDataCB = [&dataCb, this](const char * _data, std::size_t _len) {
257+
if(!dataCb(_data, _len)) {
258+
isValid_ = false;
259+
isFinished_ = true;
260+
exception_type_ = ExceptionType::kServerCancel;
261+
}
262+
};
263+
255264
if (pos == std::string::npos)
256265
{
257266
// boundary not found, leave potential partial boundary
258-
size_t len = v.size() - crlfDashBoundary_.size();
267+
const size_t len = v.size() - crlfDashBoundary_.size();
259268
if (len > 0)
260269
{
261-
dataCb(v.data(), len);
270+
callDataCB(v.data(), len);
262271
buffer_.eraseFront(len);
263272
}
264273
return;
265274
}
266275
// found boundary
267-
dataCb(v.data(), pos);
268-
if (pos > 0)
269-
{
270-
dataCb(v.data() + pos, 0); // notify end of file
276+
if(!isFinished() && isValid()) {
277+
callDataCB(v.data(), pos);
278+
279+
if (pos > 0)
280+
{
281+
// notify end of file
282+
callDataCB(v.data() + pos, 0);
283+
}
271284
}
285+
272286
buffer_.eraseFront(pos + crlfDashBoundary_.size());
273287
status_ = Status::kExpectEndOrNewEntry;
274288
continue;

lib/src/MultipartStreamParser.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ namespace drogon
2121
{
2222
class DROGON_EXPORT MultipartStreamParser
2323
{
24+
public:
25+
enum class ExceptionType
26+
{
27+
kNoException = 0,
28+
kServerCancel = 1,
29+
};
30+
2431
public:
2532
MultipartStreamParser(const std::string &contentType);
2633

@@ -39,6 +46,10 @@ class DROGON_EXPORT MultipartStreamParser
3946
return isValid_;
4047
}
4148

49+
ExceptionType exceptionType() const {
50+
return exception_type_;
51+
}
52+
4253
private:
4354
const std::string dash_ = "--";
4455
const std::string crlf_ = "\r\n";
@@ -70,6 +81,8 @@ class DROGON_EXPORT MultipartStreamParser
7081
kExpectEndOrNewEntry = 4,
7182
} status_{Status::kExpectFirstBoundary};
7283

84+
ExceptionType exception_type_{ExceptionType::kNoException};
85+
7386
MultipartHeader currentHeader_;
7487
bool isValid_{true};
7588
bool isFinished_{false};

lib/src/RequestStream.cc

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -161,9 +161,17 @@ class MultipartStreamReader : public RequestStreamReader
161161
parser_.parse(data, length, headerCb_, dataCb_);
162162
if (!parser_.isValid())
163163
{
164+
std::exception_ptr exception = nullptr;
164165
// TODO: should we mix stream error and user error?
165-
finishCb_(std::make_exception_ptr(
166-
std::runtime_error("invalid multipart data")));
166+
switch (parser_.exceptionType()) {
167+
case MultipartStreamParser::ExceptionType::kServerCancel:
168+
exception = std::make_exception_ptr(std::runtime_error("server cancelled"));
169+
break;
170+
default:
171+
exception = std::make_exception_ptr(std::runtime_error("invalid multipart data"));
172+
break;
173+
}
174+
finishCb_(exception);
167175
}
168176
else if (parser_.isFinished())
169177
{

lib/tests/integration_test/server/RequestStreamTestCtrl.cc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ class RequestStreamTestCtrl : public HttpController<RequestStreamTestCtrl>
118118
{
119119
ctx->firstFileContent.append(data, length);
120120
}
121+
return true;
121122
},
122123
[ctx, callback = std::move(callback)](std::exception_ptr ex) {
123124
auto resp = HttpResponse::newHttpResponse();

0 commit comments

Comments
 (0)