Skip to content

Commit 5f302de

Browse files
Merge pull request ClickHouse#86758 from ClickHouse/backport/25.8/86631
Backport ClickHouse#86631 to 25.8: Fixing keep-alive on inter-server calls.
2 parents 2006383 + 22ae61f commit 5f302de

File tree

4 files changed

+12
-7
lines changed

4 files changed

+12
-7
lines changed

src/Interpreters/InterserverIOHandler.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
#include <Common/ActionBlocker.h>
44
#include <Common/Exception.h>
55
#include <Common/SharedMutex.h>
6+
#include <IO/ReadBuffer.h>
67
#include <base/types.h>
78

89
#include <map>
@@ -34,7 +35,7 @@ class InterserverIOEndpoint
3435
{
3536
public:
3637
virtual std::string getId(const std::string & path) const = 0;
37-
virtual void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) = 0;
38+
virtual void processQuery(const HTMLForm & params, ReadBufferPtr body, WriteBuffer & out, HTTPServerResponse & response) = 0;
3839
virtual ~InterserverIOEndpoint() = default;
3940

4041
/// You need to stop the data transfer if blocker is activated.

src/Server/InterserverIOHTTPHandler.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer
6161
String endpoint_name = params.get("endpoint");
6262
bool compress = params.get("compress") == "true";
6363

64-
auto input_stream_with_body = request.getStream();
65-
6664
auto endpoint = server.context()->getInterserverIOHandler().getEndpoint(endpoint_name);
6765
/// Locked for read while query processing
6866
std::shared_lock lock(endpoint->rwlock);
@@ -72,13 +70,15 @@ void InterserverIOHTTPHandler::processQuery(HTTPServerRequest & request, HTTPSer
7270
if (compress)
7371
{
7472
CompressedWriteBuffer compressed_out(*output);
75-
endpoint->processQuery(params, *input_stream_with_body, compressed_out, response);
73+
endpoint->processQuery(params, request.getStream(), compressed_out, response);
7674
compressed_out.finalize();
7775
}
7876
else
7977
{
80-
endpoint->processQuery(params, *input_stream_with_body, *output, response);
78+
endpoint->processQuery(params, request.getStream(), *output, response);
8179
}
80+
/// Make sure that request stream is not used after this function.
81+
assert(request.getStream().use_count() == 2);
8282
}
8383

8484

src/Storages/MergeTree/DataPartsExchange.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,11 @@ std::string Service::getId(const std::string & node_id) const
123123
return getEndpointId(node_id);
124124
}
125125

126-
void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, WriteBuffer & out, HTTPServerResponse & response)
126+
void Service::processQuery(const HTMLForm & params, ReadBufferPtr body, WriteBuffer & out, HTTPServerResponse & response)
127127
{
128+
// nothing to read from body
129+
body.reset();
130+
128131
int client_protocol_version = parse<int>(params.get("client_protocol_version", "0"));
129132

130133
String part_name = params.get("part");

src/Storages/MergeTree/DataPartsExchange.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
#include <IO/ConnectionTimeouts.h>
1010
#include <Common/Throttler.h>
1111
#include <Common/ActionBlocker.h>
12+
#include <IO/ReadBuffer.h>
1213

1314

1415
namespace zkutil
@@ -37,7 +38,7 @@ class Service final : public InterserverIOEndpoint
3738
Service & operator=(const Service &) = delete;
3839

3940
std::string getId(const std::string & node_id) const override;
40-
void processQuery(const HTMLForm & params, ReadBuffer & body, WriteBuffer & out, HTTPServerResponse & response) override;
41+
void processQuery(const HTMLForm & params, ReadBufferPtr body, WriteBuffer & out, HTTPServerResponse & response) override;
4142

4243
private:
4344
MergeTreeData::DataPartPtr findPart(const String & name);

0 commit comments

Comments
 (0)