Skip to content

Commit 5ce1364

Browse files
authored
Merge pull request ClickHouse#79540 from ClickHouse/chesema-fix-02789
fix test 02789_reading_from_s3_with_connection_pool
2 parents 95abb81 + f7213de commit 5ce1364

File tree

9 files changed

+52
-45
lines changed

9 files changed

+52
-45
lines changed

src/Interpreters/AsynchronousInsertQueue.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -946,7 +946,7 @@ try
946946
throw;
947947
}
948948

949-
auto finish_entries = [&](size_t num_rows, size_t num_bytes)
949+
auto finish_entries = [&](QueryPipeline && pileline_, size_t num_rows, size_t num_bytes)
950950
{
951951
for (const auto & entry : data->entries)
952952
{
@@ -963,7 +963,7 @@ try
963963
LOG_DEBUG(log, "Flushed {} rows, {} bytes for query '{}'", num_rows, num_bytes, key.query_str);
964964
queue_shard_flush_time_history.updateWithCurrentTime();
965965

966-
logQueryFinish(query_log_elem, insert_context, key.query, pipeline, /*pulling_pipeline=*/false, query_span, QueryResultCacheUsage::None, internal);
966+
logQueryFinish(query_log_elem, insert_context, key.query, std::move(pileline_), /*pulling_pipeline=*/false, query_span, QueryResultCacheUsage::None, internal);
967967
};
968968

969969
try
@@ -980,8 +980,8 @@ try
980980

981981
if (chunk.getNumRows() == 0)
982982
{
983-
finish_entries(/*num_rows=*/ 0, /*num_bytes=*/ 0);
984-
pipeline.cancel();
983+
pipeline.cancel(); // this just cancels the processors
984+
finish_entries(std::move(pipeline), /*num_rows=*/ 0, /*num_bytes=*/ 0);
985985
return;
986986
}
987987

@@ -994,7 +994,7 @@ try
994994
CompletedPipelineExecutor completed_executor(pipeline);
995995
completed_executor.execute();
996996

997-
finish_entries(num_rows, num_bytes);
997+
finish_entries(std::move(pipeline), num_rows, num_bytes);
998998
}
999999
catch (...)
10001000
{

src/Interpreters/executeQuery.cpp

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,7 @@ void logQueryFinishImpl(
573573
QueryLogElement & elem,
574574
const ContextMutablePtr & context,
575575
const ASTPtr & query_ast,
576-
const QueryPipeline & query_pipeline,
576+
QueryPipeline && query_pipeline,
577577
bool pulling_pipeline,
578578
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
579579
QueryResultCacheUsage query_result_cache_usage,
@@ -586,16 +586,24 @@ void logQueryFinishImpl(
586586
QueryStatusPtr process_list_elem = context->getProcessListElement();
587587
if (process_list_elem)
588588
{
589+
590+
logProcessorProfile(context, query_pipeline.getProcessors());
591+
592+
auto result_progress = flushQueryProgress(query_pipeline, pulling_pipeline, context->getProgressCallback(), process_list_elem);
593+
594+
/// Reset pipeline before fetching profile counters
595+
query_pipeline.reset();
596+
589597
/// Update performance counters before logging to query_log
590598
CurrentThread::finalizePerformanceCounters();
591599

592600
QueryStatusInfo info = process_list_elem->getInfo(true, settings[Setting::log_profile_events]);
593601
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time, std::make_shared<QueryStatusInfo>(info));
602+
594603
elem.type = QueryLogElementType::QUERY_FINISH;
595604

596605
addStatusInfoToQueryLogElement(elem, info, query_ast, context, time);
597606

598-
auto result_progress = flushQueryProgress(query_pipeline, pulling_pipeline, context->getProgressCallback(), process_list_elem);
599607
elem.result_rows = result_progress.result_rows;
600608
elem.result_bytes = result_progress.result_bytes;
601609

@@ -622,7 +630,6 @@ void logQueryFinishImpl(
622630
query_log->add(elem);
623631
}
624632

625-
logProcessorProfile(context, query_pipeline.getProcessors());
626633
}
627634

628635
if (query_span)
@@ -659,14 +666,14 @@ void logQueryFinish(
659666
QueryLogElement & elem,
660667
const ContextMutablePtr & context,
661668
const ASTPtr & query_ast,
662-
const QueryPipeline & query_pipeline,
669+
QueryPipeline && query_pipeline,
663670
bool pulling_pipeline,
664671
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
665672
QueryResultCacheUsage query_result_cache_usage,
666673
bool internal)
667674
{
668675
const auto time_now = std::chrono::system_clock::now();
669-
logQueryFinishImpl(elem, context, query_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal, time_now);
676+
logQueryFinishImpl(elem, context, query_ast, std::move(query_pipeline), pulling_pipeline, query_span, query_result_cache_usage, internal, time_now);
670677
}
671678

672679
void logQueryException(
@@ -1643,14 +1650,14 @@ static BlockIO executeQueryImpl(
16431650
execute_implicit_tcl_query,
16441651
// Need to be cached, since will be changed after complete()
16451652
pulling_pipeline = pipeline.pulling(),
1646-
query_span](QueryPipeline & query_pipeline, std::chrono::system_clock::time_point finish_time) mutable
1653+
query_span](QueryPipeline && query_pipeline, std::chrono::system_clock::time_point finish_time) mutable
16471654
{
16481655
if (query_result_cache_usage == QueryResultCacheUsage::Write)
16491656
/// Trigger the actual write of the buffered query result into the query result cache. This is done explicitly to
16501657
/// prevent partial/garbage results in case of exceptions during query execution.
16511658
query_pipeline.finalizeWriteInQueryResultCache();
16521659

1653-
logQueryFinishImpl(elem, context, out_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal, finish_time);
1660+
logQueryFinishImpl(elem, context, out_ast, std::move(query_pipeline), pulling_pipeline, query_span, query_result_cache_usage, internal, finish_time);
16541661

16551662
if (*implicit_txn_control)
16561663
execute_implicit_tcl_query(context, ASTTransactionControl::COMMIT);

src/Interpreters/executeQuery.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ void logQueryFinish(
9393
QueryLogElement & elem,
9494
const ContextMutablePtr & context,
9595
const ASTPtr & query_ast,
96-
const QueryPipeline & query_pipeline,
96+
QueryPipeline && query_pipeline,
9797
bool pulling_pipeline,
9898
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
9999
QueryResultCacheUsage query_result_cache_usage,

src/QueryPipeline/BlockIO.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,9 @@ BlockIO::~BlockIO()
4949
void BlockIO::onFinish(std::chrono::system_clock::time_point finish_time)
5050
{
5151
if (finish_callback)
52-
finish_callback(pipeline, finish_time);
53-
54-
pipeline.reset();
52+
finish_callback(std::move(pipeline), finish_time);
53+
else
54+
pipeline.reset();
5555
}
5656

5757
void BlockIO::onException(bool log_as_error)

src/QueryPipeline/BlockIO.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ struct BlockIO
2525
QueryPipeline pipeline;
2626

2727
/// Callbacks for query logging could be set here.
28-
std::function<void(QueryPipeline &, std::chrono::system_clock::time_point)> finish_callback;
28+
std::function<void(QueryPipeline &&, std::chrono::system_clock::time_point)> finish_callback;
2929
std::function<void(bool)> exception_callback;
3030

3131
/// When it is true, don't bother sending any non-empty blocks to the out stream

src/Storages/MaterializedView/RefreshTask.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ std::optional<UUID> RefreshTask::executeRefreshUnlocked(bool append, int32_t roo
655655
if (execution.interrupt_execution.load())
656656
throw Exception(ErrorCodes::QUERY_WAS_CANCELLED, "Refresh for view {} cancelled", view_storage_id.getFullTableName());
657657

658-
logQueryFinish(*query_log_elem, refresh_context, refresh_query, pipeline, /*pulling_pipeline=*/false, query_span, QueryResultCacheUsage::None, /*internal=*/false);
658+
logQueryFinish(*query_log_elem, refresh_context, refresh_query, std::move(pipeline), /*pulling_pipeline=*/false, query_span, QueryResultCacheUsage::None, /*internal=*/false);
659659
query_log_elem = std::nullopt;
660660
query_span = nullptr;
661661
process_list_entry.reset(); // otherwise it needs to be alive for logQueryException

src/Storages/StorageURL.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -421,7 +421,7 @@ StorageURLSource::StorageURLSource(
421421
if (need_only_count)
422422
input_format->needOnlyCount();
423423

424-
builder.init(Pipe(input_format));
424+
builder.init(Pipe(input_format));
425425

426426
if (columns_description.hasDefaults())
427427
{
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
1
2-
1
1+
DiskConnectionsPreserved 1
2+
StorageConnectionsPreserved 1
Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
#!/usr/bin/env bash
2-
# Tags: no-fasttest, no-random-settings, no-replicated-database, no-distributed-cache
2+
# Tags: no-fasttest, no-random-settings, no-replicated-database, no-distributed-cache, no-parallel-replicas
3+
4+
# no-fasttest -- test uses s3_dick
5+
# no-parallel-replicas -- do not run url functions as StorageURLCluster
36

47
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
58
# shellcheck source=../shell_config.sh
@@ -12,14 +15,14 @@ CREATE TABLE test_s3 (a UInt64, b UInt64)
1215
ENGINE = MergeTree ORDER BY a
1316
SETTINGS disk = 's3_disk', min_bytes_for_wide_part = 0;
1417
15-
INSERT INTO test_s3 SELECT number, number FROM numbers_mt(1e7);
18+
INSERT INTO test_s3 SELECT number, number FROM numbers_mt(1);
1619
"
1720

1821
# This (reusing connections from the pool) is not guaranteed to always happen,
1922
# (due to random time difference between the queries and random activity in parallel)
2023
# but should happen most of the time.
2124

22-
while true
25+
for _ in {0..9}
2326
do
2427
query="SELECT a, b FROM test_s3"
2528
query_id=$(${CLICKHOUSE_CLIENT} --query "select queryID() from ($query) limit 1" 2>&1)
@@ -33,34 +36,31 @@ do
3336
AND query_id='$query_id';
3437
")
3538

36-
[[ $RES -eq 1 ]] && echo "$RES" && break;
39+
[[ $RES -eq 1 ]] && echo "DiskConnectionsPreserved $RES" && break;
3740
done
3841

3942

4043
# Test connection pool in ReadWriteBufferFromHTTP
4144

42-
while true
45+
# LIMIT 1 here is important part
46+
# processor StorageURLSource releases the HTTP session either when all data is fully read in the last generate call or at d-tor of the processor instance
47+
# with LIMIT 1 the HTTP session is released at pipeline d-tor because not all the data is read from HTTP connection inside StorageURLSource processor
48+
# this tests covers the case when profile events have to be gathered and logged to the query_log only after pipeline is destroyed
49+
50+
for _ in {0..9}
4351
do
4452
query_id=$(${CLICKHOUSE_CLIENT} -q "
45-
create table if not exists mut (n int, m int, k int) engine=ReplicatedMergeTree('/test/02441/{database}/mut', '1') order by n;
46-
set insert_keeper_fault_injection_probability=0;
47-
set parallel_replicas_for_cluster_engines=0;
48-
insert into mut values (1, 2, 3), (10, 20, 30);
49-
50-
system stop merges mut;
51-
alter table mut delete where n = 10;
52-
53-
select queryID() from(
54-
-- a funny way to wait for a MUTATE_PART to be assigned
55-
select sleepEachRow(2) from url('http://localhost:8123/?param_tries={1..10}&query=' || encodeURLComponent(
56-
'select 1 where ''MUTATE_PART'' not in (select type from system.replication_queue where database=''' || currentDatabase() || ''' and table=''mut'')'
57-
), 'LineAsString', 's String')
58-
-- queryID() will be returned for each row, since the query above doesn't return anything we need to return a fake row
59-
union all
60-
select 1
61-
) limit 1 settings max_threads=1, http_make_head_request=0;
62-
" 2>&1)
53+
SELECT queryID() FROM(
54+
SELECT *
55+
FROM url(
56+
'http://localhost:8123/?query=' || encodeURLComponent('select 1'),
57+
'LineAsString',
58+
's String')
59+
) LIMIT 1 SETTINGS max_threads=1, http_make_head_request=0;
60+
")
61+
6362
${CLICKHOUSE_CLIENT} --query "SYSTEM FLUSH LOGS query_log"
63+
6464
RES=$(${CLICKHOUSE_CLIENT} -m --query "
6565
SELECT ProfileEvents['StorageConnectionsPreserved'] > 0
6666
FROM system.query_log
@@ -69,5 +69,5 @@ do
6969
AND query_id='$query_id';
7070
")
7171

72-
[[ $RES -eq 1 ]] && echo "$RES" && break;
72+
[[ $RES -eq 1 ]] && echo "StorageConnectionsPreserved $RES" && break;
7373
done

0 commit comments

Comments
 (0)