Skip to content

Commit dfefd0d

Browse files
authored
PML-78: Fail replication of oplog out of range error (#33)
1 parent 634e64c commit dfefd0d

File tree

5 files changed

+108
-29
lines changed

5 files changed

+108
-29
lines changed

.github/pr-badge.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
label: "JIRA"
2+
url: "https://jira.percona.com/browse/$issuePrefix"
3+
message: "$issuePrefix"
4+
color: "green"
5+
when: "$issuePrefix"

main.go

Lines changed: 54 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,19 @@ func main() {
140140
Use: "finalize",
141141
Short: "Finalize Cluster Replication",
142142
RunE: func(cmd *cobra.Command, _ []string) error {
143-
return NewClient(port).Finalize(cmd.Context())
143+
ignoreHistoryLost, _ := cmd.Flags().GetBool("ignore-history-lost")
144+
145+
finalizeOptions := finalizeRequest{
146+
IgnoreHistoryLost: ignoreHistoryLost,
147+
}
148+
149+
return NewClient(port).Finalize(cmd.Context(), finalizeOptions)
144150
},
145151
}
146152

153+
finalizeCmd.Flags().Bool("ignore-history-lost", false, "Ignore history lost error")
154+
finalizeCmd.Flags().MarkHidden("ignore-history-lost")
155+
147156
pauseCmd := &cobra.Command{
148157
Use: "pause",
149158
Short: "Pause Cluster Replication",
@@ -618,7 +627,33 @@ func (s *server) handleFinalize(w http.ResponseWriter, r *http.Request) {
618627
return
619628
}
620629

621-
err := s.mlink.Finalize(ctx)
630+
var params finalizeRequest
631+
632+
if r.ContentLength != 0 {
633+
data, err := io.ReadAll(r.Body)
634+
if err != nil {
635+
http.Error(w,
636+
http.StatusText(http.StatusInternalServerError),
637+
http.StatusInternalServerError)
638+
639+
return
640+
}
641+
642+
err = json.Unmarshal(data, &params)
643+
if err != nil {
644+
http.Error(w,
645+
http.StatusText(http.StatusBadRequest),
646+
http.StatusBadRequest)
647+
648+
return
649+
}
650+
}
651+
652+
options := &mongolink.FinalizeOptions{
653+
IgnoreHistoryLost: params.IgnoreHistoryLost,
654+
}
655+
656+
err := s.mlink.Finalize(ctx, *options)
622657
if err != nil {
623658
writeResponse(w, finalizeResponse{Err: err.Error()})
624659

@@ -719,6 +754,13 @@ type startResponse struct {
719754
Err string `json:"error,omitempty"`
720755
}
721756

757+
// finalizeRequest represents the request body for the /finalize endpoint.
758+
type finalizeRequest struct {
759+
// IgnoreHistoryLost indicates whether the operation can ignore the ChangeStreamHistoryLost
760+
// error.
761+
IgnoreHistoryLost bool `json:"ignoreHistoryLost,omitempty"`
762+
}
763+
722764
// finalizeResponse represents the response body for the /finalize endpoint.
723765
type finalizeResponse struct {
724766
// Ok indicates if the operation was successful.
@@ -800,13 +842,13 @@ func (c MongoLinkClient) Status(ctx context.Context) error {
800842
}
801843

802844
// Start sends a request to start the cluster replication.
803-
func (c MongoLinkClient) Start(ctx context.Context, startOptions startRequest) error {
804-
return doClientRequest[startResponse](ctx, c.port, http.MethodPost, "start", startOptions)
845+
func (c MongoLinkClient) Start(ctx context.Context, req startRequest) error {
846+
return doClientRequest[startResponse](ctx, c.port, http.MethodPost, "start", req)
805847
}
806848

807849
// Finalize sends a request to finalize the cluster replication.
808-
func (c MongoLinkClient) Finalize(ctx context.Context) error {
809-
return doClientRequest[finalizeResponse](ctx, c.port, http.MethodPost, "finalize", nil)
850+
func (c MongoLinkClient) Finalize(ctx context.Context, req finalizeRequest) error {
851+
return doClientRequest[finalizeResponse](ctx, c.port, http.MethodPost, "finalize", req)
810852
}
811853

812854
// Pause sends a request to pause the cluster replication.
@@ -819,24 +861,24 @@ func (c MongoLinkClient) Resume(ctx context.Context) error {
819861
return doClientRequest[resumeResponse](ctx, c.port, http.MethodPost, "resume", nil)
820862
}
821863

822-
func doClientRequest[T any](ctx context.Context, port, method, path string, options any) error {
864+
func doClientRequest[T any](ctx context.Context, port, method, path string, body any) error {
823865
url := fmt.Sprintf("http://localhost:%s/%s", port, path)
824866

825-
data := []byte("")
826-
if options != nil {
867+
bodyData := []byte("")
868+
if body != nil {
827869
var err error
828-
data, err = json.Marshal(options)
870+
bodyData, err = json.Marshal(body)
829871
if err != nil {
830872
return errors.Wrap(err, "encode request")
831873
}
832874
}
833875

834-
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(data))
876+
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(bodyData))
835877
if err != nil {
836878
return errors.Wrap(err, "build request")
837879
}
838880

839-
log.Ctx(ctx).Debugf("POST /%s %s", path, string(data))
881+
log.Ctx(ctx).Debugf("POST /%s %s", path, string(bodyData))
840882

841883
res, err := http.DefaultClient.Do(req)
842884
if err != nil {

mongolink/mongolink.go

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,8 @@ func (ml *MongoLink) Status(ctx context.Context) *Status {
206206
}
207207

208208
switch {
209+
case ml.err != nil:
210+
s.Error = ml.err
209211
case s.Repl.Err != nil:
210212
s.Error = errors.Wrap(s.Repl.Err, "Change Replication")
211213
case s.Clone.Err != nil:
@@ -475,41 +477,51 @@ func (ml *MongoLink) doResume(context.Context) error {
475477
return nil
476478
}
477479

480+
type FinalizeOptions struct {
481+
IgnoreHistoryLost bool
482+
}
483+
478484
// Finalize finalizes the replication process.
479-
func (ml *MongoLink) Finalize(ctx context.Context) error {
485+
func (ml *MongoLink) Finalize(ctx context.Context, options FinalizeOptions) error {
486+
status := ml.Status(ctx)
487+
480488
ml.lock.Lock()
481489
defer ml.lock.Unlock()
482490

483-
cloneStatus := ml.clone.Status()
484-
if !cloneStatus.IsFinished() {
491+
if status.State == StateFailed {
492+
if !(options.IgnoreHistoryLost && errors.Is(status.Repl.Err, ErrOplogHistoryLost)) {
493+
return errors.Wrap(status.Error, "failed state")
494+
}
495+
}
496+
497+
if !status.Clone.IsFinished() {
485498
return errors.New("clone is not completed")
486499
}
487500

488-
replStatus := ml.repl.Status()
489-
if !replStatus.IsStarted() {
501+
if !status.Repl.IsStarted() {
490502
return errors.New("change replication is not started")
491503
}
492504

493-
if replStatus.LastReplicatedOpTime.Before(cloneStatus.FinishTS) {
505+
if !status.InitialSyncCompleted {
494506
return errors.New("initial sync is not completed")
495507
}
496508

497509
lg := log.Ctx(ctx)
498510
lg.Info("Starting finalization")
499511

500-
if replStatus.IsRunning() {
512+
if status.Repl.IsRunning() {
501513
err := ml.repl.Pause(ctx)
502514
if err != nil {
503-
return err
515+
return errors.Wrap(err, "pause change replication")
504516
}
505517

506518
<-ml.repl.Done()
507519

508-
replStatus = ml.repl.Status()
509-
if replStatus.Err != nil {
510-
ml.setFailed(errors.Wrap(replStatus.Err, "repl"))
511-
512-
return err
520+
err = ml.repl.Status().Err
521+
if err != nil {
522+
// no need to set the MongoLink failed status here.
523+
// [MongoLink.setFailed] is called in [MongoLink.run].
524+
return errors.Wrap(err, "post-pause change replication")
513525
}
514526
}
515527

mongolink/repl.go

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,13 @@ import (
1616
"github.com/percona-lab/percona-mongolink/list"
1717
"github.com/percona-lab/percona-mongolink/log"
1818
"github.com/percona-lab/percona-mongolink/sel"
19+
"github.com/percona-lab/percona-mongolink/topo"
1920
)
2021

21-
var ErrInvalidateEvent = errors.New("invalidate")
22+
var (
23+
ErrInvalidateEvent = errors.New("invalidate")
24+
ErrOplogHistoryLost = errors.New("oplog history is lost")
25+
)
2226

2327
// Repl handles replication from a source MongoDB to a target MongoDB.
2428
type Repl struct {
@@ -273,7 +277,15 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) {
273277
go func() {
274278
err := r.watchChangeStream(watchCtx, opts, eventC)
275279
if err != nil && !errors.Is(err, context.Canceled) {
276-
log.New("repl:loop").Error(err, "change stream")
280+
if topo.IsChangeStreamHistoryLost(err) {
281+
err = ErrOplogHistoryLost
282+
}
283+
284+
r.lock.Lock()
285+
r.err = errors.Wrap(err, "watch change stream")
286+
r.lock.Unlock()
287+
288+
log.New("repl:loop").Error(err, "watch change stream")
277289
}
278290
}()
279291

@@ -282,7 +294,11 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) {
282294

283295
err := r.replication(replCtx, eventC)
284296
if err != nil {
285-
log.New("repl:loop").Error(err, "")
297+
r.lock.Lock()
298+
r.err = errors.Wrap(err, "replication")
299+
r.lock.Unlock()
300+
301+
log.New("repl:loop").Error(err, "replication")
286302
}
287303
}
288304

@@ -316,7 +332,7 @@ func (r *Repl) watchChangeStream(
316332
}
317333

318334
if err := cur.Err(); err != nil {
319-
return errors.Wrap(err, "next")
335+
return errors.Wrap(err, "cursor")
320336
}
321337

322338
return nil

topo/error.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,10 @@ func IsIndexOptionsConflict(err error) bool {
1616
return isMongoCommandError(err, "IndexOptionsConflict")
1717
}
1818

19+
func IsChangeStreamHistoryLost(err error) bool {
20+
return isMongoCommandError(err, "ChangeStreamHistoryLost")
21+
}
22+
1923
// isMongoCommandError checks if an error is a MongoDB error with the specified name.
2024
func isMongoCommandError(err error, name string) bool {
2125
for err != nil {

0 commit comments

Comments
 (0)