Skip to content

Commit 133fd3c

Browse files
committed
fixes for the review
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent c77c8ca commit 133fd3c

File tree

4 files changed

+78
-73
lines changed

4 files changed

+78
-73
lines changed

test/basic_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,20 @@ func TestSubmitAndReceive(t *testing.T) {
131131
endBlock := uint64(tt.numOfShards)
132132
errString := "cancelled pull from assembler: %d"
133133

134-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, tt.numOfShards+1, errString)
134+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, tt.numOfShards+1, errString, 30)
135135

136136
// Pull first two blocks and count them.
137137
startBlock = uint64(0)
138138
endBlock = uint64(1)
139139

140-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, int((endBlock-startBlock)+1), errString)
140+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, int((endBlock-startBlock)+1), errString, 30)
141141

142142
// Pull more block, then cancel.
143143
startBlock = uint64(1)
144144
endBlock = uint64(1000)
145145
errString = "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing"
146146

147-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, 0, errString)
147+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, 0, errString, 30)
148148
})
149149
}
150150
}

test/batcher_test.go

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import (
2424
)
2525

2626
const (
27-
// Number of shards in the test
28-
numOfShards = 1
2927
// Number of parties in the test
3028
numOfParties = 4
3129
)
@@ -49,15 +47,15 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
4947
require.NoError(t, err)
5048
require.NotNil(t, armaBinaryPath)
5149

52-
t.Logf("Running test with %d parties and %d shards", numOfParties, numOfShards)
50+
t.Logf("Running test with %d parties and %d shards", numOfParties, 1)
5351

5452
// Create a temporary directory for the test
5553
dir, err := os.MkdirTemp("", t.Name())
5654
require.NoError(t, err)
5755
defer os.RemoveAll(dir)
5856

5957
configPath := filepath.Join(dir, "config.yaml")
60-
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
58+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 1, "none", "none")
6159
require.NoError(t, err)
6260
numOfArmaNodes := len(netInfo)
6361

@@ -111,24 +109,16 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
111109
totalTxSent += totalTxNumber
112110

113111
// Pull from Assemblers
114-
blockInfos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
112+
infos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
115113

116114
// Get the primary batcher
117-
partyBlockInfos := blockInfos[types.PartyID(1)]
118-
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
115+
primaryBatcherId := infos[types.PartyID(1)].Primary[types.ShardID(1)]
119116
primaryBatcher := armaNetwork.GeBatcher(t, primaryBatcherId, types.ShardID(1))
120-
correctParties := []types.PartyID{}
121117

122118
// 3. Stop the primary batcher
123119
t.Logf("Stopping primary batcher: party %d", primaryBatcher.PartyId)
124120
primaryBatcher.StopArmaNode()
125121

126-
for partyID := 1; partyID <= numOfParties; partyID++ {
127-
if primaryBatcherId != types.PartyID(partyID) {
128-
correctParties = append(correctParties, types.PartyID(partyID))
129-
}
130-
}
131-
132122
stalled := false
133123
routerToStall := armaNetwork.GetRouter(t, primaryBatcher.PartyId)
134124

@@ -150,20 +140,25 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
150140
}
151141
}
152142

153-
// test that the router of party get stalled in the some point
143+
// make sure the router of the faulty party got stalled
154144
require.True(t, stalled, "expected router to stall but it did not")
155145
broadcastClient.Stop()
156146

157147
totalTxSent += totalTxNumber
158148

159149
// 5.
160-
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
161-
blockInfos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
162-
partyBlockInfos = blockInfos[correctParties[0]]
163-
newPrimaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
150+
// make sure assemblers of correct parties continue to get transactions (expect 2000 TXs).
151+
152+
correctParties := []types.PartyID{}
153+
for partyID := 1; partyID <= numOfParties; partyID++ {
154+
if primaryBatcherId != types.PartyID(partyID) {
155+
correctParties = append(correctParties, types.PartyID(partyID))
156+
}
157+
}
158+
infos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
164159

