Skip to content

Commit 68f556f

Browse files
author
Hongyu Zhou
committed
reverted unwanted changes
1 parent 04be134 commit 68f556f

File tree

4 files changed

+128
-76
lines changed

4 files changed

+128
-76
lines changed

.github/workflows/build-ctlstore.yml

Lines changed: 0 additions & 37 deletions
This file was deleted.

Dockerfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ RUN apk --no-cache add sqlite
2222

2323
COPY --from=0 /bin/chamber /bin/chamber
2424
COPY --from=0 /usr/local/bin/ctlstore /usr/local/bin/
25-
COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/
25+
COPY --from=0 /usr/local/bin/ctlstore-cli /usr/local/bin/

pkg/cmd/ctlstore/main.go

Lines changed: 28 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"github.com/segmentio/ctlstore/pkg/errs"
2424
executivepkg "github.com/segmentio/ctlstore/pkg/executive"
2525
heartbeatpkg "github.com/segmentio/ctlstore/pkg/heartbeat"
26+
"github.com/segmentio/ctlstore/pkg/ldbwriter"
2627
"github.com/segmentio/ctlstore/pkg/ledger"
2728
reflectorpkg "github.com/segmentio/ctlstore/pkg/reflector"
2829
sidecarpkg "github.com/segmentio/ctlstore/pkg/sidecar"
@@ -46,22 +47,25 @@ type sidecarConfig struct {
4647
}
4748

4849
type reflectorCliConfig struct {
49-
LDBPath string `conf:"ldb-path" help:"Path to LDB file" validate:"nonzero"`
50-
ChangelogPath string `conf:"changelog-path" help:"Path to changelog file"`
51-
ChangelogSize int `conf:"changelog-size" help:"Maximum size of the changelog file"`
52-
UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"`
53-
UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"`
54-
UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"`
55-
BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"`
56-
BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"`
57-
PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"`
58-
PollJitterCoefficient float64 `conf:"poll-jitter-coefficient" help:"Coefficient for poll jittering"`
59-
QueryBlockSize int `conf:"query-block-size" help:"Number of ledger entries to get at once"`
60-
Debug bool `conf:"debug" help:"Turns on debug logging"`
61-
LedgerHealth ledgerHealthConfig `conf:"ledger-latency" help:"Configure ledger latency behavior"`
62-
Dogstatsd dogstatsdConfig `conf:"dogstatsd" help:"dogstatsd Configuration"`
63-
MetricsBind string `conf:"metrics-bind" help:"address to serve Prometheus metircs"`
64-
WALPollInterval time.Duration `conf:"wal-poll-interval" help:"How often to pull the sqlite's wal size and status. 0 indicates disabled monitoring'"`
50+
LDBPath string `conf:"ldb-path" help:"Path to LDB file" validate:"nonzero"`
51+
ChangelogPath string `conf:"changelog-path" help:"Path to changelog file"`
52+
ChangelogSize int `conf:"changelog-size" help:"Maximum size of the changelog file"`
53+
UpstreamDriver string `conf:"upstream-driver" help:"Upstream driver name (e.g. sqlite3)" validate:"nonzero"`
54+
UpstreamDSN string `conf:"upstream-dsn" help:"Upstream DSN (e.g. path to file if sqlite3)" validate:"nonzero"`
55+
UpstreamLedgerTable string `conf:"upstream-ledger-table" help:"Table on the upstream to look for statement ledger"`
56+
BootstrapURL string `conf:"bootstrap-url" help:"Bootstraps LDB from an S3 URL"`
57+
BootstrapRegion string `conf:"bootstrap-region" help:"If specified, indicates which region in which the S3 bucket lives"`
58+
PollInterval time.Duration `conf:"poll-interval" help:"How often to pull the upstream" validate:"nonzero"`
59+
PollJitterCoefficient float64 `conf:"poll-jitter-coefficient" help:"Coefficient for poll jittering"`
60+
QueryBlockSize int `conf:"query-block-size" help:"Number of ledger entries to get at once"`
61+
Debug bool `conf:"debug" help:"Turns on debug logging"`
62+
LedgerHealth ledgerHealthConfig `conf:"ledger-latency" help:"Configure ledger latency behavior"`
63+
Dogstatsd dogstatsdConfig `conf:"dogstatsd" help:"dogstatsd Configuration"`
64+
MetricsBind string `conf:"metrics-bind" help:"address to serve Prometheus metircs"`
65+
WALPollInterval time.Duration `conf:"wal-poll-interval" help:"How often to pull the sqlite's wal size and status. 0 indicates disabled monitoring'"`
66+
WALCheckpointThresholdSize int `conf:"wal-checkpoint-threshold-size" help:"Performs a checkpoint after the WAL file exceeds this size in bytes"`
67+
WALCheckpointType ldbwriter.CheckpointType `conf:"wal-checkpoint-type" help:"what type of checkpoint to manually perform once the wal size is exceeded"`
68+
BusyTimeoutMS int `conf:"busy-timeout-ms" help:"Set a busy timeout on the connection string for sqlite in milliseconds"`
6569
}
6670

