Skip to content

Commit 6911734

Browse files
committed
Custom rcu-like structure
1 parent bc80956 commit 6911734

File tree

2 files changed

+138
-9
lines changed

2 files changed

+138
-9
lines changed

main.go

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"net/http"
77
"netexp/netdev"
88
"netexp/pipeline"
9+
"netexp/rcu"
910
"os"
10-
"sync"
1111
"time"
1212
)
1313

@@ -17,9 +17,10 @@ var (
1717
getver bool
1818
)
1919

20+
type Metrics []byte
21+
2022
var (
21-
mu sync.RWMutex // guards the metrics
22-
metrics []byte
23+
rcuMetrics *rcu.RCU[Metrics]
2324
)
2425

2526
func main() {
@@ -40,6 +41,11 @@ func main() {
4041
return
4142
}
4243

44+
// Init rcu
45+
rcuMetrics = rcu.New[Metrics]()
46+
// Add the first element.
47+
rcuMetrics.Rotate()
48+
4349
serve()
4450
gather()
4551
}
@@ -50,9 +56,12 @@ func serve() {
5056
})
5157

5258
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
53-
mu.RLock()
54-
defer mu.RUnlock()
55-
w.Write(metrics)
59+
latest, done := rcuMetrics.Latest()
60+
61+
if latest != nil {
62+
w.Write(*latest)
63+
done()
64+
}
5665
})
5766

5867
go func() {
@@ -68,6 +77,8 @@ func serve() {
6877
func gather() {
6978
p := pipeline.New([]int{1, 5, 10, 15, 30, 60})
7079

80+
timer := time.NewTicker(60 * time.Second)
81+
7182
for {
7283
data, err := netdev.ReadNetDev()
7384
if err != nil {
@@ -79,9 +90,16 @@ func gather() {
7990
panic(fmt.Errorf("could not get traffic: %w", err))
8091
}
8192

82-
mu.Lock()
83-
metrics = p.Step(recv, trns)
84-
mu.Unlock()
93+
m := p.Step(recv, trns)
94+
95+
// Non blocking. It expected to be fast.
96+
rcuMetrics.Assign(m)
97+
98+
select {
99+
case <-timer.C:
100+
rcuMetrics.Rotate()
101+
default:
102+
}
85103

86104
time.Sleep(time.Second)
87105
}

rcu/rcu.go

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
// read, copy, update
2+
package rcu
3+
4+
import (
5+
"slices"
6+
"sync"
7+
)
8+
9+
// Represents a single unit of data that "RCU" Holds.
10+
type Element[T any] struct {
11+
data T
12+
mu *sync.Mutex // guards the "refCount" down below.
13+
refCount int // Read & Writes on "refCount" only happens under the mu lock.
14+
}
15+
16+
// RCU is a structure that provides a safe way to Write and read
17+
// data. All readers are guaranteed to access to the second latest
18+
// buffer, Using its "Latest()" method.
19+
type RCU[T any] struct {
20+
elements []Element[T]
21+
mu sync.RWMutex
22+
}
23+
24+
func New[T any]() *RCU[T] {
25+
return &RCU[T]{
26+
// 10 capacity guarantees that no reallocation occur, if and
27+
// only if the program doesn't append more than that. Which
28+
// is unlikely to happen if we configure a timeout deadline on
29+
// the HTTP server.
30+
elements: make([]Element[T], 0, 10),
31+
mu: sync.RWMutex{},
32+
}
33+
}
34+
35+
// Rotate adds a new instance of Element to the Elements slice and
36+
// also removes unreferenced elements from the beginning of the slice.
37+
func (rcu *RCU[T]) Rotate() {
38+
rcu.mu.Lock()
39+
defer rcu.mu.Unlock()
40+
41+
newElem := Element[T]{
42+
refCount: 0,
43+
mu: &sync.Mutex{},
44+
}
45+
46+
rcu.elements = append(rcu.elements, newElem)
47+
48+
if len(rcu.elements) <= 2 {
49+
return // So there is nothing to clean up.
50+
}
51+
52+
// Only check up to last two (protect the last two: current and
53+
// previous elements). And do not waste your time if its lock
54+
// acquired.
55+
til := 0
56+
for i := 0; i < len(rcu.elements)-2; i++ {
57+
58+
ok := rcu.elements[i].mu.TryLock()
59+
60+
if !ok {
61+
break
62+
}
63+
64+
if rcu.elements[i].refCount > 0 {
65+
rcu.elements[i].mu.Unlock()
66+
break // Stop if we hit a referenced element; We only remove consecutive unreferenced elements.
67+
}
68+
til++
69+
rcu.elements[i].mu.Unlock()
70+
}
71+
72+
if til > 0 {
73+
rcu.elements = slices.Delete(rcu.elements, 0, til)
74+
}
75+
}
76+
77+
type RefDecrementFunc func()
78+
79+
// returns the most recent valid element. The caller is reponsible for
80+
// decrementing the refCount using the returned "RefDecrementFunc".
81+
func (rcu *RCU[T]) Latest() (*T, RefDecrementFunc) {
82+
rcu.mu.RLock()
83+
84+
if len(rcu.elements) >= 2 {
85+
index := len(rcu.elements) - 2
86+
87+
elem := &rcu.elements[index]
88+
rcu.mu.RUnlock()
89+
90+
elem.mu.Lock()
91+
elem.refCount++
92+
elem.mu.Unlock()
93+
94+
return &elem.data, func() {
95+
elem.mu.Lock()
96+
elem.refCount--
97+
elem.mu.Unlock()
98+
}
99+
}
100+
101+
rcu.mu.RUnlock()
102+
return nil, nil
103+
}
104+
105+
// Assigns data to the last index of "elements" slice. It doesn't
106+
// need mutual exclution, because only one goroutine manipulates the
107+
// rcu slice.
108+
func (rcu *RCU[T]) Assign(data T) {
109+
l := len(rcu.elements)
110+
rcu.elements[l-1].data = data
111+
}

0 commit comments

Comments
 (0)