Skip to content

Commit 02c8d2d

Browse files
authored
Created snapshot test, optimised snapshots and fixed bug (#462)
1 parent f829e8b commit 02c8d2d

File tree

7 files changed

+246
-16
lines changed

7 files changed

+246
-16
lines changed

cluster/dragon/dragon.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"os"
1111
"path/filepath"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415

1516
"github.com/lni/dragonboat/v3/logger"
@@ -84,6 +85,8 @@ type Dragon struct {
8485
requestClientPool []remoting.Client
8586
requestClientPoolLock sync.Mutex
8687
healthChecker *remoting.HealthChecker
88+
saveSnapshotCount int64
89+
restoreSnapshotCount int64
8790
}
8891

8992
type snapshot struct {
@@ -1167,3 +1170,11 @@ func (d *Dragon) ensureNodeHostAvailable() error {
11671170
}
11681171
return nil
11691172
}
1173+
1174+
func (d *Dragon) SaveSnapshotCount() int64 {
1175+
return atomic.LoadInt64(&d.saveSnapshotCount)
1176+
}
1177+
1178+
func (d *Dragon) RestoreSnapshotCount() int64 {
1179+
return atomic.LoadInt64(&d.restoreSnapshotCount)
1180+
}

cluster/dragon/locks_odsm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ func (s *locksODStateMachine) SaveSnapshot(i interface{}, writer io.Writer, i2 <
161161
}
162162
prefix := table.EncodeTableKeyPrefix(common.LocksTableID, locksClusterID, 16)
163163
log.Printf("Saving locks snapshot on node id %d for shard id %d prefix is %v", s.dragon.cnf.NodeID, locksClusterID, prefix)
164-
return saveSnapshotDataToWriter(snapshot, prefix, writer, locksClusterID)
164+
return saveSnapshotDataToWriter(s.dragon.pebble, snapshot, prefix, writer, locksClusterID)
165165
}
166166

167167
func (s *locksODStateMachine) RecoverFromSnapshot(reader io.Reader, i <-chan struct{}) error {

cluster/dragon/sequence_odsm.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ func (s *sequenceODStateMachine) SaveSnapshot(i interface{}, writer io.Writer, i
9292
}
9393
prefix := table.EncodeTableKeyPrefix(common.SequenceGeneratorTableID, tableSequenceClusterID, 16)
9494
log.Printf("Saving sequence snapshot on node id %d for shard id %d prefix is %v", s.dragon.cnf.NodeID, tableSequenceClusterID, prefix)
95-
err := saveSnapshotDataToWriter(snapshot, prefix, writer, tableSequenceClusterID)
95+
err := saveSnapshotDataToWriter(s.dragon.pebble, snapshot, prefix, writer, tableSequenceClusterID)
9696
log.Info("sequence shard save snapshot done")
9797
return err
9898
}

cluster/dragon/shard_odsm.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"io"
1313
"math"
1414
"sync"
15+
"sync/atomic"
1516
)
1617

