Skip to content

Commit 51dcea9

Browse files
authored
PML-144: Support resuming from a failure (#84)
1 parent 601ab52 commit 51dcea9

File tree

4 files changed

+79
-15
lines changed

4 files changed

+79
-15
lines changed

main.go

Lines changed: 44 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,13 @@ var resumeCmd = &cobra.Command{
223223
return err
224224
}
225225

226-
return NewClient(port).Resume(cmd.Context())
226+
fromFailure, _ := cmd.Flags().GetBool("from-failure")
227+
228+
resumeOptions := resumeRequest{
229+
FromFailure: fromFailure,
230+
}
231+
232+
return NewClient(port).Resume(cmd.Context(), resumeOptions)
227233
},
228234
}
229235

@@ -369,7 +375,9 @@ func main() {
369375
startCmd.Flags().MarkHidden("pause-on-initial-sync") //nolint:errcheck
370376

371377
pauseCmd.Flags().Int("port", DefaultServerPort, "Port number")
378+
372379
resumeCmd.Flags().Int("port", DefaultServerPort, "Port number")
380+
resumeCmd.Flags().Bool("from-failure", false, "Reuse from failure")
373381

374382
finalizeCmd.Flags().Int("port", DefaultServerPort, "Port number")
375383
finalizeCmd.Flags().Bool("ignore-history-lost", false, "Ignore history lost error")
@@ -876,7 +884,33 @@ func (s *server) handleResume(w http.ResponseWriter, r *http.Request) {
876884
return
877885
}
878886

879-
err := s.mlink.Resume(ctx)
887+
var params resumeRequest
888+
889+
if r.ContentLength != 0 {
890+
data, err := io.ReadAll(r.Body)
891+
if err != nil {
892+
http.Error(w,
893+
http.StatusText(http.StatusInternalServerError),
894+
http.StatusInternalServerError)
895+
896+
return
897+
}
898+
899+
err = json.Unmarshal(data, &params)
900+
if err != nil {
901+
http.Error(w,
902+
http.StatusText(http.StatusBadRequest),
903+
http.StatusBadRequest)
904+
905+
return
906+
}
907+
}
908+
909+
options := &mongolink.ResumeOptions{
910+
ResumeFromFailure: params.FromFailure,
911+
}
912+
913+
err := s.mlink.Resume(ctx, *options)
880914
if err != nil {
881915
writeResponse(w, resumeResponse{Err: err.Error()})
882916

@@ -984,6 +1018,12 @@ type pauseResponse struct {
9841018
Err string `json:"error,omitempty"`
9851019
}
9861020

1021+
// resumeRequest represents the request body for the /resume endpoint.
1022+
type resumeRequest struct {
1023+
// FromFailure indicates whether to resume from a failed state.
1024+
FromFailure bool `json:"fromFailure,omitempty"`
1025+
}
1026+
9871027
// resumeResponse represents the response body for the /resume
9881028
// endpoint.
9891029
type resumeResponse struct {
@@ -1022,8 +1062,8 @@ func (c MongoLinkClient) Pause(ctx context.Context) error {
10221062
}
10231063

10241064
// Resume sends a request to resume the cluster replication.
1025-
func (c MongoLinkClient) Resume(ctx context.Context) error {
1026-
return doClientRequest[resumeResponse](ctx, c.port, http.MethodPost, "resume", nil)
1065+
func (c MongoLinkClient) Resume(ctx context.Context, req resumeRequest) error {
1066+
return doClientRequest[resumeResponse](ctx, c.port, http.MethodPost, "resume", req)
10271067
}
10281068

10291069
func doClientRequest[T any](ctx context.Context, port int, method, path string, body any) error {

mongolink/clone.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,13 @@ func (c *Clone) Status() CloneStatus {
158158
}
159159
}
160160

161+
func (c *Clone) resetError() {
162+
c.lock.Lock()
163+
defer c.lock.Unlock()
164+
165+
c.err = nil
166+
}
167+
161168
func (c *Clone) Done() <-chan struct{} {
162169
c.lock.Lock()
163170
defer c.lock.Unlock()

mongolink/mongolink.go

Lines changed: 21 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ func (ml *MongoLink) Recover(ctx context.Context, data []byte) error {
204204
}
205205

206206
if cp.State == StateRunning {
207-
return ml.doResume(ctx)
207+
return ml.doResume(ctx, false)
208208
}
209209

210210
return nil
@@ -275,6 +275,12 @@ func (ml *MongoLink) Status(ctx context.Context) *Status {
275275
return s
276276
}
277277

278+
func (ml *MongoLink) resetError() {
279+
ml.err = nil
280+
ml.clone.resetError()
281+
ml.repl.resetError()
282+
}
283+
278284
// StartOptions represents the options for starting the MongoLink.
279285
type StartOptions struct {
280286
// PauseOnInitialSync indicates whether to finalize after the initial sync.
@@ -318,7 +324,6 @@ func (ml *MongoLink) Start(_ context.Context, options *StartOptions) error {
318324
ml.state = StateRunning
319325

320326
go ml.run()
321-
go ml.onStateChanged(StateRunning)
322327

323328
return nil
324329
}
@@ -541,16 +546,20 @@ func (ml *MongoLink) doPause(ctx context.Context) error {
541546
return nil
542547
}
543548

549+
type ResumeOptions struct {
550+
ResumeFromFailure bool
551+
}
552+
544553
// Resume resumes the replication process.
545-
func (ml *MongoLink) Resume(ctx context.Context) error {
554+
func (ml *MongoLink) Resume(ctx context.Context, options ResumeOptions) error {
546555
ml.lock.Lock()
547556
defer ml.lock.Unlock()
548557

549-
if ml.state != StatePaused {
550-
return errors.New("cannot resume: not paused")
558+
if ml.state != StatePaused && !(ml.state == StateFailed && options.ResumeFromFailure) {
559+
return errors.New("cannot resume: not paused or not resuming from failure")
551560
}
552561

553-
err := ml.doResume(ctx)
562+
err := ml.doResume(ctx, options.ResumeFromFailure)
554563
if err != nil {
555564
log.New("mongolink").Error(err, "Resume Cluster Replication")
556565

@@ -562,18 +571,19 @@ func (ml *MongoLink) Resume(ctx context.Context) error {
562571
return nil
563572
}
564573

565-
func (ml *MongoLink) doResume(context.Context) error {
574+
func (ml *MongoLink) doResume(_ context.Context, fromFailure bool) error {
566575
replStatus := ml.repl.Status()
567576

568-
if !replStatus.IsStarted() {
569-
return errors.New("cannot resume: replication is not started")
577+
if !replStatus.IsStarted() && !fromFailure {
578+
return errors.New("cannot resume: replication is not started or not resuming from failure")
570579
}
571580

572-
if !replStatus.IsPaused() {
573-
return errors.New("cannot resume: replication is not paused")
581+
if !replStatus.IsPaused() && fromFailure {
582+
return errors.New("cannot resume: replication is not paused or not resuming from failure")
574583
}
575584

576585
ml.state = StateRunning
586+
ml.resetError()
577587

578588
go ml.run()
579589
go ml.onStateChanged(StateRunning)

mongolink/repl.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,13 @@ func (r *Repl) Status() ReplStatus {
177177
}
178178
}
179179

180+
func (r *Repl) resetError() {
181+
r.lock.Lock()
182+
defer r.lock.Unlock()
183+
184+
r.err = nil
185+
}
186+
180187
func (r *Repl) Done() <-chan struct{} {
181188
r.lock.Lock()
182189
defer r.lock.Unlock()

0 commit comments

Comments
 (0)