Skip to content

Commit 34e3bf9

Browse files
committed
MINIFICPP-2709 Fix site to site receive using HTTP protocol
1 parent af585c2 commit 34e3bf9

File tree

8 files changed

+107
-94
lines changed

8 files changed

+107
-94
lines changed

core-framework/include/http/HTTPStream.h

Lines changed: 33 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -48,54 +48,49 @@ class HttpStream : public io::BaseStreamImpl {
4848
}
4949

5050
const std::shared_ptr<HTTPClient>& getClient() {
51-
http_client_future_.get();
51+
if (http_client_read_future_.valid()) {
52+
http_client_read_future_.get();
53+
}
54+
if (http_client_write_future_.valid()) {
55+
http_client_write_future_.get();
56+
}
5257
return http_client_;
5358
}
5459

5560
void forceClose() {
56-
if (started_) {
57-
// lock shouldn't be needed here as call paths currently guarantee
58-
// flow, but we should be safe anyway.
59-
std::lock_guard<std::mutex> lock(mutex_);
60-
close();
61-
http_client_->forceClose();
62-
if (http_client_future_.valid()) {
63-
http_client_future_.get();
64-
} else {
65-
logger_->log_warn("Future status already cleared for {}, continuing", http_client_->getURL());
61+
bool should_close = false;
62+
{
63+
std::lock_guard<std::mutex> read_lock(read_mutex_);
64+
std::lock_guard<std::mutex> write_lock(write_mutex_);
65+
should_close = read_started_ || write_started_;
66+
if (should_close) {
67+
close();
68+
http_client_->forceClose();
69+
read_started_ = false;
70+
write_started_ = false;
6671
}
72+
}
6773

68-
started_ = false;
74+
if (should_close) {
75+
if (http_client_read_future_.valid()) {
76+
http_client_read_future_.get();
77+
}
78+
if (http_client_write_future_.valid()) {
79+
http_client_write_future_.get();
80+
}
6981
}
7082
}
7183

72-
/**
73-
* Skip to the specified offset.
74-
* @param offset offset to which we will skip
75-
*/
7684
void seek(size_t offset) override;
7785

7886
[[nodiscard]] size_t tell() const override;
7987

80-
[[nodiscard]] size_t size() const override {
81-
return written;
82-
}
88+
[[nodiscard]] size_t size() const override;
8389

8490
using BaseStream::write;
8591
using BaseStream::read;
8692

87-
/**
88-
* Reads data and places it into buf
89-
* @param buf buffer in which we extract data
90-
* @param buflen
91-
*/
9293
size_t read(std::span<std::byte> buf) override;
93-
94-
/**
95-
* writes value to stream
96-
* @param value value to write
97-
* @param size size of value
98-
*/
9994
size_t write(const uint8_t* value, size_t size) override;
10095

10196
static bool submit_client(const std::shared_ptr<HTTPClient>& client) {
@@ -114,19 +109,16 @@ class HttpStream : public io::BaseStreamImpl {
114109
}
115110

116111
inline bool isFinished(int seconds = 0) {
117-
return http_client_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready
112+
return http_client_read_future_.wait_for(std::chrono::seconds(seconds)) == std::future_status::ready
118113
&& getByteOutputReadCallback()
119114
&& getByteOutputReadCallback()->getSize() == 0
120115
&& getByteOutputReadCallback()->waitingOps();
121116
}
122117

123-
/**
124-
* Waits for more data to become available.
125-
*/
126118
bool waitForDataAvailable() {
127119
do {
128120
logger_->log_trace("Waiting for more data");
129-
} while (http_client_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready
121+
} while (http_client_read_future_.wait_for(std::chrono::seconds(0)) != std::future_status::ready
130122
&& getByteOutputReadCallback()
131123
&& getByteOutputReadCallback()->getSize() == 0);
132124

@@ -135,16 +127,15 @@ class HttpStream : public io::BaseStreamImpl {
135127
}
136128

137129
protected:
138-
std::vector<uint8_t> array;
139-
140130
std::shared_ptr<HTTPClient> http_client_;
141-
std::future<bool> http_client_future_;
142-
143-
size_t written{0};
131+
std::future<bool> http_client_read_future_;
132+
std::future<bool> http_client_write_future_;
144133

145-
std::mutex mutex_;
134+
std::mutex read_mutex_;
135+
std::mutex write_mutex_;
146136

147-
std::atomic<bool> started_{false};
137+
std::atomic<bool> read_started_{false};
138+
std::atomic<bool> write_started_{false};
148139

149140
private:
150141
utils::ByteOutputCallback* getByteOutputReadCallback() {

core-framework/src/http/HTTPStream.cpp

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -40,30 +40,28 @@ void HttpStream::close() {
4040
}
4141

4242
void HttpStream::seek(size_t /*offset*/) {
43-
// seek is an unnecessary part of this implementation
4443
throw std::logic_error{"HttpStream::seek is unimplemented"};
4544
}
4645

4746
size_t HttpStream::tell() const {
48-
// tell is an unnecessary part of this implementation
4947
throw std::logic_error{"HttpStream::tell is unimplemented"};
5048
}
5149

52-
// data stream overrides
50+
[[nodiscard]] size_t HttpStream::size() const {
51+
throw std::logic_error{"HttpStream::size is unimplemented"};
52+
}
5353

5454
size_t HttpStream::write(const uint8_t* value, size_t size) {
5555
if (size == 0) return 0;
5656
if (IsNullOrEmpty(value)) {
5757
return io::STREAM_ERROR;
5858
}
59-
if (!started_) {
60-
std::lock_guard<std::mutex> lock(mutex_);
61-
if (!started_) {
62-
auto callback = std::make_unique<HttpStreamingCallback>();
63-
http_client_->setUploadCallback(std::move(callback));
64-
http_client_future_ = std::async(std::launch::async, submit_client, http_client_);
65-
started_ = true;
66-
}
59+
std::lock_guard<std::mutex> lock(write_mutex_);
60+
if (!write_started_) {
61+
auto callback = std::make_unique<HttpStreamingCallback>();
62+
http_client_->setUploadCallback(std::move(callback));
63+
http_client_write_future_ = std::async(std::launch::async, submit_client, http_client_);
64+
write_started_ = true;
6765
}
6866
if (auto http_callback = dynamic_cast<HttpStreamingCallback*>(http_client_->getUploadCallback()))
6967
http_callback->process(value, size);
@@ -75,14 +73,12 @@ size_t HttpStream::write(const uint8_t* value, size_t size) {
7573
size_t HttpStream::read(std::span<std::byte> buf) {
7674
if (buf.empty()) { return 0; }
7775
if (!IsNullOrEmpty(buf)) {
78-
if (!started_) {
79-
std::lock_guard<std::mutex> lock(mutex_);
80-
if (!started_) {
81-
auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true);
82-
http_client_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get());
83-
http_client_->setReadCallback(std::move(read_callback));
84-
started_ = true;
85-
}
76+
std::lock_guard<std::mutex> lock(read_mutex_);
77+
if (!read_started_) {
78+
auto read_callback = std::make_unique<HTTPReadByteOutputCallback>(66560, true);
79+
http_client_read_future_ = std::async(std::launch::async, submit_read_client, http_client_, read_callback.get());
80+
http_client_->setReadCallback(std::move(read_callback));
81+
read_started_ = true;
8682
}
8783
return gsl::not_null(getByteOutputReadCallback())->readFully(reinterpret_cast<char*>(buf.data()), buf.size());
8884
} else {

docker/test/integration/minifi/flow_serialization/Minifi_flow_json_serializer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ def serialize_node(self, connectable, root, visited):
133133
'targetUri': group.url,
134134
'communicationsTimeout': '30 sec',
135135
'yieldDuration': '3 sec',
136+
'transportProtocol': group.transport_protocol,
136137
'outputPorts': []
137138
}
138139

docker/test/integration/minifi/flow_serialization/Minifi_flow_yaml_serializer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ def serialize_node(self, connectable, res=None, visited=None):
136136
'url': group.url,
137137
'timeout': '30 sec',
138138
'yield period': '3 sec',
139+
'transport protocol': group.transport_protocol,
139140
'Output Ports': []
140141
}
141142

libminifi/include/sitetosite/HttpSiteToSiteClient.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,9 @@ class HttpSiteToSiteClient final : public SiteToSiteClient {
7777
void deleteTransaction(const utils::Identifier& transaction_id) override;
7878
void tearDown() override;
7979

80+
protected:
81+
std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) override;
82+
8083
private:
8184
void setSiteToSiteHeaders(minifi::http::HTTPClient& client);
8285
void closeTransaction(const utils::Identifier &transaction_id);

libminifi/include/sitetosite/SiteToSiteClient.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
#include <utility>
2626
#include <vector>
2727
#include <optional>
28+
#include <expected>
2829

2930
#include "Peer.h"
3031
#include "SiteToSite.h"
@@ -171,6 +172,9 @@ class SiteToSiteClient {
171172
std::atomic<std::chrono::milliseconds> batch_duration_{0s};
172173
std::atomic<std::chrono::milliseconds> timeout_{0s};
173174

175+
protected:
176+
virtual std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session);
177+
174178
private:
175179
struct ReceiveFlowFileHeaderResult {
176180
std::map<std::string, std::string> attributes;
@@ -187,9 +191,8 @@ class SiteToSiteClient {
187191
bool completeReceive(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id);
188192
bool completeSend(const std::shared_ptr<Transaction>& transaction, const utils::Identifier& transaction_id, core::ProcessContext& context);
189193

190-
bool readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result);
191-
std::optional<ReceiveFlowFileHeaderResult> receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction);
192-
std::pair<uint64_t, uint64_t> readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session);
194+
std::expected<void, std::string> readFlowFileHeaderData(io::InputStream& stream, const std::string& transaction_id, SiteToSiteClient::ReceiveFlowFileHeaderResult& result);
195+
std::expected<ReceiveFlowFileHeaderResult, std::string> receiveFlowFileHeader(io::InputStream& stream, const std::shared_ptr<Transaction>& transaction);
193196

194197
std::shared_ptr<core::logging::Logger> logger_{core::logging::LoggerFactory<SiteToSiteClient>::getLogger()};
195198
};

libminifi/src/sitetosite/HttpSiteToSiteClient.cpp

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -141,7 +141,6 @@ std::shared_ptr<Transaction> HttpSiteToSiteClient::createTransaction(TransferDir
141141
} else {
142142
transaction_client = openConnectionForReceive(transaction);
143143
transaction->setDataAvailable(true);
144-
// 201 tells us that data is available. 200 would mean that nothing is available.
145144
}
146145
gsl_Assert(transaction_client);
147146

@@ -352,13 +351,11 @@ void HttpSiteToSiteClient::closeTransaction(const utils::Identifier &transaction
352351

353352
logger_->log_debug("Received {} response code from delete", client->getResponseCode());
354353

355-
if (client->getResponseCode() == 400) {
354+
if (client->getResponseCode() >= 400) {
356355
std::string error(client->getResponseBody().data(), client->getResponseBody().size());
357356

358-
logger_->log_warn("400 received: {}", error);
359-
std::stringstream message;
360-
message << "Received " << client->getResponseCode() << " from " << uri.str();
361-
throw Exception(SITE2SITE_EXCEPTION, message.str());
357+
logger_->log_warn("{} received: {}", client->getResponseCode(), error);
358+
throw Exception(SITE2SITE_EXCEPTION, fmt::format("Received {} from {}", client->getResponseCode(), uri.str()));
362359
}
363360

364361
transaction->close();
@@ -388,4 +385,33 @@ void HttpSiteToSiteClient::setSiteToSiteHeaders(minifi::http::HTTPClient& client
388385
}
389386
}
390387

388+
std::pair<uint64_t, uint64_t> HttpSiteToSiteClient::readFlowFiles(const std::shared_ptr<Transaction>& transaction, core::ProcessSession& session) {
389+
auto http_stream = dynamic_cast<http::HttpStream*>(peer_->getStream());
390+
if (!http_stream) {
391+
throw Exception(SITE2SITE_EXCEPTION, "Reading flow files failed: stream cannot be cast to HTTP stream");
392+
}
393+
394+
try {
395+
return SiteToSiteClient::readFlowFiles(transaction, session);
396+
} catch (const Exception& e) {
397+
auto response_code = http_stream->getClientRef()->getResponseCode();
398+
399+
// 200 tells us that there is no content to read, so we should not treat it as an error.
400+
// The read fails in this case because the stream does not contain a valid response body with the expected format.
401+
// Unfortunately there is no way to get the response code before trying to read, so we have to let it fail and check the response code afterwards.
402+
if (response_code == 200) {
403+
logger_->log_debug("Response code 200, no content to read");
404+
transaction->setDataAvailable(false);
405+
transaction->setState(TransactionState::TRANSACTION_CANCELED);
406+
current_code_ = ResponseCode::CANCEL_TRANSACTION;
407+
return {0, 0};
408+
}
409+
throw;
410+
}
411+
412+
if (auto response_code = http_stream->getClientRef()->getResponseCode(); response_code >= 400) {
413+
throw Exception(SITE2SITE_EXCEPTION, fmt::format("Error received while reading flow files: {}", response_code));
414+
}
415+
}
416+
391417
} // namespace org::apache::nifi::minifi::sitetosite

0 commit comments

Comments
 (0)