1718
const (
@@ -328,23 +329,32 @@ func (s *ShardOnDiskStateMachine) PrepareSnapshot() (interface{}, error) {
328329
}
329330

330331
func (s *ShardOnDiskStateMachine) SaveSnapshot(i interface{}, writer io.Writer, _ <-chan struct{}) error {
331-
log.Debugf("data shard %d saving snapshot", s.shardID)
332+
log.Debugf("data shard %d saving snapshot on node id %d", s.shardID, s.dragon.cnf.NodeID)
332333
snapshot, ok := i.(*pebble.Snapshot)
333334
if !ok {
334335
panic("not a snapshot")
335336
}
336337
prefix := make([]byte, 0, 8)
337338
prefix = common.AppendUint64ToBufferBE(prefix, s.shardID)
338339
log.Debugf("Saving data snapshot on node id %d for shard id %d prefix is %v", s.dragon.cnf.NodeID, s.shardID, prefix)
339-
err := saveSnapshotDataToWriter(snapshot, prefix, writer, s.shardID)
340-
log.Debugf("data shard %d save snapshot done", s.shardID)
341-
return err
340+
err := saveSnapshotDataToWriter(s.dragon.pebble, snapshot, prefix, writer, s.shardID)
341+
atomic.AddInt64(&s.dragon.saveSnapshotCount, 1)
342+
if err != nil {
343+
// According to the docs for IOnDiskStateMachine we should return ErrSnapshotStreaming
344+
if errors.Is(err, statemachine.ErrSnapshotStreaming) {
345+
return err
346+
}
347+
log.Errorf("failure in streaming snapshot %+v", err)
348+
return statemachine.ErrSnapshotStreaming
349+
}
350+
log.Debugf("data shard %d save snapshot done on node id %d", s.shardID, s.dragon.cnf.NodeID)
351+
return nil
342352
}
343353

344354
func (s *ShardOnDiskStateMachine) RecoverFromSnapshot(reader io.Reader, i <-chan struct{}) error {
345355
s.lock.Lock()
346356
defer s.lock.Unlock()
347-
log.Debugf("data shard %d recover from snapshot", s.shardID)
357+
log.Debugf("data shard %d recover from snapshot on node %d", s.shardID, s.dragon.cnf.NodeID)
348358
s.dedupSequences = make(map[string]uint64)
349359
startPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), s.shardID)
350360
endPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), s.shardID+1)
@@ -357,7 +367,8 @@ func (s *ShardOnDiskStateMachine) RecoverFromSnapshot(reader io.Reader, i <-chan
357367
return err
358368
}
359369
s.maybeTriggerRemoteWriteOccurred()
360-
log.Debugf("data shard %d recover from snapshot done", s.shardID)
370+
log.Debugf("data shard %d recover from snapshot done on node %d", s.shardID, s.dragon.cnf.NodeID)
371+
atomic.AddInt64(&s.dragon.restoreSnapshotCount, 1)
361372
return nil
362373
}
363374

cluster/dragon/sm_utils.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dragon
22

33
import (
4+
"bufio"
45
"io"
56
"io/ioutil"
67
"math"
@@ -18,11 +19,21 @@ import (
1819
var syncWriteOptions = &pebble.WriteOptions{Sync: true}
1920
var nosyncWriteOptions = &pebble.WriteOptions{Sync: false}
2021

21-
func saveSnapshotDataToWriter(snapshot *pebble.Snapshot, prefix []byte, writer io.Writer, shardID uint64) error {
22+
const (
23+
saveSnapshotBufferSize = 32 * 1024
24+
restoreSnapshotBufferSize = 32 * 1024
25+
)
26+
27+
func saveSnapshotDataToWriter(peb *pebble.DB, snapshot *pebble.Snapshot, prefix []byte, writer io.Writer, shardID uint64) error {
2228
var w writeCloseSyncer
2329
w, ok := writer.(writeCloseSyncer)
2430
if !ok {
25-
w = &nopWriteCloseSyncer{writer}
31+
// We use our own buffered writer so we can increase buffer size and also sync
32+
bufWriter := bufio.NewWriterSize(writer, saveSnapshotBufferSize)
33+
w = &snapshotWriteCloseSyncer{
34+
peb: peb,
35+
bufWriter: bufWriter,
36+
}
2637
}
2738
tbl := sstable.NewWriter(w, sstable.WriterOptions{})
2839
upper := common.IncrementBytesBigEndian(prefix)
@@ -53,6 +64,9 @@ func saveSnapshotDataToWriter(snapshot *pebble.Snapshot, prefix []byte, writer i
5364
return errors.WithStack(err)
5465
}
5566
}
67+
if err := iter.Close(); err != nil {
68+
return errors.WithStack(err)
69+
}
5670
if err := tbl.Close(); err != nil {
5771
return errors.WithStack(err)
5872
}
@@ -79,7 +93,8 @@ func restoreSnapshotDataFromReader(peb *pebble.DB, startPrefix []byte, endPrefix
7993
}()
8094

8195
log.Info("Copying reader to temp file")
82-
if _, err := io.Copy(f, reader); err != nil {
96+
bufReader := bufio.NewReaderSize(reader, restoreSnapshotBufferSize)
97+
if _, err := io.Copy(f, bufReader); err != nil {
8398
return errors.WithStack(err)
8499
}
85100
if err := f.Sync(); err != nil {
@@ -149,12 +164,34 @@ type writeCloseSyncer interface {
149164
Sync() error
150165
}
151166

152-
type nopWriteCloseSyncer struct{ io.Writer }
167+
type snapshotWriteCloseSyncer struct {
168+
peb *pebble.DB
169+
bufWriter *bufio.Writer
170+
needsFlush bool
171+
}
153172

154-
func (w *nopWriteCloseSyncer) Close() error {
155-
return nil
173+
func (w *snapshotWriteCloseSyncer) Write(p []byte) (n int, err error) {
174+
n, err = w.bufWriter.Write(p)
175+
if err == nil {
176+
w.needsFlush = true
177+
}
178+
return
156179
}
157180

158-
func (w *nopWriteCloseSyncer) Sync() error {
181+
func (w *snapshotWriteCloseSyncer) Close() error {
182+
if w.needsFlush {
183+
w.needsFlush = false
184+
return w.bufWriter.Flush()
185+
}
159186
return nil
160187
}
188+
189+
func (w *snapshotWriteCloseSyncer) Sync() error {
190+
if w.needsFlush {
191+
if err := w.bufWriter.Flush(); err != nil {
192+
return err
193+
}
194+
w.needsFlush = false
195+
}
196+
return syncPebble(w.peb)
197+
}

cluster/dragon/sm_utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func TestSaveRestoreSnapshot(t *testing.T) {
4141

4242
buf := &bytes.Buffer{}
4343
snapshotPrefix := table.EncodeTableKeyPrefix(tableID, shardIDs[1], 20)
44-
require.NoError(t, saveSnapshotDataToWriter(srcDB.NewSnapshot(), snapshotPrefix, buf, shardIDToSnapshot))
44+
require.NoError(t, saveSnapshotDataToWriter(srcDB, srcDB.NewSnapshot(), snapshotPrefix, buf, shardIDToSnapshot))
4545

4646
startPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), shardIDToSnapshot)
4747
endPrefix := common.AppendUint64ToBufferBE(make([]byte, 0, 8), shardIDToSnapshot+1)
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package snapshottest
2+
3+
import (
4+
"fmt"
5+
log "github.com/sirupsen/logrus"
6+
"github.com/squareup/pranadb/cluster"
7+
"github.com/squareup/pranadb/cluster/dragon"
8+
"github.com/squareup/pranadb/conf"
9+
"github.com/squareup/pranadb/errors"
10+
"github.com/squareup/pranadb/table"
11+
"github.com/stretchr/testify/require"
12+
"io/ioutil"
13+
"testing"
14+
"time"
15+
)
16+
17+
func TestSnapshot(t *testing.T) {
18+
dataDir, err := ioutil.TempDir("", "dragon-test")
19+
if err != nil {
20+
panic("failed to create temp dir")
21+
}
22+
// We start just two nodes of the three node cluster - this provides a quorum so they will accept writes
23+
dragonCluster, err := startDragonNodes(dataDir, 0, 2)
24+
if err != nil {
25+
panic(fmt.Sprintf("failed to start dragon cluster %+v", err))
26+
}
27+
node := dragonCluster[0]
28+
29+
// Write a bunch of data to a shard
30+
numKeys := 50
31+
batchSize := 1
32+
var wb *cluster.WriteBatch
33+
shardID := node.GetAllShardIDs()[0]
34+
tableID := uint64(1056)
35+
var keys [][]byte
36+
var vals []string
37+
for i := 0; i < numKeys; i++ {
38+
skey := fmt.Sprintf("some-key-%06d", i)
39+
value := fmt.Sprintf("some-value-%d", i)
40+
key := table.EncodeTableKeyPrefix(tableID, shardID, 16)
41+
key = append(key, []byte(skey)...)
42+
keys = append(keys, key)
43+
vals = append(vals, value)
44+
if wb == nil {
45+
wb = cluster.NewWriteBatch(shardID)
46+
}
47+
wb.AddPut(key, []byte(value))
48+
if i%batchSize == 0 {
49+
err := node.WriteBatch(wb)
50+
require.NoError(t, err)
51+
log.Info("wrote batch")
52+
wb = nil
53+
}
54+
}
55+
if wb != nil {
56+
err := node.WriteBatch(wb)
57+
require.NoError(t, err)
58+
log.Info("wrote batch")
59+
}
60+
61+
// At this point no snapshots should have been done.
62+
// We use an IOnDiskStateMachine and Dragon will NOT create snapshots periodically because the state machine
63+
// is persisted to disk anyway (that's not true for standard state machines)
64+
// Snapshots are only created when another node starts and needs to get its state machine in sync with the rest
65+
// of the group. In this case a snapshot stream is requested from an existing node, the snapshot is created and
66+
// streamed to the new node
67+
require.Equal(t, int64(0), dragonCluster[0].SaveSnapshotCount())
68+
require.Equal(t, int64(0), dragonCluster[0].RestoreSnapshotCount())
69+
require.Equal(t, int64(0), dragonCluster[2].SaveSnapshotCount())
70+
require.Equal(t, int64(0), dragonCluster[2].RestoreSnapshotCount())
71+
72+
// Now start the another node
73+
// This should trigger a snapshot on one of the existing nodes which should then be streamed to the new node
74+
// so it can get up to state
75+
dragonCluster2, err := startDragonNodes(dataDir, 1)
76+
if err != nil {
77+
panic(fmt.Sprintf("failed to start dragon cluster %+v", err))
78+
}
79+
require.Equal(t, int64(0), dragonCluster2[1].SaveSnapshotCount())
80+
require.Equal(t, int64(1), dragonCluster2[1].RestoreSnapshotCount())
81+
82+
require.Equal(t, int64(0), dragonCluster[0].RestoreSnapshotCount())
83+
require.Equal(t, int64(0), dragonCluster[2].RestoreSnapshotCount())
84+
85+
require.True(t, dragonCluster[0].SaveSnapshotCount() == 1 || dragonCluster[2].SaveSnapshotCount() == 1)
86+
require.False(t, dragonCluster[0].SaveSnapshotCount() == 1 && dragonCluster[2].SaveSnapshotCount() == 1)
87+
88+
// Now we read the data back from this node
89+
// We read it directly from Pebble to make sure it's actually got to that node
90+
for i, key := range keys {
91+
va, err := dragonCluster2[1].LocalGet(key)
92+
require.NoError(t, err)
93+
require.Equal(t, vals[i], string(va))
94+
}
95+
96+
stopDragonCluster(dragonCluster)
97+
stopDragonCluster(dragonCluster2)
98+
}
99+
100+
func startDragonNodes(dataDir string, nodes ...int) ([]*dragon.Dragon, error) {
101+
102+
nodeAddresses := []string{
103+
"localhost:63101",
104+
"localhost:63102",
105+
"localhost:63103",
106+
}
107+
108+
requiredNodes := map[int]struct{}{}
109+
for _, node := range nodes {
110+
requiredNodes[node] = struct{}{}
111+
}
112+
113+
var chans []chan error
114+
clusterNodes := make([]*dragon.Dragon, len(nodeAddresses))
115+
for i := 0; i < len(nodeAddresses); i++ {
116+
_, ok := requiredNodes[i]
117+
if !ok {
118+
continue
119+
}
120+
ch := make(chan error)
121+
chans = append(chans, ch)
122+
cnf := conf.NewDefaultConfig()
123+
cnf.NodeID = i
124+
cnf.ClusterID = 123
125+
cnf.RaftAddresses = nodeAddresses
126+
cnf.NumShards = 10
127+
cnf.DataDir = dataDir
128+
cnf.ReplicationFactor = 3
129+
cnf.TestServer = true
130+
cnf.DataSnapshotEntries = 10
131+
cnf.DataCompactionOverhead = 5
132+
clus, err := dragon.NewDragon(*cnf)
133+
if err != nil {
134+
return nil, err
135+
}
136+
clusterNodes[i] = clus
137+
clus.RegisterShardListenerFactory(&cluster.DummyShardListenerFactory{})
138+
clus.SetRemoteQueryExecutionCallback(&cluster.DummyRemoteQueryExecutionCallback{})
139+
140+
go startDragonNode(clus, ch)
141+
}
142+
143+
for _, ch := range chans {
144+
err, ok := <-ch
145+
if !ok {
146+
return nil, errors.Error("channel was closed")
147+
}
148+
if err != nil {
149+
return nil, errors.WithStack(err)
150+
}
151+
}
152+
153+
time.Sleep(5 * time.Second)
154+
return clusterNodes, nil
155+
}
156+
157+
func startDragonNode(clus cluster.Cluster, ch chan error) {
158+
err := clus.Start()
159+
ch <- err
160+
}
161+
162+
func stopDragonCluster(dragonCluster []*dragon.Dragon) {
163+
for _, dragon := range dragonCluster {
164+
if dragon != nil {
165+
err := dragon.Stop()
166+
if err != nil {
167+
panic(fmt.Sprintf("failed to stop dragon cluster %+v", err))
168+
}
169+
}
170+
}
171+
}

0 commit comments

Comments
 (0)