Skip to content

Commit 7518f22

Browse files
committed
DeliveryClient refactored a bit
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent efff161 commit 7518f22

File tree

4 files changed

+81
-60
lines changed

4 files changed

+81
-60
lines changed

test/basic_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,20 @@ func TestSubmitAndReceive(t *testing.T) {
131131
endBlock := uint64(tt.numOfShards)
132132
errString := "cancelled pull from assembler: %d"
133133

134-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, tt.numOfShards+1, errString)
134+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, tt.numOfShards+1, errString, 30)
135135

136136
// Pull first two blocks and count them.
137137
startBlock = uint64(0)
138138
endBlock = uint64(1)
139139

140-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, int((endBlock-startBlock)+1), errString)
140+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, int((endBlock-startBlock)+1), errString, 30)
141141

142142
// Pull more block, then cancel.
143143
startBlock = uint64(1)
144144
endBlock = uint64(1000)
145145
errString = "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing"
146146

147-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, 0, errString)
147+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, 0, errString, 30)
148148
})
149149
}
150150
}

test/batcher_test.go

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ import (
2424
)
2525

2626
const (
27-
// Number of shards in the test
28-
numOfShards = 1
2927
// Number of parties in the test
3028
numOfParties = 4
3129
)
@@ -49,15 +47,15 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
4947
require.NoError(t, err)
5048
require.NotNil(t, armaBinaryPath)
5149

52-
t.Logf("Running test with %d parties and %d shards", numOfParties, numOfShards)
50+
t.Logf("Running test with %d parties and %d shards", numOfParties, 1)
5351

5452
// Create a temporary directory for the test
5553
dir, err := os.MkdirTemp("", t.Name())
5654
require.NoError(t, err)
5755
defer os.RemoveAll(dir)
5856

5957
configPath := filepath.Join(dir, "config.yaml")
60-
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
58+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 1, "none", "none")
6159
require.NoError(t, err)
6260
numOfArmaNodes := len(netInfo)
6361

@@ -111,24 +109,16 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
111109
totalTxSent += totalTxNumber
112110

113111
// Pull from Assemblers
114-
blockInfos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
112+
infos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
115113

116114
// Get the primary batcher
117-
partyBlockInfos := blockInfos[types.PartyID(1)]
118-
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
115+
primaryBatcherId := infos[types.PartyID(1)].Primary[PrimaryID{ShardID: 1}]
119116
primaryBatcher := armaNetwork.GeBatcher(t, primaryBatcherId, types.ShardID(1))
120-
correctParties := []types.PartyID{}
121117

122118
// 3. Stop the primary batcher
123119
t.Logf("Stopping primary batcher: party %d", primaryBatcher.PartyId)
124120
primaryBatcher.StopArmaNode()
125121

126-
for partyID := 1; partyID <= numOfParties; partyID++ {
127-
if primaryBatcherId != types.PartyID(partyID) {
128-
correctParties = append(correctParties, types.PartyID(partyID))
129-
}
130-
}
131-
132122
stalled := false
133123
routerToStall := armaNetwork.GetRouter(t, primaryBatcher.PartyId)
134124

@@ -150,20 +140,25 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
150140
}
151141
}
152142

153-
// test that the router of party get stalled in the some point
143+
// make sure the router of the faulty party got stalled
154144
require.True(t, stalled, "expected router to stall but it did not")
155145
broadcastClient.Stop()
156146

157147
totalTxSent += totalTxNumber
158148

159149
// 5.
160-
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
161-
blockInfos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
162-
partyBlockInfos = blockInfos[correctParties[0]]
163-
newPrimaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
150+
// make sure assemblers of correct parties continue to get transactions (expect 2000 TXs).
151+
152+
correctParties := []types.PartyID{}
153+
for partyID := 1; partyID <= numOfParties; partyID++ {
154+
if primaryBatcherId != types.PartyID(partyID) {
155+
correctParties = append(correctParties, types.PartyID(partyID))
156+
}
157+
}
158+
infos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
164159

165160
// check that the primary batcher has changed
166-
require.NotEqual(t, primaryBatcherId, newPrimaryBatcherId, "expected primary batcher not to remain the same")
161+
require.True(t, infos[correctParties[0]].TermChanged, "expected primary batcher not to remain the same")
167162

168163
// 6.
169164
t.Logf("Restarting Batcher: party %d", primaryBatcher.PartyId)
@@ -172,7 +167,7 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
172167

173168
testutil.WaitReady(t, readyChan, 1, 10)
174169

