Skip to content

Commit 7a76f21

Browse files
authored
[PML-54] allow to pause/resume change replication (#31)
1 parent 28ac8f7 commit 7a76f21

30 files changed

+1815
-1314
lines changed

.github/workflows/go.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ jobs:
1010
runs-on: ubuntu-latest
1111
steps:
1212
- uses: actions/checkout@v4
13-
- uses: actions/setup-go@v4
13+
- uses: actions/setup-go@v5
1414
- run: go test -v ./...
1515

1616
gofmt:
@@ -21,8 +21,10 @@ jobs:
2121
- uses: actions/setup-go@v5
2222
- run: go install golang.org/x/tools/cmd/goimports@latest
2323
- run: go install mvdan.cc/gofumpt@latest
24+
- run: go install github.com/segmentio/golines@latest
2425
- run: goimports -w -local "github.com/percona" .
2526
- run: gofumpt -w -extra .
27+
- run: golines -w --shorten-comments --chain-split-dots -m 100 .
2628
- uses: reviewdog/action-suggester@v1
2729
with:
2830
tool_name: gofmt
@@ -33,8 +35,6 @@ jobs:
3335
steps:
3436
- uses: actions/checkout@v4
3537
- uses: actions/setup-go@v5
36-
with:
37-
go-version: "1.23"
3838
- name: golangci-lint
3939
uses: reviewdog/action-golangci-lint@v2
4040
with:

.golangci.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ linters:
1212
- dupl
1313
- funlen
1414
- tenv
15+
- wsl
1516

1617
linters-settings:
1718
govet:

.vscode/settings.json

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
"go.useLanguageServer": true,
2929
"gopls": {
3030
"analyses": {
31-
"composites": false,
32-
"fieldalignment": false
31+
"composites": false
3332
},
3433
"formatting.gofumpt": true,
3534
"formatting.local": "github.com/percona",

README.md

Lines changed: 74 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ Percona MongoLink is a tool for replicating data from a source MongoDB cluster t
88
- **Real-Time Replication**: Tail the oplog to keep your target cluster up to date.
99
- **Namespace Filtering**: Specify which databases and collections to include or exclude.
1010
- **Automatic Index Management**: Ensure necessary indexes are created on the target.
11-
- **HTTP API**: Start, finalize, and check replication status via REST endpoints.
11+
- **HTTP API**: Start, finalize, pause, resume, and check replication status via REST endpoints.
1212

1313
## Setup
1414

@@ -87,6 +87,38 @@ bin/mongolink finalize
8787
curl -X POST http://localhost:2242/finalize
8888
```
8989

90+
### Pausing the Replication
91+
92+
To pause the replication process, you can either use the command-line interface or send a POST request to the `/pause` endpoint:
93+
94+
#### Using Command-Line Interface
95+
96+
```sh
97+
bin/mongolink pause
98+
```
99+
100+
#### Using HTTP API
101+
102+
```sh
103+
curl -X POST http://localhost:2242/pause
104+
```
105+
106+
### Resuming the Replication
107+
108+
To resume the replication process, you can either use the command-line interface or send a POST request to the `/resume` endpoint:
109+
110+
#### Using Command-Line Interface
111+
112+
```sh
113+
bin/mongolink resume
114+
```
115+
116+
#### Using HTTP API
117+
118+
```sh
119+
curl -X POST http://localhost:2242/resume
120+
```
121+
90122
### Checking the Status
91123

92124
To check the current status of the replication process, you can either use the command-line interface or send a GET request to the `/status` endpoint:
@@ -131,12 +163,10 @@ When using the `--log-json` option, the logs will be output in JSON format with
131163
- `s`: Scope of the log entry.
132164
- `ns`: Namespace (database.collection format).
133165
- `elapsed_secs`: The duration in seconds for the specific operation to complete.
134-
- `total_elapsed_secs`: The cumulative time in seconds for the entire process, including all operations.
135166

136167
Example:
137168

138169
```json
139-
140170
{ "level": "info",
141171
"s": "clone",
142172
"ns": "db_1.coll_1",
@@ -149,7 +179,6 @@ Example:
149179
"elapsed_secs": 0,
150180
"time": "2025-02-23 11:26:03.857",
151181
"message": "Change replication stopped at 1740335163.1740335163 source cluster time" }
152-
153182
```
154183
155184
## HTTP API
@@ -202,29 +231,61 @@ Example:
202231
}
203232
```
204233
234+
### POST /pause
235+
236+
Pauses the replication process.
237+
238+
#### Response
239+
240+
- `ok`: Boolean indicating if the operation was successful.
241+
- `error` (optional): Error message if the operation failed.
242+
243+
Example:
244+
245+
```json
246+
{
247+
"ok": true
248+
}
249+
```
250+
251+
### POST /resume
252+
253+
Resumes the replication process.
254+
255+
#### Response
256+
257+
- `ok`: Boolean indicating if the operation was successful.
258+
- `error` (optional): Error message if the operation failed.
259+
260+
Example:
261+
262+
```json
263+
{
264+
"ok": true
265+
}
266+
```
267+
205268
### GET /status
206269
207-
The /status endpoint provides the current state of the MongoLink replication
208-
process, including its progress, lag, and event processing details.
270+
The /status endpoint provides the current state of the MongoLink replication process, including its progress, lag, and event processing details.
209271
210272
#### Response
211273
212274
- `ok`: indicates if the operation was successful.
213275
- `state`: the current state of the replication.
214-
- `info` (optional): provides additional information about the current state.
276+
- `info`: provides additional information about the current state.
215277
- `error` (optional): the error message if the operation failed.
216278
217-
- `lagTime` (optional): the current lag time in logical seconds between source and target clusters.
218-
- `eventsProcessed` (optional): the number of events processed.
219-
- `lastReplicatedOpTime` (optional): the last replicated operation time.
279+
- `lagTime`: the current lag time in logical seconds between source and target clusters.
280+
- `eventsProcessed`: the number of events processed.
281+
- `lastReplicatedOpTime`: the last replicated operation time.
220282
221-
- `initialSync.pauseOnInitialSync` (optional): indicates if the replication is paused on initial sync.
222283
- `initialSync.completed`: indicates if the initial sync is completed.
223284
- `initialSync.lagTime`: the lag time in logical seconds until the initial sync completed.
224285
225286
- `initialSync.cloneCompleted`: indicates if the cloning process is completed.
226-
- `initialSync.estimatedCloneSize` (optional): the estimated total size of the clone.
227-
- `initialSync.clonedSize` (optional): the size of the data that has been cloned.
287+
- `initialSync.estimatedCloneSize`: the estimated total size of the clone.
288+
- `initialSync.clonedSize`: the size of the data that has been cloned.
228289
229290
Example:
230291

config/const.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,8 @@ const (
3737
ReplInitialSyncCheckInterval = time.Second
3838
)
3939

40-
// CloneMaxWriteSizePerCollection is the maximum write size per collection during the cloning process.
40+
// CloneMaxWriteSizePerCollection is the maximum write size per collection during the cloning
41+
// process.
4142
const CloneMaxWriteSizePerCollection = 100 * MB
4243

4344
// MaxBSONSize is hardcoded maximum BSON document size. 16 mebibytes.

errors/errors.go

Lines changed: 53 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,56 +5,100 @@ import (
55
"fmt"
66
)
77

8+
// ErrUnsupported indicates an unsupported operation.
9+
var ErrUnsupported = errors.ErrUnsupported
10+
11+
// wrappedError wraps an error with an additional message.
812
type wrappedError struct {
913
cause error
1014
msg string
1115
}
1216

13-
func (w *wrappedError) Error() string { return w.msg + ": " + w.cause.Error() }
14-
func (w *wrappedError) Unwrap() error { return w.cause }
17+
func (w *wrappedError) Error() string {
18+
return w.msg + ": " + w.cause.Error()
19+
}
20+
21+
func (w *wrappedError) Unwrap() error {
22+
return w.cause
23+
}
1524

16-
// New calls [errors.New].
25+
// New creates a new error with the given text.
26+
//
27+
// Calls [errors.New].
28+
//
29+
//go:inline
1730
func New(text string) error {
1831
return errors.New(text) //nolint:err113
1932
}
2033

21-
// Errorf calls [fmt.Errorf].
34+
// Errorf formats an error according to a format specifier.
35+
//
36+
// Calls [fmt.Errorf].
37+
//
38+
//go:inline
2239
func Errorf(format string, vals ...any) error {
2340
return fmt.Errorf(format, vals...) //nolint:err113
2441
}
2542

43+
// Wrap adds a message to an existing error.
2644
func Wrap(cause error, text string) error {
2745
if cause == nil {
2846
return nil
2947
}
3048

49+
if text == "" {
50+
return cause
51+
}
52+
3153
return &wrappedError{cause: cause, msg: text}
3254
}
3355

56+
// Wrapf adds a formatted message to an existing error.
3457
func Wrapf(cause error, format string, vals ...any) error {
3558
if cause == nil {
3659
return nil
3760
}
3861

39-
return &wrappedError{cause: cause, msg: fmt.Sprintf(format, vals...)}
62+
msg := fmt.Sprintf(format, vals...)
63+
if msg == "" {
64+
return cause
65+
}
66+
67+
return &wrappedError{cause: cause, msg: msg}
4068
}
4169

42-
// Unwrap calls [errors.Unwrap].
70+
// Unwrap returns the cause of the error.
71+
//
72+
// Calls [errors.Unwrap].
73+
//
74+
//go:inline
4375
func Unwrap(err error) error {
4476
return errors.Unwrap(err)
4577
}
4678

47-
// Join calls [errors.Join].
79+
// Join combines multiple errors into one.
80+
//
81+
// Calls [errors.Join].
82+
//
83+
//go:inline
4884
func Join(errs ...error) error {
4985
return errors.Join(errs...)
5086
}
5187

52-
// Is calls [errors.Is].
88+
// Is checks if an error matches a target error.
89+
//
90+
// Calls [errors.Is].
91+
//
92+
//go:inline
5393
func Is(err, target error) bool {
5494
return errors.Is(err, target)
5595
}
5696

57-
// As calls [errors.As].
97+
// As checks if an error can be cast to a target type.
98+
//
99+
// Calls [errors.As].
100+
//
101+
//go:inline
58102
func As(err error, target any) bool {
59103
return errors.As(err, target)
60104
}

go.mod

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,12 @@ go 1.24
55
require (
66
github.com/rs/zerolog v1.33.0
77
github.com/spf13/cobra v1.9.1
8-
go.mongodb.org/mongo-driver/v2 v2.0.1
9-
golang.org/x/sync v0.11.0
8+
go.mongodb.org/mongo-driver/v2 v2.1.0
9+
golang.org/x/sync v0.12.0
1010
)
1111

1212
require (
13-
github.com/golang/snappy v0.0.4 // indirect
13+
github.com/golang/snappy v1.0.0 // indirect
1414
github.com/inconshreveable/mousetrap v1.1.0 // indirect
1515
github.com/klauspost/compress v1.18.0 // indirect
1616
github.com/mattn/go-colorable v0.1.14 // indirect
@@ -20,7 +20,7 @@ require (
2020
github.com/xdg-go/scram v1.1.2 // indirect
2121
github.com/xdg-go/stringprep v1.0.4 // indirect
2222
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
23-
golang.org/x/crypto v0.33.0 // indirect
24-
golang.org/x/sys v0.30.0 // indirect
25-
golang.org/x/text v0.22.0 // indirect
23+
golang.org/x/crypto v0.36.0 // indirect
24+
golang.org/x/sys v0.31.0 // indirect
25+
golang.org/x/text v0.23.0 // indirect
2626
)

0 commit comments

Comments
 (0)