Skip to content

Commit 3cd436f

Browse files
authored
Batcher restart recover (#146)
* Integration tests - Batcher restart and recover #84 Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> * a commit for review Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> * fixed a linter issue Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> * Integration tests - Batcher restart and recover #84 Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> * a commit for review Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> * a little refactoring Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> * fixes for the review Signed-off-by: Genady Gurevich <genadyg@il.ibm.com> --------- Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent 96134d8 commit 3cd436f

File tree

4 files changed

+422
-24
lines changed

4 files changed

+422
-24
lines changed

test/basic_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -131,20 +131,20 @@ func TestSubmitAndReceive(t *testing.T) {
131131
endBlock := uint64(tt.numOfShards)
132132
errString := "cancelled pull from assembler: %d"
133133

134-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, tt.numOfShards+1, errString)
134+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, tt.numOfShards+1, errString, 30)
135135

136136
// Pull first two blocks and count them.
137137
startBlock = uint64(0)
138138
endBlock = uint64(1)
139139

140-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, int((endBlock-startBlock)+1), errString)
140+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, int((endBlock-startBlock)+1), errString, 30)
141141

142142
// Pull more block, then cancel.
143143
startBlock = uint64(1)
144144
endBlock = uint64(1000)
145145
errString = "cancelled pull from assembler: %d; pull ended: failed to receive a deliver response: rpc error: code = Canceled desc = grpc: the client connection is closing"
146146

147-
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, 0, errString)
147+
PullFromAssemblers(t, uc, parties, startBlock, endBlock, 0, 0, errString, 30)
148148
})
149149
}
150150
}

test/batcher_test.go

