Skip to content

Commit 634e64c

Browse files
authored
[PML-49] recovery on OS process crash (#32)
1 parent 7a76f21 commit 634e64c

File tree

11 files changed

+715
-34
lines changed

11 files changed

+715
-34
lines changed

config/const.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,23 @@ const (
2020
const (
2121
// MongoLinkDatabase is the name of the MongoDB database used by MongoLink.
2222
MongoLinkDatabase = "percona_mongolink"
23+
// TickCollection is the name of the collection used for ticks during change replication.
24+
TickCollection = "ticks"
25+
// RecoveryCollection is the name of the collection used for recovery data.
26+
RecoveryCollection = "checkpoints"
27+
// HeartbeatCollection is the name of the collection used for heartbeats.
28+
HeartbeatCollection = "heartbeats"
29+
)
2330

24-
// TickCollection is the name of the collection used for ticks.
25-
TickCollection = "tick"
31+
const (
32+
// RecoveryCheckpointingInternal is the interval for recovery checkpointing.
33+
RecoveryCheckpointingInternal = time.Minute
34+
// HeartbeatInternal is the interval for heartbeats.
35+
HeartbeatInternal = 30 * time.Second
36+
// HeartbeatTimeout is the timeout duration for heartbeats.
37+
HeartbeatTimeout = 5 * time.Second
38+
// StaleHeartbeatDuration is the duration after which a heartbeat is considered stale.
39+
StaleHeartbeatDuration = HeartbeatInternal + HeartbeatTimeout
2640
)
2741

2842
// Change stream and replication settings.

heartbeat.go

Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"go.mongodb.org/mongo-driver/v2/bson"
8+
"go.mongodb.org/mongo-driver/v2/mongo"
9+
"go.mongodb.org/mongo-driver/v2/mongo/options"
10+
11+
"github.com/percona-lab/percona-mongolink/config"
12+
"github.com/percona-lab/percona-mongolink/errors"
13+
"github.com/percona-lab/percona-mongolink/log"
14+
)
15+
16+
var errConcurrentProcess = errors.New("detected concurrent process")
17+
18+
const heartbeatID = "mongolink"
19+
20+
type StopHeartbeat func(context.Context) error
21+
22+
func RunHeartbeat(ctx context.Context, m *mongo.Client) (StopHeartbeat, error) {
23+
lastBeat, err := doFirstHeartbeat(ctx, m)
24+
if err != nil {
25+
return nil, errors.Wrap(err, "first")
26+
}
27+
28+
lg := log.New("heartbeat")
29+
lg.With(log.Int64("hb", lastBeat)).Trace("")
30+
31+
ctx, cancel := context.WithCancel(context.Background())
32+
33+
go func() {
34+
lastBeat := lastBeat
35+
36+
for {
37+
time.Sleep(config.HeartbeatInternal)
38+
39+
savedBeat, err := doHeartbeat(ctx, m, lastBeat)
40+
if err != nil {
41+
lg.Error(err, "beat")
42+
43+
continue
44+
}
45+
46+
lastBeat = savedBeat
47+
lg.With(log.Int64("hb", lastBeat)).Trace("")
48+
}
49+
}()
50+
51+
stop := func(ctx context.Context) error {
52+
cancel()
53+
54+
return DeleteHeartbeat(ctx, m)
55+
}
56+
57+
return stop, nil
58+
}
59+
60+
func doFirstHeartbeat(ctx context.Context, m *mongo.Client) (int64, error) {
61+
timeoutCtx, cancel := context.WithTimeout(ctx, config.HeartbeatTimeout)
62+
defer cancel()
63+
64+
currBeat := time.Now().Unix()
65+
66+
_, err := m.Database(config.MongoLinkDatabase).
67+
Collection(config.HeartbeatCollection).
68+
InsertOne(timeoutCtx, bson.D{{"_id", heartbeatID}, {"time", currBeat}})
69+
if err == nil {
70+
return currBeat, nil
71+
}
72+
73+
if !mongo.IsDuplicateKeyError(err) {
74+
return 0, err //nolint:wrapcheck
75+
}
76+
77+
raw, err := m.Database(config.MongoLinkDatabase).
78+
Collection(config.HeartbeatCollection).
79+
FindOne(ctx, bson.D{{"_id", heartbeatID}}).
80+
Raw()
81+
if err != nil {
82+
return 0, errors.Wrap(err, "find")
83+
}
84+
85+
lastBeat, _ := raw.Lookup("time").AsInt64OK()
86+
87+
if time.Since(time.Unix(lastBeat, 0)) < config.StaleHeartbeatDuration {
88+
return 0, errConcurrentProcess
89+
}
90+
91+
currBeat, err = doHeartbeat(ctx, m, lastBeat)
92+
if err != nil {
93+
return 0, errors.Wrap(err, "beat")
94+
}
95+
96+
return currBeat, nil
97+
}
98+
99+
func doHeartbeat(ctx context.Context, m *mongo.Client, lastBeat int64) (int64, error) {
100+
timeoutCtx, cancel := context.WithTimeout(ctx, config.HeartbeatTimeout)
101+
defer cancel()
102+
103+
currBeat := time.Now().Unix()
104+
105+
raw, err := m.Database(config.MongoLinkDatabase).
106+
Collection(config.HeartbeatCollection).
107+
FindOneAndUpdate(timeoutCtx,
108+
bson.D{{"_id", heartbeatID}},
109+
bson.D{{"$set", bson.D{{"time", currBeat}}}},
110+
options.FindOneAndUpdate().SetReturnDocument(options.Before)).
111+
Raw()
112+
if err != nil {
113+
return 0, err //nolint:wrapcheck
114+
}
115+
116+
savedBeat, _ := raw.Lookup("time").AsInt64OK()
117+
if savedBeat != lastBeat {
118+
return 0, errConcurrentProcess
119+
}
120+
121+
return currBeat, nil
122+
}
123+
124+
func DeleteHeartbeat(ctx context.Context, m *mongo.Client) error {
125+
_, err := m.Database(config.MongoLinkDatabase).
126+
Collection(config.HeartbeatCollection).
127+
DeleteOne(ctx, bson.D{{"_id", heartbeatID}})
128+
129+
return err //nolint:wrapcheck
130+
}

log/log.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,12 @@ func Tx(txn *int64, lsid []byte) AttrFn {
8585
}
8686
}
8787

88+
func Int64(key string, val int64) AttrFn {
89+
return func(l zerolog.Context) zerolog.Context {
90+
return l.Int64(key, val)
91+
}
92+
}
93+
8894
// New creates a new Logger with the specified scope.
8995
func New(scope string) Logger {
9096
log := zerolog.Ctx(context.Background()).With().Logger()

0 commit comments

Comments
 (0)