Skip to content

Commit db2bf06

Browse files
committed
using "sync/atomic" primitives
1 parent 6911734 commit db2bf06

File tree

2 files changed

+10
-130
lines changed

2 files changed

+10
-130
lines changed

main.go

Lines changed: 4 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,11 @@ import (
1313

1414
var (
1515
version = "0.3.8"
16+
metrics rcu.RCU[[]byte]
1617
listen string
1718
getver bool
1819
)
1920

20-
type Metrics []byte
21-
22-
var (
23-
rcuMetrics *rcu.RCU[Metrics]
24-
)
25-
2621
func main() {
2722
// get options from flags
2823
flag.StringVar(&listen, "listen", ":9298", "network address to listen on")
@@ -41,11 +36,6 @@ func main() {
4136
return
4237
}
4338

44-
// Init rcu
45-
rcuMetrics = rcu.New[Metrics]()
46-
// Add the first element.
47-
rcuMetrics.Rotate()
48-
4939
serve()
5040
gather()
5141
}
@@ -56,12 +46,7 @@ func serve() {
5646
})
5747

5848
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
59-
latest, done := rcuMetrics.Latest()
60-
61-
if latest != nil {
62-
w.Write(*latest)
63-
done()
64-
}
49+
w.Write(*metrics.Load())
6550
})
6651

6752
go func() {
@@ -77,8 +62,6 @@ func serve() {
7762
func gather() {
7863
p := pipeline.New([]int{1, 5, 10, 15, 30, 60})
7964

80-
timer := time.NewTicker(60 * time.Second)
81-
8265
for {
8366
data, err := netdev.ReadNetDev()
8467
if err != nil {
@@ -90,16 +73,9 @@ func gather() {
9073
panic(fmt.Errorf("could not get traffic: %w", err))
9174
}
9275

93-
m := p.Step(recv, trns)
76+
buf := p.Step(recv, trns)
9477

95-
// Non blocking. It expected to be fast.
96-
rcuMetrics.Assign(m)
97-
98-
select {
99-
case <-timer.C:
100-
rcuMetrics.Rotate()
101-
default:
102-
}
78+
metrics.Store(&buf)
10379

10480
time.Sleep(time.Second)
10581
}

rcu/rcu.go

Lines changed: 6 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,111 +1,15 @@
1-
// read, copy, update
21
package rcu
32

4-
import (
5-
"slices"
6-
"sync"
7-
)
3+
import "sync/atomic"
84

9-
// Represents a single unit of data that "RCU" Holds.
10-
type Element[T any] struct {
11-
data T
12-
mu *sync.Mutex // guards the "refCount" down below.
13-
refCount int // Read & Writes on "refCount" only happens under the mu lock.
14-
}
15-
16-
// RCU is a structure that provides a safe way to Write and read
17-
// data. All readers are guaranteed to access to the second latest
18-
// buffer, Using its "Latest()" method.
195
type RCU[T any] struct {
20-
elements []Element[T]
21-
mu sync.RWMutex
22-
}
23-
24-
func New[T any]() *RCU[T] {
25-
return &RCU[T]{
26-
// 10 capacity guarantees that no reallocation occur, if and
27-
// only if the program doesn't append more than that. Which
28-
// is unlikely to happen if we configure a timeout deadline on
29-
// the HTTP server.
30-
elements: make([]Element[T], 0, 10),
31-
mu: sync.RWMutex{},
32-
}
33-
}
34-
35-
// Rotate adds a new instance of Element to the Elements slice and
36-
// also removes unreferenced elements from the beginning of the slice.
37-
func (rcu *RCU[T]) Rotate() {
38-
rcu.mu.Lock()
39-
defer rcu.mu.Unlock()
40-
41-
newElem := Element[T]{
42-
refCount: 0,
43-
mu: &sync.Mutex{},
44-
}
45-
46-
rcu.elements = append(rcu.elements, newElem)
47-
48-
if len(rcu.elements) <= 2 {
49-
return // So there is nothing to clean up.
50-
}
51-
52-
// Only check up to last two (protect the last two: current and
53-
// previous elements). And do not waste your time if its lock
54-
// acquired.
55-
til := 0
56-
for i := 0; i < len(rcu.elements)-2; i++ {
57-
58-
ok := rcu.elements[i].mu.TryLock()
59-
60-
if !ok {
61-
break
62-
}
63-
64-
if rcu.elements[i].refCount > 0 {
65-
rcu.elements[i].mu.Unlock()
66-
break // Stop if we hit a referenced element; We only remove consecutive unreferenced elements.
67-
}
68-
til++
69-
rcu.elements[i].mu.Unlock()
70-
}
71-
72-
if til > 0 {
73-
rcu.elements = slices.Delete(rcu.elements, 0, til)
74-
}
6+
p atomic.Pointer[T]
757
}
768

77-
type RefDecrementFunc func()
78-
79-
// returns the most recent valid element. The caller is reponsible for
80-
// decrementing the refCount using the returned "RefDecrementFunc".
81-
func (rcu *RCU[T]) Latest() (*T, RefDecrementFunc) {
82-
rcu.mu.RLock()
83-
84-
if len(rcu.elements) >= 2 {
85-
index := len(rcu.elements) - 2
86-
87-
elem := &rcu.elements[index]
88-
rcu.mu.RUnlock()
89-
90-
elem.mu.Lock()
91-
elem.refCount++
92-
elem.mu.Unlock()
93-
94-
return &elem.data, func() {
95-
elem.mu.Lock()
96-
elem.refCount--
97-
elem.mu.Unlock()
98-
}
99-
}
100-
101-
rcu.mu.RUnlock()
102-
return nil, nil
9+
func (r *RCU[T]) Store(t *T) {
10+
r.p.Store(t)
10311
}
10412

105-
// Assigns data to the last index of "elements" slice. It doesn't
106-
// need mutual exclution, because only one goroutine manipulates the
107-
// rcu slice.
108-
func (rcu *RCU[T]) Assign(data T) {
109-
l := len(rcu.elements)
110-
rcu.elements[l-1].data = data
13+
func (r *RCU[T]) Load() *T {
14+
return r.p.Load()
11115
}

0 commit comments

Comments
 (0)