Skip to content

Batcher restart recover#146

Merged
HagarMeir merged 7 commits intohyperledger:mainfrom
gengur:batcher_restart_recover
Aug 10, 2025
Merged

Batcher restart recover#146
HagarMeir merged 7 commits intohyperledger:mainfrom
gengur:batcher_restart_recover

Conversation

@gengur
Copy link
Contributor

@gengur gengur commented Jul 28, 2025

Integration tests - Batcher restart and recover
issue #84

@gengur gengur requested review from HagarMeir and tock-ibm July 28, 2025 12:39
@gengur gengur marked this pull request as draft July 28, 2025 12:40
@gengur gengur force-pushed the batcher_restart_recover branch 10 times, most recently from 59dada3 to 80448d9 Compare August 4, 2025 14:37
@gengur gengur marked this pull request as ready for review August 4, 2025 15:59
@@ -0,0 +1,366 @@
/*
Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0

This comment was marked as resolved.

Copyright IBM Corp. All Rights Reserved.
SPDX-License-Identifier: Apache-2.0
*/
package test

This comment was marked as resolved.


const (
// Number of shards in the test
numOfShards = 4

This comment was marked as resolved.

// Number of parties in the test
numOfParties = 4
// Placeholder for a value that doesn't matter in the test
doesntMatter = -1

This comment was marked as resolved.

require.ErrorContains(t, err, errString)
require.Equal(t, uint64(transactions), totalTxs)
require.Equal(t, uint64(blocks), totalBlocks)
require.LessOrEqual(t, uint64(transactions), totalTxs)

This comment was marked as resolved.

}
}

func WaitForAssemblersGotAtLeast(t *testing.T, armaNetwork *ArmaNetwork, parties []types.PartyID, waitFor int, duration time.Duration) {

This comment was marked as resolved.

This comment was marked as resolved.

This comment was marked as resolved.

doesntMatter = -1
)

func TestPrimaryBatcherRestartRecover(t *testing.T) {

This comment was marked as resolved.

PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")
}

func TestSecondaryBatcherRestartRecover(t *testing.T) {

This comment was marked as resolved.

"github.com/hyperledger/fabric-x-orderer/node/comm"
)

type StreamInfo struct {

This comment was marked as resolved.

// Pull from Assemblers
PullFromAssemblers(t, uc, parties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")

partyToRestart := types.PartyID(3)

This comment was marked as resolved.

// make sure clients of correct parties continue to get transactions (expect 2000 TXs).
PullFromAssemblers(t, uc, correctParties, 0, math.MaxUint64, totalTxSent, doesntMatter, "cancelled pull from assembler: %d")

// for shardId := 2; shardId <= 2; shardId++ {

This comment was marked as resolved.

@gengur gengur force-pushed the batcher_restart_recover branch 5 times, most recently from 7ab097c to b16e475 Compare August 6, 2025 15:24
func PullFromAssembler(t *testing.T, userConfig *armageddon.UserConfig, partyID types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int) (uint64, uint64, []types.BatchID, error) {
require.NotNil(t, userConfig)
dc := client.NewDeliverClient(userConfig)
toCtx, toCancel := context.WithTimeout(context.Background(), 30*time.Second)

This comment was marked as resolved.

require.Equal(t, uint64(transactions), totalTxs)
require.Equal(t, uint64(blocks), totalBlocks)
// TODO: check that we get all of the submitted transactions
require.LessOrEqual(t, uint64(transactions), totalTxs)

This comment was marked as resolved.

// TODO: check that we get all of the submitted transactions
require.LessOrEqual(t, uint64(transactions), totalTxs)
if blocks > 0 {
require.LessOrEqual(t, uint64(blocks), totalBlocks)

This comment was marked as resolved.


const (
// Number of shards in the test
numOfShards = 1

This comment was marked as resolved.

partyBlockInfos := blockInfos[types.PartyID(1)]
primaryBatcherId := partyBlockInfos[len(partyBlockInfos)-1].Primary()
primaryBatcher := armaNetwork.GeBatcher(t, primaryBatcherId, types.ShardID(1))
correctParties := []types.PartyID{}

This comment was marked as resolved.

}
}

// test that the router of party get stalled in the some point

This comment was marked as resolved.

totalTxSent += totalTxNumber

// 5.
// make sure clients of correct parties continue to get transactions (expect 2000 TXs).

This comment was marked as resolved.

txContent := prepareTx(i, 64, []byte("sessionNumber"))
err = broadcastClient.SendTx(txContent)
if err != nil {
require.ErrorContains(t, err, fmt.Sprintf("received error response from %s: INTERNAL_SERVER_ERROR", routerToStall.Listener.Addr().String())) // only such errors are permitted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this still happen? maybe check there is no error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it may happen since it takes time to renew faulty streams.

totalTxSent += totalTxNumber

// Pull from Assemblers
// make sure clients of all the parties get transactions (expect 3000 TXs).

This comment was marked as resolved.

@gengur gengur force-pushed the batcher_restart_recover branch 4 times, most recently from 713b820 to 7518f22 Compare August 8, 2025 18:06
require.Equal(t, uint64(blocks), totalBlocks)
// TODO: check that we get all of the submitted transactions
require.GreaterOrEqual(t, pullInfo.TotalTxs, uint64(transactions))
if blocks > 0 {

This comment was marked as resolved.


if shardID != types.ShardIDConsensus {
pID := PrimaryID{ShardID: shardID}
primaryIDBytes := block.GetMetadata().GetMetadata()[common.BlockMetadataIndex_ORDERER][0:2]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you using this and not the AssemblerBatchIdOrderingInfoAndTxCountFromBlock func?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need all the stuff that returns it

return errors.New("nil block header")
}

atomic.AddUint64(&totalBlocks, uint64(1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how come this needs to be atomic but there is no lock over the map?
I guess you can remove this atomic add?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need it's legacy

ShardID types.ShardID
}

func (p *PrimaryID) Digest() types.ShardID {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need, shardID is a comparable

}

func PullFromAssemblers(t *testing.T, userConfig *armageddon.UserConfig, parties []types.PartyID, startBlock uint64, endBlock uint64, transactions int, blocks int, errString string) {
type PrimaryID struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why create a type for this? just use shardID

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, shardID is comparable Fixed

gengur added 7 commits August 10, 2025 03:22
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
Signed-off-by: Genady Gurevich <genadyg@il.ibm.com>
@gengur gengur force-pushed the batcher_restart_recover branch from 7518f22 to 133fd3c Compare August 10, 2025 10:11
@HagarMeir HagarMeir merged commit 3cd436f into hyperledger:main Aug 10, 2025
5 checks passed
@gengur gengur deleted the batcher_restart_recover branch August 10, 2025 12:21
@HagarMeir
Copy link
Contributor

@gengur you still have a TODO for next PR to check that all transactions actually got in

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants