Skip to content

Commit f6fdf2c

Browse files
authored
Multi-Shoveler Open and Close (#131)
* start up multiple reflectors * Concurrent ldb downloads * Add rotating reader and tests * add convenience methods * use go 1.13 friendly atomic interface * add support for multiple LDB copies in InitContainer * only create dbs if they don't exist * reopen and close last rotated reader * create a changelog for first ldb in multimode
1 parent 4839bef commit f6fdf2c

File tree

14 files changed

+840
-75
lines changed

14 files changed

+840
-75
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ require (
2323
github.com/segmentio/go-sqlite3 v1.12.0
2424
github.com/segmentio/stats/v4 v4.6.2
2525
github.com/stretchr/testify v1.8.1
26+
golang.org/x/sync v0.3.0
2627
)
2728

2829
require (

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
156156
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
157157
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
158158
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
159+
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
160+
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
159161
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
160162
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
161163
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=

ldb_reader.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
// across multiple processes.
2929
type LDBReader struct {
3030
Db *sql.DB
31+
path string
3132
pkCache map[string]schema.PrimaryKey // keyed by ldbTableName()
3233
getRowByKeyStmtCache map[string]*sql.Stmt // keyed by ldbTableName()
3334
getRowsByKeyPrefixStmtCache map[prefixCacheKey]*sql.Stmt
@@ -46,13 +47,17 @@ var (
4647
ErrNoLedgerUpdates = errors.New("no ledger updates have been received yet")
4748
)
4849

50+
type RowRetriever interface {
51+
GetRowsByKeyPrefix(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error)
52+
GetRowByKey(ctx context.Context, out interface{}, familyName string, tableName string, key ...interface{}) (found bool, err error)
53+
}
54+
4955
func newLDBReader(path string) (*LDBReader, error) {
5056
db, err := newLDB(path)
5157
if err != nil {
5258
return nil, err
5359
}
54-
55-
return &LDBReader{Db: db}, nil
60+
return &LDBReader{Db: db, path: path}, nil
5661
}
5762

5863
func newVersionedLDBReader(dirPath string) (*LDBReader, error) {
@@ -344,7 +349,6 @@ func (reader *LDBReader) closeDB() error {
344349
if reader.Db != nil {
345350
return reader.Db.Close()
346351
}
347-
348352
return nil
349353
}
350354

@@ -369,7 +373,7 @@ func (reader *LDBReader) Ping(ctx context.Context) bool {
369373
// to the type of each PK column.
370374
func convertKeyBeforeQuery(pk schema.PrimaryKey, key []interface{}) error {
371375
for i, k := range key {
372-
// sanity check on th elength of the pk field type slice
376+
// sanity check on the length of the pk field type slice
373377
if i >= len(pk.Types) {
374378
return errors.New("insufficient key field type data")
375379
}

ldb_rotating_reader.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
package ctlstore
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"github.com/segmentio/ctlstore/pkg/errs"
8+
"github.com/segmentio/ctlstore/pkg/globalstats"
9+
"github.com/segmentio/ctlstore/pkg/ldb"
10+
"github.com/segmentio/events/v2"
11+
"github.com/segmentio/stats/v4"
12+
"path"
13+
"strconv"
14+
"sync/atomic"
15+
"time"
16+
)
17+
18+
// LDBRotatingReader reads data from multiple LDBs on a rotating schedule.
19+
// The main benefit is relieving read pressure on a particular ldb file when it becomes inactive,
20+
// allowing sqlite maintenance
21+
type LDBRotatingReader struct {
22+
active int32
23+
dbs []*LDBReader
24+
schedule []int8
25+
now func() time.Time
26+
tickerInterval time.Duration
27+
}
28+
29+
// RotationPeriod how many minutes each reader is active for before rotating to the next
30+
type RotationPeriod int
31+
32+
const (
33+
// Every30 rotate on 30 minute mark in an hour
34+
Every30 RotationPeriod = 30
35+
// Every20 rotate on 20 minute marks in an hour
36+
Every20 RotationPeriod = 20
37+
// Every15 rotate on 15 minute marks in an hour
38+
Every15 RotationPeriod = 15
39+
// Every10 rotate on 10 minute marks in an hour
40+
Every10 RotationPeriod = 10
41+
// Every6 rotate on 6 minute marks in an hour
42+
Every6 RotationPeriod = 6
43+
44+
// for simpler migration, the first ldb retains the original name
45+
defaultPath = DefaultCtlstorePath + ldb.DefaultLDBFilename
46+
ldbFormat = DefaultCtlstorePath + "ldb-%d.db"
47+
)
48+
49+
func defaultPaths(count int) []string {
50+
paths := []string{defaultPath}
51+
for i := 1; i < count; i++ {
52+
paths = append(paths, fmt.Sprintf(ldbFormat, i+1))
53+
}
54+
return paths
55+
}
56+
57+
// RotatingReader creates a new reader that rotates which ldb it reads from on a rotation period with the default location in /var/spool/ctlstore
58+
func RotatingReader(ctx context.Context, minutesPerRotation RotationPeriod, ldbsCount int) (RowRetriever, error) {
59+
return CustomerRotatingReader(ctx, minutesPerRotation, defaultPaths(ldbsCount)...)
60+
}
61+
62+
// CustomerRotatingReader creates a new reader that rotates which ldb it reads from on a rotation period
63+
func CustomerRotatingReader(ctx context.Context, minutesPerRotation RotationPeriod, ldbPaths ...string) (RowRetriever, error) {
64+
r, err := rotatingReader(minutesPerRotation, ldbPaths...)
65+
if err != nil {
66+
return nil, err
67+
}
68+
r.setActive()
69+
go r.rotate(ctx)
70+
return r, nil
71+
}
72+
73+
func rotatingReader(minutesPerRotation RotationPeriod, ldbPaths ...string) (*LDBRotatingReader, error) {
74+
if len(ldbPaths) < 2 {
75+
return nil, errors.New("RotatingReader requires more than 1 ldb")
76+
}
77+
if !isValid(minutesPerRotation) {
78+
return nil, errors.New(fmt.Sprintf("invalid rotation period: %v", minutesPerRotation))
79+
}
80+
if len(ldbPaths) > 60/int(minutesPerRotation) {
81+
return nil, errors.New("cannot have more ldbs than rotations per hour")
82+
}
83+
var r LDBRotatingReader
84+
for _, p := range ldbPaths {
85+
events.Log("Opening ldb %s for reading", p)
86+
reader, err := newLDBReader(p)
87+
if err != nil {
88+
return nil, err
89+
}
90+
r.dbs = append(r.dbs, reader)
91+
}
92+
r.schedule = make([]int8, 60)
93+
idx := 0
94+
for i := 1; i < 61; i++ {
95+
r.schedule[i-1] = int8(idx % len(ldbPaths))
96+
if i%int(minutesPerRotation) == 0 {
97+
idx++
98+
}
99+
}
100+
return &r, nil
101+
}
102+
103+
func (r *LDBRotatingReader) setActive() {
104+
if r.now == nil {
105+
r.now = time.Now
106+
}
107+
atomic.StoreInt32(&r.active, int32(r.schedule[r.now().Minute()]))
108+
}
109+
110+
// GetRowsByKeyPrefix delegates to the active LDBReader
111+
func (r *LDBRotatingReader) GetRowsByKeyPrefix(ctx context.Context, familyName string, tableName string, key ...interface{}) (*Rows, error) {
112+
return r.dbs[atomic.LoadInt32(&r.active)].GetRowsByKeyPrefix(ctx, familyName, tableName, key...)
113+
}
114+
115+
// GetRowByKey delegates to the active LDBReader
116+
func (r *LDBRotatingReader) GetRowByKey(ctx context.Context, out interface{}, familyName string, tableName string, key ...interface{}) (found bool, err error) {
117+
return r.dbs[atomic.LoadInt32(&r.active)].GetRowByKey(ctx, out, familyName, tableName, key...)
118+
}
119+
120+
// rotate by default checks every 1 minute if the active db has changed according to schedule
121+
func (r *LDBRotatingReader) rotate(ctx context.Context) {
122+
if r.tickerInterval == 0 {
123+
r.tickerInterval = 1 * time.Minute
124+
}
125+
ticker := time.NewTicker(r.tickerInterval)
126+
127+
for {
128+
select {
129+
case <-ctx.Done():
130+
return
131+
case <-ticker.C:
132+
next := r.schedule[r.now().Minute()]
133+
last := atomic.LoadInt32(&r.active)
134+
135+
// move the next to active and close and reopen the last one
136+
if int32(next) != last {
137+
atomic.StoreInt32(&r.active, int32(next))
138+
stats.Incr("rotating_reader.rotate")
139+
globalstats.Set("rotating_reader.active", next)
140+
err := r.dbs[last].Close()
141+
if err != nil {
142+
events.Log("failed to close LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err)
143+
errs.Incr("rotating_reader.closing_ldbreader", stats.T("id", strconv.Itoa(int(last))))
144+
return
145+
}
146+
147+
reader, err := newLDBReader(r.dbs[last].path)
148+
if err != nil {
149+
events.Log("failed to open LDBReader for %s on rotation: %{error}v", r.dbs[last].path, err)
150+
errs.Incr("rotating_reader.opening_ldbreader",
151+
stats.T("id", strconv.Itoa(int(last))),
152+
stats.T("path", path.Base(r.dbs[last].path)))
153+
return
154+
}
155+
r.dbs[last] = reader
156+
157+
}
158+
}
159+
}
160+
}
161+
162+
func isValid(rf RotationPeriod) bool {
163+
switch rf {
164+
case Every6, Every10, Every15, Every20, Every30:
165+
return true
166+
}
167+
return false
168+
}

0 commit comments

Comments
 (0)