8080
8181#include < Poco/Net/SocketAddress.h>
8282
83+ #include < exception>
8384#include < memory>
8485#include < random>
8586
@@ -328,12 +329,11 @@ addPrivilegesInfoToQueryLogElement(QueryLogElement & element, const ContextPtr c
328329}
329330
330331static void
331- addStatusInfoToQueryLogElement (QueryLogElement & element, const QueryStatusInfo & info, const ASTPtr query_ast, const ContextPtr context_ptr)
332+ addStatusInfoToQueryLogElement (QueryLogElement & element, const QueryStatusInfo & info, const ASTPtr query_ast, const ContextPtr context_ptr, std::chrono::system_clock::time_point time )
332333{
333- const auto time_now = std::chrono::system_clock::now ();
334334 UInt64 elapsed_microseconds = info.elapsed_microseconds ;
335- element.event_time = timeInSeconds (time_now );
336- element.event_time_microseconds = timeInMicroseconds (time_now );
335+ element.event_time = timeInSeconds (time );
336+ element.event_time_microseconds = timeInMicroseconds (time );
337337 element.query_duration_ms = elapsed_microseconds / 1000 ;
338338
339339 ProfileEvents::increment (ProfileEvents::QueryTimeMicroseconds, elapsed_microseconds);
@@ -544,15 +544,41 @@ void logQueryMetricLogFinish(ContextPtr context, bool internal, String query_id,
544544 }
545545}
546546
547- void logQueryFinish (
547+ static ResultProgress flushQueryProgress (const QueryPipeline & pipeline, bool pulling_pipeline, const ProgressCallback & progress_callback, QueryStatusPtr process_list_elem)
548+ {
549+ ResultProgress res (0 , 0 );
550+
551+ if (pulling_pipeline)
552+ {
553+ pipeline.tryGetResultRowsAndBytes (res.result_rows , res.result_bytes );
554+ }
555+ else if (process_list_elem) // / will be used only for ordinary INSERT queries
556+ {
557+ auto progress_out = process_list_elem->getProgressOut ();
558+ res.result_rows = progress_out.written_rows ;
559+ res.result_bytes = progress_out.written_bytes ;
560+ }
561+
562+ if (progress_callback)
563+ {
564+ Progress p;
565+ p.incrementPiecewiseAtomically (Progress{res});
566+ progress_callback (p);
567+ }
568+
569+ return res;
570+ }
571+
572+ void logQueryFinishImpl (
548573 QueryLogElement & elem,
549574 const ContextMutablePtr & context,
550575 const ASTPtr & query_ast,
551576 const QueryPipeline & query_pipeline,
552577 bool pulling_pipeline,
553578 std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
554579 QueryResultCacheUsage query_result_cache_usage,
555- bool internal)
580+ bool internal,
581+ std::chrono::system_clock::time_point time)
556582{
557583 const Settings & settings = context->getSettingsRef ();
558584 auto log_queries = settings[Setting::log_queries] && !internal;
@@ -563,31 +589,15 @@ void logQueryFinish(
563589 // / Update performance counters before logging to query_log
564590 CurrentThread::finalizePerformanceCounters ();
565591
566- auto time_now = std::chrono::system_clock::now ();
567592 QueryStatusInfo info = process_list_elem->getInfo (true , settings[Setting::log_profile_events]);
568- logQueryMetricLogFinish (context, internal, elem.client_info .current_query_id , time_now , std::make_shared<QueryStatusInfo>(info));
593+ logQueryMetricLogFinish (context, internal, elem.client_info .current_query_id , time , std::make_shared<QueryStatusInfo>(info));
569594 elem.type = QueryLogElementType::QUERY_FINISH;
570595
571- addStatusInfoToQueryLogElement (elem, info, query_ast, context);
572-
573- if (pulling_pipeline)
574- {
575- query_pipeline.tryGetResultRowsAndBytes (elem.result_rows , elem.result_bytes );
576- }
577- else // / will be used only for ordinary INSERT queries
578- {
579- auto progress_out = process_list_elem->getProgressOut ();
580- elem.result_rows = progress_out.written_rows ;
581- elem.result_bytes = progress_out.written_bytes ;
582- }
596+ addStatusInfoToQueryLogElement (elem, info, query_ast, context, time);
583597
584- auto progress_callback = context->getProgressCallback ();
585- if (progress_callback)
586- {
587- Progress p;
588- p.incrementPiecewiseAtomically (Progress{ResultProgress{elem.result_rows , elem.result_bytes }});
589- progress_callback (p);
590- }
598+ auto result_progress = flushQueryProgress (query_pipeline, pulling_pipeline, context->getProgressCallback (), process_list_elem);
599+ elem.result_rows = result_progress.result_rows ;
600+ elem.result_bytes = result_progress.result_bytes ;
591601
592602 if (elem.read_rows != 0 )
593603 {
@@ -641,10 +651,24 @@ void logQueryFinish(
641651 query_span->addAttribute (fmt::format (" clickhouse.setting.{}" , change.name ), convertFieldToString (change.value ));
642652 }
643653 }
644- query_span->finish ();
654+ query_span->finish (time );
645655 }
646656}
647657
658+ void logQueryFinish (
659+ QueryLogElement & elem,
660+ const ContextMutablePtr & context,
661+ const ASTPtr & query_ast,
662+ const QueryPipeline & query_pipeline,
663+ bool pulling_pipeline,
664+ std::shared_ptr<OpenTelemetry::SpanHolder> query_span,
665+ QueryResultCacheUsage query_result_cache_usage,
666+ bool internal)
667+ {
668+ const auto time_now = std::chrono::system_clock::now ();
669+ logQueryFinishImpl (elem, context, query_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal, time_now);
670+ }
671+
648672void logQueryException (
649673 QueryLogElement & elem,
650674 const ContextMutablePtr & context,
@@ -676,7 +700,7 @@ void logQueryException(
676700 if (process_list_elem)
677701 {
678702 info = std::make_shared<QueryStatusInfo>(process_list_elem->getInfo (true , settings[Setting::log_profile_events], false ));
679- addStatusInfoToQueryLogElement (elem, *info, query_ast, context);
703+ addStatusInfoToQueryLogElement (elem, *info, query_ast, context, time_now );
680704 }
681705 else
682706 {
@@ -710,7 +734,7 @@ void logQueryException(
710734 query_span->addAttribute (" clickhouse.query_id" , elem.client_info .current_query_id );
711735 query_span->addAttribute (" clickhouse.exception" , elem.exception );
712736 query_span->addAttribute (" clickhouse.exception_code" , elem.exception_code );
713- query_span->finish ();
737+ query_span->finish (time_now );
714738 }
715739}
716740
@@ -832,7 +856,7 @@ void logExceptionBeforeStart(
832856 query_span->addAttribute (" clickhouse.exception" , elem.exception );
833857 query_span->addAttribute (" db.statement" , elem.query );
834858 query_span->addAttribute (" clickhouse.query_id" , elem.client_info .current_query_id );
835- query_span->finish ();
859+ query_span->finish (query_end_time );
836860 }
837861
838862 ProfileEvents::increment (ProfileEvents::FailedQuery);
@@ -1617,15 +1641,16 @@ static BlockIO executeQueryImpl(
16171641 internal,
16181642 implicit_txn_control,
16191643 execute_implicit_tcl_query,
1644+ // Need to be cached, since will be changed after complete()
16201645 pulling_pipeline = pipeline.pulling (),
1621- query_span](QueryPipeline & query_pipeline) mutable
1646+ query_span](QueryPipeline & query_pipeline, std::chrono::system_clock::time_point finish_time ) mutable
16221647 {
16231648 if (query_result_cache_usage == QueryResultCacheUsage::Write)
16241649 // / Trigger the actual write of the buffered query result into the query result cache. This is done explicitly to
16251650 // / prevent partial/garbage results in case of exceptions during query execution.
16261651 query_pipeline.finalizeWriteInQueryResultCache ();
16271652
1628- logQueryFinish (elem, context, out_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal);
1653+ logQueryFinishImpl (elem, context, out_ast, query_pipeline, pulling_pipeline, query_span, query_result_cache_usage, internal, finish_time );
16291654
16301655 if (*implicit_txn_control)
16311656 execute_implicit_tcl_query (context, ASTTransactionControl::COMMIT);
@@ -1702,7 +1727,8 @@ void executeQuery(
17021727 SetResultDetailsFunc set_result_details,
17031728 QueryFlags flags,
17041729 const std::optional<FormatSettings> & output_format_settings,
1705- HandleExceptionInOutputFormatFunc handle_exception_in_output_format)
1730+ HandleExceptionInOutputFormatFunc handle_exception_in_output_format,
1731+ QueryFinishCallback query_finish_callback)
17061732{
17071733 PODArray<char > parse_buf;
17081734 const char * begin;
@@ -1869,6 +1895,7 @@ void executeQuery(
18691895 }
18701896
18711897 auto & pipeline = streams.pipeline ;
1898+ bool pulling_pipeline = pipeline.pulling ();
18721899
18731900 std::unique_ptr<WriteBuffer> compressed_buffer;
18741901 try
@@ -1974,7 +2001,37 @@ void executeQuery(
19742001 throw ;
19752002 }
19762003
1977- streams.onFinish ();
2004+ // / The order is important here:
2005+ // / - first we save the finish_time that will be used for the entry in query_log/opentelemetry_span_log.finish_time_us
2006+ // / - then we flush the progress (to flush result_rows/result_bytes)
2007+ // / - then call the query_finish_callback() - right now the only purpose is to flush the data over HTTP
2008+ // / - then call onFinish() that will create entry in query_log/opentelemetry_span_log
2009+ // /
2010+ // / That way we have:
2011+ // / - correct finish time of the query (regardless of how long does the query_finish_callback() takes)
2012+ // / - correct progress for HTTP (X-ClickHouse-Summary requires result_rows/result_bytes)
2013+ // / - correct NetworkSendElapsedMicroseconds/NetworkSendBytes in query_log
2014+ const auto finish_time = std::chrono::system_clock::now ();
2015+ std::exception_ptr exception_ptr;
2016+ if (query_finish_callback)
2017+ {
2018+ // / Dump result_rows/result_bytes, since query_finish_callback() will send final http header.
2019+ flushQueryProgress (pipeline, pulling_pipeline, context->getProgressCallback (), context->getProcessListElement ());
2020+
2021+ try
2022+ {
2023+ query_finish_callback ();
2024+ }
2025+ catch (...)
2026+ {
2027+ exception_ptr = std::current_exception ();
2028+ }
2029+ }
2030+
2031+ streams.onFinish (finish_time);
2032+
2033+ if (exception_ptr)
2034+ std::rethrow_exception (exception_ptr);
19782035}
19792036
19802037void executeTrivialBlockIO (BlockIO & streams, ContextPtr context)
0 commit comments