Lines changed: 369 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,369 @@
1+
/*
2+
Copyright IBM Corp. All Rights Reserved.
3+
4+
SPDX-License-Identifier: Apache-2.0
5+
*/
6+
7+
package test
8+
9+
import (
10+
"fmt"
11+
"math"
12+
"os"
13+
"path/filepath"
14+
"testing"
15+
"time"
16+
17+
"github.com/hyperledger/fabric-x-orderer/common/tools/armageddon"
18+
"github.com/hyperledger/fabric-x-orderer/common/types"
19+
"github.com/hyperledger/fabric-x-orderer/testutil"
20+
"github.com/hyperledger/fabric-x-orderer/testutil/client"
21+
"github.com/onsi/gomega/gexec"
22+
"github.com/stretchr/testify/assert"
23+
"github.com/stretchr/testify/require"
24+
)
25+
26+
const (
27+
// Number of parties in the test
28+
numOfParties = 4
29+
)
30+
31+
// Simulates a scenario where the primary batcher
32+
// is stopped and then restarted, while ensuring that the system can recover and
33+
// continue processing transactions. The test involves the following steps:
34+
// 1. Compile and run the arma nodes.
35+
// 2. Submit a batch of transactions to the network.
36+
// 3. Identify and stop the primary batcher for one of the parties.
37+
// 4. Continue submitting transactions, expecting the stopped party's router to stall.
38+
// 5. Verify that the correct number of transactions have been processed by the assemblers.
39+
// 6. Restart the stopped primary batcher and ensure the stalled transactions are processed.
40+
// 7. Submit additional transactions and verify that all parties receive the expected number
41+
// of transactions.
42+
43+
func TestPrimaryBatcherRestartRecover(t *testing.T) {
44+
// 1. compile arma
45+
armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
46+
defer gexec.CleanupBuildArtifacts()
47+
require.NoError(t, err)
48+
require.NotNil(t, armaBinaryPath)
49+
50+
t.Logf("Running test with %d parties and %d shards", numOfParties, 1)
51+
52+
// Create a temporary directory for the test
53+
dir, err := os.MkdirTemp("", t.Name())
54+
require.NoError(t, err)
55+
defer os.RemoveAll(dir)
56+
57+
configPath := filepath.Join(dir, "config.yaml")
58+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 1, "none", "none")
59+
require.NoError(t, err)
60+
numOfArmaNodes := len(netInfo)
61+
62+
armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir, "--version", "2"})
63+
64+
readyChan := make(chan struct{}, numOfArmaNodes)
65+
armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo)
66+
defer armaNetwork.Stop()
67+
68+
testutil.WaitReady(t, readyChan, numOfArmaNodes, 10)
69+
70+
uc, err := testutil.GetUserConfig(dir, 1)
71+
assert.NoError(t, err)
72+
assert.NotNil(t, uc)
73+
74+
// 2. Send To Routers
75+
totalTxNumber := 1000
76+
fillInterval := 10 * time.Millisecond
77+
fillFrequency := 1000 / int(fillInterval.Milliseconds())
78+
rate := 500
79+
totalTxSent := 0
80+
81+
capacity := rate / fillFrequency
82+
rl, err := armageddon.NewRateLimiter(rate, fillInterval, capacity)
83+
if err != nil {
84+
fmt.Fprintf(os.Stderr, "failed to start a rate limiter")
85+
os.Exit(3)
86+
}
87+
88+
broadcastClient := client.NewBroadCastTxClient(uc, 10*time.Second)
89+
90+
for i := 0; i < totalTxNumber; i++ {
91+
status := rl.GetToken()
92+
if !status {
93+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
94+
os.Exit(3)
95+
}
96+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
97+
err = broadcastClient.SendTx(txContent)
98+
require.NoError(t, err)
99+
}
100+
101+
t.Log("Finished submit")
102+
broadcastClient.Stop()
103+
104+
parties := []types.PartyID{}
105+
for partyID := 1; partyID <= numOfParties; partyID++ {
106+
parties = append(parties, types.PartyID(partyID))
107+
}
108+
109+
totalTxSent += totalTxNumber
110+
111+
// Pull from Assemblers
112+
infos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
113+
114+
// Get the primary batcher
115+
primaryBatcherId := infos[types.PartyID(1)].Primary[types.ShardID(1)]
116+
primaryBatcher := armaNetwork.GeBatcher(t, primaryBatcherId, types.ShardID(1))
117+
118+
// 3. Stop the primary batcher
119+
t.Logf("Stopping primary batcher: party %d", primaryBatcher.PartyId)
120+
primaryBatcher.StopArmaNode()
121+
122+
stalled := false
123+
routerToStall := armaNetwork.GetRouter(t, primaryBatcher.PartyId)
124+
125+
// 4.
126+
// make sure 2f+1 routers are receiving TXs w/o problems
127+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
128+
129+
for i := 0; i < totalTxNumber; i++ {
130+
status := rl.GetToken()
131+
if !status {
132+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
133+
os.Exit(3)
134+
}
135+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
136+
err = broadcastClient.SendTx(txContent)
137+
if err != nil {
138+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
139+
stalled = true
140+
}
141+
}
142+
143+
// make sure the router of the faulty party got stalled
144+
require.True(t, stalled, "expected router to stall but it did not")
145+
broadcastClient.Stop()
146+
147+
totalTxSent += totalTxNumber
148+
149+
// 5.
150+
// make sure assemblers of correct parties continue to get transactions (expect 2000 TXs).
151+
152+
correctParties := []types.PartyID{}
153+
for partyID := 1; partyID <= numOfParties; partyID++ {
154+
if primaryBatcherId != types.PartyID(partyID) {
155+
correctParties = append(correctParties, types.PartyID(partyID))
156+
}
157+
}
158+
infos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
159+
160+
// check that the primary batcher has changed
161+
require.True(t, infos[correctParties[0]].TermChanged, "expected primary batcher not to remain the same")
162+
163+
// 6.
164+
t.Logf("Restarting Batcher: party %d", primaryBatcher.PartyId)
165+
// restart the batcher
166+
primaryBatcher.RestartArmaNode(t, readyChan, numOfParties)
167+
168+
testutil.WaitReady(t, readyChan, 1, 10)
169+
170+
PullFromAssemblers(t, uc, []types.PartyID{primaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
171+
172+
// 7.
173+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
174+
175+
for i := 0; i < totalTxNumber; i++ {
176+
status := rl.GetToken()
177+
if !status {
178+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
179+
os.Exit(3)
180+
}
181+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
182+
err = broadcastClient.SendTx(txContent)
183+
if err != nil {
184+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) // only such errors are permitted
185+
}
186+
}
187+
188+
t.Log("Finished submit")
189+
broadcastClient.Stop()
190+
191+
totalTxSent += totalTxNumber
192+
193+
// Pull from Assemblers
194+
// make sure assemblers of all the parties get transactions (expect 3000 TXs).
195+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
196+
}
197+
198+
// Simulates a scenario where a secondary batcher node is stopped and restarted.
199+
// The test ensures that even after stopping a secondary batcher, the system continues to process transactions
200+
// correctly, and once the node is restarted, it recovers and resumes normal operation. The steps include:
201+
// 1. Compile and run the arma nodes.
202+
// 2. Submit a batch of transactions to the network.
203+
// 3. Identify and stop the secondary batcher for one of the parties.
204+
// 4. Continue submitting transactions, expecting the stopped party's router to stall.
205+
// 5. Verify that the correct number of transactions have been processed by the assemblers.
206+
// 6. Restart the stopped secondary batcher and ensure the stalled transactions are processed.
207+
// 7. Submit additional transactions and verify that all parties receive the expected number
208+
// of transactions.
209+
210+
func TestSecondaryBatcherRestartRecover(t *testing.T) {
211+
// 1. compile arma
212+
armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
213+
defer gexec.CleanupBuildArtifacts()
214+
require.NoError(t, err)
215+
require.NotNil(t, armaBinaryPath)
216+
217+
t.Logf("Running test with %d parties and %d shards", numOfParties, 1)
218+
219+
// Create a temporary directory for the test
220+
dir, err := os.MkdirTemp("", t.Name())
221+
require.NoError(t, err)
222+
defer os.RemoveAll(dir)
223+
224+
configPath := filepath.Join(dir, "config.yaml")
225+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, 1, "none", "none")
226+
require.NoError(t, err)
227+
numOfArmaNodes := len(netInfo)
228+
229+
armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir, "--version", "2"})
230+
231+
// run arma nodes
232+
// NOTE: if one of the nodes is not started within 10 seconds, there is no point in continuing the test, so fail it
233+
readyChan := make(chan struct{}, numOfArmaNodes)
234+
armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo)
235+
defer armaNetwork.Stop()
236+
237+
testutil.WaitReady(t, readyChan, numOfArmaNodes, 10)
238+
239+
uc, err := testutil.GetUserConfig(dir, 1)
240+
assert.NoError(t, err)
241+
assert.NotNil(t, uc)
242+
243+
// 2. Send To Routers
244+
totalTxNumber := 1000
245+
fillInterval := 10 * time.Millisecond
246+
fillFrequency := 1000 / int(fillInterval.Milliseconds())
247+
rate := 500
248+
totalTxSent := 0
249+
250+
capacity := rate / fillFrequency
251+
rl, err := armageddon.NewRateLimiter(rate, fillInterval, capacity)
252+
if err != nil {
253+
fmt.Fprintf(os.Stderr, "failed to start a rate limiter")
254+
os.Exit(3)
255+
}
256+
257+
broadcastClient := client.NewBroadCastTxClient(uc, 10*time.Second)
258+
259+
for i := 0; i < totalTxNumber; i++ {
260+
status := rl.GetToken()
261+
if !status {
262+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
263+
os.Exit(3)
264+
}
265+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
266+
err = broadcastClient.SendTx(txContent)
267+
require.NoError(t, err)
268+
}
269+
270+
t.Log("Finished submit")
271+
broadcastClient.Stop()
272+
273+
parties := []types.PartyID{}
274+
for partyID := 1; partyID <= numOfParties; partyID++ {
275+
parties = append(parties, types.PartyID(partyID))
276+
}
277+
278+
totalTxSent += totalTxNumber
279+
280+
// Pull from Assemblers
281+
infos := PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
282+
primaryBatcherId := infos[types.PartyID(1)].Primary[types.ShardID(1)]
283+
correctParties := []types.PartyID{}
284+
285+
var secondaryBatcher *testutil.ArmaNodeInfo = nil
286+
287+
// 3. Identify and stop the secondary batcher
288+
for partyId := 1; partyId <= numOfParties; partyId++ {
289+
if primaryBatcherId != types.PartyID(partyId) && secondaryBatcher == nil {
290+
secondaryBatcher = armaNetwork.GeBatcher(t, types.PartyID(partyId), types.ShardID(1))
291+
} else {
292+
correctParties = append(correctParties, types.PartyID(partyId))
293+
}
294+
}
295+
296+
require.NotNil(t, secondaryBatcher, "Secondary batcher not found for party %d", secondaryBatcher.PartyId)
297+
298+
t.Logf("Stopping secondary batcher: party %d", secondaryBatcher.PartyId)
299+
secondaryBatcher.StopArmaNode()
300+
301+
stalled := false
302+
routerToStall := armaNetwork.GetRouter(t, secondaryBatcher.PartyId)
303+
304+
// 4. Send To Routers
305+
// make sure 2f+1 routers are receiving TXs w/o problems
306+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
307+
308+
for i := 0; i < totalTxNumber; i++ {
309+
status := rl.GetToken()
310+
if !status {
311+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
312+
os.Exit(3)
313+
}
314+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
315+
err = broadcastClient.SendTx(txContent)
316+
if err != nil {
317+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
318+
stalled = true
319+
}
320+
}
321+
322+
// make sure the router of the faulty party got stalled
323+
require.True(t, stalled, "expected router to stall but it did not")
324+
broadcastClient.Stop()
325+
326+
totalTxSent += totalTxNumber
327+
328+
// 5.
329+
// make sure assemblers of correct parties continue to get transactions (expect 2000 TXs).
330+
infos = PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
331+
332+
// make sure the primary batcher did not change
333+
require.False(t, infos[correctParties[0]].TermChanged, "expected primary batcher to remain the same")
334+
335+
// 6.
336+
t.Logf("Restarting Batcher %d of party %d", secondaryBatcher.PartyId, secondaryBatcher.PartyId)
337+
// restart the batcher
338+
secondaryBatcher.RestartArmaNode(t, readyChan, numOfParties)
339+
340+
testutil.WaitReady(t, readyChan, 1, 10)
341+
342+
PullFromAssemblers(t, uc, []types.PartyID{secondaryBatcher.PartyId}, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
343+
344+
// 7.
345+
// make sure 2f+1 routers are receiving TXs w/o problems
346+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
347+
348+
for i := 0; i < totalTxNumber; i++ {
349+
status := rl.GetToken()
350+
if !status {
351+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
352+
os.Exit(3)
353+
}
354+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
355+
err = broadcastClient.SendTx(txContent)
356+
if err != nil {
357+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) // only such errors are permitted
358+
}
359+
}
360+
361+
t.Log("Finished submit")
362+
broadcastClient.Stop()
363+
364+
totalTxSent += totalTxNumber
365+
366+
// Pull from Assemblers
367+
// make sure assemblers of all the parties get transactions (expect 3000 TXs).
368+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, -1, "cancelled pull from assembler: %d", 60)
369+
}

0 commit comments

Comments
 (0)