Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Repository Guidelines

## Project Structure & Modules
- `cmd/server`, `cmd/client`: entrypoints for running the KV server and client.
- `store/`: MVCC storage engine, OCC/TTL, and related tests.
- `kv/`: hybrid logical clock (HLC) utilities and KV interfaces.
- `adapter/`: protocol adapters (e.g., Redis), plus integration tests.
- `jepsen/`, `jepsen/redis/`: Jepsen test harnesses and workloads.
- `proto/`, `distribution/`, `internal/`: supporting protobufs, build assets, and shared helpers.

## Build, Test, and Development Commands
- `go test ./...` — run unit/integration tests. If macOS sandbox blocks `$GOCACHE`, prefer `GOCACHE=$(pwd)/.cache GOTMPDIR=$(pwd)/.cache/tmp go test ./...`.
- `GOCACHE=$(pwd)/.cache GOLANGCI_LINT_CACHE=$(pwd)/.golangci-cache golangci-lint run ./... --timeout=5m` — full lint suite.
- `HOME=$(pwd)/jepsen/tmp-home LEIN_HOME=$(pwd)/jepsen/.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/jepsen/tmp-home" /tmp/lein test` (from `jepsen/` or `jepsen/redis/`) — Jepsen tests.
- `go run ./cmd/server` / `go run ./cmd/client` — start server or CLI locally.

## Coding Style & Naming
- Go code: `gofmt` + project lint rules (`golangci-lint`). Avoid adding `//nolint` unless absolutely required; prefer refactoring.
- Naming: Go conventions (MixedCaps for exported identifiers, short receiver names). Filenames remain lowercase with underscores only where existing.
- Logging: use `slog` where present; maintain structured keys (`key`, `commit_ts`, etc.).

## Testing Guidelines
- Unit tests co-located with packages (`*_test.go`); prefer table-driven cases.
- TTL/HLC behaviors live in `store/` and `kv/`; add coverage when touching clocks, OCC, or replication logic.
- Integration: run Jepsen suites after changes affecting replication, MVCC, or Redis adapter.
- `cd jepsen && HOME=$(pwd)/tmp-home LEIN_HOME=$(pwd)/.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/tmp-home" /tmp/lein test`
- `cd jepsen/redis && HOME=$(pwd)/../tmp-home LEIN_HOME=$(pwd)/../.lein LEIN_JVM_OPTS="-Duser.home=$(pwd)/../tmp-home" /tmp/lein test`

## Commit & Pull Request Guidelines
- Messages: short imperative summary (e.g., “Add HLC TTL handling”). Include scope when helpful (`store:`, `adapter:`).
- Pull requests: describe behavior change, risk, and test evidence (`go test`, lint, Jepsen). Add repro steps for bug fixes.

## Security & Configuration Tips
- Hybrid clock derives from wall-clock millis; keep system clock reasonably synchronized across nodes.
- Avoid leader-local timestamps in persistence; timestamp issuance should originate from the Raft leader to prevent skewed OCC decisions.
17 changes: 16 additions & 1 deletion adapter/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,18 @@ import (
"github.com/hashicorp/raft"
)

