diff --git a/.github/pr-badge.yml b/.github/pr-badge.yml new file mode 100644 index 00000000..b3ecc7f2 --- /dev/null +++ b/.github/pr-badge.yml @@ -0,0 +1,5 @@ +label: "JIRA" +url: "https://jira.percona.com/browse/$issuePrefix" +message: "$issuePrefix" +color: "green" +when: "$issuePrefix" diff --git a/main.go b/main.go index 588ec14a..e1607f4a 100644 --- a/main.go +++ b/main.go @@ -140,10 +140,19 @@ func main() { Use: "finalize", Short: "Finalize Cluster Replication", RunE: func(cmd *cobra.Command, _ []string) error { - return NewClient(port).Finalize(cmd.Context()) + ignoreHistoryLost, _ := cmd.Flags().GetBool("ignore-history-lost") + + finalizeOptions := finalizeRequest{ + IgnoreHistoryLost: ignoreHistoryLost, + } + + return NewClient(port).Finalize(cmd.Context(), finalizeOptions) }, } + finalizeCmd.Flags().Bool("ignore-history-lost", false, "Ignore history lost error") + finalizeCmd.Flags().MarkHidden("ignore-history-lost") + pauseCmd := &cobra.Command{ Use: "pause", Short: "Pause Cluster Replication", @@ -618,7 +627,33 @@ func (s *server) handleFinalize(w http.ResponseWriter, r *http.Request) { return } - err := s.mlink.Finalize(ctx) + var params finalizeRequest + + if r.ContentLength != 0 { + data, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, + http.StatusText(http.StatusInternalServerError), + http.StatusInternalServerError) + + return + } + + err = json.Unmarshal(data, ¶ms) + if err != nil { + http.Error(w, + http.StatusText(http.StatusBadRequest), + http.StatusBadRequest) + + return + } + } + + options := &mongolink.FinalizeOptions{ + IgnoreHistoryLost: params.IgnoreHistoryLost, + } + + err := s.mlink.Finalize(ctx, *options) if err != nil { writeResponse(w, finalizeResponse{Err: err.Error()}) @@ -719,6 +754,13 @@ type startResponse struct { Err string `json:"error,omitempty"` } +// finalizeRequest represents the request body for the /finalize endpoint. +type finalizeRequest struct { + // IgnoreHistoryLost indicates whether the operation can ignore the ChangeStreamHistoryLost + // error. + IgnoreHistoryLost bool `json:"ignoreHistoryLost,omitempty"` +} + // finalizeResponse represents the response body for the /finalize endpoint. type finalizeResponse struct { // Ok indicates if the operation was successful. @@ -800,13 +842,13 @@ func (c MongoLinkClient) Status(ctx context.Context) error { } // Start sends a request to start the cluster replication. -func (c MongoLinkClient) Start(ctx context.Context, startOptions startRequest) error { - return doClientRequest[startResponse](ctx, c.port, http.MethodPost, "start", startOptions) +func (c MongoLinkClient) Start(ctx context.Context, req startRequest) error { + return doClientRequest[startResponse](ctx, c.port, http.MethodPost, "start", req) } // Finalize sends a request to finalize the cluster replication. -func (c MongoLinkClient) Finalize(ctx context.Context) error { - return doClientRequest[finalizeResponse](ctx, c.port, http.MethodPost, "finalize", nil) +func (c MongoLinkClient) Finalize(ctx context.Context, req finalizeRequest) error { + return doClientRequest[finalizeResponse](ctx, c.port, http.MethodPost, "finalize", req) } // Pause sends a request to pause the cluster replication. @@ -819,24 +861,24 @@ func (c MongoLinkClient) Resume(ctx context.Context) error { return doClientRequest[resumeResponse](ctx, c.port, http.MethodPost, "resume", nil) } -func doClientRequest[T any](ctx context.Context, port, method, path string, options any) error { +func doClientRequest[T any](ctx context.Context, port, method, path string, body any) error { url := fmt.Sprintf("http://localhost:%s/%s", port, path) - data := []byte("") - if options != nil { + bodyData := []byte("") + if body != nil { var err error - data, err = json.Marshal(options) + bodyData, err = json.Marshal(body) if err != nil { return errors.Wrap(err, "encode request") } } - req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(data)) + req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(bodyData)) if err != nil { return errors.Wrap(err, "build request") } - log.Ctx(ctx).Debugf("POST /%s %s", path, string(data)) + log.Ctx(ctx).Debugf("POST /%s %s", path, string(bodyData)) res, err := http.DefaultClient.Do(req) if err != nil { diff --git a/mongolink/mongolink.go b/mongolink/mongolink.go index 09d6c146..13f637cc 100644 --- a/mongolink/mongolink.go +++ b/mongolink/mongolink.go @@ -206,6 +206,8 @@ func (ml *MongoLink) Status(ctx context.Context) *Status { } switch { + case ml.err != nil: + s.Error = ml.err case s.Repl.Err != nil: s.Error = errors.Wrap(s.Repl.Err, "Change Replication") case s.Clone.Err != nil: @@ -475,41 +477,51 @@ func (ml *MongoLink) doResume(context.Context) error { return nil } +type FinalizeOptions struct { + IgnoreHistoryLost bool +} + // Finalize finalizes the replication process. -func (ml *MongoLink) Finalize(ctx context.Context) error { +func (ml *MongoLink) Finalize(ctx context.Context, options FinalizeOptions) error { + status := ml.Status(ctx) + ml.lock.Lock() defer ml.lock.Unlock() - cloneStatus := ml.clone.Status() - if !cloneStatus.IsFinished() { + if status.State == StateFailed { + if !(options.IgnoreHistoryLost && errors.Is(status.Repl.Err, ErrOplogHistoryLost)) { + return errors.Wrap(status.Error, "failed state") + } + } + + if !status.Clone.IsFinished() { return errors.New("clone is not completed") } - replStatus := ml.repl.Status() - if !replStatus.IsStarted() { + if !status.Repl.IsStarted() { return errors.New("change replication is not started") } - if replStatus.LastReplicatedOpTime.Before(cloneStatus.FinishTS) { + if !status.InitialSyncCompleted { return errors.New("initial sync is not completed") } lg := log.Ctx(ctx) lg.Info("Starting finalization") - if replStatus.IsRunning() { + if status.Repl.IsRunning() { err := ml.repl.Pause(ctx) if err != nil { - return err + return errors.Wrap(err, "pause change replication") } <-ml.repl.Done() - replStatus = ml.repl.Status() - if replStatus.Err != nil { - ml.setFailed(errors.Wrap(replStatus.Err, "repl")) - - return err + err = ml.repl.Status().Err + if err != nil { + // no need to set the MongoLink failed status here. + // [MongoLink.setFailed] is called in [MongoLink.run]. + return errors.Wrap(err, "post-pause change replication") } } diff --git a/mongolink/repl.go b/mongolink/repl.go index 5ba78980..17b3ba64 100644 --- a/mongolink/repl.go +++ b/mongolink/repl.go @@ -16,9 +16,13 @@ import ( "github.com/percona-lab/percona-mongolink/list" "github.com/percona-lab/percona-mongolink/log" "github.com/percona-lab/percona-mongolink/sel" + "github.com/percona-lab/percona-mongolink/topo" ) -var ErrInvalidateEvent = errors.New("invalidate") +var ( + ErrInvalidateEvent = errors.New("invalidate") + ErrOplogHistoryLost = errors.New("oplog history is lost") +) // Repl handles replication from a source MongoDB to a target MongoDB. type Repl struct { @@ -273,7 +277,15 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) { go func() { err := r.watchChangeStream(watchCtx, opts, eventC) if err != nil && !errors.Is(err, context.Canceled) { - log.New("repl:loop").Error(err, "change stream") + if topo.IsChangeStreamHistoryLost(err) { + err = ErrOplogHistoryLost + } + + r.lock.Lock() + r.err = errors.Wrap(err, "watch change stream") + r.lock.Unlock() + + log.New("repl:loop").Error(err, "watch change stream") } }() @@ -282,7 +294,11 @@ func (r *Repl) run(opts *options.ChangeStreamOptionsBuilder) { err := r.replication(replCtx, eventC) if err != nil { - log.New("repl:loop").Error(err, "") + r.lock.Lock() + r.err = errors.Wrap(err, "replication") + r.lock.Unlock() + + log.New("repl:loop").Error(err, "replication") } } @@ -316,7 +332,7 @@ func (r *Repl) watchChangeStream( } if err := cur.Err(); err != nil { - return errors.Wrap(err, "next") + return errors.Wrap(err, "cursor") } return nil diff --git a/topo/error.go b/topo/error.go index 21a81313..ca49127f 100644 --- a/topo/error.go +++ b/topo/error.go @@ -16,6 +16,10 @@ func IsIndexOptionsConflict(err error) bool { return isMongoCommandError(err, "IndexOptionsConflict") } +func IsChangeStreamHistoryLost(err error) bool { + return isMongoCommandError(err, "ChangeStreamHistoryLost") +} + // isMongoCommandError checks if an error is a MongoDB error with the specified name. func isMongoCommandError(err error, name string) bool { for err != nil {