Skip to content

Commit a4a403e

Browse files
author
Travis Jeffery
authored
Merge pull request #77 from travisjeffery/tests
more broker tests
2 parents be0ab37 + 37c8608 commit a4a403e

File tree

2 files changed

+56
-26
lines changed

2 files changed

+56
-26
lines changed

broker/broker.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -256,8 +256,7 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco
256256
}
257257
for i, p := range req.PartitionStates {
258258
partition, err := b.partition(p.Topic, p.Partition)
259-
// TODO: seems ok to have protocol.ErrUnknownTopicOrPartition here?
260-
if err != protocol.ErrNone {
259+
if err != protocol.ErrUnknownTopicOrPartition && err != protocol.ErrNone {
261260
setErr(i, p, err)
262261
continue
263262
}
@@ -289,6 +288,7 @@ func (b *Broker) handleLeaderAndISR(header *protocol.RequestHeader, req *protoco
289288
continue
290289
}
291290
}
291+
resp.Partitions[i] = &protocol.LeaderAndISRPartition{Partition: p.Partition, Topic: p.Topic, ErrorCode: protocol.ErrNone.Code()}
292292
}
293293
return resp
294294
}

broker/broker_test.go

Lines changed: 54 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -444,6 +444,40 @@ func TestBroker_Run(t *testing.T) {
444444
}}}},
445445
},
446446
},
447+
{
448+
name: "leader and isr leader new partition",
449+
fields: newFields(),
450+
args: args{
451+
requestCh: make(chan jocko.Request, 2),
452+
responseCh: make(chan jocko.Response, 2),
453+
requests: []jocko.Request{{
454+
Header: &protocol.RequestHeader{CorrelationID: 2},
455+
Request: &protocol.LeaderAndISRRequest{
456+
PartitionStates: []*protocol.PartitionState{
457+
{
458+
Topic: "the-topic",
459+
Partition: 1,
460+
ISR: []int32{1},
461+
Replicas: []int32{1},
462+
Leader: 1,
463+
ZKVersion: 1,
464+
},
465+
},
466+
}},
467+
},
468+
responses: []jocko.Response{{
469+
Header: &protocol.RequestHeader{CorrelationID: 2},
470+
Response: &protocol.Response{CorrelationID: 2, Body: &protocol.LeaderAndISRResponse{
471+
Partitions: []*protocol.LeaderAndISRPartition{
472+
{
473+
ErrorCode: protocol.ErrNone.Code(),
474+
Partition: 1,
475+
Topic: "the-topic",
476+
},
477+
},
478+
}}}},
479+
},
480+
},
447481
}
448482
for _, tt := range tests {
449483
os.RemoveAll("/tmp/jocko")
@@ -1225,42 +1259,32 @@ func TestBroker_deletePartitions(t *testing.T) {
12251259
}
12261260

12271261
func TestBroker_Shutdown(t *testing.T) {
1228-
type fields struct {
1229-
logger *simplelog.Logger
1230-
id int32
1231-
topicMap map[string][]*jocko.Partition
1232-
replicators map[*jocko.Partition]*Replicator
1233-
brokerAddr string
1234-
logDir string
1235-
raft jocko.Raft
1236-
serf jocko.Serf
1237-
shutdownCh chan struct{}
1238-
shutdown bool
1239-
}
12401262
tests := []struct {
12411263
name string
12421264
fields fields
12431265
wantErr bool
12441266
}{
1245-
// TODO: Add test cases.
1267+
{
1268+
name: "shutdown ok",
1269+
fields: newFields(),
1270+
wantErr: false,
1271+
},
12461272
}
12471273
for _, tt := range tests {
12481274
t.Run(tt.name, func(t *testing.T) {
1249-
b := &Broker{
1250-
logger: tt.fields.logger,
1251-
id: tt.fields.id,
1252-
topicMap: tt.fields.topicMap,
1253-
replicators: tt.fields.replicators,
1254-
brokerAddr: tt.fields.brokerAddr,
1255-
logDir: tt.fields.logDir,
1256-
raft: tt.fields.raft,
1257-
serf: tt.fields.serf,
1258-
shutdownCh: tt.fields.shutdownCh,
1259-
shutdown: tt.fields.shutdown,
1275+
b, err := New(tt.fields.id, Addr(tt.fields.brokerAddr), Serf(tt.fields.serf), Raft(tt.fields.raft), Logger(tt.fields.logger), RaftCommands(tt.fields.raftCommands), LogDir(tt.fields.logDir))
1276+
if err != nil {
1277+
t.Errorf("Broker.New() error = %v, wanted nil", err)
12601278
}
12611279
if err := b.Shutdown(); (err != nil) != tt.wantErr {
12621280
t.Errorf("Broker.Shutdown() error = %v, wantErr %v", err, tt.wantErr)
12631281
}
1282+
if tt.fields.raft.ShutdownInvoked != true {
1283+
t.Errorf("did not shutdown raft")
1284+
}
1285+
if tt.fields.serf.ShutdownInvoked != true {
1286+
t.Errorf("did not shutdown raft")
1287+
}
12641288
})
12651289
}
12661290
}
@@ -1415,6 +1439,9 @@ func newFields() fields {
14151439
MemberFn: func(memberID int32) *jocko.ClusterMember {
14161440
return &jocko.ClusterMember{ID: 1}
14171441
},
1442+
ShutdownFn: func() error {
1443+
return nil
1444+
},
14181445
}
14191446
raft := &mock.Raft{
14201447
AddrFn: func() string {
@@ -1438,6 +1465,9 @@ func newFields() fields {
14381465
ApplyFn: func(jocko.RaftCommand) error {
14391466
return nil
14401467
},
1468+
ShutdownFn: func() error {
1469+
return nil
1470+
},
14411471
}
14421472
return fields{
14431473
topicMap: make(map[string][]*jocko.Partition),

0 commit comments

Comments
 (0)