func NewInternal(txm kv.Transactional, r *raft.Raft) *Internal {
func NewInternal(txm kv.Transactional, r *raft.Raft, clock *kv.HLC) *Internal {
return &Internal{
raft: r,
transactionManager: txm,
clock: clock,
}
}

type Internal struct {
raft *raft.Raft
transactionManager kv.Transactional
clock *kv.HLC

pb.UnimplementedInternalServer
}
Expand All @@ -33,6 +35,19 @@ func (i *Internal) Forward(_ context.Context, req *pb.ForwardRequest) (*pb.Forwa
return nil, errors.WithStack(ErrNotLeader)
}

// Ensure leader issues start_ts when followers forward txn groups without it.
if req.IsTxn {
var startTs uint64
for _, r := range req.Requests {
if r.Ts == 0 {
if startTs == 0 {
startTs = i.clock.Next()
}
r.Ts = startTs
}
}
}

r, err := i.transactionManager.Commit(req.Requests)
if err != nil {
return &pb.ForwardResponse{
Expand Down
7 changes: 3 additions & 4 deletions adapter/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,9 +322,8 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
cfg := buildRaftConfig(n, ports)

for i := 0; i < n; i++ {
st := store.NewRbMemoryStore()
trxSt := store.NewMemoryStoreDefaultTTL()
fsm := kv.NewKvFSM(st, trxSt)
st := store.NewMVCCStore()
fsm := kv.NewKvFSM(st)

port := ports[i]
grpcSock := lis[i].grpc
Expand All @@ -351,7 +350,7 @@ func setupNodes(t *testing.T, ctx context.Context, n int, ports []portsAdress) (
tm.Register(s)
pb.RegisterRawKVServer(s, gs)
pb.RegisterTransactionalKVServer(s, gs)
pb.RegisterInternalServer(s, NewInternal(trx, r))
pb.RegisterInternalServer(s, NewInternal(trx, r, coordinator.Clock()))

leaderhealth.Setup(r, s, []string{"Example"})
raftadmin.Register(s, r)
Expand Down
7 changes: 3 additions & 4 deletions cmd/server/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,8 @@ func run(eg *errgroup.Group) error {
}

for i := 0; i < 3; i++ {
st := store.NewRbMemoryStore()
trxSt := store.NewMemoryStoreDefaultTTL()
fsm := kv.NewKvFSM(st, trxSt)
st := store.NewMVCCStore()
fsm := kv.NewKvFSM(st)

r, tm, err := newRaft(strconv.Itoa(i), grpcAdders[i], fsm, i == 0, cfg)
if err != nil {
Expand All @@ -97,7 +96,7 @@ func run(eg *errgroup.Group) error {
tm.Register(s)
pb.RegisterRawKVServer(s, gs)
pb.RegisterTransactionalKVServer(s, gs)
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r))
pb.RegisterInternalServer(s, adapter.NewInternal(trx, r, coordinator.Clock()))
leaderhealth.Setup(r, s, []string{"RawKV"})
raftadmin.Register(s, r)

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/tidwall/redcon v1.6.2
go.etcd.io/bbolt v1.4.3
golang.org/x/sync v0.19.0
golang.org/x/sys v0.38.0
google.golang.org/grpc v1.78.0
google.golang.org/protobuf v1.36.11
)
Expand Down Expand Up @@ -68,7 +69,6 @@ require (
github.com/tidwall/btree v1.1.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/text v0.31.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20251029180050-ab9386a59fda // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
37 changes: 26 additions & 11 deletions kv/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ func NewCoordinator(txm Transactional, r *raft.Raft) *Coordinate {
return &Coordinate{
transactionManager: txm,
raft: r,
clock: NewHLC(),
}
}

Expand All @@ -24,6 +25,7 @@ type CoordinateResponse struct {
type Coordinate struct {
transactionManager Transactional
raft *raft.Raft
clock *HLC
}

var _ Coordinator = (*Coordinate)(nil)
Expand All @@ -39,8 +41,13 @@ func (c *Coordinate) Dispatch(reqs *OperationGroup[OP]) (*CoordinateResponse, er
return c.redirect(reqs)
}

if reqs.IsTxn && reqs.StartTS == 0 {
// Leader-only timestamp issuance to avoid divergence across shards.
reqs.StartTS = c.nextStartTS()
}

if reqs.IsTxn {
return c.dispatchTxn(reqs.Elems)
return c.dispatchTxn(reqs.Elems, reqs.StartTS)
}

return c.dispatchRaw(reqs.Elems)
Expand All @@ -56,10 +63,18 @@ func (c *Coordinate) RaftLeader() raft.ServerAddress {
return addr
}

func (c *Coordinate) dispatchTxn(reqs []*Elem[OP]) (*CoordinateResponse, error) {
func (c *Coordinate) Clock() *HLC {
return c.clock
}

func (c *Coordinate) nextStartTS() uint64 {
return c.clock.Next()
}

func (c *Coordinate) dispatchTxn(reqs []*Elem[OP], startTS uint64) (*CoordinateResponse, error) {
var logs []*pb.Request
for _, req := range reqs {
m := c.toTxnRequests(req)
m := c.toTxnRequests(req, startTS)
logs = append(logs, m...)
}

Expand Down Expand Up @@ -96,6 +111,7 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request {
return &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: c.clock.Next(),
Mutations: []*pb.Mutation{
{
Op: pb.Op_PUT,
Expand All @@ -109,6 +125,7 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request {
return &pb.Request{
IsTxn: false,
Phase: pb.Phase_NONE,
Ts: c.clock.Next(),
Mutations: []*pb.Mutation{
{
Op: pb.Op_DEL,
Expand All @@ -121,16 +138,14 @@ func (c *Coordinate) toRawRequest(req *Elem[OP]) *pb.Request {
panic("unreachable")
}

const defaultTxnLockTTLSeconds = uint64(30)

func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
func (c *Coordinate) toTxnRequests(req *Elem[OP], startTS uint64) []*pb.Request {
switch req.Op {
case Put:
return []*pb.Request{
{
IsTxn: true,
Phase: pb.Phase_PREPARE,
Ts: defaultTxnLockTTLSeconds,
Ts: startTS,
Mutations: []*pb.Mutation{
{
Key: req.Key,
Expand All @@ -141,7 +156,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
{
IsTxn: true,
Phase: pb.Phase_COMMIT,
Ts: defaultTxnLockTTLSeconds,
Ts: startTS,
Mutations: []*pb.Mutation{
{
Key: req.Key,
Expand All @@ -156,7 +171,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
{
IsTxn: true,
Phase: pb.Phase_PREPARE,
Ts: defaultTxnLockTTLSeconds,
Ts: startTS,
Mutations: []*pb.Mutation{
{
Key: req.Key,
Expand All @@ -166,7 +181,7 @@ func (c *Coordinate) toTxnRequests(req *Elem[OP]) []*pb.Request {
{
IsTxn: true,
Phase: pb.Phase_COMMIT,
Ts: defaultTxnLockTTLSeconds,
Ts: startTS,
Mutations: []*pb.Mutation{
{
Key: req.Key,
Expand Down Expand Up @@ -204,7 +219,7 @@ func (c *Coordinate) redirect(reqs *OperationGroup[OP]) (*CoordinateResponse, er
var requests []*pb.Request
if reqs.IsTxn {
for _, req := range reqs.Elems {
requests = append(requests, c.toTxnRequests(req)...)
requests = append(requests, c.toTxnRequests(req, reqs.StartTS)...)
}
} else {
for _, req := range reqs.Elems {
Expand Down
Loading
Loading