Skip to content

Commit d39b99e

Browse files
committed
Refactor list meta serialization to binary format
1 parent 2ab2fb1 commit d39b99e

File tree

6 files changed

+271
-58
lines changed

6 files changed

+271
-58
lines changed

adapter/dynamodb_transcoder.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func (t *dynamodbTranscoder) valueAttrToOps(key []byte, val attributeValue) (*kv
101101
Tail: int64(len(val.L)),
102102
Len: int64(len(val.L)),
103103
}
104-
b, err := json.Marshal(meta)
104+
b, err := store.MarshalListMeta(meta)
105105
if err != nil {
106106
return nil, errors.WithStack(err)
107107
}

adapter/redis.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package adapter
33
import (
44
"bytes"
55
"context"
6-
"encoding/json"
76
"math"
87
"net"
98
"sort"
@@ -742,7 +741,7 @@ func (t *txnContext) buildListElems() ([]*kv.Elem[kv.OP], error) {
742741

743742
st.meta.Len += int64(len(st.appends))
744743
st.meta.Tail = st.meta.Head + st.meta.Len
745-
metaBytes, err := json.Marshal(st.meta)
744+
metaBytes, err := store.MarshalListMeta(st.meta)
746745
if err != nil {
747746
return nil, errors.WithStack(err)
748747
}
@@ -854,7 +853,7 @@ func (r *RedisServer) buildRPushOps(meta store.ListMeta, key []byte, values [][]
854853
meta.Len += int64(len(values))
855854
meta.Tail = meta.Head + meta.Len
856855

857-
b, err := json.Marshal(meta)
856+
b, err := store.MarshalListMeta(meta)
858857
if err != nil {
859858
return nil, meta, errors.WithStack(err)
860859
}

store/bolt_store.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,22 @@ func (t *boltStoreTxn) Exists(_ context.Context, key []byte) (bool, error) {
175175
return t.bucket.Get(key) != nil, nil
176176
}
177177

178+
func (t *boltStoreTxn) Scan(_ context.Context, start []byte, end []byte, limit int) ([]*KVPair, error) {
179+
if limit <= 0 {
180+
return nil, nil
181+
}
182+
183+
var res []*KVPair
184+
c := t.bucket.Cursor()
185+
for k, v := c.Seek(start); k != nil && (end == nil || bytes.Compare(k, end) < 0); k, v = c.Next() {
186+
res = append(res, &KVPair{Key: k, Value: v})
187+
if len(res) >= limit {
188+
break
189+
}
190+
}
191+
return res, nil
192+
}
193+
178194
func (s *boltStore) Txn(ctx context.Context, fn func(ctx context.Context, txn Txn) error) error {
179195
btxn, err := s.bbolt.Begin(true)
180196
if err != nil {

store/list_store.go

Lines changed: 95 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,21 +3,23 @@ package store
33
import (
44
"bytes"
55
"context"
6-
"encoding/hex"
7-
"encoding/json"
6+
"encoding/binary"
87
"math"
98

109
"github.com/cockroachdb/errors"
1110
)
1211

1312
// Wide-column style list storage using per-element keys.
14-
// Item keys: !lst|itm|<userKey>|%020d
15-
// Meta key : !lst|meta|<userKey> -> {"h":head,"t":tail,"l":len}
13+
// Item keys: !lst|itm|<userKey><seq(8-byte sortable binary)>
14+
// Meta key : !lst|meta|<userKey> -> [Head(8)][Tail(8)][Len(8)]
1615

1716
const (
1817
ListMetaPrefix = "!lst|meta|"
1918
ListItemPrefix = "!lst|itm|"
20-
ListSeqWidth = 20
19+
// limit per scan when deleting to avoid OOM.
20+
deleteBatchSize = 1024
21+
listMetaBinarySize = 24
22+
scanAdvanceByte = byte(0x00)
2123
)
2224

2325
type ListMeta struct {
@@ -43,24 +45,28 @@ func (s *ListStore) IsList(ctx context.Context, key []byte) (bool, error) {
4345

4446
// PutList replaces the entire list.
4547
func (s *ListStore) PutList(ctx context.Context, key []byte, list []string) error {
46-
// delete existing meta/items (best-effort)
47-
if err := s.deleteList(ctx, key); err != nil {
48-
return err
49-
}
50-
5148
meta := ListMeta{Head: 0, Tail: int64(len(list)), Len: int64(len(list))}
52-
metaBytes, err := json.Marshal(meta)
49+
metaBytes, err := marshalListMeta(meta)
5350
if err != nil {
5451
return errors.WithStack(err)
5552
}
5653

5754
return errors.WithStack(s.store.Txn(ctx, func(ctx context.Context, txn Txn) error {
55+
scanTxn, ok := txn.(ScanTxn)
56+
if !ok {
57+
return errors.WithStack(ErrNotSupported)
58+
}
59+
60+
if err := s.deleteListTxn(ctx, scanTxn, key); err != nil {
61+
return err
62+
}
63+
5864
for i, v := range list {
59-
if err := txn.Put(ctx, ListItemKey(key, int64(i)), []byte(v)); err != nil {
65+
if err := scanTxn.Put(ctx, ListItemKey(key, int64(i)), []byte(v)); err != nil {
6066
return errors.WithStack(err)
6167
}
6268
}
63-
if err := txn.Put(ctx, ListMetaKey(key), metaBytes); err != nil {
69+
if err := scanTxn.Put(ctx, ListMetaKey(key), metaBytes); err != nil {
6470
return errors.WithStack(err)
6571
}
6672
return nil
@@ -107,7 +113,7 @@ func (s *ListStore) RPush(ctx context.Context, key []byte, values ...string) (in
107113
}
108114
meta.Len += int64(len(values))
109115
meta.Tail = meta.Head + meta.Len
110-
metaBytes, err := json.Marshal(meta)
116+
metaBytes, err := marshalListMeta(meta)
111117
if err != nil {
112118
return errors.WithStack(err)
113119
}
@@ -168,11 +174,8 @@ func (s *ListStore) LoadMeta(ctx context.Context, key []byte) (ListMeta, bool, e
168174
if len(val) == 0 {
169175
return ListMeta{}, false, nil
170176
}
171-
var meta ListMeta
172-
if err := json.Unmarshal(val, &meta); err != nil {
173-
return ListMeta{}, false, errors.WithStack(err)
174-
}
175-
return meta, true, nil
177+
meta, err := unmarshalListMeta(val)
178+
return meta, err == nil, errors.WithStack(err)
176179
}
177180

178181
func (s *ListStore) loadMetaTxn(ctx context.Context, txn Txn, key []byte) (ListMeta, bool, error) {
@@ -186,31 +189,44 @@ func (s *ListStore) loadMetaTxn(ctx context.Context, txn Txn, key []byte) (ListM
186189
if len(val) == 0 {
187190
return ListMeta{}, false, nil
188191
}
189-
var meta ListMeta
190-
if err := json.Unmarshal(val, &meta); err != nil {
191-
return ListMeta{}, false, errors.WithStack(err)
192-
}
193-
return meta, true, nil
192+
meta, err := unmarshalListMeta(val)
193+
return meta, err == nil, errors.WithStack(err)
194194
}
195195

196-
func (s *ListStore) deleteList(ctx context.Context, key []byte) error {
197-
start := ListItemKey(key, mathMinInt64) // use smallest seq
198-
end := ListItemKey(key, mathMaxInt64)
196+
// deleteListTxn deletes list items and metadata within the provided transaction.
197+
func (s *ListStore) deleteListTxn(ctx context.Context, txn ScanTxn, key []byte) error {
198+
start := ListItemKey(key, mathMinInt64) // inclusive
199+
end := ListItemKey(key, mathMaxInt64) // inclusive sentinel
199200

200-
items, err := s.store.Scan(ctx, start, end, math.MaxInt)
201-
if err != nil && !errors.Is(err, ErrKeyNotFound) {
202-
return errors.WithStack(err)
203-
}
201+
for {
202+
kvs, err := txn.Scan(ctx, start, end, deleteBatchSize)
203+
if err != nil && !errors.Is(err, ErrKeyNotFound) {
204+
return errors.WithStack(err)
205+
}
206+
if len(kvs) == 0 {
207+
break
208+
}
204209

205-
return errors.WithStack(s.store.Txn(ctx, func(ctx context.Context, txn Txn) error {
206-
for _, kvp := range items {
210+
for _, kvp := range kvs {
207211
if err := txn.Delete(ctx, kvp.Key); err != nil {
208212
return errors.WithStack(err)
209213
}
210214
}
211-
_ = txn.Delete(ctx, ListMetaKey(key))
212-
return nil
213-
}))
215+
216+
// advance start just after the last processed key to guarantee forward progress
217+
lastKey := kvs[len(kvs)-1].Key
218+
start = append(bytes.Clone(lastKey), scanAdvanceByte)
219+
220+
if len(kvs) < deleteBatchSize {
221+
break
222+
}
223+
}
224+
225+
// delete meta last (ignore missing)
226+
if err := txn.Delete(ctx, ListMetaKey(key)); err != nil && !errors.Is(err, ErrKeyNotFound) {
227+
return errors.WithStack(err)
228+
}
229+
return nil
214230
}
215231

216232
// ListMetaKey builds the metadata key for a user key.
@@ -220,20 +236,55 @@ func ListMetaKey(userKey []byte) []byte {
220236

221237
// ListItemKey builds the item key for a user key and sequence number.
222238
func ListItemKey(userKey []byte, seq int64) []byte {
223-
// Offset sign bit (seq ^ minInt64) to preserve order, then big-endian encode and hex.
239+
// Offset sign bit (seq ^ minInt64) to preserve order, then big-endian encode (8 bytes).
224240
var raw [8]byte
225241
encodeSortableInt64(raw[:], seq)
226-
hexSeq := make([]byte, hex.EncodedLen(len(raw)))
227-
hex.Encode(hexSeq, raw[:])
228242

229-
buf := make([]byte, 0, len(ListItemPrefix)+len(userKey)+1+len(hexSeq))
243+
buf := make([]byte, 0, len(ListItemPrefix)+len(userKey)+len(raw))
230244
buf = append(buf, ListItemPrefix...)
231245
buf = append(buf, userKey...)
232-
buf = append(buf, '|')
233-
buf = append(buf, hexSeq...)
246+
buf = append(buf, raw[:]...)
234247
return buf
235248
}
236249

250+
// MarshalListMeta encodes ListMeta into a fixed 24-byte binary format.
251+
func MarshalListMeta(meta ListMeta) ([]byte, error) { return marshalListMeta(meta) }
252+
253+
// UnmarshalListMeta decodes ListMeta from the fixed 24-byte binary format.
254+
func UnmarshalListMeta(b []byte) (ListMeta, error) { return unmarshalListMeta(b) }
255+
256+
func marshalListMeta(meta ListMeta) ([]byte, error) {
257+
if meta.Head < 0 || meta.Tail < 0 || meta.Len < 0 {
258+
return nil, errors.WithStack(errors.Newf("list meta contains negative value: head=%d tail=%d len=%d", meta.Head, meta.Tail, meta.Len))
259+
}
260+
261+
buf := make([]byte, listMetaBinarySize)
262+
binary.BigEndian.PutUint64(buf[0:8], uint64(meta.Head))
263+
binary.BigEndian.PutUint64(buf[8:16], uint64(meta.Tail))
264+
binary.BigEndian.PutUint64(buf[16:24], uint64(meta.Len))
265+
return buf, nil
266+
}
267+
268+
func unmarshalListMeta(b []byte) (ListMeta, error) {
269+
if len(b) != listMetaBinarySize {
270+
return ListMeta{}, errors.Wrap(errors.Newf("invalid list meta length: %d", len(b)), "unmarshal list meta")
271+
}
272+
273+
head := binary.BigEndian.Uint64(b[0:8])
274+
tail := binary.BigEndian.Uint64(b[8:16])
275+
length := binary.BigEndian.Uint64(b[16:24])
276+
277+
if head > math.MaxInt64 || tail > math.MaxInt64 || length > math.MaxInt64 {
278+
return ListMeta{}, errors.New("list meta value overflows int64")
279+
}
280+
281+
return ListMeta{
282+
Head: int64(head),
283+
Tail: int64(tail),
284+
Len: int64(length),
285+
}, nil
286+
}
287+
237288
// encodeSortableInt64 writes seq with sign bit flipped (seq ^ minInt64) in big-endian order.
238289
const sortableInt64Bytes = 8
239290

@@ -286,11 +337,10 @@ func ExtractListUserKey(key []byte) []byte {
286337
return bytes.TrimPrefix(key, []byte(ListMetaPrefix))
287338
case IsListItemKey(key):
288339
trimmed := bytes.TrimPrefix(key, []byte(ListItemPrefix))
289-
idx := bytes.LastIndexByte(trimmed, '|')
290-
if idx == -1 {
340+
if len(trimmed) < sortableInt64Bytes {
291341
return nil
292342
}
293-
return trimmed[:idx]
343+
return trimmed[:len(trimmed)-sortableInt64Bytes]
294344
default:
295345
return nil
296346
}

store/list_store_test.go

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package store
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"sync"
78
"testing"
89

10+
"github.com/cockroachdb/errors"
911
"github.com/stretchr/testify/assert"
1012
)
1113

@@ -80,3 +82,71 @@ func TestListStore_RPushConcurrent(t *testing.T) {
8082
assert.NoError(t, err)
8183
assert.Len(t, list, n)
8284
}
85+
86+
// failingScanStore simulates a transaction commit failure to verify atomicity.
87+
type failingScanStore struct {
88+
inner ScanStore
89+
fail bool
90+
}
91+
92+
func newFailingScanStore(inner ScanStore, fail bool) *failingScanStore {
93+
return &failingScanStore{inner: inner, fail: fail}
94+
}
95+
96+
func (s *failingScanStore) Get(ctx context.Context, key []byte) ([]byte, error) {
97+
return s.inner.Get(ctx, key)
98+
}
99+
100+
func (s *failingScanStore) Put(ctx context.Context, key []byte, value []byte) error {
101+
return s.inner.Put(ctx, key, value)
102+
}
103+
104+
func (s *failingScanStore) Delete(ctx context.Context, key []byte) error {
105+
return s.inner.Delete(ctx, key)
106+
}
107+
108+
func (s *failingScanStore) Exists(ctx context.Context, key []byte) (bool, error) {
109+
return s.inner.Exists(ctx, key)
110+
}
111+
112+
func (s *failingScanStore) Snapshot() (io.ReadWriter, error) { return nil, ErrNotSupported }
113+
func (s *failingScanStore) Restore(io.Reader) error { return ErrNotSupported }
114+
func (s *failingScanStore) Close() error { return nil }
115+
116+
func (s *failingScanStore) Scan(ctx context.Context, start []byte, end []byte, limit int) ([]*KVPair, error) {
117+
return s.inner.Scan(ctx, start, end, limit)
118+
}
119+
120+
// Txn executes the function; if fail is set, it aborts commit and returns an error.
121+
func (s *failingScanStore) Txn(ctx context.Context, f func(ctx context.Context, txn Txn) error) error {
122+
err := s.inner.Txn(ctx, func(ctx context.Context, txn Txn) error {
123+
if s.fail {
124+
return errors.New("injected commit failure")
125+
}
126+
return f(ctx, txn)
127+
})
128+
129+
return err
130+
}
131+
132+
func TestListStore_PutList_RollbackOnTxnFailure(t *testing.T) {
133+
t.Parallel()
134+
135+
ctx := context.Background()
136+
rawBase := NewRbMemoryStore()
137+
ls := NewListStore(rawBase)
138+
139+
initial := []string{"a", "b", "c"}
140+
assert.NoError(t, ls.PutList(ctx, []byte("k"), initial))
141+
142+
failStore := newFailingScanStore(rawBase, true)
143+
lsFail := NewListStore(failStore)
144+
145+
err := lsFail.PutList(ctx, []byte("k"), []string{"x", "y"})
146+
assert.Error(t, err, "expected injected failure")
147+
148+
// Original list must remain intact because txn never committed.
149+
out, err := ls.GetList(ctx, []byte("k"))
150+
assert.NoError(t, err)
151+
assert.Equal(t, initial, out)
152+
}

0 commit comments

Comments
 (0)