6771
type executiveCliConfig struct {
@@ -490,6 +494,9 @@ func defaultReflectorCLIConfig(isSupervisor bool) reflectorCliConfig {
490494
},
491495
// disabled by default
492496
WALPollInterval: 0,
497+
// 8 MB, double what a "healthy" WAL file should be https://www.sqlite.org/compile.html#default_wal_autocheckpoint
498+
WALCheckpointThresholdSize: 8 * 1024 * 1024,
499+
WALCheckpointType: ldbwriter.Passive,
493500
}
494501
if isSupervisor {
495502
// the supervisor runs as an ECS task, so it cannot yet set
@@ -547,7 +554,10 @@ func newReflector(cliCfg reflectorCliConfig, isSupervisor bool) (*reflectorpkg.R
547554
QueryBlockSize: cliCfg.QueryBlockSize,
548555
PollTimeout: 5 * time.Second,
549556
},
550-
WALPollInterval: cliCfg.WALPollInterval,
551-
DoMonitorWAL: cliCfg.WALPollInterval > 0,
557+
WALPollInterval: cliCfg.WALPollInterval,
558+
DoMonitorWAL: cliCfg.WALPollInterval > 0,
559+
WALCheckpointThresholdSize: cliCfg.WALCheckpointThresholdSize,
560+
WALCheckpointType: cliCfg.WALCheckpointType,
561+
BusyTimeoutMS: cliCfg.BusyTimeoutMS,
552562
})
553563
}

pkg/reflector/wal_monitor_test.go

Lines changed: 99 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,26 @@ import (
55
"fmt"
66
"os"
77
"sync"
8+
"sync/atomic"
89
"testing"
910
"time"
1011

1112
"github.com/segmentio/ctlstore/pkg/ldbwriter"
1213
)
1314

1415
type fake struct {
15-
size int64
16-
err error
17-
wg sync.WaitGroup
18-
statCallCount int
19-
checkCallCount int
16+
size int64
17+
err error
18+
wg sync.WaitGroup
19+
statCallCount atomic.Int64
20+
cpCallCount atomic.Int64
2021
}
2122

