Skip to content

Commit 5a87cc9

Browse files
committed
Add MVCC support and integrate HLC timestamps
1 parent d39b99e commit 5a87cc9

File tree

15 files changed

+1035
-180
lines changed

15 files changed

+1035
-180
lines changed

AGENTS.md

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
# Repository Guidelines
2+
3+
## Project Structure & Modules
4+
- `cmd/server`, `cmd/client`: entrypoints for running the KV server and client.
5+
- `store/`: MVCC storage engine, OCC/TTL, and related tests.
6+
- `kv/`: hybrid logical clock (HLC) utilities and KV interfaces.
7+
- `adapter/`: protocol adapters (e.g., Redis), plus integration tests.
8+
- `jepsen/`, `jepsen/redis/`: Jepsen test harnesses and workloads.
9+
- `proto/`, `distribution/`, `internal/`: supporting protobufs, build assets, and shared helpers.
10+
11+
## Build, Test, and Development Commands
12+
- `go test ./...` — run unit/integration tests. If macOS sandbox blocks `$GOCACHE`, prefer `GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./...`.
13+
- `GOCACHE=$(pwd)/.cache GOLANGCI_LINT_CACHE=$(pwd)/.golangci-cache golangci-lint run ./... --timeout=5m` — full lint suite.
14+
- `HOME=$(pwd)/jepsen/tmp-home LEIN_HOME=$(pwd)/jepsen/.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/jepsen/tmp-home" /tmp/lein test` (from `jepsen/` or `jepsen/redis/`) — Jepsen tests.
15+
- `go run ./cmd/server` / `go run ./cmd/client` — start server or CLI locally.
16+
17+
## Coding Style & Naming
18+
- Go code: `gofmt` + project lint rules (`golangci-lint`). Avoid adding `//nolint` unless absolutely required; prefer refactoring.
19+
- Naming: Go conventions (MixedCaps for exported identifiers, short receiver names). Filenames remain lowercase with underscores only where existing.
20+
- Logging: use `slog` where present; maintain structured keys (`key`, `commit_ts`, etc.).
21+
22+
## Testing Guidelines
23+
- Unit tests co-located with packages (`*_test.go`); prefer table-driven cases.
24+
- TTL/HLC behaviors live in `store/` and `kv/`; add coverage when touching clocks, OCC, or replication logic.
25+
- Integration: run Jepsen suites after changes affecting replication, MVCC, or Redis adapter.
26+
- `cd jepsen && HOME=$(pwd)/tmp-home LEIN_HOME=$(pwd)/.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/tmp-home" /tmp/lein test`
27+
- `cd jepsen/redis && HOME=$(pwd)/../tmp-home LEIN_HOME=$(pwd)/../.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/../tmp-home" /tmp/lein test`
28+
29+
## Commit & Pull Request Guidelines
30+
- Messages: short imperative summary (e.g., “Add HLC TTL handling”). Include scope when helpful (`store:`, `adapter:`).
31+
- Pull requests: describe behavior change, risk, and test evidence (`go test`, lint, Jepsen). Add repro steps for bug fixes.
32+
33+
## Security & Configuration Tips
34+
- Hybrid clock derives from wall-clock millis; keep system clock reasonably synchronized across nodes.
35+
- Avoid leader-local timestamps in persistence; timestamp issuance should originate from the Raft leader to prevent skewed OCC decisions.

adapter/internal.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,18 @@ import (
99
"github.com/hashicorp/raft"
1010
)
1111

12-
func NewInternal(txm kv.Transactional, r *raft.Raft) *Internal {
12+
func NewInternal(txm kv.Transactional, r *raft.Raft, clock *kv.HLC) *Internal {
1313
return &Internal{
1414
raft: r,
1515
transactionManager: txm,
16+
clock: clock,
1617
}
1718
}
1819

1920
type Internal struct {
2021
raft *raft.Raft
2122
transactionManager kv.Transactional
23+
clock *kv.HLC
2224

2325
pb.UnimplementedInternalServer
2426
}
@@ -33,6 +35,15 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
3335
return nil, errors.WithStack(ErrNotLeader)
3436
}
3537

