Skip to content

Commit 456536b

Browse files
Merge pull request ClickHouse#86875 from ClickHouse/backport/25.8/86606
Backport ClickHouse#86606 to 25.8: Better fix for ShellCommandSource
2 parents 79d44e0 + 3623236 commit 456536b

File tree

8 files changed

+50
-44
lines changed

8 files changed

+50
-44
lines changed

src/Processors/Formats/IOutputFormat.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,11 @@ class IOutputFormat : public IProcessor
107107
progress_write_frequency_us = value;
108108
}
109109

110+
/// Derived classes can use some wrappers around out WriteBuffer
111+
/// and can override this method to return wrapper
112+
/// that should be used in its derived classes.
113+
virtual WriteBuffer * getWriteBufferPtr() { return &out; }
114+
110115
protected:
111116
friend class ParallelFormattingOutputFormat;
112117

@@ -182,11 +187,6 @@ class IOutputFormat : public IProcessor
182187
/// outputs them in finalize() method.
183188
virtual bool areTotalsAndExtremesUsedInFinalize() const { return false; }
184189

185-
/// Derived classes can use some wrappers around out WriteBuffer
186-
/// and can override this method to return wrapper
187-
/// that should be used in its derived classes.
188-
virtual WriteBuffer * getWriteBufferPtr() { return &out; }
189-
190190
WriteBuffer & out;
191191

192192
Chunk current_chunk;

src/Processors/Formats/Impl/JSONEachRowRowOutputFormat.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
#pragma once
22

3-
#include <IO/PeekableWriteBuffer.h>
43
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
54
#include <Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h>
65
#include <Formats/FormatSettings.h>

src/Processors/Formats/Impl/JSONRowOutputFormat.h

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
#pragma once
22

33
#include <IO/Progress.h>
4-
#include <IO/PeekableWriteBuffer.h>
54
#include <Common/Stopwatch.h>
65
#include <Processors/Formats/OutputFormatWithUTF8ValidationAdaptor.h>
76
#include <Processors/Formats/RowOutputFormatWithExceptionHandlerAdaptor.h>