165160
// check that the primary batcher has changed
166-
require.NotEqual(t, primaryBatcherId, newPrimaryBatcherId, "expected primary batcher not to remain the same")
161+
require.True(t, infos[correctParties[0]].TermChanged, "expected primary batcher not to remain the same")
167162

168163
// 6.
169164
t.Logf("Restarting Batcher: party %d", primaryBatcher.PartyId)
@@ -172,7 +167,7 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
172167

173168
testutil.WaitReady(t, readyChan, 1, 10)
174169

175-
PullFromAssemblers(t, uc, []types.PartyID{primaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
170+
PullFromAssemblers(t, uc, []types.PartyID{primaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
176171

177172
// 7.
178173
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
@@ -196,8 +191,8 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
196191
totalTxSent += totalTxNumber
197192

198193
// Pull from Assemblers
199-
// make sure clients of all the parties get transactions (expect 3000 TXs).
200-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
194+
// make sure assemblers of all the parties get transactions (expect 3000 TXs).
195+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
201196
}
202197

203198
// Simulates a scenario where a secondary batcher node is stopped and restarted.
@@ -219,15 +214,15 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
219214
require.NoError(t, err)
220215
require.NotNil(t, armaBinaryPath)
221216

222-
t.Logf("Running test with %d parties and %d shards", numOfParties, numOfShards)
217+
t.Logf("Running test with %d parties and %d shards", numOfParties, 1)
223218

224219
// Create a temporary directory for the test
225220
dir, err := os.MkdirTemp("", t.Name())
226221
require.NoError(t, err)
227222
defer os.RemoveAll(dir)
228223

229224
configPath := filepath.Join(dir, "config.yaml")
230-
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
225+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 1, "none", "none")
231226
require.NoError(t, err)
232227
numOfArmaNodes := len(netInfo)
233228

@@ -283,10 +278,8 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
283278
totalTxSent += totalTxNumber
284279

285280
// Pull from Assemblers
286-
blockInfos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
287-
288-
partyBlockInfos := blockInfos[types.PartyID(1)]
289-
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
281+
infos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
282+
primaryBatcherId := infos[types.PartyID(1)].Primary[types.ShardID(1)]
290283
correctParties := []types.PartyID{}
291284

292285
var secondaryBatcher *testutil.ArmaNodeInfo = nil
@@ -326,20 +319,18 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
326319
}
327320
}
328321

329-
// test that the router of party get stalled in the some point
322+
// make sure the router of the faulty party got stalled
330323
require.True(t, stalled, "expected router to stall but it did not")
331324
broadcastClient.Stop()
332325

333326
totalTxSent += totalTxNumber
334327

335328
// 5.
336-
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
337-
blockInfos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
338-
partyBlockInfos = blockInfos[correctParties[0]]
339-
newPrimaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
329+
// make sure assemblers of correct parties continue to get transactions (expect 2000 TXs).
330+
infos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
340331

341332
// make sure the primary batcher did not change
342-
require.Equal(t, primaryBatcherId, newPrimaryBatcherId, "expected primary batcher to remain the same")
333+
require.False(t, infos[correctParties[0]].TermChanged, "expected primary batcher to remain the same")
343334

344335
// 6.
345336
t.Logf("Restarting Batcher %d of party %d", secondaryBatcher.PartyId, secondaryBatcher.PartyId)
@@ -348,7 +339,7 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
348339

349340
testutil.WaitReady(t, readyChan, 1, 10)
350341

