@@ -68,6 +68,8 @@ type node struct {
6868 opsLock sync.Mutex
6969 cdcTracker * CDC
7070 canCampaign bool
71+
72+ startTsKey map [uint64 ]uint64
7173}
7274
7375type op int
@@ -876,6 +878,12 @@ func (n *node) processApplyCh() {
876878 glog .V (3 ).Infof ("handling element in applyCh with #entries %v" , len (entries ))
877879 defer glog .V (3 ).Infof ("done handling element in applyCh" )
878880
881+ _ , spanHandler := otel .Tracer ("applyCh" ).Start (context .Background (), "Alpha.processApplyCh" )
882+ defer spanHandler .End ()
883+
884+ spanHandler .AddEvent ("handling element in applyCh with #entries %v" , trace .WithAttributes (
885+ attribute .Int64 ("numEntries" , int64 (len (entries )))))
886+
879887 var totalSize int64
880888 for _ , entry := range entries {
881889 x .AssertTrue (len (entry .Data ) > 0 )
@@ -909,7 +917,7 @@ func (n *node) processApplyCh() {
909917 p := & P {err : perr , size : psz , seen : time .Now ()}
910918 previous [key ] = p
911919 }
912- span := trace .SpanFromContext (n .ctx )
920+ span := trace .SpanFromContext (n .Ctx ( key ) )
913921 if perr != nil {
914922 glog .Errorf ("Applying proposal. Error: %v. Proposal: %q." , perr , & proposal )
915923 span .AddEvent (fmt .Sprintf ("Applying proposal failed. Error: %v Proposal: %q" , perr , & proposal ))
@@ -919,11 +927,18 @@ func (n *node) processApplyCh() {
919927 attribute .Int64 ("key" , int64 (key )),
920928 attribute .Int64 ("index" , int64 (proposal .Index )),
921929 ))
930+ spanHandler .AddEvent ("Applied proposal with key: %d, index: %d. Err: %v" ,
931+ trace .WithAttributes (
932+ attribute .Int64 ("key" , int64 (key )),
933+ attribute .Int64 ("index" , int64 (proposal .Index )),
934+ ))
922935
923936 var tags []tag.Mutator
924937 switch {
925938 case proposal .Mutations != nil :
926939 tags = append (tags , tag .Upsert (x .KeyMethod , "apply.Mutations" ))
940+ span .SetAttributes (attribute .Int64 ("start_ts" , int64 (proposal .Mutations .StartTs )))
941+ n .startTsKey [proposal .Mutations .StartTs ] = key
927942 case proposal .Delta != nil :
928943 tags = append (tags , tag .Upsert (x .KeyMethod , "apply.Delta" ))
929944 }
@@ -966,6 +981,9 @@ func (n *node) processApplyCh() {
966981
967982// TODO(Anurag - 4 May 2020): Are we using pkey? Remove if unused.
968983func (n * node ) commitOrAbort (pkey uint64 , delta * pb.OracleDelta ) error {
984+ _ , span := otel .Tracer ("alpha.CommitLoop" ).Start (context .Background (), "commitOrAbort" )
985+ defer span .End ()
986+
969987 x .PrintOracleDelta (delta )
970988 // First let's commit all mutations to disk.
971989 writer := posting .NewTxnWriter (pstore )
@@ -995,6 +1013,18 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
9951013 start , commit , err )
9961014 panic (err )
9971015 }
1016+
1017+ span .AddEvent ("Committed txn with start_ts: %d, commit_ts: %d" , trace .WithAttributes (
1018+ attribute .Int64 ("start_ts" , int64 (start )),
1019+ attribute .Int64 ("commit_ts" , int64 (commit )),
1020+ ))
1021+
1022+ spani := trace .SpanFromContext (n .Ctx (n .startTsKey [start ]))
1023+ fmt .Println ("FIRST" , spani )
1024+ spani .AddEvent ("Committed txn with start_ts: %d, commit_ts: %d" , trace .WithAttributes (
1025+ attribute .Int64 ("start_ts" , int64 (start )),
1026+ attribute .Int64 ("commit_ts" , int64 (commit )),
1027+ ))
9981028 }
9991029
10001030 for _ , status := range delta .Txns {
@@ -1004,6 +1034,16 @@ func (n *node) commitOrAbort(pkey uint64, delta *pb.OracleDelta) error {
10041034 return errors .Wrapf (err , "while flushing to disk" )
10051035 }
10061036
1037+ span .AddEvent ("Flushed to disk" )
1038+ for _ , status := range delta .Txns {
1039+ spani := trace .SpanFromContext (n .Ctx (n .startTsKey [status .StartTs ]))
1040+ fmt .Println ("Second" , spani )
1041+ spani .AddEvent ("Flushed txn with start_ts: %d, commit_ts: %d" , trace .WithAttributes (
1042+ attribute .Int64 ("start_ts" , int64 (status .StartTs )),
1043+ attribute .Int64 ("commit_ts" , int64 (status .CommitTs )),
1044+ ))
1045+ }
1046+
10071047 if x .WorkerConfig .HardSync {
10081048 if err := pstore .Sync (); err != nil {
10091049 glog .Errorf ("Error while calling Sync while commitOrAbort: %v" , err )
0 commit comments