Skip to content

Commit 52e4457

Browse files
committed
Fix query finish time (now it does not take into account query_finish_callback)
1 parent 7ef93b5 commit 52e4457

File tree

3 files changed

+38
-13
lines changed

3 files changed

+38
-13
lines changed

src/Interpreters/executeQuery.cpp

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -569,31 +569,31 @@ static ResultProgress flushQueryProgress(const QueryPipeline & pipeline, bool pu
569569
return res;
570570
}
571571

572-
void logQueryFinish(
572+
void logQueryFinishImpl(
573573
QueryLogElement & elem,
574574
const ContextMutablePtr & context,
575575
const ASTPtr & query_ast,
576576
const QueryPipeline & query_pipeline,
577577
bool pulling_pipeline,
578578
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
579579
QueryResultCacheUsage query_result_cache_usage,
580-
bool internal)
580+
bool internal,
581+
std::chrono::system_clock::time_point time)
581582
{
582583
const Settings & settings = context->getSettingsRef();
583584
auto log_queries = settings[Setting::log_queries] && !internal;
584585

585-
const auto time_now = std::chrono::system_clock::now();
586586
QueryStatusPtr process_list_elem = context->getProcessListElement();
587587
if (process_list_elem)
588588
{
589589
/// Update performance counters before logging to query_log
590590
CurrentThread::finalizePerformanceCounters();
591591

592592
QueryStatusInfo info = process_list_elem->getInfo(true, settings[Setting::log_profile_events]);
593-
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
593+
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time, std::make_shared<QueryStatusInfo>(info));
594594
elem.type = QueryLogElementType::QUERY_FINISH;
595595

596-
addStatusInfoToQueryLogElement(elem, info, query_ast, context, time_now);
596+
addStatusInfoToQueryLogElement(elem, info, query_ast, context, time);
597597

598598
auto result_progress = flushQueryProgress(query_pipeline, pulling_pipeline, context->getProgressCallback(), process_list_elem);
599599
elem.result_rows = result_progress.result_rows;
@@ -651,10 +651,24 @@ void logQueryFinish(
651651
query_span->addAttribute(fmt::format("clickhouse.setting.{}", change.name), convertFieldToString(change.value));
652652
}
653653
}
654-
query_span->finish(time_now);
654+
query_span->finish(time);
655655
}
656656
}
657657

658+
void logQueryFinish(
659+
QueryLogElement & elem,
660+
const ContextMutablePtr & context,
661+
const ASTPtr & query_ast,
662+
const QueryPipeline & query_pipeline,
663+
bool pulling_pipeline,
664+
std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
665+
QueryResultCacheUsage query_result_cache_usage,
666+
bool internal)
667+
{
668+
const auto time_now = std::chrono::system_clock::now();
669+
logQueryFinishImpl(elem, context, query_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal, time_now);
670+
}
671+
658672
void logQueryException(
659673
QueryLogElement & elem,
660674
const ContextMutablePtr & context,
@@ -1629,14 +1643,14 @@ static BlockIO executeQueryImpl(
16291643
execute_implicit_tcl_query,
16301644
// Need to be cached, since will be changed after complete()
16311645
pulling_pipeline = pipeline.pulling(),
1632-
query_span](QueryPipeline & query_pipeline) mutable
1646+
query_span](QueryPipeline & query_pipeline, std::chrono::system_clock::time_point finish_time) mutable
16331647
{
16341648
if (query_result_cache_usage == QueryResultCacheUsage::Write)
16351649
/// Trigger the actual write of the buffered query result into the query result cache. This is done explicitly to
16361650
/// prevent partial/garbage results in case of exceptions during query execution.
16371651
query_pipeline.finalizeWriteInQueryResultCache();
16381652

1639-
logQueryFinish(elem, context, out_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal);
1653+
logQueryFinishImpl(elem, context, out_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal, finish_time);
16401654

16411655
if (*implicit_txn_control)
16421656
execute_implicit_tcl_query(context, ASTTransactionControl::COMMIT);
@@ -1987,6 +2001,17 @@ void executeQuery(
19872001
throw;
19882002
}
19892003

2004+
/// The order is important here:
2005+
/// - first we save the finish_time that will be used for the entry in query_log/opentelemetry_span_log.finish_time_us
2006+
/// - then we flush the progress (to flush result_rows/result_bytes)
2007+
/// - then call the query_finish_callback() - right now the only purpose is to flush the data over HTTP
2008+
/// - then call onFinish() that will create entry in query_log/opentelemetry_span_log
2009+
///
2010+
/// That way we have:
2011+
/// - correct finish time of the query (regardless of how long does the query_finish_callback() takes)
2012+
/// - correct progress for HTTP (X-ClickHouse-Summary requires result_rows/result_bytes)
2013+
/// - correct NetworkSendElapsedMicroseconds/NetworkSendBytes in query_log
2014+
const auto finish_time = std::chrono::system_clock::now();
19902015
std::exception_ptr exception_ptr;
19912016
if (query_finish_callback)
19922017
{
@@ -2003,7 +2028,7 @@ void executeQuery(
20032028
}
20042029
}
20052030

2006-
streams.onFinish();
2031+
streams.onFinish(finish_time);
20072032

20082033
if (exception_ptr)
20092034
std::rethrow_exception(exception_ptr);

src/QueryPipeline/BlockIO.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,10 @@ BlockIO::~BlockIO()
4646
reset();
4747
}
4848

49-
void BlockIO::onFinish()
49+
void BlockIO::onFinish(std::chrono::system_clock::time_point finish_time)
5050
{
5151
if (finish_callback)
52-
finish_callback(pipeline);
52+
finish_callback(pipeline, finish_time);
5353

5454
pipeline.reset();
5555
}

src/QueryPipeline/BlockIO.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,13 +25,13 @@ struct BlockIO
2525
QueryPipeline pipeline;
2626

2727
/// Callbacks for query logging could be set here.
28-
std::function<void(QueryPipeline &)> finish_callback;
28+
std::function<void(QueryPipeline &, std::chrono::system_clock::time_point)> finish_callback;
2929
std::function<void(bool)> exception_callback;
3030

3131
/// When it is true, don't bother sending any non-empty blocks to the out stream
3232
bool null_format = false;
3333

34-
void onFinish();
34+
void onFinish(std::chrono::system_clock::time_point finish_time = std::chrono::system_clock::now());
3535
void onException(bool log_as_error=true);
3636
void onCancelOrConnectionLoss();
3737

0 commit comments

Comments
 (0)