351-
PullFromAssemblers(t, uc, []types.PartyID{secondaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
342+
PullFromAssemblers(t, uc, []types.PartyID{secondaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
352343

353344
// 7.
354345
// make sure 2f+1 routers are receiving TXs w/o problems
@@ -373,6 +364,6 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
373364
totalTxSent += totalTxNumber
374365

375366
// Pull from Assemblers
376-
// make sure clients of all the parties get transactions (expect 3000 TXs).
377-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
367+
// make sure assemblers of all the parties get transactions (expect 3000 TXs).
368+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
378369
}

test/consensus_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestSubmitStopThenRestartConsenter(t *testing.T) {
7878
parties[i] = types.PartyID(i + 1)
7979
}
8080
errString := "cancelled pull from assembler: %d"
81-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 1000, 0, errString)
81+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 1000, 0, errString, 30)
8282

8383
partyToRestart := types.PartyID(3)
8484
consenterToRestart := armaNetwork.GetConsenter(t, partyToRestart)
@@ -98,7 +98,7 @@ func TestSubmitStopThenRestartConsenter(t *testing.T) {
9898
correctParties = append(correctParties, types.PartyID(i+1))
9999
}
100100
}
101-
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, 1500, 0, errString)
101+
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, 1500, 0, errString, 30)
102102

103103
consenterToRestart.RestartArmaNode(t, readyChan, numOfParties)
104104
testutil.WaitReady(t, readyChan, 1, 10)
@@ -111,5 +111,5 @@ func TestSubmitStopThenRestartConsenter(t *testing.T) {
111111

112112
waitForTxSent.Wait()
113113

114-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 2000, 0, errString)
114+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 2000, 0, errString, 30)
115115
}

test/utils_test.go

Lines changed: 43 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"fmt"
1919
"os"
2020
"sync"
21-
"sync/atomic"
2221
"testing"
2322
"time"
2423

@@ -33,7 +32,6 @@ import (
3332
nodeconfig "github.com/hyperledger/fabric-x-orderer/node/config"
3433
"github.com/hyperledger/fabric-x-orderer/node/consensus"
3534
"github.com/hyperledger/fabric-x-orderer/node/crypto"
36-
"github.com/hyperledger/fabric-x-orderer/node/ledger"
3735
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
3836
"github.com/hyperledger/fabric-x-orderer/node/router"
3937
"github.com/hyperledger/fabric-x-orderer/testutil"
@@ -329,41 +327,50 @@ func sendTxn(workerID int, txnNum int, routers []*router.Router) {
329327
}
330328
}
331329

332-
func PullFromAssemblers(t *testing.T, userConfig *armageddon.UserConfig, parties []types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, errString string) map[types.PartyID][]types.BatchID {
330+
type BlockPullerInfo struct {
331+
TotalTxs uint64
332+
TotalBlocks uint64
333+
Primary map[types.ShardID]types.PartyID
334+
TermChanged bool
335+
}
336+
337+
func PullFromAssemblers(t *testing.T, userConfig *armageddon.UserConfig, parties []types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, errString string, timeout int) map[types.PartyID]*BlockPullerInfo {
333338
var waitForPullDone sync.WaitGroup
334-
blockInfos := make(map[types.PartyID][]types.BatchID, len(parties))
339+
pullInfos := make(map[types.PartyID]*BlockPullerInfo, len(parties))
340+
lock := sync.Mutex{}
335341

336342
for _, partyID := range parties {
337343
waitForPullDone.Add(1)
338344

339345
go func() {
340346
defer waitForPullDone.Done()
341347

342-
totalTxs, totalBlocks, bInfos, err := PullFromAssembler(t, userConfig, partyID, startBlock, endBlock, transactions, blocks)
343-
blockInfos[partyID] = bInfos
348+
pullInfo, err := PullFromAssembler(t, userConfig, partyID, startBlock, endBlock, transactions, blocks, timeout)
349+
lock.Lock()
350+
defer lock.Unlock()
351+
pullInfos[partyID] = pullInfo
344352
errString := fmt.Sprintf(errString, partyID)
345353
require.ErrorContains(t, err, errString)
346354
// TODO: check that we get all of the submitted transactions
347-
require.LessOrEqual(t, uint64(transactions), totalTxs)
348-
if blocks > 0 {
349-
require.LessOrEqual(t, uint64(blocks), totalBlocks)
350-
}
355+
require.GreaterOrEqual(t, int64(pullInfo.TotalTxs), int64(transactions))
356+
require.GreaterOrEqual(t, int64(pullInfo.TotalBlocks), int64(blocks))
351357
}()
352358
}
353359

354360
waitForPullDone.Wait()
355-
return blockInfos
361+
return pullInfos
356362
}
357363

358-
func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int) (uint64, uint64, []types.BatchID, error) {
364+
func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, timeout int) (*BlockPullerInfo, error) {
359365
require.NotNil(t, userConfig)
360366
dc := client.NewDeliverClient(userConfig)
361-
toCtx, toCancel := context.WithTimeout(context.Background(), 60*time.Second)
367+
toCtx, toCancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
362368
defer toCancel()
363369

364370
totalTxs := uint64(0)
365371
totalBlocks := uint64(0)
366-
blockInfos := make([]types.BatchID, 0)
372+
primaryMap := make(map[types.ShardID]types.PartyID)
373+
termChanged := false
367374

368375
expectedNumOfTxs := uint64(transactions + 1)
369376
expectedNumOfBlocks := uint64(blocks)
@@ -376,23 +383,30 @@ func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID
376383
return errors.New("nil block header")
377384
}
378385

