Skip to content

Commit 369c7bf

Browse files
authored
remove naive batch (#243)
Signed-off-by: Yoav Tock <tock@il.ibm.com>
1 parent 28749f7 commit 369c7bf

File tree

2 files changed

+19
-172
lines changed

2 files changed

+19
-172
lines changed

node/assembler/assembler_role.go

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ SPDX-License-Identifier: Apache-2.0
77
package assembler
88

99
import (
10-
"encoding/hex"
1110
"errors"
1211
"sync"
1312
"time"
@@ -88,7 +87,6 @@ func (a *AssemblerRole) processOrderedBatchAttestations() {
8887
return
8988
}
9089

91-
t1 := time.Now()
9290
batch, err := a.collateAttestationWithBatch(oba.BatchAttestation())
9391
if err != nil {
9492
if errors.Is(err, utils.ErrOperationCancelled) {
@@ -97,7 +95,6 @@ func (a *AssemblerRole) processOrderedBatchAttestations() {
9795
}
9896
a.Logger.Panicf("Something went wrong while fetching the batch %v", oba.BatchAttestation())
9997
}
100-
a.Logger.Infof("Located batch for digest %s within %v", ShortDigestString(oba.BatchAttestation().Digest()), time.Since(t1))
10198
a.Ledger.Append(batch, oba.OrderingInfo())
10299
}
103100
a.Logger.Infof("Finished processing incoming OrderedBatchAttestations from consensus")
@@ -109,18 +106,6 @@ func (a *AssemblerRole) collateAttestationWithBatch(ba types.BatchAttestation) (
109106
if err != nil {
110107
return nil, err
111108
}
112-
a.Logger.Infof("Retrieved batch with %d requests for attestation %s from index within %v", len(batch.Requests()), ShortDigestString(ba.Digest()), time.Since(t1))
109+
a.Logger.Infof("Retrieved full batch with %d requests from index within %s, BatchID: %s", len(batch.Requests()), time.Since(t1), types.BatchIDToString(ba))
113110
return batch, nil
114111
}
115-
116-
// ShortDigestString provides a short string from a potentially long (32B) digest.
117-
func ShortDigestString(digest []byte) string {
118-
if digest == nil {
119-
return "nil"
120-
}
121-
if len(digest) <= 8 {
122-
return hex.EncodeToString(digest)
123-
}
124-
125-
return hex.EncodeToString(digest[:8])
126-
}

node/assembler/assembler_role_test.go

Lines changed: 18 additions & 156 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ package assembler_test
88

99
import (
1010
"encoding/binary"
11-
"encoding/hex"
12-
"encoding/json"
1311
"fmt"
1412
"sync"
1513
"sync/atomic"
@@ -18,6 +16,7 @@ import (
1816

1917
"github.com/hyperledger/fabric-x-orderer/common/types"
2018
"github.com/hyperledger/fabric-x-orderer/node/assembler"
19+
"github.com/hyperledger/fabric-x-orderer/node/consensus/state"
2120
"github.com/hyperledger/fabric-x-orderer/testutil"
2221
"github.com/stretchr/testify/assert"
2322
)
@@ -31,39 +30,13 @@ func (n naiveOrderedBatchAttestationReplicator) Replicate() <-chan types.Ordered
3130
type naiveReplication struct {
3231
subscribers []chan types.Batch
3332
i uint32
34-
stopped int32
3533
}
3634

3735
func (r *naiveReplication) Replicate(_ types.ShardID) <-chan types.Batch {
3836
j := atomic.AddUint32(&r.i, 1)
3937
return r.subscribers[j-1]
4038
}
4139

42-
func (r *naiveReplication) PullBatches(_ types.PartyID) <-chan types.Batch {
43-
j := atomic.AddUint32(&r.i, 1)
44-
return r.subscribers[j-1]
45-
}
46-
47-
func (r *naiveReplication) Stop() {
48-
atomic.StoreInt32(&r.stopped, 0x1)
49-
}
50-
51-
func (r *naiveReplication) Append(partyID types.PartyID, batchSeq types.BatchSequence, batchedRequests types.BatchedRequests) {
52-
for _, s := range r.subscribers {
53-
s <- types.NewSimpleBatch(0, partyID, batchSeq, batchedRequests, 0)
54-
}
55-
}
56-
57-
func (r *naiveReplication) Height(partyID types.PartyID) uint64 {
58-
// TODO use in test
59-
return 0
60-
}
61-
62-
func (r *naiveReplication) RetrieveBatchByNumber(partyID types.PartyID, seq uint64) types.Batch {
63-
// TODO use in test
64-
return nil
65-
}
66-
6740
type naiveIndex struct {
6841
sync.Map
6942
}
@@ -90,132 +63,21 @@ func (n *naiveIndex) PopOrWait(batchId types.BatchID) (types.Batch, error) {
9063

9164
func (n *naiveIndex) Stop() {}
9265

93-
type naiveBatchAttestation struct {
94-
primary types.PartyID
95-
seq types.BatchSequence
96-
node types.PartyID
97-
shard types.ShardID
98-
digest []byte
99-
}
100-
101-
func (nba *naiveBatchAttestation) Epoch() uint64 {
102-
return 0
103-
}
104-
105-
func (nba *naiveBatchAttestation) GarbageCollect() [][]byte {
106-
return nil
107-
}
108-
109-
func (nba *naiveBatchAttestation) Signer() types.PartyID {
110-
return nba.node
111-
}
112-
113-
func (nba *naiveBatchAttestation) Primary() types.PartyID {
114-
return nba.primary
115-
}
116-
117-
func (nba *naiveBatchAttestation) Seq() types.BatchSequence {
118-
return nba.seq
119-
}
120-
121-
func (nba *naiveBatchAttestation) Shard() types.ShardID {
122-
return nba.shard
123-
}
124-
125-
func (nba *naiveBatchAttestation) Digest() []byte {
126-
return nba.digest
127-
}
128-
129-
func (nba *naiveBatchAttestation) Fragments() []types.BatchAttestationFragment {
130-
return nil
131-
}
132-
133-
func (nba *naiveBatchAttestation) Serialize() []byte {
134-
m := make(map[string]interface{})
135-
m["seq"] = nba.seq
136-
m["node"] = nba.node
137-
m["shard"] = nba.shard
138-
m["digest"] = hex.EncodeToString(nba.digest)
139-
140-
bytes, err := json.Marshal(m)
141-
if err != nil {
142-
panic(err)
143-
}
144-
145-
return bytes
146-
}
147-
148-
func (nba *naiveBatchAttestation) Deserialize(bytes []byte) error {
149-
m := make(map[string]interface{})
150-
if err := json.Unmarshal(bytes, &m); err != nil {
151-
panic(err)
152-
}
153-
154-
node := m["node"]
155-
shard := m["shard"]
156-
seq := m["seq"]
157-
dig := m["digest"]
158-
159-
nba.node = types.PartyID(node.(float64))
160-
nba.shard = types.ShardID(shard.(float64))
161-
nba.seq = types.BatchSequence(seq.(float64))
162-
nba.digest, _ = hex.DecodeString(dig.(string))
163-
164-
return nil
165-
}
166-
167-
type naiveOrderingInfo struct {
168-
num uint64
169-
}
170-
171-
func (noi *naiveOrderingInfo) String() string {
172-
return fmt.Sprintf("BlockNumber: %d", noi.num)
173-
}
174-
175-
type naiveOrderedBatchAttestation struct {
176-
ba types.BatchAttestation
177-
orderingInfo types.OrderingInfo
178-
}
179-
180-
func (noba *naiveOrderedBatchAttestation) BatchAttestation() types.BatchAttestation {
181-
return noba.ba
182-
}
183-
184-
func (noba *naiveOrderedBatchAttestation) OrderingInfo() types.OrderingInfo {
185-
return noba.orderingInfo
186-
}
187-
18866
type naiveAssemblerLedger chan types.OrderedBatchAttestation
18967

19068
func (n naiveAssemblerLedger) Append(batch types.Batch, orderingInfo types.OrderingInfo) {
191-
noba := &naiveOrderedBatchAttestation{
192-
ba: &naiveBatchAttestation{
193-
primary: batch.Primary(),
194-
seq: batch.Seq(),
195-
node: batch.Primary(),
196-
shard: batch.Shard(),
197-
digest: batch.Digest(),
198-
},
199-
orderingInfo: orderingInfo,
69+
noba := &state.AvailableBatchOrdered{
70+
AvailableBatch: state.NewAvailableBatch(batch.Primary(), batch.Shard(), batch.Seq(), batch.Digest()),
71+
OrderingInformation: orderingInfo.(*state.OrderingInformation),
20072
}
73+
20174
n <- noba
20275
}
20376

20477
func (n naiveAssemblerLedger) Close() {
20578
close(n)
20679
}
20780

208-
func TestNaive(t *testing.T) {
209-
nba := &naiveBatchAttestation{}
210-
nba.seq = 100
211-
nba.node = 1
212-
nba.digest = []byte{1, 2, 3, 4, 5, 6, 7, 8}
213-
214-
nba2 := &naiveBatchAttestation{}
215-
nba2.Deserialize(nba.Serialize())
216-
assert.Equal(t, nba, nba2)
217-
}
218-
21981
func TestAssembler(t *testing.T) {
22082
shardCount := 4
22183
batchNum := 20
@@ -236,7 +98,7 @@ func TestAssembler(t *testing.T) {
23698
batches = append(batches, batchesForShard)
23799
}
238100

239-
r, ledger, nobar, assembler := createAssembler(t, shardCount)
101+
r, ledger, nobar, assembler := createAssemblerRole(t, shardCount)
240102

241103
assembler.Run()
242104

@@ -245,20 +107,15 @@ func TestAssembler(t *testing.T) {
245107
time.Sleep(time.Millisecond)
246108
}
247109

248-
totalOrder := make(chan *naiveBatchAttestation)
110+
totalOrder := make(chan *state.AvailableBatch)
249111

250112
for shardID := 0; shardID < shardCount; shardID++ {
251113
go func(shard int) {
252114
for _, batch := range batches[shard] {
253115

254116
r.subscribers[shard] <- batch
255117

256-
nba := &naiveBatchAttestation{
257-
primary: batch.Primary(),
258-
seq: batch.Seq(),
259-
shard: batch.Shard(),
260-
digest: batch.Digest(),
261-
}
118+
nba := state.NewAvailableBatch(batch.Primary(), batch.Shard(), batch.Seq(), batch.Digest())
262119
totalOrder <- nba
263120
}
264121
}(shardID)
@@ -268,9 +125,14 @@ func TestAssembler(t *testing.T) {
268125
var num uint64
269126

270127
for nba := range totalOrder {
271-
noba := &naiveOrderedBatchAttestation{
272-
ba: nba,
273-
orderingInfo: &naiveOrderingInfo{num: num},
128+
noba := &state.AvailableBatchOrdered{
129+
AvailableBatch: nba,
130+
OrderingInformation: &state.OrderingInformation{
131+
BlockHeader: &state.BlockHeader{Number: num, PrevHash: []byte{0x08}, Digest: []byte{0x09}},
132+
DecisionNum: types.DecisionNum(num),
133+
BatchIndex: 0,
134+
BatchCount: 1,
135+
},
274136
}
275137
nobar <- noba
276138
num++
@@ -279,14 +141,14 @@ func TestAssembler(t *testing.T) {
279141

280142
for i := uint64(0); i < uint64(batchNum*shardCount); i++ {
281143
noba := <-ledger
282-
assert.Equal(t, fmt.Sprintf("BlockNumber: %d", i), noba.OrderingInfo().String())
144+
assert.Equal(t, fmt.Sprintf("DecisionNum: %d, BatchIndex: 0, BatchCount: 1; No. Sigs: 0, BlockHeader: Number: %d, PrevHash: 08, Digest: 09", i, i), noba.OrderingInfo().String())
283145
delete(digests, string(noba.BatchAttestation().Digest()))
284146
}
285147

286148
assert.Len(t, digests, 0)
287149
}
288150

289-
func createAssembler(t *testing.T, shardCount int) (*naiveReplication, naiveAssemblerLedger, naiveOrderedBatchAttestationReplicator, *assembler.AssemblerRole) {
151+
func createAssemblerRole(t *testing.T, shardCount int) (*naiveReplication, naiveAssemblerLedger, naiveOrderedBatchAttestationReplicator, *assembler.AssemblerRole) {
290152
index := &naiveIndex{}
291153

292154
r := &naiveReplication{}

0 commit comments

Comments
 (0)