Skip to content

Commit 5ca79dc

Browse files
jpeletiernolash
authored andcommitted
pss: Refactor. Step 2. Refactor forward cache (ethersphere#1742)
pss: Refactored forwardCache into a TTLSet + add mockable timer
1 parent 11c05dc commit 5ca79dc

File tree

20 files changed

+1065
-164
lines changed

20 files changed

+1065
-164
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ require (
7171
github.com/steakknife/bloomfilter v0.0.0-20180922174646-6819c0d2a570 // indirect
7272
github.com/steakknife/hamming v0.0.0-20180906055917-c99c65617cd3 // indirect
7373
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965
74+
github.com/tilinna/clock v1.0.2
7475
github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef // indirect
7576
github.com/uber-go/atomic v1.4.0 // indirect
7677
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,8 @@ github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0
249249
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
250250
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965 h1:V/AztY/q2oW5ghho7YMgUJQkKvSACHRxpeDyT5DxpIo=
251251
github.com/syndtr/goleveldb v0.0.0-20190318030020-c3a204f8e965/go.mod h1:9OrXJhf154huy1nPWmuSrkgjPUtUNhA+Zmy+6AESzuA=
252+
github.com/tilinna/clock v1.0.2 h1:6BO2tyAC9JbPExKH/z9zl44FLu1lImh3nDNKA0kgrkI=
253+
github.com/tilinna/clock v1.0.2/go.mod h1:ZsP7BcY7sEEz7ktc0IVy8Us6boDrK8VradlKRUGfOao=
252254
github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef h1:luEzjJzktS9eU0CmI0uApXHLP/lKzOoRPrJhd71J8ik=
253255
github.com/tyler-smith/go-bip39 v0.0.0-20181017060643-dbb3b84ba2ef/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2K9Zr6cf67kNs=
254256
github.com/uber-go/atomic v1.4.0 h1:yOuPqEq4ovnhEjpHmfFwsqBXDYbQeT6Nb0bwD6XnD5o=

pss/internal/ticker/ticker.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package ticker
2+
3+
import (
4+
"errors"
5+
"time"
6+
7+
"github.com/tilinna/clock"
8+
)
9+
10+
// Config defines the necessary information and dependencies to instantiate a Ticker
11+
type Config struct {
12+
Clock clock.Clock
13+
Interval time.Duration
14+
Callback func()
15+
}
16+
17+
// Ticker represents a periodic timer that invokes a callback
18+
type Ticker struct {
19+
quitC chan struct{}
20+
}
21+
22+
// ErrAlreadyStopped is returned if this service was already stopped and Stop() is called again
23+
var ErrAlreadyStopped = errors.New("Already stopped")
24+
25+
// New builds a ticker that will call the given callback function periodically
26+
func New(config *Config) *Ticker {
27+
28+
tk := &Ticker{
29+
quitC: make(chan struct{}),
30+
}
31+
ticker := config.Clock.NewTicker(config.Interval)
32+
go func() {
33+
defer ticker.Stop()
34+
for {
35+
select {
36+
case <-ticker.C:
37+
config.Callback()
38+
case <-tk.quitC:
39+
return
40+
}
41+
}
42+
}()
43+
return tk
44+
}
45+
46+
// Stop stops the timer and releases the goroutine running it.
47+
func (tk *Ticker) Stop() error {
48+
if tk.quitC == nil {
49+
return ErrAlreadyStopped
50+
}
51+
close(tk.quitC)
52+
tk.quitC = nil
53+
return nil
54+
}

pss/internal/ticker/ticker_test.go

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package ticker_test
2+
3+
import (
4+
"sync"
5+
"testing"
6+
"time"
7+
8+
"github.com/ethersphere/swarm/pss/internal/ticker"
9+
"github.com/tilinna/clock"
10+
)
11+
12+
// TestNewTicker tests whether the ticker calls a callback function periodically
13+
func TestNewTicker(t *testing.T) {
14+
var err error
15+
16+
testClock := clock.NewMock(time.Unix(0, 0))
17+
interval := 10 * time.Second
18+
19+
wg := sync.WaitGroup{}
20+
wg.Add(10)
21+
tickWait := make(chan bool)
22+
23+
testTicker := ticker.New(&ticker.Config{
24+
Interval: interval,
25+
Clock: testClock,
26+
Callback: func() {
27+
wg.Done()
28+
tickWait <- true
29+
},
30+
})
31+
32+
for i := 0; i < 10; i++ {
33+
testClock.Add(interval)
34+
<-tickWait
35+
}
36+
37+
wg.Wait()
38+
err = testTicker.Stop()
39+
if err != nil {
40+
t.Fatal(err)
41+
}
42+
43+
err = testTicker.Stop()
44+
if err != ticker.ErrAlreadyStopped {
45+
t.Fatal("Expected Stop() to return ticker.ErrAlreadyStopped when trying to stop an already stopped ticker")
46+
}
47+
}

pss/internal/ttlset/ttlset.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package ttlset
2+
3+
import (
4+
"sync"
5+
"time"
6+
7+
"github.com/tilinna/clock"
8+
)
9+
10+
// Config defines the TTLSet configuration
11+
type Config struct {
12+
EntryTTL time.Duration // time after which items are removed
13+
Clock clock.Clock // time reference
14+
}
15+
16+
// TTLSet implements a Set that automatically removes expired keys
17+
// after a predefined expiration time
18+
type TTLSet struct {
19+
Config
20+
set map[interface{}]setEntry
21+
lock sync.RWMutex
22+
}
23+
24+
type setEntry struct {
25+
expiresAt time.Time
26+
}
27+
28+
// New instances a TTLSet
29+
func New(config *Config) *TTLSet {
30+
ts := &TTLSet{
31+
set: make(map[interface{}]setEntry),
32+
Config: *config,
33+
}
34+
return ts
35+
}
36+
37+
// Add adds a new key to the set
38+
func (ts *TTLSet) Add(key interface{}) error {
39+
var entry setEntry
40+
var ok bool
41+
42+
ts.lock.Lock()
43+
defer ts.lock.Unlock()
44+
45+
if entry, ok = ts.set[key]; !ok {
46+
entry = setEntry{}
47+
}
48+
entry.expiresAt = ts.Clock.Now().Add(ts.EntryTTL)
49+
ts.set[key] = entry
50+
return nil
51+
}
52+
53+
// Has returns whether or not a key is already/still in the set
54+
func (ts *TTLSet) Has(key interface{}) bool {
55+
ts.lock.Lock()
56+
defer ts.lock.Unlock()
57+
58+
entry, ok := ts.set[key]
59+
if ok {
60+
if entry.expiresAt.After(ts.Clock.Now()) {
61+
return true
62+
}
63+
delete(ts.set, key) // since we're holding the lock, take the chance to delete a expired record
64+
}
65+
return false
66+
}
67+
68+
// GC will remove expired entries from the set
69+
func (ts *TTLSet) GC() {
70+
ts.lock.Lock()
71+
defer ts.lock.Unlock()
72+
for k, v := range ts.set {
73+
if v.expiresAt.Before(ts.Clock.Now()) {
74+
delete(ts.set, k)
75+
}
76+
}
77+
}
78+
79+
// Count returns the number of entries in the set
80+
func (ts *TTLSet) Count() int {
81+
ts.lock.Lock()
82+
defer ts.lock.Unlock()
83+
return len(ts.set)
84+
}

pss/internal/ttlset/ttlset_test.go

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package ttlset_test
2+
3+
import (
4+
"testing"
5+
"time"
6+
7+
"github.com/ethersphere/swarm/pss/internal/ttlset"
8+
"github.com/tilinna/clock"
9+
)
10+
11+
func TestTTLSet(t *testing.T) {
12+
var err error
13+
14+
testClock := clock.NewMock(time.Unix(0, 0))
15+
16+
testEntryTTL := 10 * time.Second
17+
testSet := ttlset.New(&ttlset.Config{
18+
EntryTTL: testEntryTTL,
19+
Clock: testClock,
20+
})
21+
22+
key1 := "some key"
23+
key2 := "some other key"
24+
25+
// check adding a key to the set
26+
err = testSet.Add(key1)
27+
if err != nil {
28+
t.Fatal((err))
29+
}
30+
31+
// check if the key is now there:
32+
hasKey := testSet.Has(key1)
33+
if !(hasKey == true) {
34+
t.Fatal("key1 should've been in the set, but Has() returned false")
35+
}
36+
37+
// check if Has() returns false when asked about a key that was never added:
38+
hasKey = testSet.Has("some made up key")
39+
if !(hasKey == false) {
40+
t.Fatal("Has() should have returned false when presented with a key that was never added")
41+
}
42+
43+
// Let some time pass, but not enough to have the key expire:
44+
testClock.Add(testEntryTTL / 2)
45+
46+
// check if the key is still there:
47+
hasKey = testSet.Has(key1)
48+
if !(hasKey == true) {
49+
t.Fatal("key1 should've been in the set, but Has() returned false")
50+
}
51+
52+
// Let some time pass well beyond the expiry time, so key1 expires:
53+
testClock.Add(testEntryTTL * 2)
54+
55+
// Add another key to the set:
56+
err = testSet.Add(key2)
57+
if err != nil {
58+
t.Fatal((err))
59+
}
60+
61+
hasKey = testSet.Has(key1)
62+
if !(hasKey == false) {
63+
t.Fatal("key1 should've been removed from the set, but Has() returned true")
64+
}
65+
66+
hasKey = testSet.Has(key2)
67+
if !(hasKey == true) {
68+
t.Fatal("key should remain in the set, but Has() returned false")
69+
}
70+
71+
// Let some time pass well beyond key2's expiry time, so key2 expires:
72+
testClock.Add(testEntryTTL * 2)
73+
74+
hasKey = testSet.Has(key2)
75+
if !(hasKey == false) {
76+
t.Fatal("key2 should have been wiped, but Has() returned true")
77+
}
78+
}
79+
80+
func TestGC(t *testing.T) {
81+
var err error
82+
83+
testClock := clock.NewMock(time.Unix(0, 0))
84+
85+
testEntryTTL := 10 * time.Second
86+
testSet := ttlset.New(&ttlset.Config{
87+
EntryTTL: testEntryTTL,
88+
Clock: testClock,
89+
})
90+
91+
key1 := "some key"
92+
key2 := "some later key"
93+
94+
// check adding a message to the cache
95+
err = testSet.Add(key1)
96+
if err != nil {
97+
t.Fatal((err))
98+
}
99+
100+
// move the clock 2 seconds
101+
testClock.Add(2 * time.Second)
102+
103+
// add a second key which will have a later expiration time
104+
err = testSet.Add(key2)
105+
if err != nil {
106+
t.Fatal((err))
107+
}
108+
109+
count := testSet.Count()
110+
if !(count == 2) {
111+
t.Fatal("Expected the set to contain 2 keys")
112+
}
113+
114+
testSet.GC() // attempt a cleanup. This cleanup should not affect any of the two keys, since they are not expired.
115+
116+
count = testSet.Count()
117+
if !(count == 2) {
118+
t.Fatal("Expected the set to still contain 2 keys")
119+
}
120+
121+
//Now, move the clock forward 9 seconds. This will expire key1 but still keep key2
122+
testClock.Add(9 * time.Second)
123+
testSet.GC() // invoke the internal cleaning function, which should wipe only key1
124+
count = testSet.Count()
125+
if !(count == 1) {
126+
t.Fatal("Expected the set to now have only 1 key")
127+
}
128+
129+
//Verify if key1 was wiped but key2 persists:
130+
hasKey := testSet.Has(key1)
131+
if !(hasKey == false) {
132+
t.Fatal("Expected the set to have removed key1")
133+
}
134+
135+
hasKey = testSet.Has(key2)
136+
if !(hasKey == true) {
137+
t.Fatal("Expected the set to still contain key2")
138+
}
139+
140+
//Now, move the clock some more time. This will wipe key2
141+
testClock.Add(7 * time.Second)
142+
testSet.GC() // invoke the internal cleaning function, which should wipe only key1
143+
144+
count = testSet.Count()
145+
// verify the map is now empty
146+
if !(count == 0) {
147+
t.Fatal("Expected the set to be empty")
148+
}
149+
150+
}

0 commit comments

Comments
 (0)