Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 27 additions & 9 deletions common/tools/armageddon/armageddon.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"sync"
"time"

"github.com/hyperledger/fabric-lib-go/common/flogging"

"github.com/hyperledger/fabric-x-orderer/config/protos"

"github.com/hyperledger/fabric-x-orderer/testutil/fabric"
Expand Down Expand Up @@ -86,6 +88,8 @@ func init() {
grpclog.SetLoggerV2(grpclog.NewLoggerV2(io.Discard, io.Discard, io.Discard))
}

var logger = flogging.MustGetLogger("armageddon")

type NetworkCryptoConfig struct {
// map from party to its crypto config
PartyToCryptoConfig map[types.PartyID]CryptoConfigPerParty
Expand Down Expand Up @@ -257,6 +261,8 @@ func (cli *CLI) Run(args []string) {
generate(cli.genConfigFile, cli.outputDir, cli.useTLS)
} else if *cli.version == 2 {
generateConfigAndCrypto(cli.genConfigFile, cli.outputDir, cli.sampleConfigPath)
logger.Infof("Configuration material was created successfully in %s", *cli.outputDir)

} else {
fmt.Fprintf(os.Stderr, "Invalid version: %d", *cli.version)
os.Exit(-1)
Expand Down Expand Up @@ -910,6 +916,8 @@ func submit(userConfigFile **os.File, transactions *int, rate *int, txSize *int)
keyValMap: make(map[string]bool),
mutex: sync.Mutex{},
}

logger.Infof("Submit starts.....")
var waitForTxToBeSentAndReceived sync.WaitGroup
waitForTxToBeSentAndReceived.Add(2)
go func() {
Expand All @@ -927,7 +935,7 @@ func submit(userConfigFile **os.File, transactions *int, rate *int, txSize *int)

waitForTxToBeSentAndReceived.Wait()
elapsed := time.Since(start)

logger.Infof("Submit Finished.....")
// report results
reportResults(*transactions, elapsed, txDelayTimes, numOfBlocks, *txSize)
}
Expand Down Expand Up @@ -977,7 +985,7 @@ func receive(userConfigFile **os.File, pullFromPartyId *int, receiveOutputDir *s

// pull blocks from the assembler and report statistics to statistics.csv file
pullBlocksFromAssemblerAndCollectStatistics(userConfig, *pullFromPartyId, *receiveOutputDir, *expectedNumOfTxs)
fmt.Printf("Receive command finished, statistics can be found in: %v\n", path.Join(*receiveOutputDir, "statistics.csv"))
logger.Infof("Receive command finished, statistics can be found in: %v\n", path.Join(*receiveOutputDir, "statistics.csv"))
}

func trimPortFromEndpoint(endpoint string) string {
Expand Down Expand Up @@ -1221,6 +1229,7 @@ func pullBlocksFromAssemblerAndCollectStatistics(userConfig *UserConfig, pullFro
// pull blocks from the assembler
// if a flag is given, stop when finish receiving all txs
go func() {
logger.Infof("starting pulling blocks from the assembler")
var txsTotal int
for {
block, err := pullBlock(stream, endpointToPullFrom, gRPCAssemblerClientConn)
Expand All @@ -1240,7 +1249,10 @@ func pullBlocksFromAssemblerAndCollectStatistics(userConfig *UserConfig, pullFro
blockChan <- blockWithTime
txsTotal += len(blockWithTime.block.Data.Data)

if expectedNumOfTxs > 0 && expectedNumOfTxs == txsTotal {
logger.Debugf("block with %d txs was pulled from the assembler, overall %d txs were received at this moment", len(blockWithTime.block.Data.Data), txsTotal)

if expectedNumOfTxs > 0 && expectedNumOfTxs <= txsTotal {
logger.Debugf("overall %d txs were received, finished pulling", txsTotal)
waitToFinish.Done()
return
}
Expand All @@ -1262,6 +1274,7 @@ func pullBlocksFromAssemblerAndCollectStatistics(userConfig *UserConfig, pullFro
// iterate over txs in block
for j := 0; j < txs; j++ {
env, err := protoutil.GetEnvelopeFromBlock(blockWithTime.block.Data.Data[j])
logger.Debugf("tx %x was received from the assembler", env.Payload)
if err != nil {
fmt.Fprintf(os.Stderr, "failed to get envelope from block: %v", err)
os.Exit(3)
Expand All @@ -1274,7 +1287,8 @@ func pullBlocksFromAssemblerAndCollectStatistics(userConfig *UserConfig, pullFro
}
statisticsAggregator.Add(txs, 1, sumOfDelayTimes, sumOfTxsSize)

if expectedNumOfTxs > 0 && expectedNumOfTxs == txsTotal {
if expectedNumOfTxs > 0 && expectedNumOfTxs <= txsTotal {
logger.Infof("%d txs were expected and overall %d were successfully received", expectedNumOfTxs, txsTotal)
close(stopChan)
waitToFinish.Done()
return
Expand All @@ -1283,6 +1297,7 @@ func pullBlocksFromAssemblerAndCollectStatistics(userConfig *UserConfig, pullFro
}()

waitToFinish.Wait()
logger.Debugf("exit pulling blocks from the assembler")
}

func calculateDelayOfTx(env *common.Envelope, acceptedTime time.Time) time.Duration {
Expand Down Expand Up @@ -1322,7 +1337,8 @@ func pullBlock(stream ab.AtomicBroadcast_DeliverClient, endpointToPullFrom strin
func sendTx(txsMap *protectedMap, streams []ab.AtomicBroadcast_BroadcastClient, i int, txSize int, sessionNumber []byte) {
payload := prepareTx(i, txSize, sessionNumber)
if txsMap != nil {
txsMap.Add(string(payload[:32]))
logger.Debugf("Add tx %x to the map", payload)
txsMap.Add(string(payload))
}
for j := 0; j < len(streams); j++ {
err := streams[j].Send(&common.Envelope{Payload: payload})
Expand Down Expand Up @@ -1356,17 +1372,18 @@ func reportResults(transactions int, elapsed time.Duration, txDelayTimesResult f
avgTxDelay := txDelayTimesResult / float64(transactions)
avgBlockRate := float64(numOfBlocksResult) / elapsed.Seconds()
avgBlockSize := transactions / numOfBlocksResult
fmt.Printf("SUCCESS: number of txs: %d, tx size: %d bytes, elapsed time: %v, avg. tx rate: %.2f, avg. tx delay: %vs, num of blocks: %d, avg. block rate: %v, avg. block size: %v txs\n", transactions, txSize, elapsed, avgTxRate, avgTxDelay, numOfBlocksResult, avgBlockRate, avgBlockSize)
logger.Infof("SUCCESS: number of txs: %d, tx size: %d bytes, elapsed time: %v, avg. tx rate: %.2f, avg. tx delay: %vs, num of blocks: %d, avg. block rate: %v, avg. block size: %v txs\n", transactions, txSize, elapsed, avgTxRate, avgTxDelay, numOfBlocksResult, avgBlockRate, avgBlockSize)
}

func reportLoadResults(transactions int, elapsed time.Duration, txSize int) {
avgTxSendingRate := float64(transactions) / elapsed.Seconds()
fmt.Printf("Load command finished, sent %d TXs in %v seconds, TX size %d, avg. tx sending rate: %.2f\n", transactions, elapsed, txSize, avgTxSendingRate)
logger.Infof("Load command finished, sent %d TXs in %v seconds, TX size %d, avg. tx sending rate: %.2f\n", transactions, elapsed, txSize, avgTxSendingRate)
}

// manageStatistics manages a statistics queue and every hour writes the queue to a CSV file
func manageStatistics(receiveOutputDir string, statisticChan <-chan Statistics, stopChan <-chan bool, startTime float64, expectedTxs int, pullFrom int, timeIntervalToSampleStat time.Duration) {
filePath := path.Join(receiveOutputDir, "statistics.csv")
logger.Infof("Statistics are written to: %v\n", filePath)
fmt.Printf("Statistics are written to: %v\n", filePath)
file, err := os.OpenFile(filePath, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0o644)
if err != nil {
Expand Down Expand Up @@ -1443,7 +1460,7 @@ func writeStatisticsToCSV(file *os.File, statistic Statistics, timeIntervalToSam
fmt.Sprintf("%d", avgNumOfTxsInBlock),
})
if err != nil {
fmt.Printf("failed to write to CSV: %v", err)
logger.Errorf("failed to write to CSV: %v", err)
}
}

Expand Down Expand Up @@ -1548,6 +1565,7 @@ func receiveResponseFromAssembler(userConfig *UserConfig, txsMap *protectedMap,

// 2. delete the tx from the map
if txsMap != nil {
logger.Debugf("remove tx %x from the map", env.Payload)
txsMap.Remove(string(env.Payload))
}
}
Expand All @@ -1558,7 +1576,7 @@ func receiveResponseFromAssembler(userConfig *UserConfig, txsMap *protectedMap,
continue
}

if (txsMap != nil && txsMap.IsEmpty()) || numOfTxsCalculated == expectedNumOfTxs {
if (txsMap != nil && txsMap.IsEmpty()) && numOfTxsCalculated >= expectedNumOfTxs {
break
}
}
Expand Down