Skip to content

Commit 5039022

Browse files
craig[bot]pav-kv
andcommitted
Merge #152665
152665: wag: introduce WAG storage r=arulajmani a=pav-kv This PR introduces the WAG sequence writer and the corresponding Store-local schema. The writes are tested with a basic `echotest` imitating the future use of the WAG. Resolves #149604 Co-authored-by: Pavel Kalinnikov <[email protected]>
2 parents 9a77ab6 + adbf895 commit 5039022

File tree

15 files changed

+380
-5
lines changed

15 files changed

+380
-5
lines changed

pkg/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,7 @@ ALL_TESTS = [
265265
"//pkg/kv/kvserver/kvflowcontrol/rac2:rac2_test",
266266
"//pkg/kv/kvserver/kvflowcontrol/replica_rac2:replica_rac2_test",
267267
"//pkg/kv/kvserver/kvstorage/snaprecv:snaprecv_test",
268+
"//pkg/kv/kvserver/kvstorage/wag:wag_test",
268269
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
269270
"//pkg/kv/kvserver/leases:leases_test",
270271
"//pkg/kv/kvserver/liveness:liveness_test",
@@ -1564,6 +1565,8 @@ GO_TARGETS = [
15641565
"//pkg/kv/kvserver/kvstorage/snaprecv:snaprecv",
15651566
"//pkg/kv/kvserver/kvstorage/snaprecv:snaprecv_test",
15661567
"//pkg/kv/kvserver/kvstorage/wag/wagpb:wagpb",
1568+
"//pkg/kv/kvserver/kvstorage/wag:wag",
1569+
"//pkg/kv/kvserver/kvstorage/wag:wag_test",
15671570
"//pkg/kv/kvserver/kvstorage:kvstorage",
15681571
"//pkg/kv/kvserver/kvstorage:kvstorage_test",
15691572
"//pkg/kv/kvserver/leases:leases",

pkg/keys/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ var (
226226
// SupporterMeta stores the highest timestamp at which support has been
227227
// withdrawn.
228228
localStoreLivenessSupporterMeta = []byte("slsm")
229+
// localStoreWAGNodeSuffix is the suffix for WAG nodes.
230+
localStoreWAGNodeSuffix = []byte("wagn")
229231
// localStoreLivenessSupportFor stores the Store Liveness support by the local
230232
// store for a store in the cluster. It includes the epoch and expiration of
231233
// support.

pkg/keys/doc.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,7 @@ var _ = [...]interface{}{
219219
StoreLivenessSupporterMetaKey, // "slsm"
220220
StoreCachedSettingsKey, // "stng"
221221
StoreLastUpKey, // "uptm"
222+
StoreWAGNodeKey, // "wagn"
222223

223224
// 5. Range lock keys for all replicated locks. All range locks share
224225
// LocalRangeLockTablePrefix. Locks can be acquired on global keys and on

pkg/keys/keys.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,32 @@ func DecodeStoreLivenessSupportForKey(key roachpb.Key) (roachpb.NodeID, roachpb.
140140
return nodeID, storeID, nil
141141
}
142142

143+
// StoreWAGPrefix returns the key prefix for WAG nodes.
144+
func StoreWAGPrefix() roachpb.Key {
145+
return MakeStoreKey(localStoreWAGNodeSuffix, nil)
146+
}
147+
148+
// StoreWAGNodeKey returns the key for a WAG node at the given index.
149+
func StoreWAGNodeKey(index uint64) roachpb.Key {
150+
return MakeStoreKey(localStoreWAGNodeSuffix, encoding.EncodeUint64Ascending(nil, index))
151+
}
152+
153+
// DecodeWAGNodeKey returns the index of the WAG node from its key.
154+
func DecodeWAGNodeKey(key roachpb.Key) (uint64, error) {
155+
suffix, detail, err := DecodeStoreKey(key)
156+
if err != nil {
157+
return 0, err
158+
}
159+
if !suffix.Equal(localStoreWAGNodeSuffix) {
160+
return 0, errors.Errorf("key with suffix %q != %q", suffix, localStoreWAGNodeSuffix)
161+
}
162+
detail, index, err := encoding.DecodeUint64Ascending(detail)
163+
if len(detail) != 0 {
164+
return 0, errors.Errorf("invalid key has trailing garbage: %q", detail)
165+
}
166+
return index, err
167+
}
168+
143169
// StoreCachedSettingsKey returns a store-local key for store's cached settings.
144170
func StoreCachedSettingsKey(settingKey roachpb.Key) roachpb.Key {
145171
return MakeStoreKey(localStoreCachedSettingsSuffix, encoding.EncodeBytesAscending(nil, settingKey))

pkg/keys/printer.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ var constSubKeyDict = []struct {
146146
{"/clusterVersion", localStoreClusterVersionSuffix},
147147
{"/nodeTombstone", localStoreNodeTombstoneSuffix},
148148
{"/cachedSettings", localStoreCachedSettingsSuffix},
149+
{"/wag", localStoreWAGNodeSuffix},
149150
{"/lossOfQuorumRecovery/applied", localStoreUnsafeReplicaRecoverySuffix},
150151
{"/lossOfQuorumRecovery/status", localStoreLossOfQuorumRecoveryStatusSuffix},
151152
{"/lossOfQuorumRecovery/cleanup", localStoreLossOfQuorumRecoveryCleanupActionsSuffix},
@@ -167,10 +168,19 @@ func cachedSettingsKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
167168
buf.Print(settingKey.String())
168169
}
169170

171+
func wagNodeKeyPrint(buf *redact.StringBuilder, key roachpb.Key) {
172+
index, err := DecodeWAGNodeKey(key)
173+
if err != nil {
174+
buf.Printf("<invalid: %s>", err)
175+
}
176+
buf.Printf("%d", index)
177+
}
178+
170179
func localStoreKeyPrint(buf *redact.StringBuilder, _ []encoding.Direction, key roachpb.Key) {
171180
for _, v := range constSubKeyDict {
172181
if bytes.HasPrefix(key, v.key) {
173182
buf.Print(v.name)
183+
// TODO(pav-kv): make this switch more efficient with a lookup.
174184
if v.key.Equal(localStoreNodeTombstoneSuffix) {
175185
buf.SafeRune('/')
176186
nodeTombstoneKeyPrint(
@@ -186,6 +196,9 @@ func localStoreKeyPrint(buf *redact.StringBuilder, _ []encoding.Direction, key r
186196
lossOfQuorumRecoveryEntryKeyPrint(
187197
buf, append(roachpb.Key(nil), append(LocalStorePrefix, key...)...),
188198
)
199+
} else if v.key.Equal(localStoreWAGNodeSuffix) {
200+
buf.SafeRune('/')
201+
wagNodeKeyPrint(buf, append(append(roachpb.Key(nil), LocalStorePrefix...), key...))
189202
}
190203
return
191204
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
2+
3+
go_library(
4+
name = "wag",
5+
srcs = ["store.go"],
6+
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag",
7+
visibility = ["//visibility:public"],
8+
deps = [
9+
"//pkg/keys",
10+
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
11+
"//pkg/storage",
12+
],
13+
)
14+
15+
go_test(
16+
name = "wag_test",
17+
srcs = ["store_test.go"],
18+
data = glob(["testdata/**"]),
19+
embed = [":wag"],
20+
deps = [
21+
"//pkg/kv/kvpb",
22+
"//pkg/kv/kvserver/kvstorage/wag/wagpb",
23+
"//pkg/kv/kvserver/print",
24+
"//pkg/roachpb",
25+
"//pkg/storage",
26+
"//pkg/testutils/echotest",
27+
"//pkg/util/leaktest",
28+
"//pkg/util/log",
29+
"@com_github_stretchr_testify//require",
30+
],
31+
)
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package wag
7+
8+
import (
9+
"context"
10+
"iter"
11+
"sync/atomic"
12+
13+
"github.com/cockroachdb/cockroach/pkg/keys"
14+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb"
15+
"github.com/cockroachdb/cockroach/pkg/storage"
16+
)
17+
18+
// Seq is the WAG sequencer. It allocates consecutive unique sequence numbers to
19+
// assist the writes of the WAG nodes in a topologically sorted order.
20+
type Seq struct {
21+
// index is the last allocated index into the WAG nodes sequence.
22+
// TODO(pav-kv): initialize it on store restarts.
23+
index atomic.Uint64
24+
}
25+
26+
// Next allocates the given number of consecutive WAG nodes in the sequence, and
27+
// returns the index of the first allocated node. The caller can subsequently
28+
// use indices [Next, Next+count) for writing WAG nodes, and must not use
29+
// indices outside this span.
30+
//
31+
// The caller must make sure that conflicting writers call Next and do the
32+
// corresponding writes according to the topological ordering of these
33+
// mutations. For example, replica lifecycle events of a single range must be
34+
// ordered. Independent / concurrent writers can call Next and perform the
35+
// corresponding writes in any order (e.g. different ranges can write their
36+
// events concurrently).
37+
func (s *Seq) Next(count uint64) uint64 {
38+
return s.index.Add(count) - count + 1
39+
}
40+
41+
// Write puts the WAG node under the specific sequence number into the given
42+
// writer. The index must have been allocated to the caller by the sequencer.
43+
func Write(w storage.Writer, index uint64, node wagpb.Node) error {
44+
data, err := node.Marshal() // nolint:protomarshal
45+
if err != nil {
46+
return err
47+
}
48+
return w.PutUnversioned(keys.StoreWAGNodeKey(index), data)
49+
}
50+
51+
// Iterator helps to scan the WAG sequence.
52+
//
53+
// var iter wag.Iterator
54+
// for node := range iter.Iter(ctx, reader) {
55+
// // process node
56+
// }
57+
// if err := iter.Error(); err != nil {
58+
// return err
59+
// }
60+
//
61+
// TODO(pav-kv): make it more flexible, e.g. iterate from a particular index.
62+
type Iterator struct {
63+
// err is the last error encountered during the WAG iteration.
64+
err error
65+
}
66+
67+
// Iter returns an iterator that scans the WAG sequence.
68+
func (it *Iterator) Iter(ctx context.Context, r storage.Reader) iter.Seq[wagpb.Node] {
69+
prefix := keys.StoreWAGPrefix()
70+
mi, err := r.NewMVCCIterator(ctx, storage.MVCCKeyIterKind, storage.IterOptions{
71+
UpperBound: prefix.PrefixEnd(),
72+
})
73+
if err != nil {
74+
it.err = err
75+
return nil
76+
}
77+
mi.SeekGE(storage.MakeMVCCMetadataKey(prefix))
78+
79+
return func(yield func(wagpb.Node) bool) {
80+
defer mi.Close()
81+
for ; ; mi.Next() {
82+
if ok, err := mi.Valid(); err != nil || !ok {
83+
it.err = err
84+
return
85+
}
86+
v, err := mi.UnsafeValue()
87+
if err != nil {
88+
it.err = err
89+
return
90+
}
91+
var node wagpb.Node
92+
if it.err = node.Unmarshal(v); it.err != nil { // nolint:protounmarshal
93+
return
94+
}
95+
if !yield(node) {
96+
return
97+
}
98+
}
99+
}
100+
}
101+
102+
// Error returns the last encountered error during the iteration.
103+
func (it *Iterator) Error() error {
104+
return it.err
105+
}
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
// Copyright 2025 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the CockroachDB Software License
4+
// included in the /LICENSE file.
5+
6+
package wag
7+
8+
import (
9+
"context"
10+
"fmt"
11+
"path/filepath"
12+
"strings"
13+
"testing"
14+
15+
"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
16+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb"
17+
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/print"
18+
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/storage"
20+
"github.com/cockroachdb/cockroach/pkg/testutils/echotest"
21+
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
22+
"github.com/cockroachdb/cockroach/pkg/util/log"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
func TestWrite(t *testing.T) {
27+
defer leaktest.AfterTest(t)()
28+
defer log.Scope(t).Close(t)
29+
30+
eng := storage.NewDefaultInMemForTesting()
31+
defer eng.Close()
32+
s := store{eng: eng}
33+
34+
var out string
35+
write := func(name string, f func(w storage.Writer) error) {
36+
b := eng.NewWriteBatch()
37+
defer b.Close()
38+
require.NoError(t, f(b))
39+
40+
str, err := print.DecodeWriteBatch(b.Repr())
41+
require.NoError(t, err)
42+
out += fmt.Sprintf(">> %s\n%s", name, str)
43+
44+
require.NoError(t, b.Commit(false /* sync */))
45+
}
46+
47+
id := roachpb.FullReplicaID{RangeID: 123, ReplicaID: 4}
48+
write("create", func(w storage.Writer) error { return createReplica(&s, w, id) })
49+
write("init", func(w storage.Writer) error { return initReplica(&s, w, id, 10) })
50+
write("split", func(w storage.Writer) error { return splitReplica(&s, w, id, 200) })
51+
52+
// TODO(pav-kv): the trailing \n in DecodeWriteBatch is duplicated with
53+
// recursion. Remove it, and let the caller handle new lines.
54+
out = strings.ReplaceAll(out, "\n\n", "\n")
55+
echotest.Require(t, out, filepath.Join("testdata", t.Name()+".txt"))
56+
57+
// Smoke check that the iterator works.
58+
var iter Iterator
59+
count := 0
60+
for range iter.Iter(context.Background(), s.eng) {
61+
count++
62+
}
63+
require.NoError(t, iter.Error())
64+
require.Equal(t, 4, count)
65+
}
66+
67+
type store struct {
68+
eng storage.Engine
69+
seq Seq
70+
}
71+
72+
func createReplica(s *store, w storage.Writer, id roachpb.FullReplicaID) error {
73+
b := s.eng.NewWriteBatch()
74+
defer b.Close()
75+
if err := writeStateMachine(b, "state-machine-key", "state"); err != nil {
76+
return err
77+
}
78+
return Write(w, s.seq.Next(1), wagpb.Node{
79+
Addr: wagpb.Addr{RangeID: id.RangeID, ReplicaID: id.ReplicaID, Index: 0},
80+
Type: wagpb.NodeType_NodeCreate,
81+
Mutation: wagpb.Mutation{Batch: b.Repr()},
82+
})
83+
}
84+
85+
func initReplica(s *store, w storage.Writer, id roachpb.FullReplicaID, index uint64) error {
86+
return Write(w, s.seq.Next(1), wagpb.Node{
87+
Addr: wagpb.Addr{RangeID: id.RangeID, ReplicaID: id.ReplicaID, Index: kvpb.RaftIndex(index)},
88+
Type: wagpb.NodeType_NodeSnap,
89+
Mutation: wagpb.Mutation{Ingestion: &wagpb.Ingestion{
90+
SSTs: []string{"tmp/1.sst", "tmp/2.sst"},
91+
}},
92+
})
93+
}
94+
95+
func splitReplica(s *store, w storage.Writer, id roachpb.FullReplicaID, index uint64) error {
96+
b := s.eng.NewWriteBatch()
97+
defer b.Close()
98+
if err := writeStateMachine(b, "lhs-key", "lhs-state"); err != nil {
99+
return err
100+
} else if err := writeStateMachine(b, "rhs-key", "rhs-state"); err != nil {
101+
return err
102+
}
103+
104+
seq := s.seq.Next(2)
105+
if err := Write(w, seq, wagpb.Node{
106+
Addr: wagpb.Addr{RangeID: id.RangeID, ReplicaID: id.ReplicaID, Index: kvpb.RaftIndex(index - 1)},
107+
Type: wagpb.NodeType_NodeApply,
108+
}); err != nil {
109+
return err
110+
}
111+
return Write(w, seq+1, wagpb.Node{
112+
Addr: wagpb.Addr{RangeID: id.RangeID, ReplicaID: id.ReplicaID, Index: kvpb.RaftIndex(index)},
113+
Type: wagpb.NodeType_NodeSplit,
114+
Mutation: wagpb.Mutation{Batch: b.Repr()},
115+
Create: 567, // the RHS range ID
116+
})
117+
}
118+
119+
func writeStateMachine(w storage.Writer, k, v string) error {
120+
return w.PutUnversioned(roachpb.Key(k), []byte(v))
121+
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
echo
2+
----
3+
>> create
4+
Put: 0,0 /Local/Store/wag/1 (0x01737761676e000000000000000100): NodeCreate r123/4:0
5+
> Put: 0,0 "state-machine-key" (0x73746174652d6d616368696e652d6b657900): "state"
6+
>> init
7+
Put: 0,0 /Local/Store/wag/2 (0x01737761676e000000000000000200): NodeSnap r123/4:10
8+
ingestion: SSTs:"tmp/1.sst" SSTs:"tmp/2.sst"
9+
>> split
10+
Put: 0,0 /Local/Store/wag/3 (0x01737761676e000000000000000300): NodeApply r123/4:199
11+
Put: 0,0 /Local/Store/wag/4 (0x01737761676e000000000000000400): NodeSplit r123/4:200 create:567
12+
> Put: 0,0 "lhs-key" (0x6c68732d6b657900): "lhs-state"
13+
> Put: 0,0 "rhs-key" (0x7268732d6b657900): "rhs-state"

pkg/kv/kvserver/kvstorage/wag/wagpb/BUILD.bazel

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ go_proto_library(
2121
visibility = ["//visibility:public"],
2222
deps = [
2323
"//pkg/kv/kvserver/kvserverpb",
24+
"//pkg/roachpb", # keep
2425
"@com_github_gogo_protobuf//gogoproto",
2526
],
2627
)
@@ -31,5 +32,8 @@ go_library(
3132
embed = [":wagpb_go_proto"],
3233
importpath = "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvstorage/wag/wagpb",
3334
visibility = ["//visibility:public"],
34-
deps = ["//pkg/kv/kvpb"],
35+
deps = [
36+
"//pkg/kv/kvpb",
37+
"@com_github_cockroachdb_redact//:redact",
38+
],
3539
)

0 commit comments

Comments
 (0)