Skip to content

Commit 7898870

Browse files
committed
Complete coverage
1 parent 6d03114 commit 7898870

File tree

2 files changed

+41
-7
lines changed

2 files changed

+41
-7
lines changed

event.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,18 @@ package goneli
22

33
import (
44
"fmt"
5+
6+
"github.com/obsidiandynamics/libstdgo/concurrent"
57
)
68

79
// Barrier is a callback function for handling Neli events during group rebalancing.
810
type Barrier func(e Event)
911

1012
// NopBarrier returns a no-op barrier implementation.
1113
func NopBarrier() Barrier {
12-
return func(e Event) {}
14+
return func(e Event) {
15+
concurrent.Nop()
16+
}
1317
}
1418

1519
// Event encapsulates a Neli event.

neli_test.go

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -145,9 +145,9 @@ func TestErrorDuringSubscribe(t *testing.T) {
145145
}
146146

147147
func TestPulseNotLeader(t *testing.T) {
148-
_, _, config, b := fixtures{}.create()
148+
_, _, config, _ := fixtures{}.create()
149149

150-
n, err := New(config, b.barrier())
150+
n, err := New(config)
151151
require.Nil(t, err)
152152

153153
isLeader, err := n.Pulse(1 * time.Millisecond)
@@ -159,9 +159,9 @@ func TestPulseNotLeader(t *testing.T) {
159159
}
160160

161161
func TestPulseAfterClose(t *testing.T) {
162-
_, _, config, b := fixtures{}.create()
162+
_, _, config, _ := fixtures{}.create()
163163

164-
n, err := New(config, b.barrier())
164+
n, err := New(config)
165165
require.Nil(t, err)
166166

167167
assertNoError(t, n.Close)
@@ -173,9 +173,9 @@ func TestPulseAfterClose(t *testing.T) {
173173
}
174174

175175
func TestDeadline(t *testing.T) {
176-
_, _, config, b := fixtures{}.create()
176+
_, _, config, _ := fixtures{}.create()
177177

178-
n, err := New(config, b.barrier())
178+
n, err := New(config)
179179
require.Nil(t, err)
180180

181181
assert.Equal(t, n.Deadline().Last(), time.Unix(0, 0))
@@ -258,6 +258,36 @@ func TestBasicLeaderElectionAndRevocation(t *testing.T) {
258258
assertNoError(t, p.Await)
259259
}
260260

261+
func TestLeaderElectionAndRevocation_nopBarrier(t *testing.T) {
262+
m, cons, config, _ := fixtures{}.create()
263+
264+
n, err := New(config)
265+
require.Nil(t, err)
266+
267+
onLeaderCnt := concurrent.NewAtomicCounter()
268+
p, err := n.Background(func() {
269+
onLeaderCnt.Inc()
270+
})
271+
require.Nil(t, err)
272+
273+
// Starts off in a non-leader state
274+
assert.Equal(t, false, n.IsLeader())
275+
276+
// Assign leadership via the rebalance listener and wait for the assignment to take effect
277+
cons.rebalanceEvents <- assignedPartitions(0, 1, 2)
278+
wait(t).UntilAsserted(isTrue(n.IsLeader))
279+
wait(t).UntilAsserted(m.ContainsEntries().
280+
Having(scribe.LogLevel(scribe.Info)).
281+
Having(scribe.MessageEqual("Elected as leader")).
282+
Passes(scribe.Count(1)))
283+
m.Reset()
284+
wait(t).UntilAsserted(atLeast(1, onLeaderCnt.GetInt))
285+
286+
assertNoError(t, n.Close)
287+
n.Await()
288+
assertNoError(t, p.Await)
289+
}
290+
261291
func TestNonFatalErrorInReadMessage(t *testing.T) {
262292
m, cons, config, _ := fixtures{}.create()
263293

0 commit comments

Comments
 (0)