379-
atomic.AddUint64(&totalBlocks, uint64(1))
380-
atomic.AddUint64(&totalTxs, uint64(len(block.GetData().GetData())))
381-
batchId, _, _, err := ledger.AssemblerBatchIdOrderingInfoAndTxCountFromBlock(block)
382-
if err != nil {
383-
return err
384-
}
385-
blockInfos = append(blockInfos, batchId)
386+
shardIDBytes := block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_ORDERER][2:4]
387+
shardID := types.ShardID(binary.BigEndian.Uint16(shardIDBytes))
386388

387-
if blocks > 0 {
388-
if atomic.CompareAndSwapUint64(&totalBlocks, expectedNumOfBlocks, uint64(blocks)) {
389-
toCancel()
389+
if shardID != types.ShardIDConsensus {
390+
primaryIDBytes := block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_ORDERER][0:2]
391+
primaryID := types.PartyID(binary.BigEndian.Uint16(primaryIDBytes))
392+
393+
if pr, ok := primaryMap[shardID]; !ok {
394+
primaryMap[shardID] = primaryID
395+
} else if pr != primaryID {
396+
t.Logf("primary id changed from %d to %d", pr, primaryID)
397+
termChanged = true
398+
primaryMap[shardID] = primaryID
390399
}
391400
}
392-
if transactions > 0 {
393-
if atomic.CompareAndSwapUint64(&totalTxs, expectedNumOfTxs, uint64(transactions)) {
394-
toCancel()
395-
}
401+
402+
totalBlocks++
403+
totalTxs += uint64(len(block.GetData().GetData()))
404+
405+
if blocks > 0 && totalBlocks >= expectedNumOfBlocks {
406+
toCancel()
407+
}
408+
if transactions > 0 && totalTxs >= expectedNumOfTxs {
409+
toCancel()
396410
}
397411

398412
return nil
@@ -401,5 +415,5 @@ func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID
401415
fmt.Printf("Pulling from party: %d\n", partyID)
402416
err := dc.PullBlocks(toCtx, partyID, startBlock, endBlock, handler)
403417
fmt.Printf("Finished pull and count: blocks %d, txs %d from party: %d\n", totalBlocks, totalTxs, partyID)
404-
return totalTxs, totalBlocks, blockInfos, err
418+
return &BlockPullerInfo{TotalTxs: totalTxs, TotalBlocks: totalBlocks, Primary: primaryMap, TermChanged: termChanged}, err
405419
}

0 commit comments

Comments
 (0)