Skip to content

Commit 45f6fa8

Browse files
authored
PCSM-167: Status response improvement (#143)
1 parent bad3a18 commit 45f6fa8

File tree

8 files changed

+113
-70
lines changed

8 files changed

+113
-70
lines changed

README.md

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -297,16 +297,19 @@ The /status endpoint provides the current state of the PCSM replication process,
297297
- `info`: provides additional information about the current state.
298298
- `error` (optional): the error message if the operation failed.
299299
300-
- `lagTime`: the current lag time in logical seconds between source and target clusters.
301-
- `eventsProcessed`: the number of events processed.
302-
- `lastReplicatedOpTime`: the last replicated operation time.
300+
- `lagTimeSeconds`: the current lag time in logical seconds between source and target clusters.
301+
- `eventsApplied`: the number of events applied to the target cluster.
302+
- `lastReplicatedOpTime`: the last replicated operation time
303+
- `lastReplicatedOpTime.ts`: op time timestamp
304+
- `lastReplicatedOpTime.isoDate`: op time ts in human-readable form
305+
303306
304307
- `initialSync.completed`: indicates if the initial sync is completed.
305-
- `initialSync.lagTime`: the lag time in logical seconds until the initial sync completed.
308+
- `initialSync.lagTimeSeconds`: the lag time in logical seconds until the initial sync completed.
306309
307310
- `initialSync.cloneCompleted`: indicates if the cloning process is completed.
308-
- `initialSync.estimatedCloneSize`: the estimated total size of the clone.
309-
- `initialSync.clonedSize`: the size of the data that has been cloned.
311+
- `initialSync.estimatedCloneSizeBytes`: the estimated total size of the clone.
312+
- `initialSync.clonedSizeBytes`: the size of the data that has been cloned.
310313
311314
Example:
312315
@@ -316,17 +319,20 @@ Example:
316319
"state": "running",
317320
"info": "Initial Sync",
318321
319-
"lagTime": 22,
320-
"eventsProcessed": 5000,
321-
"lastReplicatedOpTime": "1740335200.5",
322+
"lagTimeSeconds": 22,
323+
"eventsApplied": 5000,
324+
"lastReplicatedOpTime": {
325+
"ts": "1762241863.1",
326+
"isoDate": "2025-11-04T07:37:43Z"
327+
},
322328
323329
"initialSync": {
324330
"completed": false,
325-
"lagTime": 5,
331+
"lagTimeSeconds": 5,
326332
327333
"cloneCompleted": false,
328-
"estimatedCloneSize": 5000000000,
329-
"clonedSize": 2500000000
334+
"estimatedCloneSizeBytes": 5000000000,
335+
"clonedSizeBytes": 2500000000
330336
}
331337
}
332338
```

main.go

Lines changed: 35 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -689,22 +689,31 @@ func (s *server) handleStatus(w http.ResponseWriter, r *http.Request) {
689689
return
690690
}
691691

692-
res.EventsProcessed = status.Repl.EventsProcessed
693-
res.LagTime = status.TotalLagTime
692+
res.EventsRead = status.Repl.EventsRead
693+
res.EventsApplied = status.Repl.EventsApplied
694+
res.LagTimeSeconds = status.TotalLagTimeSeconds
694695

695696
if !status.Repl.LastReplicatedOpTime.IsZero() {
696-
res.LastReplicatedOpTime = fmt.Sprintf("%d.%d",
697+
ts := fmt.Sprintf("%d.%d",
697698
status.Repl.LastReplicatedOpTime.T,
698699
status.Repl.LastReplicatedOpTime.I)
700+
701+
isoDate := time.Unix(int64(status.Repl.LastReplicatedOpTime.T),
702+
int64(status.Repl.LastReplicatedOpTime.I)).UTC()
703+
704+
res.LastReplicatedOpTime = &lastReplicatedOpTime{
705+
TS: ts,
706+
ISODate: isoDate.Format(time.RFC3339),
707+
}
699708
}
700709

701710
res.InitialSync = &statusInitialSyncResponse{
702-
Completed: status.InitialSyncCompleted,
703-
LagTime: status.InitialSyncLagTime,
711+
Completed: status.InitialSyncCompleted,
712+
LagTimeSeconds: status.InitialSyncLagTimeSeconds,
704713

705-
CloneCompleted: status.Clone.IsFinished(),
706-
EstimatedCloneSize: status.Clone.EstimatedTotalSize,
707-
ClonedSize: status.Clone.CopiedSize,
714+
CloneCompleted: status.Clone.IsFinished(),
715+
EstimatedCloneSizeBytes: status.Clone.EstimatedTotalSizeBytes,
716+
ClonedSizeBytes: status.Clone.CopiedSizeBytes,
708717
}
709718

710719
switch {
@@ -992,26 +1001,33 @@ type statusResponse struct {
9921001
// Info provides additional information about the current state.
9931002
Info string `json:"info,omitempty"`
9941003

995-
// LagTime is the current lag time in logical seconds.
996-
LagTime int64 `json:"lagTime"`
997-
// EventsProcessed is the number of events processed.
998-
EventsProcessed int64 `json:"eventsProcessed"`
1004+
// LagTimeSeconds is the current lag time in logical seconds.
1005+
LagTimeSeconds int64 `json:"lagTimeSeconds"`
1006+
// EventsRead is the number of events read from the source. Not counting tick events.
1007+
EventsRead int64 `json:"eventsRead"`
1008+
// EventsApplied is the number of events applied.
1009+
EventsApplied int64 `json:"eventsApplied"`
9991010
// LastReplicatedOpTime is the last replicated operation time.
1000-
LastReplicatedOpTime string `json:"lastReplicatedOpTime,omitempty"`
1011+
LastReplicatedOpTime *lastReplicatedOpTime `json:"lastReplicatedOpTime,omitempty"`
10011012

10021013
// InitialSync contains the initial sync status details.
10031014
InitialSync *statusInitialSyncResponse `json:"initialSync,omitempty"`
10041015
}
10051016

1017+
type lastReplicatedOpTime struct {
1018+
TS string `json:"ts"`
1019+
ISODate string `json:"isoDate"`
1020+
}
1021+
10061022
// statusInitialSyncResponse represents the initial sync status in the /status response.
10071023
type statusInitialSyncResponse struct {
1008-
// LagTime is the lag time in logical seconds until the initial sync completed.
1009-
LagTime int64 `json:"lagTime,omitempty"`
1024+
// LagTimeSeconds is the lag time in logical seconds until the initial sync completed.
1025+
LagTimeSeconds int64 `json:"lagTimeSeconds,omitempty"`
10101026

1011-
// EstimatedCloneSize is the estimated total size of the clone.
1012-
EstimatedCloneSize uint64 `json:"estimatedCloneSize,omitempty"`
1013-
// ClonedSize is the size of the data that has been cloned.
1014-
ClonedSize uint64 `json:"clonedSize"`
1027+
// EstimatedCloneSizeBytes is the estimated total size of the clone in bytes.
1028+
EstimatedCloneSizeBytes uint64 `json:"estimatedCloneSizeBytes,omitempty"`
1029+
// ClonedSizeBytes is the size of the data that has been cloned.
1030+
ClonedSizeBytes uint64 `json:"clonedSizeBytes"`
10151031

10161032
// Completed indicates if the initial sync is completed.
10171033
Completed bool `json:"completed"`

metrics/metrics.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,16 @@ const metricNamespace = "percona_clustersync_mongodb"
1212
// Counters.
1313
var (
1414
//nolint:gochecknoglobals
15-
eventsProcessedTotal = prometheus.NewCounter(prometheus.CounterOpts{
16-
Name: "events_processed_total",
17-
Help: "Total number of events processed.",
15+
eventsReadTotal = prometheus.NewCounter(prometheus.CounterOpts{
16+
Name: "events_read_total",
17+
Help: "Total number of events read from the source.",
18+
Namespace: metricNamespace,
19+
})
20+
21+
//nolint:gochecknoglobals
22+
eventsAppliedTotal = prometheus.NewCounter(prometheus.CounterOpts{
23+
Name: "events_applied_total",
24+
Help: "Total number of events applied.",
1825
Namespace: metricNamespace,
1926
})
2027

@@ -101,7 +108,7 @@ func Init(reg prometheus.Registerer) {
101108
copyReadBatchDurationSeconds,
102109
copyInsertBatchDurationSeconds,
103110

104-
eventsProcessedTotal,
111+
eventsAppliedTotal,
105112
lagTimeSeconds,
106113
intialSyncLagTimeSeconds,
107114
)
@@ -144,9 +151,14 @@ func SetCopyInsertBatchDurationSeconds(dur time.Duration) {
144151
copyInsertBatchDurationSeconds.Set(float64(dur.Seconds()))
145152
}
146153

147-
// AddEventsProcessed increments the total number of events processed counter.
148-
func AddEventsProcessed(v int) {
149-
eventsProcessedTotal.Add(float64(v))
154+
// IncEventsRead increments the total number of events read counter.
155+
func IncEventsRead() {
156+
eventsReadTotal.Inc()
157+
}
158+
159+
// AddEventsApplied increments the total number of events applied counter.
160+
func AddEventsApplied(v int) {
161+
eventsAppliedTotal.Add(float64(v))
150162
}
151163

152164
// SetLagTimeSeconds sets the lag time in seconds gauge.

pcsm/catalog.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,6 @@ func (c *Catalog) UnlockWrite() {
134134
func (c *Catalog) Checkpoint() *catalogCheckpoint { //nolint:revive
135135
// do not call [sync.RWMutex.RLock] to avoid deadlock through recursive read-locking
136136
// that may happen during clone or change replication
137-
138137
if len(c.Databases) == 0 {
139138
return nil
140139
}

pcsm/clone.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,8 @@ type Clone struct {
4747

4848
// CloneStatus represents the status of the cloning process.
4949
type CloneStatus struct {
50-
EstimatedTotalSize uint64 // Estimated total bytes to be copied
51-
CopiedSize uint64 // Bytes copied so far
50+
EstimatedTotalSizeBytes uint64 // Estimated total bytes to be copied
51+
CopiedSizeBytes uint64 // Bytes copied so far
5252

5353
StartTS bson.Timestamp
5454
FinishTS bson.Timestamp
@@ -148,13 +148,13 @@ func (c *Clone) Status() CloneStatus {
148148
defer c.lock.Unlock()
149149

150150
return CloneStatus{
151-
EstimatedTotalSize: c.totalSize,
152-
CopiedSize: c.copiedSize.Load(),
153-
StartTS: c.startTS,
154-
FinishTS: c.finishTS,
155-
StartTime: c.startTime,
156-
FinishTime: c.finishTime,
157-
Err: c.err,
151+
EstimatedTotalSizeBytes: c.totalSize,
152+
CopiedSizeBytes: c.copiedSize.Load(),
153+
StartTS: c.startTS,
154+
FinishTS: c.finishTS,
155+
StartTime: c.startTime,
156+
FinishTime: c.finishTime,
157+
Err: c.err,
158158
}
159159
}
160160

pcsm/pcsm.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,10 +57,10 @@ type Status struct {
5757
// Error is the error message if the operation failed.
5858
Error error
5959

60-
// TotalLagTime is the current lag time in logical seconds between source and target clusters.
61-
TotalLagTime int64
62-
// InitialSyncLagTime is the lag time during the initial sync.
63-
InitialSyncLagTime int64
60+
// TotalLagTimeSeconds is the current lag time in logical seconds between source and target clusters.
61+
TotalLagTimeSeconds int64
62+
// InitialSyncLagTimeSeconds is the lag time during the initial sync.
63+
InitialSyncLagTimeSeconds int64
6464
// InitialSyncCompleted indicates if the initial sync is completed.
6565
InitialSyncCompleted bool
6666

@@ -261,15 +261,15 @@ func (ml *PCSM) Status(ctx context.Context) *Status {
261261
switch {
262262
case !s.Repl.LastReplicatedOpTime.IsZero():
263263
totalLag := int64(sourceTime.T) - int64(s.Repl.LastReplicatedOpTime.T)
264-
s.TotalLagTime = totalLag
264+
s.TotalLagTimeSeconds = totalLag
265265
case !s.Clone.StartTS.IsZero():
266266
totalLag := int64(sourceTime.T) - int64(s.Clone.StartTS.T)
267-
s.TotalLagTime = totalLag
267+
s.TotalLagTimeSeconds = totalLag
268268
}
269269
}
270270

271271
if !s.InitialSyncCompleted {
272-
s.InitialSyncLagTime = s.TotalLagTime
272+
s.InitialSyncLagTimeSeconds = s.TotalLagTimeSeconds
273273
}
274274

275275
return s

pcsm/repl.go

Lines changed: 23 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/hex"
66
"strings"
77
"sync"
8+
"sync/atomic"
89
"time"
910

1011
"go.mongodb.org/mongo-driver/v2/bson"
@@ -40,7 +41,8 @@ type Repl struct {
4041
lock sync.Mutex
4142
err error
4243

43-
eventsProcessed int64
44+
eventsRead atomic.Int64
45+
eventsApplied int64
4446

4547
startTime time.Time
4648
pauseTime time.Time
@@ -61,7 +63,8 @@ type ReplStatus struct {
6163
PauseTime time.Time
6264

6365
LastReplicatedOpTime bson.Timestamp // Last applied operation time
64-
EventsProcessed int64 // Number of events processed
66+
EventsRead int64 // Number of events read from the source
67+
EventsApplied int64 // Number of events applied
6568

6669
Err error
6770
}
@@ -95,7 +98,8 @@ func NewRepl(source, target *mongo.Client, catalog *Catalog, nsFilter sel.NSFilt
9598
type replCheckpoint struct {
9699
StartTime time.Time `bson:"startTime,omitempty"`
97100
PauseTime time.Time `bson:"pauseTime,omitempty"`
98-
EventsProcessed int64 `bson:"events,omitempty"`
101+
EventsRead int64 `bson:"eventsRead,omitempty"`
102+
EventsApplied int64 `bson:"events,omitempty"`
99103
LastReplicatedOpTime bson.Timestamp `bson:"lastOpTS,omitempty"`
100104
Error string `bson:"error,omitempty"`
101105
UseClientBulkWrite bool `bson:"clientBulk,omitempty"`
@@ -112,7 +116,8 @@ func (r *Repl) Checkpoint() *replCheckpoint { //nolint:revive
112116
cp := &replCheckpoint{
113117
StartTime: r.startTime,
114118
PauseTime: r.pauseTime,
115-
EventsProcessed: r.eventsProcessed,
119+
EventsRead: r.eventsRead.Load(),
120+
EventsApplied: r.eventsApplied,
116121
LastReplicatedOpTime: r.lastReplicatedOpTime,
117122
}
118123

@@ -145,7 +150,7 @@ func (r *Repl) Recover(cp *replCheckpoint) error {
145150

146151
r.startTime = cp.StartTime
147152
r.pauseTime = pauseTime
148-
r.eventsProcessed = cp.EventsProcessed
153+
r.eventsApplied = cp.EventsApplied
149154
r.lastReplicatedOpTime = cp.LastReplicatedOpTime
150155

151156
if cp.UseClientBulkWrite {
@@ -168,7 +173,8 @@ func (r *Repl) Status() ReplStatus {
168173

169174
return ReplStatus{
170175
LastReplicatedOpTime: r.lastReplicatedOpTime,
171-
EventsProcessed: r.eventsProcessed,
176+
EventsRead: r.eventsRead.Load(),
177+
EventsApplied: r.eventsApplied,
172178

173179
StartTime: r.startTime,
174180
PauseTime: r.pauseTime,
@@ -481,14 +487,17 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) {
481487
continue
482488
}
483489

490+
r.eventsRead.Add(1)
491+
metrics.IncEventsRead()
492+
484493
if change.Namespace.Database == config.PCSMDatabase {
485494
if r.bulkWrite.Empty() {
486495
r.lock.Lock()
487496
r.lastReplicatedOpTime = change.ClusterTime
488-
r.eventsProcessed++
497+
r.eventsApplied++
489498
r.lock.Unlock()
490499

491-
metrics.AddEventsProcessed(1)
500+
metrics.AddEventsApplied(1)
492501
}
493502

494503
continue
@@ -498,10 +507,10 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) {
498507
if r.bulkWrite.Empty() {
499508
r.lock.Lock()
500509
r.lastReplicatedOpTime = change.ClusterTime
501-
r.eventsProcessed++
510+
r.eventsApplied++
502511
r.lock.Unlock()
503512

504-
metrics.AddEventsProcessed(1)
513+
metrics.AddEventsApplied(1)
505514
}
506515

507516
continue
@@ -552,10 +561,10 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) {
552561

553562
r.lock.Lock()
554563
r.lastReplicatedOpTime = change.ClusterTime
555-
r.eventsProcessed++
564+
r.eventsApplied++
556565
r.lock.Unlock()
557566

558-
metrics.AddEventsProcessed(1)
567+
metrics.AddEventsApplied(1)
559568

560569
switch change.OperationType { //nolint:exhaustive
561570
case Create, Rename, Drop, DropDatabase:
@@ -603,10 +612,10 @@ func (r *Repl) doBulkOps(ctx context.Context) bool {
603612

604613
r.lock.Lock()
605614
r.lastReplicatedOpTime = r.bulkTS
606-
r.eventsProcessed += int64(size)
615+
r.eventsApplied += int64(size)
607616
r.lock.Unlock()
608617

609-
metrics.AddEventsProcessed(size)
618+
metrics.AddEventsApplied(size)
610619

611620
log.New("bulk:write").
612621
With(log.Int64("size", int64(size)), log.Elapsed(time.Since(r.lastBulkDoneAt))).

tests/pcsm.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -233,7 +233,8 @@ def wait_for_clone_completed(self):
233233
def last_applied_op(self):
234234
"""Get the last applied operation time."""
235235
status = self.pcsm.status()
236-
if last_replicated_op_time := status.get("lastReplicatedOpTime"):
236+
last_replicated_op_time = status.get("lastReplicatedOpTime", {}).get("ts")
237+
if last_replicated_op_time:
237238
t_s, i_s = last_replicated_op_time.split(".")
238239
return bson.Timestamp(int(t_s), int(i_s))
239240

0 commit comments

Comments
 (0)