175-
PullFromAssemblers(t, uc, []types.PartyID{primaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
170+
PullFromAssemblers(t, uc, []types.PartyID{primaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
176171

177172
// 7.
178173
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
@@ -196,8 +191,8 @@ func TestPrimaryBatcherRestartRecover(t *testing.T) {
196191
totalTxSent += totalTxNumber
197192

198193
// Pull from Assemblers
199-
// make sure clients of all the parties get transactions (expect 3000 TXs).
200-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
194+
// make sure assemblers of all the parties get transactions (expect 3000 TXs).
195+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
201196
}
202197

203198
// Simulates a scenario where a secondary batcher node is stopped and restarted.
@@ -219,15 +214,15 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
219214
require.NoError(t, err)
220215
require.NotNil(t, armaBinaryPath)
221216

222-
t.Logf("Running test with %d parties and %d shards", numOfParties, numOfShards)
217+
t.Logf("Running test with %d parties and %d shards", numOfParties, 1)
223218

224219
// Create a temporary directory for the test
225220
dir, err := os.MkdirTemp("", t.Name())
226221
require.NoError(t, err)
227222
defer os.RemoveAll(dir)
228223

229224
configPath := filepath.Join(dir, "config.yaml")
230-
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
225+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 1, "none", "none")
231226
require.NoError(t, err)
232227
numOfArmaNodes := len(netInfo)
233228

@@ -283,10 +278,8 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
283278
totalTxSent += totalTxNumber
284279

285280
// Pull from Assemblers
286-
blockInfos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
287-
288-
partyBlockInfos := blockInfos[types.PartyID(1)]
289-
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
281+
infos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
282+
primaryBatcherId := infos[types.PartyID(1)].Primary[PrimaryID{ShardID: 1}]
290283
correctParties := []types.PartyID{}
291284

292285
var secondaryBatcher *testutil.ArmaNodeInfo = nil
@@ -326,20 +319,18 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
326319
}
327320
}
328321

329-
// test that the router of party get stalled in the some point
322+
// make sure the router of the faulty party got stalled
330323
require.True(t, stalled, "expected router to stall but it did not")
331324
broadcastClient.Stop()
332325

333326
totalTxSent += totalTxNumber
334327

335328
// 5.
336-
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
337-
blockInfos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
338-
partyBlockInfos = blockInfos[correctParties[0]]
339-
newPrimaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
329+
// make sure assemblers of correct parties continue to get transactions (expect 2000 TXs).
330+
infos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
340331

341332
// make sure the primary batcher did not change
342-
require.Equal(t, primaryBatcherId, newPrimaryBatcherId, "expected primary batcher to remain the same")
333+
require.False(t, infos[correctParties[0]].TermChanged, "expected primary batcher to remain the same")
343334

344335
// 6.
345336
t.Logf("Restarting Batcher %d of party %d", secondaryBatcher.PartyId, secondaryBatcher.PartyId)
@@ -348,7 +339,7 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
348339

349340
testutil.WaitReady(t, readyChan, 1, 10)
350341

351-
PullFromAssemblers(t, uc, []types.PartyID{secondaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
342+
PullFromAssemblers(t, uc, []types.PartyID{secondaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
352343

353344
// 7.
354345
// make sure 2f+1 routers are receiving TXs w/o problems
@@ -373,6 +364,6 @@ func TestSecondaryBatcherRestartRecover(t *testing.T) {
373364
totalTxSent += totalTxNumber
374365

375366
// Pull from Assemblers
376-
// make sure clients of all the parties get transactions (expect 3000 TXs).
377-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
367+
// make sure assemblers of all the parties get transactions (expect 3000 TXs).
368+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
378369
}

test/consensus_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func TestSubmitStopThenRestartConsenter(t *testing.T) {
7878
parties[i] = types.PartyID(i + 1)
7979
}
8080
errString := "cancelled pull from assembler: %d"
81-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 1000, 0, errString)
81+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 1000, 0, errString, 30)
8282

8383
partyToRestart := types.PartyID(3)
8484
consenterToRestart := armaNetwork.GetConsenter(t, partyToRestart)
@@ -98,7 +98,7 @@ func TestSubmitStopThenRestartConsenter(t *testing.T) {
9898
correctParties = append(correctParties, types.PartyID(i+1))
9999
}
100100
}
101-
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, 1500, 0, errString)
101+
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, 1500, 0, errString, 30)
102102

103103
consenterToRestart.RestartArmaNode(t, readyChan, numOfParties)
104104
testutil.WaitReady(t, readyChan, 1, 10)
@@ -111,5 +111,5 @@ func TestSubmitStopThenRestartConsenter(t *testing.T) {
111111

112112
waitForTxSent.Wait()
113113

114-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 2000, 0, errString)
114+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, 2000, 0, errString, 30)
115115
}

test/utils_test.go

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ import (
3333
nodeconfig "github.com/hyperledger/fabric-x-orderer/node/config"
3434
"github.com/hyperledger/fabric-x-orderer/node/consensus"
3535
"github.com/hyperledger/fabric-x-orderer/node/crypto"
36-
"github.com/hyperledger/fabric-x-orderer/node/ledger"
3736
protos "github.com/hyperledger/fabric-x-orderer/node/protos/comm"
3837
"github.com/hyperledger/fabric-x-orderer/node/router"
3938
"github.com/hyperledger/fabric-x-orderer/testutil"
@@ -329,41 +328,60 @@ func sendTxn(workerID int, txnNum int, routers []*router.Router) {
329328
}
330329
}
331330

