Skip to content

Commit 7ab097c

Browse files
committed
refactored primary/secondary batcher identification, removed waitforAssembler... func
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent 6136159 commit 7ab097c

File tree

4 files changed

+269
-196
lines changed

4 files changed

+269
-196
lines changed

test/batcher_test.go

Lines changed: 245 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
/*
22
Copyright IBM Corp. All Rights Reserved.
3+
34
SPDX-License-Identifier: Apache-2.0
45
*/
6+
57
package test
68

79
import (
@@ -16,23 +18,32 @@ import (
1618
"github.com/hyperledger/fabric-x-orderer/common/types"
1719
"github.com/hyperledger/fabric-x-orderer/testutil"
1820
"github.com/hyperledger/fabric-x-orderer/testutil/client"
19-
"github.com/onsi/gomega/gbytes"
2021
"github.com/onsi/gomega/gexec"
2122
"github.com/stretchr/testify/assert"
2223
"github.com/stretchr/testify/require"
2324
)
2425

2526
const (
2627
// Number of shards in the test
27-
numOfShards = 4
28+
numOfShards = 1
2829
// Number of parties in the test
2930
numOfParties = 4
30-
// Placeholder for a value that doesn't matter in the test
31-
doesntMatter = -1
3231
)
3332

34-
func TestBatcherRestartRecover(t *testing.T) {
35-
// compile arma
33+
// Simulates a scenario where the primary batcher
34+
// is stopped and then restarted, while ensuring that the system can recover and
35+
// continue processing transactions. The test involves the following steps:
36+
// 1. Compile and run the arma nodes.
37+
// 2. Submit a batch of transactions to the network.
38+
// 3. Identify and stop the primary batcher for one of the parties.
39+
// 4. Continue submitting transactions, expecting the stopped party's router to stall.
40+
// 5. Verify that the correct number of transactions have been processed by the assemblers.
41+
// 6. Restart the stopped primary batcher and ensure the stalled transactions are processed.
42+
// 7. Submit additional transactions and verify that all parties receive the expected number
43+
// of transactions.
44+
45+
func TestPrimaryBatcherRestartRecover(t *testing.T) {
46+
// 1. compile arma
3647
armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
3748
defer gexec.CleanupBuildArtifacts()
3849
require.NoError(t, err)
@@ -45,17 +56,13 @@ func TestBatcherRestartRecover(t *testing.T) {
4556
require.NoError(t, err)
4657
defer os.RemoveAll(dir)
4758

48-
// 1.
4959
configPath := filepath.Join(dir, "config.yaml")
5060
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
5161
require.NoError(t, err)
5262
numOfArmaNodes := len(netInfo)
53-
// 2.
63+
5464
armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir, "--version", "2"})
5565

56-
// 3.
57-
// run arma nodes
58-
// NOTE: if one of the nodes is not started within 10 seconds, there is no point in continuing the test, so fail it
5966
readyChan := make(chan struct{}, numOfArmaNodes)
6067
armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo)
6168
defer armaNetwork.Stop()
@@ -66,7 +73,7 @@ func TestBatcherRestartRecover(t *testing.T) {
6673
assert.NoError(t, err)
6774
assert.NotNil(t, uc)
6875

69-
// 4. Send To Routers
76+
// 2. Send To Routers
7077
totalTxNumber := 1000
7178
fillInterval := 10 * time.Millisecond
7279
fillFrequency := 1000 / int(fillInterval.Milliseconds())
@@ -102,76 +109,207 @@ func TestBatcherRestartRecover(t *testing.T) {
102109
}
103110

104111
totalTxSent += totalTxNumber
105-
testutil.WaitForAssemblersReady(t, armaNetwork, parties, totalTxSent, 15)
106112

107113
// Pull from Assemblers
108-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
114+
blockInfos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
115+
116+
// Get the primary batcher
117+
partyBlockInfos := blockInfos[types.PartyID(1)]
118+
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
119+
primaryBatcher := armaNetwork.GeBatcher(t, primaryBatcherId, types.ShardID(1))
120+
correctParties := []types.PartyID{}
109121

110-
partyToRestart := types.PartyID(3)
111-
correctParties := []types.PartyID{types.PartyID(1), types.PartyID(2), types.PartyID(4)}
112-
routerToStall := armaNetwork.GetRouter(t, partyToRestart)
122+
// 3. Stop the primary batcher
123+
t.Logf("Stopping primary batcher: party %d", primaryBatcher.PartyId)
124+
primaryBatcher.StopArmaNode()
125+
126+
for partyID := 1; partyID <= numOfParties; partyID++ {
127+
if primaryBatcherId != types.PartyID(partyID) {
128+
correctParties = append(correctParties, types.PartyID(partyID))
129+
}
130+
}
113131

114-
for shardId := 2; shardId <= 2; shardId++ {
115-
batcherToStop := armaNetwork.GeBatcher(t, partyToRestart, types.ShardID(shardId))
116-
isPrimary, _ := gbytes.Say("acting as primary").Match(batcherToStop.RunInfo.Session.Err)
132+
stalled := false
133+
routerToStall := armaNetwork.GetRouter(t, primaryBatcher.PartyId)
117134

118-
batcherToStop.StopArmaNode()
135+
// 4.
136+
// make sure 2f+1 routers are receiving TXs w/o problems
137+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
119138

120-
if !isPrimary {
121-
isSecondary, _ := gbytes.Say("acting as secondary").Match(batcherToStop.RunInfo.Session.Err)
122-
require.True(t, isSecondary)
139+
for i := 0; i < totalTxNumber; i++ {
140+
status := rl.GetToken()
141+
if !status {
142+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
143+
os.Exit(3)
144+
}
145+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
146+
err = broadcastClient.SendTx(txContent)
147+
if err != nil {
148+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
149+
stalled = true
123150
}
151+
}
152+
153+
// test that the router of party get stalled in the some point
154+
require.True(t, stalled, "expected router to stall but it did not")
155+
broadcastClient.Stop()
156+
157+
totalTxSent += totalTxNumber
158+
159+
// 5.
160+
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
161+
blockInfos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
162+
partyBlockInfos = blockInfos[correctParties[0]]
163+
newPrimaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
164+
165+
// check that the primary batcher has changed
166+
require.NotEqual(t, primaryBatcherId, newPrimaryBatcherId, "expected primary batcher not to remain the same")
124167

125-
t.Logf("Batcher %d of party %d is down", shardId, partyToRestart)
126-
stalled := false
127-
128-
// make sure 2f+1 routers are receiving TXs w/o problems
129-
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
130-
131-
for i := 0; i < totalTxNumber; i++ {
132-
status := rl.GetToken()
133-
if !status {
134-
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
135-
os.Exit(3)
136-
}
137-
txContent := prepareTx(i, 64, []byte("sessionNumber"))
138-
err = broadcastClient.SendTx(txContent)
139-
if err != nil {
140-
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
141-
stalled = true
142-
}
168+
// 6.
169+
t.Logf("Restarting Batcher: party %d", primaryBatcher.PartyId)
170+
// restart the batcher
171+
primaryBatcher.RestartArmaNode(t, readyChan, numOfParties)
172+
173+
testutil.WaitReady(t, readyChan, 1, 10)
174+
175+
PullFromAssemblers(t, uc, []types.PartyID{primaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
176+
177+
// 7.
178+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
179+
180+
for i := 0; i < totalTxNumber; i++ {
181+
status := rl.GetToken()
182+
if !status {
183+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
184+
os.Exit(3)
143185
}
186+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
187+
err = broadcastClient.SendTx(txContent)
188+
if err != nil {
189+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) // only such errors are permitted
190+
}
191+
}
192+
193+
t.Log("Finished submit")
194+
broadcastClient.Stop()
195+
196+
totalTxSent += totalTxNumber
197+
198+
// Pull from Assemblers
199+
// make sure clients of all the parties get transactions (expect 3000 TXs).
200+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
201+
}
202+
203+
// Simulates a scenario where a secondary batcher node is stopped and restarted.
204+
// The test ensures that even after stopping a secondary batcher, the system continues to process transactions
205+
// correctly, and once the node is restarted, it recovers and resumes normal operation. The steps include:
206+
// 1. Compile and run the arma nodes.
207+
// 2. Submit a batch of transactions to the network.
208+
// 3. Identify and stop the secondary batcher for one of the parties.
209+
// 4. Continue submitting transactions, expecting the stopped party's router to stall.
210+
// 5. Verify that the correct number of transactions have been processed by the assemblers.
211+
// 6. Restart the stopped secondary batcher and ensure the stalled transactions are processed.
212+
// 7. Submit additional transactions and verify that all parties receive the expected number
213+
// of transactions.
214+
215+
func TestSecondaryBatcherRestartRecover(t *testing.T) {
216+
// 1. compile arma
217+
armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
218+
defer gexec.CleanupBuildArtifacts()
219+
require.NoError(t, err)
220+
require.NotNil(t, armaBinaryPath)
221+
222+
t.Logf("Running test with %d parties and %d shards", numOfParties, numOfShards)
223+
224+
// Create a temporary directory for the test
225+
dir, err := os.MkdirTemp("", t.Name())
226+
require.NoError(t, err)
227+
defer os.RemoveAll(dir)
228+
229+
configPath := filepath.Join(dir, "config.yaml")
230+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
231+
require.NoError(t, err)
232+
numOfArmaNodes := len(netInfo)
233+
234+
armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir, "--version", "2"})
235+
236+
// run arma nodes
237+
// NOTE: if one of the nodes is not started within 10 seconds, there is no point in continuing the test, so fail it
238+
readyChan := make(chan struct{}, numOfArmaNodes)
239+
armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo)
240+
defer armaNetwork.Stop()
241+
242+
testutil.WaitReady(t, readyChan, numOfArmaNodes, 10)
243+
244+
uc, err := testutil.GetUserConfig(dir, 1)
245+
assert.NoError(t, err)
246+
assert.NotNil(t, uc)
144247

145-
// test that the router of party get stalled in the some point
146-
require.True(t, stalled, "expected router to stall but it did not")
147-
broadcastClient.Stop()
248+
// 2. Send To Routers
249+
totalTxNumber := 1000
250+
fillInterval := 10 * time.Millisecond
251+
fillFrequency := 1000 / int(fillInterval.Milliseconds())
252+
rate := 500
253+
totalTxSent := 0
254+
255+
capacity := rate / fillFrequency
256+
rl, err := armageddon.NewRateLimiter(rate, fillInterval, capacity)
257+
if err != nil {
258+
fmt.Fprintf(os.Stderr, "failed to start a rate limiter")
259+
os.Exit(3)
260+
}
148261

149-
totalTxSent += totalTxNumber
262+
broadcastClient := client.NewBroadCastTxClient(uc, 10*time.Second)
150263

151-
// wait for the transactions to be processed by correct assemblers
152-
testutil.WaitForAssemblersReady(t, armaNetwork, correctParties, totalTxSent, 60)
153-
if isPrimary {
154-
testutil.WaitForComplaint(t, armaNetwork, parties, types.ShardID(shardId), 60)
155-
testutil.WaitForTermChange(t, armaNetwork, parties, types.ShardID(shardId), 60)
264+
for i := 0; i < totalTxNumber; i++ {
265+
status := rl.GetToken()
266+
if !status {
267+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
268+
os.Exit(3)
156269
}
270+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
271+
err = broadcastClient.SendTx(txContent)
272+
require.NoError(t, err)
273+
}
157274

158-
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
159-
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
275+
t.Log("Finished submit")
276+
broadcastClient.Stop()
277+
278+
parties := []types.PartyID{}
279+
for partyID := 1; partyID <= numOfParties; partyID++ {
280+
parties = append(parties, types.PartyID(partyID))
160281
}
161282

162-
for shardId := 2; shardId <= 2; shardId++ {
163-
t.Logf("Restarting Batcher %d of party %d", shardId, partyToRestart)
164-
// restart the batcher
165-
batcherToRestart := armaNetwork.GeBatcher(t, partyToRestart, types.ShardID(shardId))
166-
batcherToRestart.RestartArmaNode(t, readyChan, numOfParties)
283+
totalTxSent += totalTxNumber
284+
285+
// Pull from Assemblers
286+
blockInfos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
167287

168-
testutil.WaitReady(t, readyChan, 1, 10)
288+
partyBlockInfos := blockInfos[types.PartyID(1)]
289+
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
290+
correctParties := []types.PartyID{}
169291

170-
testutil.WaitForAssemblersReady(t, armaNetwork, []types.PartyID{partyToRestart}, totalTxSent, 15)
292+
var secondaryBatcher *testutil.ArmaNodeInfo = nil
171293

172-
PullFromAssemblers(t, uc, []types.PartyID{partyToRestart}, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
294+
// 3. Identify and stop the secondary batcher
295+
for partyId := 1; partyId <= numOfParties; partyId++ {
296+
if primaryBatcherId != types.PartyID(partyId) && secondaryBatcher == nil {
297+
secondaryBatcher = armaNetwork.GeBatcher(t, types.PartyID(partyId), types.ShardID(1))
298+
} else {
299+
correctParties = append(correctParties, types.PartyID(partyId))
300+
}
173301
}
174302

303+
require.NotNil(t, secondaryBatcher, "Secondary batcher not found for party %d", secondaryBatcher.PartyId)
304+
305+
t.Logf("Stopping secondary batcher: party %d", secondaryBatcher.PartyId)
306+
secondaryBatcher.StopArmaNode()
307+
308+
stalled := false
309+
routerToStall := armaNetwork.GetRouter(t, secondaryBatcher.PartyId)
310+
311+
// 4. Send To Routers
312+
// make sure 2f+1 routers are receiving TXs w/o problems
175313
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
176314

177315
for i := 0; i < totalTxNumber; i++ {
@@ -182,18 +320,59 @@ func TestBatcherRestartRecover(t *testing.T) {
182320
}
183321
txContent := prepareTx(i, 64, []byte("sessionNumber"))
184322
err = broadcastClient.SendTx(txContent)
185-
require.NoError(t, err)
323+
if err != nil {
324+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
325+
stalled = true
326+
}
186327
}
187328

188-
t.Log("Finished submit")
329+
// test that the router of party get stalled in the some point
330+
require.True(t, stalled, "expected router to stall but it did not")
189331
broadcastClient.Stop()
190332

191333
totalTxSent += totalTxNumber
192334

193-
// wait for the transactions to be processed
194-
testutil.WaitForAssemblersReady(t, armaNetwork, parties, totalTxSent, 15)
335+
// 5.
336+
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
337+
blockInfos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
338+
partyBlockInfos = blockInfos[correctParties[0]]
339+
newPrimaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
340+
341+
// make sure the primary batcher did not change
342+
require.Equal(t, primaryBatcherId, newPrimaryBatcherId, "expected primary batcher to remain the same")
343+
344+
// 6.
345+
t.Logf("Restarting Batcher %d of party %d", secondaryBatcher.PartyId, secondaryBatcher.PartyId)
346+
// restart the batcher
347+
secondaryBatcher.RestartArmaNode(t, readyChan, numOfParties)
348+
349+
testutil.WaitReady(t, readyChan, 1, 10)
350+
351+
PullFromAssemblers(t, uc, []types.PartyID{secondaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
352+
353+
// 7.
354+
// make sure 2f+1 routers are receiving TXs w/o problems
355+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
356+
357+
for i := 0; i < totalTxNumber; i++ {
358+
status := rl.GetToken()
359+
if !status {
360+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
361+
os.Exit(3)
362+
}
363+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
364+
err = broadcastClient.SendTx(txContent)
365+
if err != nil {
366+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) // only such errors are permitted
367+
}
368+
}
369+
370+
t.Log("Finished submit")
371+
broadcastClient.Stop()
372+
373+
totalTxSent += totalTxNumber
195374

196375
// Pull from Assemblers
197376
// make sure clients of all the parties get transactions (expect 3000 TXs).
198-
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
377+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d")
199378
}

0 commit comments

Comments
 (0)