2223
func (f *fake) Stat() func(m *WALMonitor) {
2324
return func(m *WALMonitor) {
2425
m.walSizeFunc = func(p string) (int64, error) {
2526
defer f.wg.Done()
26-
f.statCallCount++
27+
f.statCallCount.Add(1)
2728
v, err := m.getWALSize(p)
2829
f.size = v
2930
f.err = err
@@ -43,13 +44,14 @@ func (f *fake) Ticker() func(m *WALMonitor) {
4344
func (f *fake) Checkpointer() func(m *WALMonitor) {
4445
return func(m *WALMonitor) {
4546
m.cpTesterFunc = func() (*ldbwriter.PragmaWALResult, error) {
46-
f.checkCallCount++
47+
defer f.wg.Done()
48+
f.cpCallCount.Add(1)
4749
return nil, fmt.Errorf("fail")
4850
}
4951
}
5052
}
5153

52-
func TestWALMonitorSize(t *testing.T) {
54+
func TestWALMonitorTooSmall(t *testing.T) {
5355
tmpdir := t.TempDir()
5456
f, err := os.CreateTemp(tmpdir, "*.ldb-wal")
5557
if err != nil {
@@ -66,10 +68,57 @@ func TestWALMonitorSize(t *testing.T) {
6668
}
6769

6870
var fake fake
69-
fake.wg.Add(1)
71+
fake.wg.Add(2)
7072
mon := NewMonitor(MonitorConfig{
71-
PollInterval: time.Millisecond,
72-
Path: f.Name(),
73+
PollInterval: time.Millisecond,
74+
Path: f.Name(),
75+
WALCheckpointThresholdSize: int64(n + 1),
76+
}, nil, fake.Stat(), fake.Ticker(), fake.Checkpointer())
77+
78+
ctx, cancel := context.WithCancel(context.Background())
79+
go mon.Start(ctx)
80+
// wait for fake stat call
81+
fake.wg.Wait()
82+
cancel()
83+
84+
if fake.statCallCount.Load() == 0 {
85+
t.Errorf("Stat should have been called at least once")
86+
}
87+
88+
if fake.cpCallCount.Load() != 0 {
89+
t.Errorf("Checkpoint should not have been called since the file wasn't large enough")
90+
}
91+
if fake.err != nil {
92+
t.Errorf("unexpected error on stat: %v", fake.err)
93+
}
94+
95+
if int64(n) != fake.size {
96+
t.Errorf("expected file size of %d, got %d", n, fake.size)
97+
}
98+
}
99+
100+
func TestWALMonitorBigEnough(t *testing.T) {
101+
tmpdir := t.TempDir()
102+
f, err := os.CreateTemp(tmpdir, "*.ldb-wal")
103+
if err != nil {
104+
t.Fatal(err)
105+
}
106+
107+
n, err := f.WriteString("some random bytes!")
108+
if err != nil {
109+
t.Fatal(err)
110+
}
111+
112+
if f.Sync() != nil {
113+
t.Fatal(err)
114+
}
115+
116+
var fake fake
117+
fake.wg.Add(2)
118+
mon := NewMonitor(MonitorConfig{
119+
PollInterval: time.Millisecond,
120+
Path: f.Name(),
121+
WALCheckpointThresholdSize: int64(n - 1),
73122
}, nil, fake.Stat(), fake.Ticker(), fake.Checkpointer())
74123

75124
ctx, cancel := context.WithCancel(context.Background())
@@ -78,11 +127,11 @@ func TestWALMonitorSize(t *testing.T) {
78127
fake.wg.Wait()
79128
cancel()
80129

81-
if fake.statCallCount == 0 {
130+
if fake.statCallCount.Load() == 0 {
82131
t.Errorf("Stat should have been called at least once")
83132
}
84133

85-
if fake.checkCallCount == 0 {
134+
if fake.cpCallCount.Load() == 0 {
86135
t.Errorf("Checkpoint should have been called at least once")
87136
}
88137
if fake.err != nil {
@@ -103,16 +152,16 @@ func TestNoWALPath(t *testing.T) {
103152

104153
mon.Start(context.Background())
105154

106-
if fake.statCallCount != 0 {
155+
if fake.statCallCount.Load() != 0 {
107156
t.Errorf("Stat should not have been called")
108157
}
109158

110-
if fake.checkCallCount != 0 {
159+
if fake.cpCallCount.Load() != 0 {
111160
t.Errorf("Checkpoint should not have been called")
112161
}
113162
}
114163

115-
func TestWALMonitorStopsOnError(t *testing.T) {
164+
func TestWALMonitorStopsOnStatError(t *testing.T) {
116165
var fake fake
117166
fake.wg.Add(5)
118167
mon := NewMonitor(MonitorConfig{
@@ -122,11 +171,41 @@ func TestWALMonitorStopsOnError(t *testing.T) {
122171

123172
mon.Start(context.Background())
124173
fake.wg.Wait()
125-
if fake.statCallCount != 5 {
126-
t.Errorf("Stat should have been called 5 times, got %d", fake.statCallCount)
174+
if fake.statCallCount.Load() != 5 {
175+
t.Errorf("Stat should have been called 5 times, got %d", fake.statCallCount.Load())
127176
}
128177

129-
if fake.checkCallCount != 5 {
130-
t.Errorf("Checkpoint should have have been called 5 times, got %d", fake.checkCallCount)
178+
if fake.cpCallCount.Load() != 0 {
179+
t.Errorf("Checkpoint should not have been called")
180+
}
181+
}
182+
183+
func TestWALMonitorStopsOnCheckpointError(t *testing.T) {
184+
tmpdir := t.TempDir()
185+
f, err := os.CreateTemp(tmpdir, "*.ldb-wal")
186+
if err != nil {
187+
t.Fatal(err)
188+
}
189+
190+
_, err = f.WriteString("some random bytes!")
191+
if err != nil {
192+
t.Fatal(err)
193+
}
194+
195+
var fake fake
196+
fake.wg.Add(10)
197+
mon := NewMonitor(MonitorConfig{
198+
PollInterval: 50 * time.Microsecond,
199+
Path: f.Name(),
200+
}, nil, fake.Stat(), fake.Checkpointer())
201+
202+
mon.Start(context.Background())
203+
fake.wg.Wait()
204+
if fake.statCallCount.Load() != 5 {
205+
t.Errorf("Stat should have been called 5 times, got %d", fake.statCallCount.Load())
206+
}
207+
208+
if fake.cpCallCount.Load() != 5 {
209+
t.Errorf("Checkpoint should not have been called")
131210
}
132211
}

0 commit comments

Comments
 (0)