332-
func PullFromAssemblers(t *testing.T, userConfig *armageddon.UserConfig, parties []types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, errString string) map[types.PartyID][]types.BatchID {
331+
type PrimaryID struct {
332+
ShardID types.ShardID
333+
}
334+
335+
func (p *PrimaryID) Digest() types.ShardID {
336+
return p.ShardID
337+
}
338+
339+
type BlockPullerInfo struct {
340+
TotalTxs uint64
341+
TotalBlocks uint64
342+
Primary map[PrimaryID]types.PartyID
343+
TermChanged bool
344+
}
345+
346+
func PullFromAssemblers(t *testing.T, userConfig *armageddon.UserConfig, parties []types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, errString string, timeout int) map[types.PartyID]*BlockPullerInfo {
333347
var waitForPullDone sync.WaitGroup
334-
blockInfos := make(map[types.PartyID][]types.BatchID, len(parties))
348+
pullInfos := make(map[types.PartyID]*BlockPullerInfo, len(parties))
349+
lock := sync.Mutex{}
335350

336351
for _, partyID := range parties {
337352
waitForPullDone.Add(1)
338353

339354
go func() {
340355
defer waitForPullDone.Done()
341356

342-
totalTxs, totalBlocks, bInfos, err := PullFromAssembler(t, userConfig, partyID, startBlock, endBlock, transactions, blocks)
343-
blockInfos[partyID] = bInfos
357+
pullInfo, err := PullFromAssembler(t, userConfig, partyID, startBlock, endBlock, transactions, blocks, timeout)
358+
lock.Lock()
359+
defer lock.Unlock()
360+
pullInfos[partyID] = pullInfo
344361
errString := fmt.Sprintf(errString, partyID)
345362
require.ErrorContains(t, err, errString)
346363
// TODO: check that we get all of the submitted transactions
347-
require.LessOrEqual(t, uint64(transactions), totalTxs)
364+
require.GreaterOrEqual(t, pullInfo.TotalTxs, uint64(transactions))
348365
if blocks > 0 {
349-
require.LessOrEqual(t, uint64(blocks), totalBlocks)
366+
require.GreaterOrEqual(t, pullInfo.TotalBlocks, uint64(blocks))
350367
}
351368
}()
352369
}
353370

354371
waitForPullDone.Wait()
355-
return blockInfos
372+
return pullInfos
356373
}
357374

358-
func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int) (uint64, uint64, []types.BatchID, error) {
375+
func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, timeout int) (*BlockPullerInfo, error) {
359376
require.NotNil(t, userConfig)
360377
dc := client.NewDeliverClient(userConfig)
361-
toCtx, toCancel := context.WithTimeout(context.Background(), 60*time.Second)
378+
toCtx, toCancel := context.WithTimeout(context.Background(), time.Duration(timeout)*time.Second)
362379
defer toCancel()
363380

364381
totalTxs := uint64(0)
365382
totalBlocks := uint64(0)
366-
blockInfos := make([]types.BatchID, 0)
383+
primaryMap := make(map[PrimaryID]types.PartyID)
384+
termChanged := false
367385

368386
expectedNumOfTxs := uint64(transactions + 1)
369387
expectedNumOfBlocks := uint64(blocks)
@@ -378,11 +396,23 @@ func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID
378396

379397
atomic.AddUint64(&totalBlocks, uint64(1))
380398
atomic.AddUint64(&totalTxs, uint64(len(block.GetData().GetData())))
381-
batchId, _, _, err := ledger.AssemblerBatchIdOrderingInfoAndTxCountFromBlock(block)
382-
if err != nil {
383-
return err
399+
400+
shardIDBytes := block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_ORDERER][2:4]
401+
shardID := types.ShardID(binary.BigEndian.Uint16(shardIDBytes))
402+
403+
if shardID != types.ShardIDConsensus {
404+
pID := PrimaryID{ShardID: shardID}
405+
primaryIDBytes := block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_ORDERER][0:2]
406+
primaryID := types.PartyID(binary.BigEndian.Uint16(primaryIDBytes))
407+
408+
if pr, ok := primaryMap[pID]; !ok {
409+
primaryMap[pID] = primaryID
410+
} else if pr != primaryID {
411+
t.Logf("primary id changed from %d to %d", pr, primaryID)
412+
termChanged = true
413+
primaryMap[pID] = primaryID
414+
}
384415
}
385-
blockInfos = append(blockInfos, batchId)
386416

387417
if blocks > 0 {
388418
if atomic.CompareAndSwapUint64(&totalBlocks, expectedNumOfBlocks, uint64(blocks)) {
@@ -401,5 +431,5 @@ func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID
401431
fmt.Printf("Pulling from party: %d\n", partyID)
402432
err := dc.PullBlocks(toCtx, partyID, startBlock, endBlock, handler)
403433
fmt.Printf("Finished pull and count: blocks %d, txs %d from party: %d\n", totalBlocks, totalTxs, partyID)
404-
return totalTxs, totalBlocks, blockInfos, err
434+
return &BlockPullerInfo{TotalTxs: totalTxs, TotalBlocks: totalBlocks, Primary: primaryMap, TermChanged: termChanged}, err
405435
}

0 commit comments

Comments
 (0)