Skip to content

Commit 0178cad

Browse files
committed
Allow finalization of the oplog out of range error.
1 parent 77f1f39 commit 0178cad

File tree

3 files changed

+69
-15
lines changed

3 files changed

+69
-15
lines changed

main.go

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -140,10 +140,21 @@ func main() {
140140
Use: "finalize",
141141
Short: "Finalize Cluster Replication",
142142
RunE: func(cmd *cobra.Command, _ []string) error {
143-
return NewClient(port).Finalize(cmd.Context())
143+
allowOnOplogOORerror, err := cmd.Flags().GetBool("allow-on-oplog-oor-error")
144+
if err != nil {
145+
return err //nolint:wrapcheck
146+
}
147+
148+
finalizeOptions := finalizeRequest{
149+
AllowOnOplogOORerror: allowOnOplogOORerror,
150+
}
151+
152+
return NewClient(port).Finalize(cmd.Context(), finalizeOptions)
144153
},
145154
}
146155

156+
finalizeCmd.Flags().Bool("allow-on-oplog-oor-error", false, "Allow on oplog out of range error")
157+
147158
pauseCmd := &cobra.Command{
148159
Use: "pause",
149160
Short: "Pause Cluster Replication",
@@ -618,7 +629,33 @@ func (s *server) handleFinalize(w http.ResponseWriter, r *http.Request) {
618629
return
619630
}
620631

621-
err := s.mlink.Finalize(ctx)
632+
var params finalizeRequest
633+
634+
if r.ContentLength != 0 {
635+
data, err := io.ReadAll(r.Body)
636+
if err != nil {
637+
http.Error(w,
638+
http.StatusText(http.StatusInternalServerError),
639+
http.StatusInternalServerError)
640+
641+
return
642+
}
643+
644+
err = json.Unmarshal(data, &params)
645+
if err != nil {
646+
http.Error(w,
647+
http.StatusText(http.StatusBadRequest),
648+
http.StatusBadRequest)
649+
650+
return
651+
}
652+
}
653+
654+
options := &mongolink.FinalizeOptions{
655+
AllowOnOplogOORError: params.AllowOnOplogOORerror,
656+
}
657+
658+
err := s.mlink.Finalize(ctx, *options)
622659
if err != nil {
623660
writeResponse(w, finalizeResponse{Err: err.Error()})
624661

@@ -719,6 +756,12 @@ type startResponse struct {
719756
Err string `json:"error,omitempty"`
720757
}
721758

759+
// finalizeRequest represents the request body for the /finalize endpoint.
760+
type finalizeRequest struct {
761+
// AllowOnOplogOORerror indicates if the operation should be allowed on oplog out of range error.
762+
AllowOnOplogOORerror bool `json:"allowOnOplogOorError,omitempty"`
763+
}
764+
722765
// finalizeResponse represents the response body for the /finalize endpoint.
723766
type finalizeResponse struct {
724767
// Ok indicates if the operation was successful.
@@ -800,13 +843,13 @@ func (c MongoLinkClient) Status(ctx context.Context) error {
800843
}
801844

802845
// Start sends a request to start the cluster replication.
803-
func (c MongoLinkClient) Start(ctx context.Context, startOptions startRequest) error {
804-
return doClientRequest[startResponse](ctx, c.port, http.MethodPost, "start", startOptions)
846+
func (c MongoLinkClient) Start(ctx context.Context, req startRequest) error {
847+
return doClientRequest[startResponse](ctx, c.port, http.MethodPost, "start", req)
805848
}
806849

807850
// Finalize sends a request to finalize the cluster replication.
808-
func (c MongoLinkClient) Finalize(ctx context.Context) error {
809-
return doClientRequest[finalizeResponse](ctx, c.port, http.MethodPost, "finalize", nil)
851+
func (c MongoLinkClient) Finalize(ctx context.Context, req finalizeRequest) error {
852+
return doClientRequest[finalizeResponse](ctx, c.port, http.MethodPost, "finalize", req)
810853
}
811854

812855
// Pause sends a request to pause the cluster replication.
@@ -819,24 +862,24 @@ func (c MongoLinkClient) Resume(ctx context.Context) error {
819862
return doClientRequest[resumeResponse](ctx, c.port, http.MethodPost, "resume", nil)
820863
}
821864

822-
func doClientRequest[T any](ctx context.Context, port, method, path string, options any) error {
865+
func doClientRequest[T any](ctx context.Context, port, method, path string, body any) error {
823866
url := fmt.Sprintf("http://localhost:%s/%s", port, path)
824867

825-
data := []byte("")
826-
if options != nil {
868+
bodyData := []byte("")
869+
if body != nil {
827870
var err error
828-
data, err = json.Marshal(options)
871+
bodyData, err = json.Marshal(body)
829872
if err != nil {
830873
return errors.Wrap(err, "encode request")
831874
}
832875
}
833876

834-
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(data))
877+
req, err := http.NewRequestWithContext(ctx, method, url, bytes.NewReader(bodyData))
835878
if err != nil {
836879
return errors.Wrap(err, "build request")
837880
}
838881

839-
log.Ctx(ctx).Debugf("POST /%s %s", path, string(data))
882+
log.Ctx(ctx).Debugf("POST /%s %s", path, string(bodyData))
840883

841884
res, err := http.DefaultClient.Do(req)
842885
if err != nil {

mongolink/mongolink.go

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -475,14 +475,18 @@ func (ml *MongoLink) doResume(context.Context) error {
475475
return nil
476476
}
477477

478+
type FinalizeOptions struct {
479+
AllowOnOplogOORError bool
480+
}
481+
478482
// Finalize finalizes the replication process.
479-
func (ml *MongoLink) Finalize(ctx context.Context) error {
483+
func (ml *MongoLink) Finalize(ctx context.Context, options FinalizeOptions) error {
480484
status := ml.Status(ctx)
481485

482486
ml.lock.Lock()
483487
defer ml.lock.Unlock()
484488

485-
if status.State == StateFailed {
489+
if status.State == StateFailed && !options.AllowOnOplogOORError {
486490
return errors.New("failed state")
487491
}
488492

@@ -501,7 +505,9 @@ func (ml *MongoLink) Finalize(ctx context.Context) error {
501505
lg := log.Ctx(ctx)
502506
lg.Info("Starting finalization")
503507

504-
if status.Repl.IsRunning() {
508+
oplogOORError := options.AllowOnOplogOORError && status.Repl.IsOplogOORError()
509+
510+
if status.Repl.IsRunning() && !oplogOORError {
505511
err := ml.repl.Pause(ctx)
506512
if err != nil {
507513
return err

mongolink/repl.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ func (rs *ReplStatus) IsPaused() bool {
7070
return !rs.PauseTime.IsZero()
7171
}
7272

73+
//go:inline
74+
func (rs *ReplStatus) IsOplogOORError() bool {
75+
return rs.Err != nil && strings.Contains(rs.Err.Error(), "resume point may no longer be in the oplog")
76+
}
77+
7378
func NewRepl(source, target *mongo.Client, catalog *Catalog, nsFilter sel.NSFilter) *Repl {
7479
return &Repl{
7580
source: source,

0 commit comments

Comments
 (0)