@@ -438,6 +438,202 @@ func internalTopic(l Logger, d trace.Detailer) (t trace.Topic) {
438438 )
439439 }
440440
441+ t .OnReaderPopBatchTx = func (
442+ startInfo trace.TopicReaderPopBatchTxStartInfo ,
443+ ) func (trace.TopicReaderPopBatchTxDoneInfo ) {
444+ if d .Details ()& trace .TopicReaderCustomerEvents == 0 {
445+ return nil
446+ }
447+
448+ start := time .Now ()
449+ ctx := with (* startInfo .Context , TRACE , "ydb" , "topic" , "reader" , "customer" , "popbatchtx" )
450+ l .Log (WithLevel (ctx , TRACE ), "starting pop batch tx" ,
451+ Int64 ("reader_id" , startInfo .ReaderID ),
452+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
453+ String ("transaction_id" , startInfo .TransactionID ),
454+ )
455+
456+ return func (doneInfo trace.TopicReaderPopBatchTxDoneInfo ) {
457+ if doneInfo .Error == nil {
458+ l .Log (
459+ WithLevel (ctx , DEBUG ), "pop batch done" ,
460+ Int64 ("reader_id" , startInfo .ReaderID ),
461+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
462+ String ("transaction_id" , startInfo .TransactionID ),
463+ Int ("messaged_count" , doneInfo .MessagesCount ),
464+ Int64 ("start_offset" , doneInfo .StartOffset ),
465+ Int64 ("end_offset" , doneInfo .EndOffset ),
466+ latencyField (start ),
467+ versionField (),
468+ )
469+ } else {
470+ l .Log (
471+ WithLevel (ctx , WARN ), "pop batch failed" ,
472+ Int64 ("reader_id" , startInfo .ReaderID ),
473+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
474+ String ("transaction_id" , startInfo .TransactionID ),
475+ Error (doneInfo .Error ),
476+ latencyField (start ),
477+ versionField (),
478+ )
479+ }
480+ }
481+ }
482+
483+ t .OnReaderStreamPopBatchTx = func (
484+ startInfo trace.TopicReaderStreamPopBatchTxStartInfo ,
485+ ) func (
486+ trace.TopicReaderStreamPopBatchTxDoneInfo ,
487+ ) {
488+ if d .Details ()& trace .TopicReaderTransactionEvents == 0 {
489+ return nil
490+ }
491+
492+ start := time .Now ()
493+ ctx := with (* startInfo .Context , TRACE , "ydb" , "topic" , "reader" , "transaction" , "popbatchtx_on_stream" )
494+ l .Log (WithLevel (ctx , TRACE ), "starting pop batch tx" ,
495+ Int64 ("reader_id" , startInfo .ReaderID ),
496+ String ("reader_connection_id" , startInfo .ReaderConnectionID ),
497+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
498+ String ("transaction_id" , startInfo .TransactionID ),
499+ versionField (),
500+ )
501+
502+ return func (doneInfo trace.TopicReaderStreamPopBatchTxDoneInfo ) {
503+ if doneInfo .Error == nil {
504+ l .Log (
505+ WithLevel (ctx , DEBUG ), "pop batch on stream done" ,
506+ Int64 ("reader_id" , startInfo .ReaderID ),
507+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
508+ String ("transaction_id" , startInfo .TransactionID ),
509+ latencyField (start ),
510+ versionField (),
511+ )
512+ } else {
513+ l .Log (
514+ WithLevel (ctx , WARN ), "pop batch on stream failed" ,
515+ Int64 ("reader_id" , startInfo .ReaderID ),
516+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
517+ String ("transaction_id" , startInfo .TransactionID ),
518+ Error (doneInfo .Error ),
519+ latencyField (start ),
520+ versionField (),
521+ )
522+ }
523+ }
524+ }
525+
526+ t .OnReaderUpdateOffsetsInTransaction = func (
527+ startInfo trace.TopicReaderOnUpdateOffsetsInTransactionStartInfo ,
528+ ) func (
529+ trace.TopicReaderOnUpdateOffsetsInTransactionDoneInfo ,
530+ ) {
531+ if d .Details ()& trace .TopicReaderTransactionEvents == 0 {
532+ return nil
533+ }
534+
535+ start := time .Now ()
536+ ctx := with (* startInfo .Context , TRACE , "ydb" , "topic" , "reader" , "transaction" , "update_offsets" )
537+ l .Log (WithLevel (ctx , TRACE ), "starting update offsets in transaction" ,
538+ Int64 ("reader_id" , startInfo .ReaderID ),
539+ String ("reader_connection_id" , startInfo .ReaderConnectionID ),
540+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
541+ String ("transaction_id" , startInfo .TransactionID ),
542+ versionField (),
543+ )
544+
545+ return func (doneInfo trace.TopicReaderOnUpdateOffsetsInTransactionDoneInfo ) {
546+ if doneInfo .Error == nil {
547+ l .Log (
548+ WithLevel (ctx , DEBUG ), "pop batch on stream done" ,
549+ Int64 ("reader_id" , startInfo .ReaderID ),
550+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
551+ String ("transaction_id" , startInfo .TransactionID ),
552+ latencyField (start ),
553+ versionField (),
554+ )
555+ } else {
556+ l .Log (
557+ WithLevel (ctx , WARN ), "pop batch on stream failed" ,
558+ Int64 ("reader_id" , startInfo .ReaderID ),
559+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
560+ String ("transaction_id" , startInfo .TransactionID ),
561+ Error (doneInfo .Error ),
562+ latencyField (start ),
563+ versionField (),
564+ )
565+ }
566+ }
567+ }
568+
569+ t .OnReaderTransactionRollback = func (
570+ startInfo trace.TopicReaderTransactionRollbackStartInfo ,
571+ ) func (
572+ trace.TopicReaderTransactionRollbackDoneInfo ,
573+ ) {
574+ if d .Details ()& trace .TopicReaderTransactionEvents == 0 {
575+ return nil
576+ }
577+
578+ start := time .Now ()
579+ ctx := with (* startInfo .Context , TRACE , "ydb" , "topic" , "reader" , "transaction" , "update_offsets" )
580+ l .Log (WithLevel (ctx , TRACE ), "starting update offsets in transaction" ,
581+ Int64 ("reader_id" , startInfo .ReaderID ),
582+ String ("reader_connection_id" , startInfo .ReaderConnectionID ),
583+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
584+ String ("transaction_id" , startInfo .TransactionID ),
585+ versionField (),
586+ )
587+
588+ return func (doneInfo trace.TopicReaderTransactionRollbackDoneInfo ) {
589+ if doneInfo .RollbackError == nil {
590+ l .Log (
591+ WithLevel (ctx , DEBUG ), "pop batch on stream done" ,
592+ Int64 ("reader_id" , startInfo .ReaderID ),
593+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
594+ String ("transaction_id" , startInfo .TransactionID ),
595+ latencyField (start ),
596+ versionField (),
597+ )
598+ } else {
599+ l .Log (
600+ WithLevel (ctx , WARN ), "pop batch on stream failed" ,
601+ Int64 ("reader_id" , startInfo .ReaderID ),
602+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
603+ String ("transaction_id" , startInfo .TransactionID ),
604+ Error (doneInfo .RollbackError ),
605+ latencyField (start ),
606+ versionField (),
607+ )
608+ }
609+ }
610+ }
611+
612+ t .OnReaderTransactionCompleted = func (
613+ startInfo trace.TopicReaderTransactionCompletedStartInfo ,
614+ ) func (
615+ trace.TopicReaderTransactionCompletedDoneInfo ,
616+ ) {
617+ if d .Details ()& trace .TopicReaderTransactionEvents == 0 {
618+ return nil
619+ }
620+
621+ // expected as very short in memory operation without errors, no need log start separately
622+ start := time .Now ()
623+
624+ return func (doneInfo trace.TopicReaderTransactionCompletedDoneInfo ) {
625+ ctx := with (* startInfo .Context , TRACE , "ydb" , "topic" , "reader" , "transaction" , "update_offsets" )
626+ l .Log (WithLevel (ctx , TRACE ), "starting update offsets in transaction" ,
627+ Int64 ("reader_id" , startInfo .ReaderID ),
628+ String ("reader_connection_id" , startInfo .ReaderConnectionID ),
629+ String ("transaction_session_id" , startInfo .TransactionSessionID ),
630+ String ("transaction_id" , startInfo .TransactionID ),
631+ latencyField (start ),
632+ versionField (),
633+ )
634+ }
635+ }
636+
441637 ///
442638 /// Topic writer
443639 ///
0 commit comments