@@ -74,6 +74,13 @@ var DrainTime = prometheus.NewHistogramVec(prometheus.HistogramOpts{
7474 Buckets : []float64 {.005 , .01 , .025 , .05 , .1 , .25 , .5 , 1 , 2.5 , 5 , 10 , 100 , 500 , 1000 , 5000 , 10000 },
7575}, []string {"type" })
7676
77+ // Size of diff sync in bytes
78+ var DiffSyncSize = prometheus .NewHistogramVec (prometheus.HistogramOpts {
79+ Namespace : "chotki" ,
80+ Name : "diff_sync_size" ,
81+ Buckets : []float64 {1 , 10 , 50 , 100 , 500 , 1000 , 10000 , 100000 , 1000000 , 10000000 , 100000000 , 1000000000 },
82+ }, []string {"id" })
83+
7784type Options struct {
7885 pebble.Options
7986
@@ -730,7 +737,19 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error)
730737 case 'H' : // handshake
731738 d := cho .db .NewBatch ()
732739 // we also create a new sync point, pebble batch that we will write diffs to
740+ activeSyncs := make ([]rdx.ID , 0 )
741+ cho .syncs .Range (func (key rdx.ID , value * syncPoint ) bool {
742+ if key .Src () == cho .src {
743+ activeSyncs = append (activeSyncs , key )
744+ }
745+ return true
746+ })
747+ for _ , sync := range activeSyncs {
748+ cho .syncs .Delete (sync )
749+ cho .log .InfoCtx (ctx , "deleted active sync" , "id" , sync .String ())
750+ }
733751 cho .syncs .Store (id , & syncPoint {batch : d , start : time .Now ()})
752+ cho .log .InfoCtx (ctx , "created new sync point" , "id" , id .String ())
734753 err = cho .ApplyH (id , ref , body , d )
735754
736755 case 'D' : // diff packet
@@ -749,6 +768,7 @@ func (cho *Chotki) drain(ctx context.Context, recs protocol.Records) (err error)
749768 if ! ok {
750769 return ErrSyncUnknown
751770 }
771+ DiffSyncSize .WithLabelValues (id .String ()).Observe (float64 (s .batch .Len ()))
752772 // update blocks version vectors
753773 err = cho .ApplyV (id , ref , body , s .batch )
754774 if err == nil {
0 commit comments