src/Processors/Sources/ShellCommandSource.cpp

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -566,7 +566,7 @@ namespace
566566
class SendingChunkHeaderTransform final : public ISimpleTransform
567567
{
568568
public:
569-
SendingChunkHeaderTransform(SharedHeader header, std::shared_ptr<TimeoutWriteBufferFromFileDescriptor> buffer_)
569+
SendingChunkHeaderTransform(SharedHeader header, WriteBuffer & buffer_)
570570
: ISimpleTransform(header, header, false)
571571
, buffer(buffer_)
572572
{
@@ -578,12 +578,12 @@ namespace
578578

579579
void transform(Chunk & chunk) override
580580
{
581-
writeText(chunk.getNumRows(), *buffer);
582-
writeChar('\n', *buffer);
581+
writeText(chunk.getNumRows(), buffer);
582+
writeChar('\n', buffer);
583583
}
584584

585585
private:
586-
std::shared_ptr<TimeoutWriteBufferFromFileDescriptor> buffer;
586+
WriteBuffer & buffer;
587587
};
588588

589589
}
@@ -677,17 +677,19 @@ Pipe ShellCommandSourceCoordinator::createPipe(
677677

678678
input_pipes[i].resize(1);
679679

680+
auto out = context->getOutputFormat(configuration.format, *timeout_write_buffer, materializeBlock(input_pipes[i].getHeader()));
681+
out->setAutoFlush();
682+
680683
if (configuration.send_chunk_header)
681684
{
682-
auto transform = std::make_shared<SendingChunkHeaderTransform>(input_pipes[i].getSharedHeader(), timeout_write_buffer);
685+
/// We cannot use timeout_write_buffer directly since the output format may wrap the buffer, so we need to use a wrapper
686+
auto transform = std::make_shared<SendingChunkHeaderTransform>(input_pipes[i].getSharedHeader(), *out->getWriteBufferPtr());
683687
input_pipes[i].addTransform(std::move(transform));
684688
}
685689

686690
auto num_streams = input_pipes[i].maxParallelStreams();
687691
auto pipeline = std::make_shared<QueryPipeline>(std::move(input_pipes[i]));
688692
pipeline->setNumThreads(num_streams);
689-
auto out = context->getOutputFormat(configuration.format, *timeout_write_buffer, materializeBlock(pipeline->getHeader()));
690-
out->setAutoFlush();
691693
pipeline->complete(std::move(out));
692694

693695
ShellCommandSource::SendDataTask task = [pipeline, timeout_write_buffer, write_buffer, is_executable_pool]()

tests/integration/test_executable_user_defined_function/functions/test_function_config.xml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,25 @@
4848
<name>test_function_send_chunk_header_python</name>
4949
<return_type>String</return_type>
5050
<argument>
51+
<name>number</name>
5152
<type>UInt64</type>
5253
</argument>
53-
<format>TabSeparated</format>
54+
<format>JSONEachRow</format>
5455
<send_chunk_header>1</send_chunk_header>
55-
<command>input_chunk_header.py</command>
56+
<command>input_chunk_header_json.py</command>
5657
</function>
5758

5859
<function>
5960
<type>executable_pool</type>
6061
<name>test_function_send_chunk_header_pool_python</name>
6162
<return_type>String</return_type>
6263
<argument>
64+
<name>number</name>
6365
<type>UInt64</type>
6466
</argument>
65-
<format>TabSeparated</format>
67+
<format>JSONEachRow</format>
6668
<send_chunk_header>1</send_chunk_header>
67-
<command>input_chunk_header.py</command>
69+
<command>input_chunk_header_json.py</command>
6870
</function>
6971

7072
<function>

tests/integration/test_executable_user_defined_function/test.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,23 @@ def test_executable_function_python(started_cluster):
8181

8282
def test_executable_function_send_chunk_header_python(started_cluster):
8383
skip_test_msan(node)
84-
assert (
85-
node.query("SELECT test_function_send_chunk_header_python(toUInt64(1))")
86-
== "Key 1\n"
87-
)
88-
assert node.query("SELECT test_function_send_chunk_header_python(1)") == "Key 1\n"
8984

90-
assert (
91-
node.query("SELECT test_function_send_chunk_header_pool_python(toUInt64(1))")
92-
== "Key 1\n"
93-
)
94-
assert (
95-
node.query("SELECT test_function_send_chunk_header_pool_python(1)") == "Key 1\n"
96-
)
85+
for function_name in [
86+
"test_function_send_chunk_header_python",
87+
"test_function_send_chunk_header_pool_python",
88+
]:
89+
assert node.query(f"SELECT {function_name}(toUInt64(1))") == "Key 1\n"
90+
assert node.query(f"SELECT {function_name}(1)") == "Key 1\n"
91+
92+
assert node.query(f"SELECT {function_name}(toUInt64(1))") == "Key 1\n"
93+
assert node.query(f"SELECT {function_name}(1)") == "Key 1\n"
94+
95+
# Test specifically HTTP protocol
96+
# This ensures that http_write_exception_in_output_format works as expected
97+
assert node.http_query(
98+
f"SELECT {function_name}(number) FROM numbers(10)",
99+
params={"max_block_size": 3, "http_write_exception_in_output_format": True},
100+
) == "".join(f"Key {i}\n" for i in range(10))
97101

98102

99103
def test_executable_function_sum_python(started_cluster):

tests/integration/test_executable_user_defined_function/user_scripts/input_chunk_header.py

Lines changed: 0 additions & 14 deletions
This file was deleted.
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
#!/usr/bin/python3
2+
3+
import sys
4+
import json
5+
6+
if __name__ == "__main__":
7+
for chunk_header in sys.stdin:
8+
chunk_length = int(chunk_header)
9+
10+
for _ in range(chunk_length):
11+
row = json.loads(sys.stdin.readline().strip())
12+
print(f'{{"result": "Key {row["number"]}"}}')
13+
14+
sys.stdout.flush()

0 commit comments

Comments
 (0)