Skip to content

Commit d667f7a

Browse files
author
Jim Posen
committed
Implement basic in-memory ReplayLog for use in tests.
1 parent 38e1504 commit d667f7a

File tree

2 files changed

+259
-0
lines changed

2 files changed

+259
-0
lines changed

replaylog.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package sphinx
22

33
import (
44
"crypto/sha256"
5+
"errors"
56
)
67

78
const (
@@ -15,6 +16,16 @@ const (
1516
// of a Hash256, and is used to detect duplicate sphinx packets.
1617
type HashPrefix [HashPrefixSize]byte
1718

19+
// errReplayLogAlreadyStarted is an error returned when Start() is called on a
20+
// ReplayLog after it is started and before it is stopped.
21+
var errReplayLogAlreadyStarted error = errors.New(
22+
"Replay log has already been started")
23+
24+
// errReplayLogNotStarted is an error returned when methods other than Start()
25+
// are called on a ReplayLog before it is started or after it is stopped.
26+
var errReplayLogNotStarted error = errors.New(
27+
"Replay log has not been started")
28+
1829
// hashSharedSecret Sha-256 hashes the shared secret and returns the first
1930
// HashPrefixSize bytes of the hash.
2031
func hashSharedSecret(sharedSecret *Hash256) *HashPrefix {
@@ -58,3 +69,119 @@ type ReplayLog interface {
5869
// that are replays and an error if one occurs.
5970
PutBatch(*Batch) (*ReplaySet, error)
6071
}
72+
73+
// MemoryReplayLog is a simple ReplayLog implementation that stores all added
74+
// sphinx packets and processed batches in memory with no persistence.
75+
//
76+
// This is designed for use just in testing.
77+
type MemoryReplayLog struct {
78+
batches map[string]*ReplaySet
79+
entries map[HashPrefix]uint32
80+
}
81+
82+
// NewMemoryReplayLog constructs a new MemoryReplayLog.
83+
func NewMemoryReplayLog() *MemoryReplayLog {
84+
return &MemoryReplayLog{}
85+
}
86+
87+
// Start initializes the log and must be called before any other methods.
88+
func (rl *MemoryReplayLog) Start() error {
89+
rl.batches = make(map[string]*ReplaySet)
90+
rl.entries = make(map[HashPrefix]uint32)
91+
return nil
92+
}
93+
94+
// Stop wipes the state of the log.
95+
func (rl *MemoryReplayLog) Stop() error {
96+
if rl.entries == nil || rl.batches == nil {
97+
return errReplayLogNotStarted
98+
}
99+
100+
rl.batches = nil
101+
rl.entries = nil
102+
return nil
103+
}
104+
105+
// Get retrieves an entry from the log given its hash prefix. It returns the
106+
// value stored and an error if one occurs. It returns ErrLogEntryNotFound
107+
// if the entry is not in the log.
108+
func (rl *MemoryReplayLog) Get(hash *HashPrefix) (uint32, error) {
109+
if rl.entries == nil || rl.batches == nil {
110+
return 0, errReplayLogNotStarted
111+
}
112+
113+
cltv, exists := rl.entries[*hash]
114+
if !exists {
115+
return 0, ErrLogEntryNotFound
116+
}
117+
118+
return cltv, nil
119+
}
120+
121+
// Put stores an entry into the log given its hash prefix and an accompanying
122+
// purposefully general type. It returns ErrReplayedPacket if the provided hash
123+
// prefix already exists in the log.
124+
func (rl *MemoryReplayLog) Put(hash *HashPrefix, cltv uint32) error {
125+
if rl.entries == nil || rl.batches == nil {
126+
return errReplayLogNotStarted
127+
}
128+
129+
_, exists := rl.entries[*hash]
130+
if exists {
131+
return ErrReplayedPacket
132+
}
133+
134+
rl.entries[*hash] = cltv
135+
return nil
136+
}
137+
138+
// Delete deletes an entry from the log given its hash prefix.
139+
func (rl *MemoryReplayLog) Delete(hash *HashPrefix) error {
140+
if rl.entries == nil || rl.batches == nil {
141+
return errReplayLogNotStarted
142+
}
143+
144+
delete(rl.entries, *hash)
145+
return nil
146+
}
147+
148+
// PutBatch stores a batch of sphinx packets into the log given their hash
149+
// prefixes and accompanying values. Returns the set of entries in the batch
150+
// that are replays and an error if one occurs.
151+
func (rl *MemoryReplayLog) PutBatch(batch *Batch) (*ReplaySet, error) {
152+
if rl.entries == nil || rl.batches == nil {
153+
return nil, errReplayLogNotStarted
154+
}
155+
156+
// Return the result when the batch was first processed to provide
157+
// idempotence.
158+
replays, exists := rl.batches[string(batch.id)]
159+
160+
if !exists {
161+
replays = NewReplaySet()
162+
for seqNum, entry := range batch.entries {
163+
err := rl.Put(&entry.hashPrefix, entry.cltv)
164+
if err == ErrReplayedPacket {
165+
replays.Add(seqNum)
166+
continue
167+
}
168+
169+
// This would be bad because we have already updated the entries
170+
// map, but no errors other than ErrReplayedPacket should occur.
171+
if err != nil {
172+
return nil, err
173+
}
174+
}
175+
176+
replays.Merge(batch.replaySet)
177+
rl.batches[string(batch.id)] = replays
178+
}
179+
180+
batch.replaySet = replays
181+
batch.isCommitted = true
182+
183+
return replays, nil
184+
}
185+
186+
// A compile time asserting *MemoryReplayLog implements the RelayLog interface.
187+
var _ ReplayLog = (*MemoryReplayLog)(nil)

replaylog_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package sphinx
2+
3+
import (
4+
"testing"
5+
)
6+
7+
// TestMemoryReplayLogStorageAndRetrieval tests that the non-batch methods on
8+
// MemoryReplayLog work as expected.
9+
func TestMemoryReplayLogStorageAndRetrieval(t *testing.T) {
10+
rl := NewMemoryReplayLog()
11+
rl.Start()
12+
defer rl.Stop()
13+
14+
var hashPrefix HashPrefix
15+
hashPrefix[0] = 1
16+
17+
var cltv1 uint32 = 1
18+
19+
// Attempt to lookup unknown sphinx packet.
20+
_, err := rl.Get(&hashPrefix)
21+
if err == nil {
22+
t.Fatalf("Expected ErrLogEntryNotFound")
23+
}
24+
if err != ErrLogEntryNotFound {
25+
t.Fatalf("Get failed - received unexpected error upon Get: %v", err)
26+
}
27+
28+
// Log incoming sphinx packet.
29+
err = rl.Put(&hashPrefix, cltv1)
30+
if err != nil {
31+
t.Fatalf("Put failed - received unexpected error upon Put: %v", err)
32+
}
33+
34+
// Attempt to replay sphinx packet.
35+
err = rl.Put(&hashPrefix, cltv1)
36+
if err == nil {
37+
t.Fatalf("Expected ErrReplayedPacket")
38+
}
39+
if err != ErrReplayedPacket {
40+
t.Fatalf("Put failed - received unexpected error upon Put: %v", err)
41+
}
42+
43+
// Lookup logged sphinx packet.
44+
cltv, err := rl.Get(&hashPrefix)
45+
if err != nil {
46+
t.Fatalf("Get failed - received unexpected error upon Get: %v", err)
47+
}
48+
if cltv != cltv1 {
49+
t.Fatalf("Get returned wrong value: expected %v, got %v", cltv1, cltv)
50+
}
51+
52+
// Delete sphinx packet from log.
53+
err = rl.Delete(&hashPrefix)
54+
if err != nil {
55+
t.Fatalf("Delete failed - received unexpected error upon Delete: %v", err)
56+
}
57+
58+
// Attempt to lookup deleted sphinx packet.
59+
_, err = rl.Get(&hashPrefix)
60+
if err == nil {
61+
t.Fatalf("Expected ErrLogEntryNotFound")
62+
}
63+
if err != ErrLogEntryNotFound {
64+
t.Fatalf("Get failed - received unexpected error upon Get: %v", err)
65+
}
66+
67+
// Reinsert incoming sphinx packet into the log.
68+
var cltv2 uint32 = 2
69+
err = rl.Put(&hashPrefix, cltv2)
70+
if err != nil {
71+
t.Fatalf("Put failed - received unexpected error upon Put: %v", err)
72+
}
73+
74+
// Lookup logged sphinx packet.
75+
cltv, err = rl.Get(&hashPrefix)
76+
if err != nil {
77+
t.Fatalf("Get failed - received unexpected error upon Get: %v", err)
78+
}
79+
if cltv != cltv2 {
80+
t.Fatalf("Get returned wrong value: expected %v, got %v", cltv2, cltv)
81+
}
82+
}
83+
84+
// TestMemoryReplayLogPutBatch tests that the batch adding of packets to a log
85+
// works as expected.
86+
func TestMemoryReplayLogPutBatch(t *testing.T) {
87+
rl := NewMemoryReplayLog()
88+
rl.Start()
89+
defer rl.Stop()
90+
91+
var hashPrefix1, hashPrefix2 HashPrefix
92+
hashPrefix1[0] = 1
93+
hashPrefix2[0] = 2
94+
95+
// Create a batch with a duplicated packet.
96+
batch1 := NewBatch([]byte{1})
97+
err := batch1.Put(1, &hashPrefix1, 1)
98+
if err != nil {
99+
t.Fatalf("Unexpected error adding entry to batch: %v", err)
100+
}
101+
err = batch1.Put(1, &hashPrefix1, 1)
102+
if err != nil {
103+
t.Fatalf("Unexpected error adding entry to batch: %v", err)
104+
}
105+
106+
replays, err := rl.PutBatch(batch1)
107+
if replays.Size() != 1 || !replays.Contains(1) {
108+
t.Fatalf("Unexpected replay set after adding batch 1 to log: %v", err)
109+
}
110+
111+
// Create a batch with one replayed packet and one valid one.
112+
batch2 := NewBatch([]byte{2})
113+
err = batch2.Put(1, &hashPrefix1, 1)
114+
if err != nil {
115+
t.Fatalf("Unexpected error adding entry to batch: %v", err)
116+
}
117+
err = batch2.Put(2, &hashPrefix2, 2)
118+
if err != nil {
119+
t.Fatalf("Unexpected error adding entry to batch: %v", err)
120+
}
121+
122+
replays, err = rl.PutBatch(batch2)
123+
if replays.Size() != 1 || !replays.Contains(1) {
124+
t.Fatalf("Unexpected replay set after adding batch 2 to log: %v", err)
125+
}
126+
127+
// Reprocess batch 2, which should be idempotent.
128+
replays, err = rl.PutBatch(batch2)
129+
if replays.Size() != 1 || !replays.Contains(1) {
130+
t.Fatalf("Unexpected replay set after adding batch 2 to log: %v", err)
131+
}
132+
}

0 commit comments

Comments
 (0)