Skip to content

Commit 004b7d6

Browse files
committed
Fix data recieved handler for winhttp
1 parent e2ffbf9 commit 004b7d6

File tree

3 files changed

+90
-8
lines changed

3 files changed

+90
-8
lines changed

src/aws-cpp-sdk-core/include/aws/core/utils/stream/StreamBufProtectedWriter.h

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,11 @@
88

99
#include <aws/core/Core_EXPORTS.h>
1010
#include <aws/core/utils/Array.h>
11-
#include <streambuf>
11+
#include <aws/core/utils/logging/LogMacros.h>
12+
#include <aws/core/utils/memory/stl/AWSStreamFwd.h>
13+
1214
#include <functional>
15+
#include <streambuf>
1316

1417
namespace Aws
1518
{
@@ -26,8 +29,15 @@ namespace Aws
2629
StreamBufProtectedWriter() = delete;
2730

2831
using WriterFunc = std::function<bool(char* dst, uint64_t dstSz, uint64_t& read)>;
32+
using WriteCompleteCallback = std::function<void(uint64_t read)>;
33+
34+
static uint64_t WriteToBuffer(Aws::IOStream& ioStream, const WriterFunc& writerFunc) {
35+
return WriteToBuffer(ioStream, writerFunc, [](uint64_t) -> void {});
36+
}
2937

30-
static uint64_t WriteToBuffer(Aws::IOStream& ioStream, const WriterFunc& writerFunc)
38+
static uint64_t WriteToBuffer(Aws::IOStream& ioStream,
39+
const WriterFunc& writerFunc,
40+
const WriteCompleteCallback& writeCompleteCallback)
3141
{
3242
uint64_t totalRead = 0;
3343

@@ -53,6 +63,7 @@ namespace Aws
5363
{
5464
break;
5565
}
66+
writeCompleteCallback(read);
5667

5768
if (pBufferCasted && pBufferCasted->pptr() && (pBufferCasted->pptr() >= pBufferCasted->epptr()))
5869
{

src/aws-cpp-sdk-core/source/http/windows/WinSyncHttpClient.cpp

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -306,11 +306,6 @@ bool WinSyncHttpClient::BuildSuccessResponse(const std::shared_ptr<HttpRequest>&
306306
{
307307
readLimiter->ApplyAndPayForCost(read);
308308
}
309-
auto& receivedHandler = request->GetDataReceivedEventHandler();
310-
if (receivedHandler)
311-
{
312-
receivedHandler(request.get(), response.get(), (long long)read);
313-
}
314309
}
315310
if (!ContinueRequest(*request) || !IsRequestProcessingEnabled())
316311
{
@@ -319,7 +314,15 @@ bool WinSyncHttpClient::BuildSuccessResponse(const std::shared_ptr<HttpRequest>&
319314
}
320315
return connectionOpen && success && ContinueRequest(*request) && IsRequestProcessingEnabled();
321316
};
322-
uint64_t numBytesResponseReceived = Aws::Utils::Stream::StreamBufProtectedWriter::WriteToBuffer(response->GetResponseBody(), writerFunc);
317+
uint64_t numBytesResponseReceived = Aws::Utils::Stream::StreamBufProtectedWriter::WriteToBuffer(response->GetResponseBody(),
318+
writerFunc,
319+
[&request, &response](uint64_t read) -> void {
320+
auto& receivedHandler = request->GetDataReceivedEventHandler();
321+
if (receivedHandler)
322+
{
323+
receivedHandler(request.get(), response.get(), (long long)read);
324+
}
325+
});
323326

324327
if(!ContinueRequest(*request) || !IsRequestProcessingEnabled())
325328
{
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#include <aws/core/utils/stream/StreamBufProtectedWriter.h>
7+
#include <aws/core/utils/memory/stl/AWSDeque.h>
8+
#include <aws/testing/AwsCppSdkGTestSuite.h>
9+
#include <sstream>
10+
#include <deque>
11+
12+
using namespace Aws::Utils::Stream;
13+
14+
class StreamBufProtectedWriterTest : public Aws::Testing::AwsCppSdkGTestSuite {};
15+
16+
class MockedBuffer {
17+
public:
18+
MockedBuffer(std::initializer_list<Aws::String> strings) : data_(strings) {}
19+
20+
Aws::String read() {
21+
if (data_.empty()) {
22+
return {};
23+
}
24+
Aws::String result = data_.front();
25+
data_.pop_front();
26+
return result;
27+
}
28+
29+
private:
30+
Aws::Deque<Aws::String> data_;
31+
};
32+
33+
TEST_F(StreamBufProtectedWriterTest, ShouldBeAbleToAccessStreamAfterWriteFunction) {
34+
Aws::StringStream bufferedStream;
35+
Aws::StringStream output;
36+
MockedBuffer srcBuffer({ "joker",
37+
"skull",
38+
"panther",
39+
"mona"});
40+
Aws::String leftover{};
41+
StreamBufProtectedWriter::WriteToBuffer(bufferedStream,
42+
[&srcBuffer, &leftover](char* dst, uint64_t dstSz, uint64_t& read) -> bool {
43+
Aws::String data{};
44+
if (!leftover.empty()) {
45+
data = leftover;
46+
leftover.clear();
47+
} else {
48+
data = srcBuffer.read();
49+
if (data.empty()) {
50+
return false;
51+
}
52+
}
53+
if (data.size() > dstSz) {
54+
leftover = data.substr(static_cast<size_t>(dstSz));
55+
read = dstSz;
56+
memcpy(dst, data.c_str(), static_cast<size_t>(dstSz));
57+
} else {
58+
read = data.size();
59+
memcpy(dst, data.c_str(), static_cast<size_t>(read));
60+
}
61+
return true;
62+
},
63+
[&bufferedStream, &output](uint64_t read) -> void {
64+
AWS_UNREFERENCED_PARAM(read);
65+
output << bufferedStream.rdbuf();
66+
});
67+
EXPECT_EQ(output.str(), "jokerskullpanthermona");
68+
}

0 commit comments

Comments
 (0)