Skip to content

Commit 59dada3

Browse files
committed
a minor fix
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
1 parent 735182b commit 59dada3

File tree

3 files changed

+264
-62
lines changed

3 files changed

+264
-62
lines changed

test/batcher_test.go

Lines changed: 214 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ const (
3131
doesntMatter = -1
3232
)
3333

34-
func TestBatcherRestartRecover(t *testing.T) {
34+
func TestPrimaryBatcherRestartRecover(t *testing.T) {
3535
// compile arma
3636
armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
3737
defer gexec.CleanupBuildArtifacts()
@@ -102,7 +102,7 @@ func TestBatcherRestartRecover(t *testing.T) {
102102
}
103103

104104
totalTxSent += totalTxNumber
105-
testutil.WaitForAssemblersReady(t, armaNetwork, parties, totalTxSent, 15)
105+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, parties, totalTxSent, 15)
106106

107107
// Pull from Assemblers
108108
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
@@ -111,67 +111,232 @@ func TestBatcherRestartRecover(t *testing.T) {
111111
correctParties := []types.PartyID{types.PartyID(1), types.PartyID(2), types.PartyID(4)}
112112
routerToStall := armaNetwork.GetRouter(t, partyToRestart)
113113

114-
for shardId := 2; shardId <= 2; shardId++ {
114+
var primaryBatcher *testutil.ArmaNodeInfo = nil
115+
116+
for shardId := 1; shardId <= numOfShards; shardId++ {
115117
batcherToStop := armaNetwork.GeBatcher(t, partyToRestart, types.ShardID(shardId))
116-
isPrimary, _ := gbytes.Say("acting as primary").Match(batcherToStop.RunInfo.Session.Err)
118+
isPrimaryBatcher, _ := gbytes.Say("acting as primary").Match(batcherToStop.RunInfo.Session.Err)
119+
if isPrimaryBatcher {
120+
primaryBatcher = batcherToStop
121+
t.Logf("Stopping primary batcher %d of party %d", shardId, partyToRestart)
122+
batcherToStop.StopArmaNode()
123+
break
124+
}
125+
}
126+
127+
require.NotNil(t, primaryBatcher, "Primary batcher not found for party %d", partyToRestart)
128+
129+
stalled := false
117130

118-
batcherToStop.StopArmaNode()
131+
// make sure 2f+1 routers are receiving TXs w/o problems
132+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
119133

120-
if !isPrimary {
121-
isSecondary, _ := gbytes.Say("acting as secondary").Match(batcherToStop.RunInfo.Session.Err)
122-
require.True(t, isSecondary)
134+
for i := 0; i < totalTxNumber; i++ {
135+
status := rl.GetToken()
136+
if !status {
137+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
138+
os.Exit(3)
123139
}
140+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
141+
err = broadcastClient.SendTx(txContent)
142+
if err != nil {
143+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
144+
stalled = true
145+
}
146+
}
147+
148+
// test that the router of party get stalled in the some point
149+
require.True(t, stalled, "expected router to stall but it did not")
150+
broadcastClient.Stop()
151+
152+
totalTxSent += totalTxNumber
124153

125-
t.Logf("Batcher %d of party %d is down", shardId, partyToRestart)
126-
stalled := false
127-
128-
// make sure 2f+1 routers are receiving TXs w/o problems
129-
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
130-
131-
for i := 0; i < totalTxNumber; i++ {
132-
status := rl.GetToken()
133-
if !status {
134-
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
135-
os.Exit(3)
136-
}
137-
txContent := prepareTx(i, 64, []byte("sessionNumber"))
138-
err = broadcastClient.SendTx(txContent)
139-
if err != nil {
140-
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
141-
stalled = true
142-
}
154+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, correctParties, totalTxSent, 60)
155+
156+
require.NoError(t, testutil.WaitForComplaint(t, armaNetwork, parties, primaryBatcher.ShardId, 60))
157+
require.NoError(t, testutil.WaitForTermChange(t, armaNetwork, parties, primaryBatcher.ShardId, 60))
158+
159+
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
160+
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
161+
162+
// for shardId := 2; shardId <= 2; shardId++ {
163+
t.Logf("Restarting Batcher %d of party %d", primaryBatcher.PartyId, partyToRestart)
164+
// restart the batcher
165+
primaryBatcher.RestartArmaNode(t, readyChan, numOfParties)
166+
167+
testutil.WaitReady(t, readyChan, 1, 10)
168+
169+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, []types.PartyID{partyToRestart}, totalTxSent, 15)
170+
171+
PullFromAssemblers(t, uc, []types.PartyID{partyToRestart}, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
172+
173+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
174+
175+
for i := 0; i < totalTxNumber; i++ {
176+
status := rl.GetToken()
177+
if !status {
178+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
179+
os.Exit(3)
143180
}
181+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
182+
err = broadcastClient.SendTx(txContent)
183+
if err != nil {
184+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) //only such errors are permitted
185+
}
186+
}
144187

145-
// test that the router of party get stalled in the some point
146-
require.True(t, stalled, "expected router to stall but it did not")
147-
broadcastClient.Stop()
188+
t.Log("Finished submit")
189+
broadcastClient.Stop()
190+
191+
totalTxSent += totalTxNumber
192+
193+
// wait for the transactions to be processed
194+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, parties, totalTxSent, 15)
195+
196+
// Pull from Assemblers
197+
// make sure clients of all the parties get transactions (expect 3000 TXs).
198+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
199+
}
148200

149-
totalTxSent += totalTxNumber
201+
func TestSecondaryBatcherRestartRecover(t *testing.T) {
202+
// compile arma
203+
armaBinaryPath, err := gexec.BuildWithEnvironment("github.com/hyperledger/fabric-x-orderer/cmd/arma", []string{"GOPRIVATE=" + os.Getenv("GOPRIVATE")})
204+
defer gexec.CleanupBuildArtifacts()
205+
require.NoError(t, err)
206+
require.NotNil(t, armaBinaryPath)
207+
208+
t.Logf("Running test with %d parties and %d shards", numOfParties, numOfShards)
209+
210+
// Create a temporary directory for the test
211+
dir, err := os.MkdirTemp("", t.Name())
212+
require.NoError(t, err)
213+
defer os.RemoveAll(dir)
214+
215+
// 1.
216+
configPath := filepath.Join(dir, "config.yaml")
217+
netInfo := testutil.CreateNetwork(t, configPath, numOfParties, numOfShards, "none", "none")
218+
require.NoError(t, err)
219+
numOfArmaNodes := len(netInfo)
220+
// 2.
221+
armageddon.NewCLI().Run([]string{"generate", "--config", configPath, "--output", dir, "--version", "2"})
222+
223+
// 3.
224+
// run arma nodes
225+
// NOTE: if one of the nodes is not started within 10 seconds, there is no point in continuing the test, so fail it
226+
readyChan := make(chan struct{}, numOfArmaNodes)
227+
armaNetwork := testutil.RunArmaNodes(t, dir, armaBinaryPath, readyChan, netInfo)
228+
defer armaNetwork.Stop()
150229

151-
// wait for the transactions to be processed by correct assemblers
152-
testutil.WaitForAssemblersReady(t, armaNetwork, correctParties, totalTxSent, 60)
153-
if isPrimary {
154-
testutil.WaitForComplaint(t, armaNetwork, parties, types.ShardID(shardId), 60)
155-
testutil.WaitForTermChange(t, armaNetwork, parties, types.ShardID(shardId), 60)
230+
testutil.WaitReady(t, readyChan, numOfArmaNodes, 10)
231+
232+
uc, err := testutil.GetUserConfig(dir, 1)
233+
assert.NoError(t, err)
234+
assert.NotNil(t, uc)
235+
236+
// 4. Send To Routers
237+
totalTxNumber := 1000
238+
fillInterval := 10 * time.Millisecond
239+
fillFrequency := 1000 / int(fillInterval.Milliseconds())
240+
rate := 500
241+
totalTxSent := 0
242+
243+
capacity := rate / fillFrequency
244+
rl, err := armageddon.NewRateLimiter(rate, fillInterval, capacity)
245+
if err != nil {
246+
fmt.Fprintf(os.Stderr, "failed to start a rate limiter")
247+
os.Exit(3)
248+
}
249+
250+
broadcastClient := client.NewBroadCastTxClient(uc, 10*time.Second)
251+
252+
for i := 0; i < totalTxNumber; i++ {
253+
status := rl.GetToken()
254+
if !status {
255+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
256+
os.Exit(3)
156257
}
258+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
259+
err = broadcastClient.SendTx(txContent)
260+
require.NoError(t, err)
261+
}
262+
263+
t.Log("Finished submit")
264+
broadcastClient.Stop()
265+
266+
parties := []types.PartyID{}
267+
for partyID := 1; partyID <= numOfParties; partyID++ {
268+
parties = append(parties, types.PartyID(partyID))
269+
}
270+
271+
totalTxSent += totalTxNumber
272+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, parties, totalTxSent, 15)
273+
274+
// Pull from Assemblers
275+
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
157276

158-
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
159-
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
277+
partyToRestart := types.PartyID(3)
278+
correctParties := []types.PartyID{types.PartyID(1), types.PartyID(2), types.PartyID(4)}
279+
routerToStall := armaNetwork.GetRouter(t, partyToRestart)
280+
281+
var secondaryBatcher *testutil.ArmaNodeInfo = nil
282+
283+
for shardId := 1; shardId <= numOfShards; shardId++ {
284+
batcherToStop := armaNetwork.GeBatcher(t, partyToRestart, types.ShardID(shardId))
285+
isSecondary, _ := gbytes.Say("acting as secondary").Match(batcherToStop.RunInfo.Session.Err)
286+
if isSecondary {
287+
secondaryBatcher = batcherToStop
288+
t.Logf("Stopping secondary batcher %d of party %d", shardId, partyToRestart)
289+
batcherToStop.StopArmaNode()
290+
break
291+
}
160292
}
161293

162-
for shardId := 2; shardId <= 2; shardId++ {
163-
t.Logf("Restarting Batcher %d of party %d", shardId, partyToRestart)
164-
// restart the batcher
165-
batcherToRestart := armaNetwork.GeBatcher(t, partyToRestart, types.ShardID(shardId))
166-
batcherToRestart.RestartArmaNode(t, readyChan, numOfParties)
294+
require.NotNil(t, secondaryBatcher, "Secondary batcher not found for party %d", partyToRestart)
167295

168-
testutil.WaitReady(t, readyChan, 1, 10)
296+
stalled := false
169297

170-
testutil.WaitForAssemblersReady(t, armaNetwork, []types.PartyID{partyToRestart}, totalTxSent, 15)
298+
// make sure 2f+1 routers are receiving TXs w/o problems
299+
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
171300

172-
PullFromAssemblers(t, uc, []types.PartyID{partyToRestart}, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
301+
for i := 0; i < totalTxNumber; i++ {
302+
status := rl.GetToken()
303+
if !status {
304+
fmt.Fprintf(os.Stderr, "failed to send tx %d", i+1)
305+
os.Exit(3)
306+
}
307+
txContent := prepareTx(i, 64, []byte("sessionNumber"))
308+
err = broadcastClient.SendTx(txContent)
309+
if err != nil {
310+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String()))
311+
stalled = true
312+
}
173313
}
174314

315+
// test that the router of party get stalled in the some point
316+
require.True(t, stalled, "expected router to stall but it did not")
317+
broadcastClient.Stop()
318+
319+
totalTxSent += totalTxNumber
320+
321+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, correctParties, totalTxSent, 60)
322+
323+
require.Error(t, testutil.WaitForComplaint(t, armaNetwork, parties, secondaryBatcher.ShardId, 30))
324+
require.Error(t, testutil.WaitForTermChange(t, armaNetwork, parties, secondaryBatcher.ShardId, 30))
325+
326+
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
327+
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
328+
329+
// for shardId := 2; shardId <= 2; shardId++ {
330+
t.Logf("Restarting Batcher %d of party %d", secondaryBatcher.PartyId, partyToRestart)
331+
// restart the batcher
332+
secondaryBatcher.RestartArmaNode(t, readyChan, numOfParties)
333+
334+
testutil.WaitReady(t, readyChan, 1, 10)
335+
336+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, []types.PartyID{partyToRestart}, totalTxSent, 15)
337+
338+
PullFromAssemblers(t, uc, []types.PartyID{partyToRestart}, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
339+
175340
broadcastClient = client.NewBroadCastTxClient(uc, 10*time.Second)
176341

177342
for i := 0; i < totalTxNumber; i++ {
@@ -182,7 +347,9 @@ func TestBatcherRestartRecover(t *testing.T) {
182347
}
183348
txContent := prepareTx(i, 64, []byte("sessionNumber"))
184349
err = broadcastClient.SendTx(txContent)
185-
require.NoError(t, err)
350+
if err != nil {
351+
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) //only such errors are permitted
352+
}
186353
}
187354

188355
t.Log("Finished submit")
@@ -191,7 +358,7 @@ func TestBatcherRestartRecover(t *testing.T) {
191358
totalTxSent += totalTxNumber
192359

193360
// wait for the transactions to be processed
194-
testutil.WaitForAssemblersReady(t, armaNetwork, parties, totalTxSent, 15)
361+
testutil.WaitForAssemblersGotAtLeast(t, armaNetwork, parties, totalTxSent, 15)
195362

196363
// Pull from Assemblers
197364
// make sure clients of all the parties get transactions (expect 3000 TXs).

test/utils_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,7 +336,7 @@ func PullFromAssemblers(t *testing.T, userConfig *armageddon.UserConfig, parties
336336
totalTxs, totalBlocks, err := PullFromAssembler(t, userConfig, partyID, startBlock, endBlock, transactions, blocks)
337337
errString := fmt.Sprintf(errString, partyID)
338338
require.ErrorContains(t, err, errString)
339-
require.Equal(t, uint64(transactions), totalTxs)
339+
require.LessOrEqual(t, uint64(transactions), totalTxs)
340340
if blocks > 0 {
341341
require.LessOrEqual(t, uint64(blocks), totalBlocks)
342342
}

0 commit comments

Comments
 (0)