38+
// Ensure leader issues start_ts when followers forward txn groups without it.
39+
if req.IsTxn {
40+
for _, r := range req.Requests {
41+
if r.Ts == 0 {
42+
r.Ts = i.clock.Next()
43+
}
44+
}
45+
}
46+
3647
r, err := i.transactionManager.Commit(req.Requests)
3748
if err != nil {
3849
return &pb.ForwardResponse{

adapter/test_util.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,8 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
322322
cfg := buildRaftConfig(n, ports)
323323

324324
for i := 0; i < n; i++ {
325-
st := store.NewRbMemoryStore()
326-
trxSt := store.NewMemoryStoreDefaultTTL()
327-
fsm := kv.NewKvFSM(st, trxSt)
325+
st := store.NewMVCCStore()
326+
fsm := kv.NewKvFSM(st)
328327

329328
port := ports[i]
330329
grpcSock := lis[i].grpc
@@ -351,7 +350,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
351350
tm.Register(s)
352351
pb.RegisterRawKVServer(s, gs)
353352
pb.RegisterTransactionalKVServer(s, gs)
354-
pb.RegisterInternalServer(s, NewInternal(trx, r))
353+
pb.RegisterInternalServer(s, NewInternal(trx, r, coordinator.Clock()))
355354

356355
leaderhealth.Setup(r, s, []string{"Example"})
357356
raftadmin.Register(s, r)

cmd/server/demo.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,9 +81,8 @@ func run(eg *errgroup.Group) error {
8181
}
8282

8383
for i := 0; i < 3; i++ {
84-
st := store.NewRbMemoryStore()
85-
trxSt := store.NewMemoryStoreDefaultTTL()
86-
fsm := kv.NewKvFSM(st, trxSt)
84+
st := store.NewMVCCStore()
85+
fsm := kv.NewKvFSM(st)
8786

8887
r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg)
8988
if err != nil {
@@ -97,7 +96,7 @@ func run(eg *errgroup.Group) error {
9796
tm.Register(s)
9897
pb.RegisterRawKVServer(s, gs)
9998
pb.RegisterTransactionalKVServer(s, gs)
100-
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r))
99+
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock()))
101100
leaderhealth.Setup(r, s, []string{"RawKV"})
102101
raftadmin.Register(s, r)
103102

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ require (
2525
github.com/tidwall/redcon v1.6.2
2626
go.etcd.io/bbolt v1.4.3
2727
golang.org/x/sync v0.19.0
28+
golang.org/x/sys v0.38.0
2829
google.golang.org/grpc v1.78.0
2930
google.golang.org/protobuf v1.36.11
3031
)
@@ -68,7 +69,6 @@ require (
6869
github.com/tidwall/btree v1.1.0 // indirect
6970
github.com/tidwall/match v1.1.1 // indirect
7071
golang.org/x/net v0.47.0 // indirect
71-
golang.org/x/sys v0.38.0 // indirect
7272
golang.org/x/text v0.31.0 // indirect
7373
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
7474
gopkg.in/yaml.v3 v3.0.1 // indirect

kv/coordinator.go

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ func NewCoordinator(txm Transactional, r *raft.Raft) *Coordinate {
1414
return &Coordinate{
1515
transactionManager: txm,
1616
raft: r,
17+
clock: NewHLC(),
1718
}
1819
}
1920

@@ -24,6 +25,7 @@ type CoordinateResponse struct {
2425
type Coordinate struct {
2526
transactionManager Transactional
2627
raft *raft.Raft
28+
clock *HLC
2729
}
2830

2931
var _ Coordinator = (*Coordinate)(nil)
@@ -39,8 +41,13 @@ func (c *Coordinate) Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, er
3941
return c.redirect(reqs)
4042
}
4143

44+
if reqs.IsTxn && reqs.StartTS == 0 {
45+
// Leader-only timestamp issuance to avoid divergence across shards.
46+
reqs.StartTS = c.nextStartTS()
47+
}
48+
4249
if reqs.IsTxn {
43-
return c.dispatchTxn(reqs.Elems)
50+
return c.dispatchTxn(reqs.Elems, reqs.StartTS)
4451
}
4552

4653
return c.dispatchRaw(reqs.Elems)
@@ -56,10 +63,18 @@ func (c *Coordinate) RaftLeader() raft.ServerAddress {
5663
return addr
5764
}
5865

59-
func (c *Coordinate) dispatchTxn(reqs []*Elem[OP]) (*CoordinateResponse, error) {
66+
func (c *Coordinate) Clock() *HLC {
67+
return c.clock
68+
}
69+
70+
func (c *Coordinate) nextStartTS() uint64 {
71+
return c.clock.Next()
72+
}
73+
74+
func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateResponse, error) {
6075
var logs []*pb.Request
6176
for _, req := range reqs {
62-
m := c.toTxnRequests(req)
77+
m := c.toTxnRequests(req, startTS)
6378
logs = append(logs, m...)
6479
}
6580

@@ -96,6 +111,7 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request {
96111
return &pb.Request{
97112
IsTxn: false,
98113
Phase: pb.Phase_NONE,
114+
Ts: c.clock.Next(),
99115
Mutations: []*pb.Mutation{
100116
{
101117
Op: pb.Op_PUT,
@@ -109,6 +125,7 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request {
109125
return &pb.Request{
110126
IsTxn: false,
111127
Phase: pb.Phase_NONE,
128+
Ts: c.clock.Next(),
112129
Mutations: []*pb.Mutation{
113130
{
114131
Op: pb.Op_DEL,
@@ -121,16 +138,14 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request {
121138
panic("unreachable")
122139
}
123140

124-
const defaultTxnLockTTLSeconds = uint64(30)
125-
126-
func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
141+
func (c *Coordinate) toTxnRequests(req *Elem[OP], startTS uint64) []*pb.Request {
127142
switch req.Op {
128143
case Put:
129144
return []*pb.Request{
130145
{
131146
IsTxn: true,
132147
Phase: pb.Phase_PREPARE,
133-
Ts: defaultTxnLockTTLSeconds,
148+
Ts: startTS,
134149
Mutations: []*pb.Mutation{
135150
{
136151
Key: req.Key,
@@ -141,7 +156,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
141156
{
142157
IsTxn: true,
143158
Phase: pb.Phase_COMMIT,
144-
Ts: defaultTxnLockTTLSeconds,
159+
Ts: startTS,
145160
Mutations: []*pb.Mutation{
146161
{
147162
Key: req.Key,
@@ -156,7 +171,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
156171
{
157172
IsTxn: true,
158173
Phase: pb.Phase_PREPARE,
159-
Ts: defaultTxnLockTTLSeconds,
174+
Ts: startTS,
160175
Mutations: []*pb.Mutation{
161176
{
162177
Key: req.Key,
@@ -166,7 +181,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
166181
{
167182
IsTxn: true,
168183
Phase: pb.Phase_COMMIT,
169-
Ts: defaultTxnLockTTLSeconds,
184+
Ts: startTS,
170185
Mutations: []*pb.Mutation{
171186
{
172187
Key: req.Key,
@@ -204,7 +219,7 @@ func (c *Coordinate) redirect(reqs *OperationGroup[OP]) (*CoordinateResponse, er
204219
var requests []*pb.Request
205220
if reqs.IsTxn {
206221
for _, req := range reqs.Elems {
207-
requests = append(requests, c.toTxnRequests(req)...)
222+
requests = append(requests, c.toTxnRequests(req, reqs.StartTS)...)
208223
}
209224
} else {
210225
for _, req := range reqs.Elems {

0 commit comments

Comments
 (0)