Skip to content

Commit a4b43f3

Browse files
committed
initial implementation
1 parent bacef9d commit a4b43f3

17 files changed

+796
-1
lines changed

README.md

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,11 @@
11
# brahms
2-
Byzantine Resilient Gossip Protocol
2+
This is an experimental implementation of [Brahms: Byzantine resilient random membership sampling](https://www.cs.technion.ac.il/~gabik/publications/Brahms-COMNET.pdf). It describes a byzantine resilient protocol that creates a well-connected overlay network with each member only needing to knowing at most `O(∛n)` other peers.
3+
4+
## TODO
5+
- [ ] decide, use and test an actual network transport
6+
- [ ] instead of Node ids, work with ip addresses and ports
7+
- [ ] fix the myriad of race conditions on shared memory variables
8+
- [ ] implement validation of the sample by probing
9+
- [ ] implement a limited push with a small proof of work
10+
- [ ] adjust l1 and l2 as the network grobs using an esimate as described [here](https://research.neustar.biz/2012/07/09/sketch-of-the-day-k-minimum-values/)
11+
- [ ] fix send on closed channel bug with update calls with too short a context

brahms.go

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
package brahms
2+
3+
import (
4+
"context"
5+
"math/rand"
6+
"time"
7+
)
8+
9+
// Brahms implements the gossip protocol and takes an old view 'v' and returns a
10+
// new view.
11+
func Brahms(self NID, rnd *rand.Rand, p P, to time.Duration, s *Sampler, tr Transport, pushes <-chan NID, v View) View {
12+
13+
// reset push/pull views (line 21)
14+
push, pull := View{}, View{}
15+
16+
// perform sends and write results to these channels
17+
cpull := make(chan View, p.βl1())
18+
func() {
19+
ctx, cancel := context.WithTimeout(context.Background(), to)
20+
defer cancel()
21+
22+
// push our own id to peers picked from the current view (line 22)
23+
for id := range v.Pick(rnd, p.αl1()) {
24+
go tr.Push(ctx, self, id)
25+
}
26+
27+
// send pull requests to peers picked from the current view (line 25)
28+
for id := range v.Pick(rnd, p.βl1()) {
29+
go tr.Pull(ctx, cpull, id)
30+
}
31+
32+
// wait for time unit to be done, cancels any open pushes/pulls (line 27)
33+
<-ctx.Done()
34+
close(cpull)
35+
}()
36+
37+
// drain the buffer of all ids pushed to us (line 28)
38+
DRAIN:
39+
for {
40+
select {
41+
case id := <-pushes:
42+
push[id] = struct{}{}
43+
default:
44+
break DRAIN
45+
}
46+
}
47+
48+
// add all peers that we received as replies from our pull requests (line 32)
49+
for pv := range cpull {
50+
for id := range pv {
51+
52+
//NOTE: we divert from the paper by ignoring any pulls
53+
if id == self {
54+
continue
55+
}
56+
57+
pull[id] = struct{}{}
58+
}
59+
}
60+
61+
// only update our view if the nr of pushed ids was not too high (line 35)
62+
// NOTE: we divert from the paper here. We're happy to update if either pull
63+
// or push yielded us some nodes not necessarily both.
64+
if len(push) <= p.αl1() && (len(push) > 0 || len(pull) > 0) {
65+
66+
// construct our new view from what we've seen this round (line 36)
67+
v = push.Pick(rnd, p.αl1()).
68+
Concat(pull.Pick(rnd, p.βl1())).
69+
Concat(s.Sample().Pick(rnd, p.γl1()))
70+
}
71+
72+
// update the sampler with resuling push/pull (line 37)
73+
s.Update(push.Concat(pull))
74+
75+
return v
76+
}

brahms_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package brahms
2+
3+
import (
4+
"math/rand"
5+
"testing"
6+
"time"
7+
8+
"github.com/advanderveer/go-test"
9+
)
10+
11+
func TestBrahmsNoReply(t *testing.T) {
12+
p, _ := NewParams(0.1, 0.7, 0.2, 10, 2)
13+
r := rand.New(rand.NewSource(1))
14+
s := NewSampler(r, p.l2())
15+
self := NID{0x01}
16+
17+
p0 := make(chan NID)
18+
v0 := NewView(NID{0x01})
19+
tr0 := NewMockTransport()
20+
21+
v1 := Brahms(self, r, p, time.Millisecond*10, s, tr0, p0, v0)
22+
23+
//view should be unchanged transport returned nothing
24+
test.Equals(t, v0, v1)
25+
26+
//should have pushed our own nid
27+
test.Equals(t, true, tr0.DidPush(self))
28+
29+
//sample should be empty
30+
test.Equals(t, View{}, s.Sample())
31+
}
32+
33+
func TestBrahmsWithJustPushes(t *testing.T) {
34+
p, _ := NewParams(0.1, 0.7, 0.2, 10, 2)
35+
r := rand.New(rand.NewSource(1))
36+
s := NewSampler(r, p.l2())
37+
self := NID{0x01}
38+
39+
id1 := NID{0x01, 0x02}
40+
p0 := make(chan NID, 10)
41+
p0 <- id1
42+
v0 := NewView(NID{0x01})
43+
tr0 := NewMockTransport()
44+
45+
// with just a pull response we update the view with just that info
46+
v1 := Brahms(self, r, p, time.Millisecond*10, s, tr0, p0, v0)
47+
test.Equals(t, 0, len(p0))
48+
test.Equals(t, NewView(id1), v1)
49+
50+
// but the pushed id should have been added to the sample
51+
test.Equals(t, NewView(id1), s.Sample())
52+
53+
t.Run("with too many pushes", func(t *testing.T) {
54+
p1 := make(chan NID, 10)
55+
p1 <- NID{0xaa, 0xaa}
56+
p1 <- NID{0xbb, 0xbb} //with the given params this is too much push
57+
58+
v2 := Brahms(NID{0xff, 0xff}, r, p, time.Millisecond*10, s, tr0, p1, v0)
59+
60+
//with too many pushes the view shouldn't have changed
61+
test.Equals(t, v2, v0)
62+
})
63+
}
64+
65+
func TestBrahmsWithPullsAndPushes(t *testing.T) {
66+
p, _ := NewParams(0.1, 0.7, 0.2, 10, 2)
67+
r := rand.New(rand.NewSource(1))
68+
s := NewSampler(r, p.l2())
69+
self := NID{0x01}
70+
other := NID{0x02}
71+
72+
v0 := NewView(other)
73+
pull1 := NID{0x01, 0x02}
74+
push1 := NID{0x02, 0x02}
75+
p0 := make(chan NID, 10)
76+
p0 <- push1
77+
tr0 := NewMockTransport()
78+
tr0.SetPull(other, NewView(pull1, pull1))
79+
80+
//with both pushes and pulls the view should get updated
81+
v1 := Brahms(self, r, p, time.Millisecond*10, s, tr0, p0, v0)
82+
test.Equals(t, NewView(pull1, push1), v1)
83+
test.Equals(t, NewView(pull1, push1), s.Sample())
84+
}

core.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package brahms
2+
3+
import (
4+
"math/rand"
5+
"time"
6+
)
7+
8+
// Core implements the core algorithm
9+
type Core struct {
10+
rnd *rand.Rand
11+
self NID
12+
view View
13+
pushes chan NID
14+
params P
15+
sampler *Sampler
16+
tr Transport
17+
}
18+
19+
// NewCore initializes the core
20+
func NewCore(rnd *rand.Rand, self NID, v0 View, p P, tr Transport) (a *Core) {
21+
a = &Core{
22+
self: self,
23+
view: v0,
24+
pushes: make(chan NID, 100),
25+
params: p,
26+
sampler: NewSampler(rnd, p.l2()),
27+
tr: tr,
28+
rnd: rnd,
29+
}
30+
31+
//initialize the sampler with our initial view
32+
a.sampler.Update(v0)
33+
return
34+
}
35+
36+
// @TODO probe for sample validation
37+
38+
// ID returns this core's id
39+
func (h *Core) ID() NID { return h.self }
40+
41+
// UpdateView runs the algorithm to update the view
42+
func (h *Core) UpdateView(to time.Duration) {
43+
h.view = Brahms(h.self, h.rnd, h.params, to, h.sampler, h.tr, h.pushes, h.view)
44+
}
45+
46+
// HandlePull responds to pulls by returning a copy of our view
47+
func (h *Core) HandlePull() View {
48+
return h.view.Copy()
49+
}
50+
51+
// HandlePush handles incoming ids
52+
func (h *Core) HandlePush(id NID) {
53+
select {
54+
case h.pushes <- id:
55+
default: //push buffer is full, discard
56+
}
57+
}

core_test.go

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
package brahms
2+
3+
import (
4+
"bytes"
5+
"encoding/binary"
6+
"fmt"
7+
"math"
8+
"math/rand"
9+
"path/filepath"
10+
"testing"
11+
"time"
12+
13+
"github.com/advanderveer/go-test"
14+
)
15+
16+
func TestMiniNetCore(t *testing.T) {
17+
rnd := rand.New(rand.NewSource(1))
18+
prm, _ := NewParams(0.45, 0.45, 0.1, 100, 10)
19+
20+
//create a mini network with three cores
21+
tr := NewMemNetTransport()
22+
c1 := NewCore(rnd, NID{0x01}, NewView(NID{0x02}), prm, tr)
23+
tr.AddCore(c1)
24+
c2 := NewCore(rnd, NID{0x02}, NewView(NID{0x03}), prm, tr)
25+
tr.AddCore(c2)
26+
c3 := NewCore(rnd, NID{0x03}, NewView(), prm, tr)
27+
tr.AddCore(c3)
28+
29+
// after two iterations we should have a connected graph
30+
for i := 0; i < 2; i++ {
31+
c1.UpdateView(time.Millisecond)
32+
c2.UpdateView(time.Millisecond)
33+
c3.UpdateView(time.Millisecond)
34+
}
35+
36+
// view and sampler should show a connected graph
37+
test.Equals(t, NewView(NID{0x02}, NID{0x03}), c1.view)
38+
test.Equals(t, NewView(NID{0x02}, NID{0x03}), c1.sampler.Sample())
39+
test.Equals(t, NewView(NID{0x01}, NID{0x03}), c2.view)
40+
test.Equals(t, NewView(NID{0x01}, NID{0x03}), c2.sampler.Sample())
41+
test.Equals(t, NewView(NID{0x01}, NID{0x02}), c3.view)
42+
test.Equals(t, NewView(NID{0x01}, NID{0x02}), c3.sampler.Sample())
43+
}
44+
45+
func TestLargeNetwork(t *testing.T) {
46+
r := rand.New(rand.NewSource(1))
47+
n := uint64(100)
48+
q := 20
49+
m := 2.0
50+
l := int(math.Round(m * math.Pow(float64(n), 1.0/3)))
51+
p, _ := NewParams(
52+
0.45,
53+
0.45,
54+
0.1,
55+
l, l,
56+
)
57+
58+
tr := NewMemNetTransport()
59+
cores := make([]*Core, 0, n)
60+
for i := uint64(1); i <= n; i++ {
61+
id := NID{}
62+
other := NID{}
63+
binary.LittleEndian.PutUint64(id[:], i)
64+
binary.LittleEndian.PutUint64(other[:], i+1)
65+
if i == n {
66+
other = NID{0x01}
67+
}
68+
69+
c := NewCore(r, id, NewView(other), p, tr)
70+
tr.AddCore(c)
71+
cores = append(cores, c)
72+
}
73+
74+
for i := 0; i < q; i++ {
75+
buf := bytes.NewBuffer(nil)
76+
draw(t, buf, cores)
77+
drawPNG(t, buf, fmt.Sprintf(filepath.Join("draws", "network_%d.png"), i))
78+
79+
for _, c := range cores {
80+
c.UpdateView(time.Microsecond * 700)
81+
}
82+
}
83+
84+
var tot float64
85+
for _, c := range cores {
86+
tot += float64(len(c.view))
87+
}
88+
89+
fmt.Println("l1/l2:", p.l2(), "avg:", tot/float64(len(cores)))
90+
91+
}

draw_test.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package brahms
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"os"
7+
"os/exec"
8+
"testing"
9+
10+
"github.com/advanderveer/go-test"
11+
)
12+
13+
func drawPNG(t *testing.T, buf io.Reader, name string) {
14+
f, err := os.Create(name)
15+
test.Ok(t, err)
16+
defer f.Close()
17+
18+
cmd := exec.Command("neato", "-Tpng")
19+
cmd.Stdin = buf
20+
cmd.Stdout = f
21+
test.Ok(t, cmd.Run())
22+
}
23+
24+
func draw(t testing.TB, w io.Writer, cores []*Core) {
25+
fmt.Fprintln(w, `digraph {`)
26+
fmt.Fprintln(w, `layout=neato;`)
27+
fmt.Fprintln(w, `overlap=scalexy;`)
28+
fmt.Fprintln(w, `sep="+1";`)
29+
30+
for _, c := range cores {
31+
fmt.Fprintf(w, "\t"+`"%.6x" [style="filled,solid",label="%.6x"`, c.ID(), c.ID())
32+
fmt.Fprintf(w, `,fillcolor="#ffffff"`)
33+
fmt.Fprintf(w, "]\n")
34+
35+
for id := range c.view {
36+
fmt.Fprintf(w, "\t"+`"%.6x" -> "%.6x";`+"\n", c.ID(), id)
37+
}
38+
}
39+
40+
fmt.Fprintln(w, `}`)
41+
}

draws/.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
*.png

go.mod

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
module github.com/advanderveer/brahms
2+
3+
go 1.12
4+
5+
require (
6+
github.com/advanderveer/go-test v1.0.1
7+
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2
8+
)

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
github.com/advanderveer/go-test v1.0.1 h1:FuP/a1WOTT66sx7J6oeY+7ACmNEvBxiJh6ZdR1fc5pQ=
2+
github.com/advanderveer/go-test v1.0.1/go.mod h1:cTvlXX1T7hQFO4xVUN2FEVnwEkhzUZzapJOi4JCoX1I=
3+
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 h1:tdlZCpZ/P9DhczCTSixgIKmwPv6+wP5DGjqLYw5SUiA=
4+
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw=

0 commit comments

Comments
 (0)