Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 11 additions & 10 deletions core/abc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (

arma_types "github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/core"
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
"github.com/hyperledger/fabric-x-orderer/node/router"
"github.com/hyperledger/fabric-x-orderer/testutil"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -47,7 +48,7 @@ type bafSender struct {

func (s *bafSender) SendBAF(baf arma_types.BatchAttestationFragment) {
ba := arma_types.NewSimpleBatchAttestationFragment(baf.Shard(), baf.Primary(), baf.Seq(), baf.Digest(), baf.Signer(), nil, 0, nil)
s.totalOrder.SubmitRequest((&core.ControlEvent{BAF: ba}).Bytes())
s.totalOrder.SubmitRequest((&state.ControlEvent{BAF: ba}).Bytes())
}

type naiveblock struct {
Expand Down Expand Up @@ -100,11 +101,11 @@ func (s *shardReplicator) Replicate(shard arma_types.ShardID) <-chan arma_types.
}

type stateProvider struct {
s *core.State
s *state.State
}

func (s *stateProvider) GetLatestStateChan() <-chan *core.State {
stateChan := make(chan *core.State, 1)
func (s *stateProvider) GetLatestStateChan() <-chan *state.State {
stateChan := make(chan *state.State, 1)
stateChan <- s.s
return stateChan
}
Expand Down Expand Up @@ -169,14 +170,14 @@ func TestAssemblerBatcherConsenter(t *testing.T) {

totalOrder := make(naiveTotalOrder, 1000)

initialState := &core.State{
initialState := &state.State{
Threshold: 1,
N: 1,
ShardCount: uint16(shardCount),
}

for shardID := uint16(0); shardID < initialState.ShardCount; shardID++ {
initialState.Shards = append(initialState.Shards, core.ShardTerm{Shard: arma_types.ShardID(shardID), Term: 1})
initialState.Shards = append(initialState.Shards, state.ShardTerm{Shard: arma_types.ShardID(shardID), Term: 1})
}

consenter := &core.Consenter{
Expand Down Expand Up @@ -218,9 +219,9 @@ func TestAssemblerBatcherConsenter(t *testing.T) {

var batchers []*core.Batcher

state := core.State{N: uint16(shardCount)}
s := state.State{N: uint16(shardCount)}
for shard := 0; shard < shardCount; shard++ {
state.Shards = append(state.Shards, core.ShardTerm{Shard: arma_types.ShardID(shard)})
s.Shards = append(s.Shards, state.ShardTerm{Shard: arma_types.ShardID(shard)})
}

var parties []arma_types.PartyID
Expand All @@ -236,7 +237,7 @@ func TestAssemblerBatcherConsenter(t *testing.T) {
batchers = append(batchers, batcher)
}

sp := &stateProvider{s: &state}
sp := &stateProvider{s: &s}

for i := 0; i < shardCount; i++ {
sc := &shardCommitter{
Expand All @@ -247,7 +248,7 @@ func TestAssemblerBatcherConsenter(t *testing.T) {
batchers[i].BatchAcker = acker
batchers[i].Ledger = sc
batchers[i].BatchPuller = nil
batchers[i].N = state.N
batchers[i].N = s.N
batchers[i].StateProvider = sp
batchers[i].ID = arma_types.PartyID(i)
batchers[i].Start()
Expand Down
7 changes: 4 additions & 3 deletions core/batcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"time"

"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
"github.com/pkg/errors"
)

Expand All @@ -41,7 +42,7 @@ type BatchPuller interface {

//go:generate counterfeiter -o mocks/state_provider.go . StateProvider
type StateProvider interface {
GetLatestStateChan() <-chan *State
GetLatestStateChan() <-chan *state.State
}

//go:generate counterfeiter -o mocks/complainer.go . Complainer
Expand Down Expand Up @@ -178,7 +179,7 @@ func (b *Batcher) Stop() {
b.running.Wait()
}

func (b *Batcher) getTerm(state *State) uint64 {
func (b *Batcher) getTerm(state *state.State) uint64 {
term := uint64(math.MaxUint64)
for _, shard := range state.Shards {
if shard.Shard == b.Shard {
Expand Down Expand Up @@ -215,7 +216,7 @@ func (b *Batcher) getTermAndNotifyChange() {
// and that tx is in the pool of other batchers but again not enough (less than f+1 batchers)
// to prevent a case where such a tx falls through the cracks, after a term change
// all batchers resubmit to their pools txs in batches with their BAFs still in pending state
func (b *Batcher) resubmitPendingBAFs(state *State, prevPrimary types.PartyID) {
func (b *Batcher) resubmitPendingBAFs(state *state.State, prevPrimary types.PartyID) {
for _, baf := range state.Pending {
if baf.Signer() == b.ID && baf.Primary() == prevPrimary {
b.Logger.Debugf("found pending BAF signed by me (id: %d) from prev primary: %d ; %s", b.ID, prevPrimary, baf.String())
Expand Down
61 changes: 31 additions & 30 deletions core/batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
arma_types "github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/core"
"github.com/hyperledger/fabric-x-orderer/core/mocks"
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
"github.com/hyperledger/fabric-x-orderer/testutil"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -141,7 +142,7 @@ func TestPrimaryChangeToSecondary(t *testing.T) {
batcher.Ledger = ledger

stateProvider := &mocks.FakeStateProvider{}
stateChan := make(chan *core.State)
stateChan := make(chan *state.State)
stateProvider.GetLatestStateChanReturns(stateChan)
batcher.StateProvider = stateProvider

Expand All @@ -158,8 +159,8 @@ func TestPrimaryChangeToSecondary(t *testing.T) {
return stateProvider.GetLatestStateChanCallCount() == 1
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 0,
Expand All @@ -173,8 +174,8 @@ func TestPrimaryChangeToSecondary(t *testing.T) {

require.NotZero(t, pool.NextRequestsCallCount())

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 1,
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestSecondaryChangeToPrimary(t *testing.T) {
batcher.Ledger = ledger

stateProvider := &mocks.FakeStateProvider{}
stateChan := make(chan *core.State)
stateChan := make(chan *state.State)
stateProvider.GetLatestStateChanReturns(stateChan)
batcher.StateProvider = stateProvider

Expand All @@ -240,8 +241,8 @@ func TestSecondaryChangeToPrimary(t *testing.T) {
return stateProvider.GetLatestStateChanCallCount() == 1
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 0,
Expand All @@ -256,8 +257,8 @@ func TestSecondaryChangeToPrimary(t *testing.T) {

require.Zero(t, pool.NextRequestsCallCount())

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 1,
Expand Down Expand Up @@ -319,7 +320,7 @@ func TestSecondaryChangeToSecondary(t *testing.T) {
batcher.MemPool = pool

stateProvider := &mocks.FakeStateProvider{}
stateChan := make(chan *core.State)
stateChan := make(chan *state.State)
stateProvider.GetLatestStateChanReturns(stateChan)
batcher.StateProvider = stateProvider

Expand All @@ -329,8 +330,8 @@ func TestSecondaryChangeToSecondary(t *testing.T) {
return stateProvider.GetLatestStateChanCallCount() == 1
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 0,
Expand All @@ -343,8 +344,8 @@ func TestSecondaryChangeToSecondary(t *testing.T) {
return ledger.AppendCallCount() == 1
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 1,
Expand Down Expand Up @@ -404,7 +405,7 @@ func TestPrimaryChangeToPrimary(t *testing.T) {
batcher.Ledger = ledger

stateProvider := &mocks.FakeStateProvider{}
stateChan := make(chan *core.State)
stateChan := make(chan *state.State)
stateProvider.GetLatestStateChanReturns(stateChan)
batcher.StateProvider = stateProvider

Expand All @@ -414,8 +415,8 @@ func TestPrimaryChangeToPrimary(t *testing.T) {
return stateProvider.GetLatestStateChanCallCount() == 1
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 0,
Expand All @@ -429,8 +430,8 @@ func TestPrimaryChangeToPrimary(t *testing.T) {

require.NotZero(t, pool.NextRequestsCallCount())

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 4,
Expand Down Expand Up @@ -513,7 +514,7 @@ func TestPrimaryWaitingAndTermChange(t *testing.T) {
batcher.Ledger = ledger

stateProvider := &mocks.FakeStateProvider{}
stateChan := make(chan *core.State)
stateChan := make(chan *state.State)
stateProvider.GetLatestStateChanReturns(stateChan)
batcher.StateProvider = stateProvider

Expand All @@ -526,8 +527,8 @@ func TestPrimaryWaitingAndTermChange(t *testing.T) {

batcher.Start()

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 0,
Expand All @@ -543,8 +544,8 @@ func TestPrimaryWaitingAndTermChange(t *testing.T) {
return pool.NextRequestsCallCount() == 10
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 1,
Expand Down Expand Up @@ -593,7 +594,7 @@ func TestResubmitPending(t *testing.T) {
batcher.Ledger = ledger

stateProvider := &mocks.FakeStateProvider{}
stateChan := make(chan *core.State)
stateChan := make(chan *state.State)
stateProvider.GetLatestStateChanReturns(stateChan)
batcher.StateProvider = stateProvider

Expand All @@ -610,8 +611,8 @@ func TestResubmitPending(t *testing.T) {
return stateProvider.GetLatestStateChanCallCount() == 1
}, 10*time.Second, 10*time.Millisecond)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 0,
Expand All @@ -634,8 +635,8 @@ func TestResubmitPending(t *testing.T) {
notMyBAF := arma_types.NewSimpleBatchAttestationFragment(batch.Shard(), batch.Primary(), batch.Seq(), batch.Digest(), arma_types.PartyID(batcherID+1), nil, 0, nil)
myBAFWithOtherPrimary := arma_types.NewSimpleBatchAttestationFragment(batch.Shard(), batch.Primary()+1, batch.Seq(), batch.Digest(), arma_types.PartyID(batcherID), nil, 0, nil)

stateChan <- &core.State{
Shards: []core.ShardTerm{
stateChan <- &state.State{
Shards: []state.ShardTerm{
{
Shard: 0,
Term: 1,
Expand Down
15 changes: 8 additions & 7 deletions core/consenter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"slices"

"github.com/hyperledger/fabric-x-orderer/common/types"
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
)

type TotalOrder interface {
Expand All @@ -26,17 +27,17 @@ type BatchAttestationDB interface {
type Consenter struct {
Logger types.Logger
DB BatchAttestationDB
BAFDeserializer BAFDeserializer
State *State
BAFDeserializer state.BAFDeserializer
State *state.State
}

func (c *Consenter) SimulateStateTransition(prevState *State, requests [][]byte) (*State, [][]types.BatchAttestationFragment) {
func (c *Consenter) SimulateStateTransition(prevState *state.State, requests [][]byte) (*state.State, [][]types.BatchAttestationFragment) {
controlEvents, err := requestsToControlEvents(requests, c.BAFDeserializer.Deserialize)
if err != nil {
panic(err)
}

filteredControlEvents := make([]ControlEvent, 0, len(controlEvents))
filteredControlEvents := make([]state.ControlEvent, 0, len(controlEvents))

// Iterate over all control events and prune those that already exist in our DB
for _, ce := range controlEvents {
Expand Down Expand Up @@ -131,10 +132,10 @@ func indexBAFs(batchAttestationFragments []types.BatchAttestationFragment) map[s
return index
}

func requestsToControlEvents(requests [][]byte, fragmentFromBytes func([]byte) (types.BatchAttestationFragment, error)) ([]ControlEvent, error) {
events := make([]ControlEvent, 0, len(requests))
func requestsToControlEvents(requests [][]byte, fragmentFromBytes func([]byte) (types.BatchAttestationFragment, error)) ([]state.ControlEvent, error) {
events := make([]state.ControlEvent, 0, len(requests))
for i := 0; i < len(requests); i++ {
ce := ControlEvent{}
ce := state.ControlEvent{}
if err := ce.FromBytes(requests[i], fragmentFromBytes); err != nil {
return nil, err
}
Expand Down
Loading