Skip to content

Commit d7e2ec3

Browse files
authored
Merge branch 'master' into surrealdb
Signed-off-by: Chiru B <33750251+itsezc@users.noreply.github.com>
2 parents 0a2de34 + 60bd963 commit d7e2ec3

File tree

12 files changed

+95
-22
lines changed

12 files changed

+95
-22
lines changed

.github/workflows/pr.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ jobs:
9494
fail-fast: false
9595
matrix:
9696
arch: [amd64, arm64]
97-
test: [sqlite, mysql, postgres, cockroachdb, schema-migration, nats, nats-embedded, nats-socket]
97+
test: [sqlite, litestream, mysql, postgres, cockroachdb, schema-migration, nats, nats-embedded, nats-socket]
9898
runs-on: ${{ matrix.arch == 'arm64' && 'ubuntu-24.04-arm' || 'ubuntu-latest' }}
9999
steps:
100100
- name: Download Artifacts

go.mod

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ require (
88
github.com/golang/protobuf v1.5.4
99
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
1010
github.com/jackc/pgx/v5 v5.8.0
11-
github.com/klauspost/compress v1.18.2
11+
github.com/klauspost/compress v1.18.3
1212
github.com/mattn/go-sqlite3 v1.14.33
1313
github.com/nats-io/jsm.go v0.3.0
1414
github.com/nats-io/nats-server/v2 v2.12.2
@@ -18,6 +18,7 @@ require (
1818
github.com/shengdoushi/base58 v1.0.0
1919
github.com/sirupsen/logrus v1.9.3
2020
github.com/surrealdb/surrealdb.go v1.0.0
21+
github.com/sirupsen/logrus v1.9.4
2122
github.com/tidwall/btree v1.8.1
2223
github.com/urfave/cli/v2 v2.27.7
2324
go.etcd.io/etcd/api/v3 v3.6.7

go.sum

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
100100
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
101101
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
102102
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
103-
github.com/klauspost/compress v1.18.2 h1:iiPHWW0YrcFgpBYhsA6D1+fqHssJscY/Tm/y2Uqnapk=
104-
github.com/klauspost/compress v1.18.2/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
103+
github.com/klauspost/compress v1.18.3 h1:9PJRvfbmTabkOX8moIpXPbMMbYN60bWImDDU7L+/6zw=
104+
github.com/klauspost/compress v1.18.3/go.mod h1:R0h/fSBs8DE4ENlcrlib3PsXS61voFxhIs2DeRhCvJ4=
105105
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
106106
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
107107
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
@@ -162,8 +162,8 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
162162
github.com/shengdoushi/base58 v1.0.0 h1:tGe4o6TmdXFJWoI31VoSWvuaKxf0Px3gqa3sUWhAxBs=
163163
github.com/shengdoushi/base58 v1.0.0/go.mod h1:m5uIILfzcKMw6238iWAhP4l3s5+uXyF3+bJKUNhAL9I=
164164
github.com/sirupsen/logrus v1.8.1/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0=
165-
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
166-
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
165+
github.com/sirupsen/logrus v1.9.4 h1:TsZE7l11zFCLZnZ+teH4Umoq5BhEIfIzfRDZ1Uzql2w=
166+
github.com/sirupsen/logrus v1.9.4/go.mod h1:ftWc9WdOfJ0a92nsE2jF5u5ZwH8Bv2zdeOC42RjbV2g=
167167
github.com/soheilhy/cmux v0.1.5 h1:jjzc5WVemNEDTLwv9tlmemhC73tI08BNOIGwBOo10Js=
168168
github.com/soheilhy/cmux v0.1.5/go.mod h1:T7TcVDs9LWfQgPlPsdngu6I6QIoyIFZDDC6sNE1GqG0=
169169
github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o=
@@ -270,7 +270,6 @@ golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7w
270270
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
271271
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
272272
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
273-
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
274273
golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
275274
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
276275
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=

pkg/drivers/sqlite/sqlite.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"database/sql"
88
"fmt"
99
"os"
10+
"strings"
1011
"sync"
1112

1213
"github.com/k3s-io/kine/pkg/drivers"
@@ -39,16 +40,20 @@ var (
3940
`CREATE INDEX IF NOT EXISTS kine_id_deleted_index ON kine (id,deleted)`,
4041
`CREATE INDEX IF NOT EXISTS kine_prev_revision_index ON kine (prev_revision)`,
4142
`CREATE UNIQUE INDEX IF NOT EXISTS kine_name_prev_revision_uindex ON kine (name, prev_revision)`,
42-
`PRAGMA wal_checkpoint(TRUNCATE)`,
4343
}
4444
)
4545

4646
func New(ctx context.Context, wg *sync.WaitGroup, cfg *drivers.Config) (bool, server.Backend, error) {
47-
backend, _, err := NewVariant(ctx, wg, "sqlite3", cfg)
47+
backend, _, err := NewVariant(ctx, wg, "sqlite3", cfg, false)
4848
return false, backend, err
4949
}
5050

51-
func NewVariant(ctx context.Context, wg *sync.WaitGroup, driverName string, cfg *drivers.Config) (server.Backend, *generic.Generic, error) {
51+
func NewWithLitestream(ctx context.Context, wg *sync.WaitGroup, cfg *drivers.Config) (bool, server.Backend, error) {
52+
backend, _, err := NewVariant(ctx, wg, "litestream", cfg, true)
53+
return false, backend, err
54+
}
55+
56+
func NewVariant(ctx context.Context, wg *sync.WaitGroup, driverName string, cfg *drivers.Config, litestream bool) (server.Backend, *generic.Generic, error) {
5257
dataSourceName := cfg.DataSourceName
5358
if dataSourceName == "" {
5459
if err := os.MkdirAll("./db", 0700); err != nil {
@@ -57,6 +62,15 @@ func NewVariant(ctx context.Context, wg *sync.WaitGroup, driverName string, cfg
5762
dataSourceName = "./db/state.db?_journal=WAL&cache=shared&_busy_timeout=30000&_txlock=immediate"
5863
}
5964

65+
noCompactCheckpoint := strings.Contains(dataSourceName, "_kine_disable_compact_wal_checkpoint")
66+
noAutoCheckpoint := strings.Contains(dataSourceName, "_kine_disable_wal_autocheckpoint")
67+
68+
if driverName == "litestream" {
69+
logrus.Infof("Litestream compatibility options enabled (all WAL checkpointing disabled)")
70+
noCompactCheckpoint = true
71+
noAutoCheckpoint = true
72+
}
73+
6074
dialect, err := generic.Open(ctx, wg, driverName, dataSourceName, cfg.ConnectionPoolConfig, "?", false, cfg.MetricsRegisterer)
6175
if err != nil {
6276
return nil, nil, err
@@ -81,7 +95,11 @@ func NewVariant(ctx context.Context, wg *sync.WaitGroup, driverName string, cfg
8195
kd.deleted != 0 AND
8296
kd.id <= ?
8397
)`
84-
dialect.PostCompactSQL = `PRAGMA wal_checkpoint(FULL)`
98+
if noCompactCheckpoint {
99+
logrus.Infof("WAL checkpoint on compact is disabled")
100+
} else {
101+
dialect.PostCompactSQL = `PRAGMA wal_checkpoint(FULL)`
102+
}
85103
dialect.TranslateErr = func(err error) error {
86104
if err, ok := err.(sqlite3.Error); ok && err.ExtendedCode == sqlite3.ErrConstraintUnique {
87105
return server.ErrKeyExists
@@ -98,17 +116,26 @@ func NewVariant(ctx context.Context, wg *sync.WaitGroup, driverName string, cfg
98116
return err.Error()
99117
}
100118

101-
if err := setup(dialect.DB); err != nil {
119+
if err := setup(dialect.DB, noCompactCheckpoint, noAutoCheckpoint); err != nil {
102120
return nil, nil, errors.Wrap(err, "setup db")
103121
}
104122

105123
dialect.Migrate(context.Background())
106124
return logstructured.New(sqllog.New(dialect, cfg.CompactInterval, cfg.CompactIntervalJitter, cfg.CompactTimeout, cfg.CompactMinRetain, cfg.CompactBatchSize, cfg.PollBatchSize)), dialect, nil
107125
}
108126

109-
func setup(db *sql.DB) error {
127+
func setup(db *sql.DB, noCheckpointing, noAutoCheckpoint bool) error {
110128
logrus.Infof("Configuring database table schema and indexes, this may take a moment...")
111129

130+
schema := append([]string{}, schema...)
131+
if !noCheckpointing {
132+
schema = append(schema, `PRAGMA wal_checkpoint(TRUNCATE)`)
133+
}
134+
if noAutoCheckpoint {
135+
logrus.Infof("WAL auto-checkpoint is disabled")
136+
schema = append(schema, `PRAGMA wal_autocheckpoint = 0`)
137+
}
138+
112139
for _, stmt := range schema {
113140
logrus.Tracef("SETUP EXEC : %v", util.Stripped(stmt))
114141
_, err := db.Exec(stmt)
@@ -122,6 +149,13 @@ func setup(db *sql.DB) error {
122149
}
123150

124151
func init() {
152+
sql.Register("litestream", &sqlite3.SQLiteDriver{
153+
ConnectHook: func(conn *sqlite3.SQLiteConn) (err error) {
154+
return conn.SetFileControlInt("main", sqlite3.SQLITE_FCNTL_PERSIST_WAL, 1)
155+
},
156+
})
157+
125158
drivers.Register("sqlite", New)
159+
drivers.Register("litestream", NewWithLitestream)
126160
drivers.SetDefault("sqlite")
127161
}

pkg/endpoint/endpoint.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -287,7 +287,7 @@ func (l *loggingServerStream) SendMsg(m any) error {
287287
start := time.Now()
288288
defer func() {
289289
if wr, ok := m.(*etcdserverpb.WatchResponse); ok {
290-
logrus.Tracef("STREAM STATS WATCH SEND DONE id=%d, revision=%d, events=%d, size=%d, time=%s", wr.WatchId, wr.Header.Revision, len(wr.Events), wr.Size(), time.Since(start).Truncate(time.Microsecond))
290+
logrus.Tracef("STREAM STATS WATCH SEND DONE id=%d, revision=%d, events=%d, size=%d, reason=%q, time=%s", wr.WatchId, wr.Header.Revision, len(wr.Events), wr.Size(), wr.CancelReason, time.Since(start).Truncate(time.Microsecond))
291291
return
292292
}
293293
if p, ok := m.(proto.Message); ok {

pkg/logstructured/logstructured.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,9 +78,6 @@ func (l *LogStructured) get(ctx context.Context, key, rangeEnd string, limit, re
7878
if err != nil {
7979
return 0, nil, err
8080
}
81-
if revision != 0 {
82-
rev = revision
83-
}
8481
if len(events) == 0 {
8582
return rev, nil, nil
8683
}
@@ -202,8 +199,6 @@ func (l *LogStructured) List(ctx context.Context, prefix, startKey string, limit
202199
return currentRev, nil, err
203200
}
204201
return l.List(ctx, prefix, startKey, limit, currentRev, keysOnly)
205-
} else if revision != 0 {
206-
rev = revision
207202
}
208203

209204
kvs := make([]*server.KeyValue, 0, len(events))

pkg/logstructured/sqllog/sql.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,7 @@ func (s *SQLLog) List(ctx context.Context, prefix, startKey string, limit, revis
355355
return 0, nil, err
356356
}
357357

358-
if revision > 0 && len(result) == 0 {
358+
if revision != 0 && len(result) == 0 {
359359
// a zero length result won't have the compact or current revisions so get them manually
360360
rev, err = s.CurrentRevision(ctx)
361361
if err != nil {

pkg/server/list.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,10 @@ func (l *LimitedServer) list(ctx context.Context, r *etcdserverpb.RangeRequest)
1919
prefix = prefix + "/"
2020
}
2121
start := string(r.Key)
22-
revision := r.Revision
22+
revision := int64(0)
23+
if r.Revision > 0 {
24+
revision = r.Revision
25+
}
2326

2427
if r.CountOnly {
2528
rev, count, err := l.backend.Count(ctx, prefix, start, revision)

pkg/server/types.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
var (
1313
ErrNotSupported = status.New(codes.InvalidArgument, "etcdserver: unsupported operations in txn request").Err()
14+
ErrInvalidWatch = status.New(codes.InvalidArgument, "etcdserver: unsupported options in watch request").Err()
1415

1516
ErrKeyExists = rpctypes.ErrGRPCDuplicateKey
1617
ErrCompacted = rpctypes.ErrGRPCCompacted

pkg/server/watch.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,14 @@ type watcher struct {
7676

7777
func (w *watcher) Start(ctx context.Context, r *etcdserverpb.WatchCreateRequest) {
7878
if r.WatchId != clientv3.AutoWatchID {
79-
logrus.Warnf("WATCH START server=%d, id=%d ignoring request with client-provided id", w.id, r.WatchId)
79+
logrus.Warnf("WATCH START server=%d, id=%d rejecting request with client-provided id", w.id, r.WatchId)
80+
w.CancelEarly(ctx, ErrInvalidWatch)
81+
return
82+
}
83+
84+
if r.StartRevision < 0 {
85+
logrus.Warnf("WATCH START server=%d rejecting request with negative StartRevision=%d", w.id, r.StartRevision)
86+
w.CancelEarly(ctx, ErrCompacted)
8087
return
8188
}
8289

@@ -231,6 +238,26 @@ func (w *watcher) removeWatch(watchID int64) bool {
231238
return false
232239
}
233240

241+
func (w *watcher) CancelEarly(ctx context.Context, err error) {
242+
rev, err := w.backend.CurrentRevision(ctx)
243+
if err != nil {
244+
logrus.Warnf("Failed to get current revision for early watch cancel: %v", err)
245+
return
246+
}
247+
248+
err = w.server.Send(&etcdserverpb.WatchResponse{
249+
Header: txnHeader(rev),
250+
WatchId: clientv3.InvalidWatchID,
251+
Canceled: true,
252+
Created: true,
253+
CancelReason: err.Error(),
254+
})
255+
256+
if err != nil && !clientv3.IsConnCanceled(err) {
257+
logrus.Errorf("WATCH Failed to send early cancel response for server=%d: %v", w.id, err)
258+
}
259+
}
260+
234261
func (w *watcher) Cancel(watchID, revision, compactRev int64, err error) {
235262
// do not send WatchResponse for unknown watch ID
236263
if !w.removeWatch(watchID) {

0 commit comments

Comments
 (0)