Skip to content

Commit fd8372a

Browse files
committed
Added optional timestamps to chaincode events by retrieving from Fabric
Signed-off-by: Jim Zhang <[email protected]>
1 parent c52b369 commit fd8372a

File tree

11 files changed

+168
-30
lines changed

11 files changed

+168
-30
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ require (
1111
github.com/google/certificate-transparency-go v1.1.1 // indirect
1212
github.com/google/uuid v1.3.0 // indirect
1313
github.com/gorilla/websocket v1.4.2
14+
github.com/hashicorp/golang-lru v0.5.4
1415
github.com/hyperledger/fabric-config v0.0.7 // indirect
1516
github.com/hyperledger/fabric-protos-go v0.0.0-20201028172056-a3136dde2354
1617
github.com/hyperledger/fabric-sdk-go v1.0.1-0.20210729165856-3be4ed253dcf

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ github.com/hashicorp/go-version v1.2.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09
334334
github.com/hashicorp/go.net v0.0.1/go.mod h1:hjKkEWcCURg++eb33jQU7oqQcI9XDCnUzHA0oac0k90=
335335
github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
336336
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
337+
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
338+
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
337339
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
338340
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
339341
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=

internal/events/eventstream.go

Lines changed: 30 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
3131
"github.com/hyperledger/firefly-fabconnect/internal/ws"
3232

33+
lru "github.com/hashicorp/golang-lru"
3334
log "github.com/sirupsen/logrus"
3435
)
3536

@@ -73,6 +74,7 @@ type StreamInfo struct {
7374
Webhook *webhookActionInfo `json:"webhook,omitempty"`
7475
WebSocket *webSocketActionInfo `json:"websocket,omitempty"`
7576
Timestamps bool `json:"timestamps,omitempty"` // Include block timestamps in the events generated
77+
TimestampCacheSize int `json:"timestampCacheSize,omitempty"`
7678
}
7779

7880
type webhookActionInfo struct {
@@ -91,26 +93,27 @@ type webSocketActionInfo struct {
9193
type eventHandler func(*eventData)
9294

9395
type eventStream struct {
94-
sm subscriptionManager
95-
allowPrivateIPs bool
96-
spec *StreamInfo
97-
eventStream chan *eventData
98-
eventHandler eventHandler
99-
stopped bool
100-
processorDone bool
101-
pollingInterval time.Duration
102-
pollerDone bool
103-
inFlight uint64
104-
batchCond *sync.Cond
105-
batchQueue *list.List
106-
batchCount uint64
107-
initialRetryDelay time.Duration
108-
backoffFactor float64
109-
updateInProgress bool
110-
updateInterrupt chan struct{} // a zero-sized struct used only for signaling (hand rolled alternative to context)
111-
updateWG *sync.WaitGroup // Wait group for the go routines to reply back after they have stopped
112-
action eventStreamAction
113-
wsChannels ws.WebSocketChannels
96+
sm subscriptionManager
97+
allowPrivateIPs bool
98+
spec *StreamInfo
99+
eventStream chan *eventData
100+
eventHandler eventHandler
101+
stopped bool
102+
processorDone bool
103+
pollingInterval time.Duration
104+
pollerDone bool
105+
inFlight uint64
106+
batchCond *sync.Cond
107+
batchQueue *list.List
108+
batchCount uint64
109+
initialRetryDelay time.Duration
110+
backoffFactor float64
111+
updateInProgress bool
112+
updateInterrupt chan struct{} // a zero-sized struct used only for signaling (hand rolled alternative to context)
113+
updateWG *sync.WaitGroup // Wait group for the go routines to reply back after they have stopped
114+
action eventStreamAction
115+
wsChannels ws.WebSocketChannels
116+
blockTimestampCache *lru.Cache
114117
}
115118

116119
type eventStreamAction interface {
@@ -149,6 +152,9 @@ func newEventStream(sm subscriptionManager, spec *StreamInfo, wsChannels ws.WebS
149152
} else {
150153
spec.ErrorHandling = ErrorHandlingSkip
151154
}
155+
if spec.TimestampCacheSize == 0 {
156+
spec.TimestampCacheSize = DefaultTimestampCacheSize
157+
}
152158

153159
a = &eventStream{
154160
sm: sm,
@@ -164,6 +170,10 @@ func newEventStream(sm subscriptionManager, spec *StreamInfo, wsChannels ws.WebS
164170
}
165171
a.eventHandler = a.handleEvent
166172

173+
if a.blockTimestampCache, err = lru.New(spec.TimestampCacheSize); err != nil {
174+
return nil, errors.Errorf(errors.EventStreamsCreateStreamResourceErr, err)
175+
}
176+
167177
if a.pollingInterval == 0 {
168178
// Let's us do this from UTs, without exposing it
169179
a.pollingInterval = 10 * time.Millisecond

internal/events/eventstream_test.go

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func TestProcessEventsEnd2EndWebhook(t *testing.T) {
380380
&StreamInfo{
381381
BatchSize: 1,
382382
Webhook: &webhookActionInfo{},
383-
Timestamps: false,
383+
Timestamps: true,
384384
}, db, 200)
385385
defer svr.Close()
386386

@@ -392,9 +392,15 @@ func TestProcessEventsEnd2EndWebhook(t *testing.T) {
392392
wg := &sync.WaitGroup{}
393393
wg.Add(1)
394394
go func() {
395+
// the block event
395396
e1s := <-eventStream
396397
assert.Equal(1, len(e1s))
397398
assert.Equal(uint64(11), e1s[0].BlockNumber)
399+
// the chaincode event
400+
e2s := <-eventStream
401+
assert.Equal(1, len(e2s))
402+
assert.Equal(uint64(10), e2s[0].BlockNumber)
403+
assert.Equal(int64(1000000), e2s[0].Timestamp)
398404
wg.Done()
399405
}()
400406
wg.Wait()
@@ -417,7 +423,7 @@ func TestProcessEventsEnd2EndCatchupWebhook(t *testing.T) {
417423
_ = db.Init()
418424
sm, stream, svr, eventStream := newTestStreamForBatching(
419425
&StreamInfo{
420-
BatchSize: 1,
426+
BatchSize: 2,
421427
Webhook: &webhookActionInfo{},
422428
Timestamps: false,
423429
}, db, 200)
@@ -432,11 +438,9 @@ func TestProcessEventsEnd2EndCatchupWebhook(t *testing.T) {
432438
wg.Add(1)
433439
go func() {
434440
e1s := <-eventStream
435-
assert.Equal(1, len(e1s))
441+
assert.Equal(2, len(e1s))
436442
assert.Equal(uint64(1), e1s[0].BlockNumber)
437-
e2s := <-eventStream
438-
assert.Equal(1, len(e2s))
439-
assert.Equal(uint64(11), e2s[0].BlockNumber)
443+
assert.Equal(uint64(11), e1s[1].BlockNumber)
440444
wg.Done()
441445
}()
442446
wg.Wait()
@@ -516,6 +520,10 @@ func TestProcessEventsEnd2EndWithReset(t *testing.T) {
516520
e1s := <-eventStream
517521
assert.Equal(1, len(e1s))
518522
assert.Equal(uint64(11), e1s[0].BlockNumber)
523+
// the chaincode event
524+
e2s := <-eventStream
525+
assert.Equal(1, len(e2s))
526+
assert.Equal(uint64(10), e2s[0].BlockNumber)
519527
wg.Done()
520528
}()
521529
wg.Wait()
@@ -625,7 +633,7 @@ func TestPauseResumeAfterCheckpoint(t *testing.T) {
625633
wg := &sync.WaitGroup{}
626634
wg.Add(1)
627635
go func() {
628-
for i := 0; i < 1; i++ {
636+
for i := 0; i < 2; i++ {
629637
<-eventStream
630638
}
631639
wg.Done()
@@ -691,7 +699,7 @@ func TestPauseResumeBeforeCheckpoint(t *testing.T) {
691699
wg := &sync.WaitGroup{}
692700
wg.Add(1)
693701
go func() {
694-
for i := 0; i < 1; i++ {
702+
for i := 0; i < 2; i++ {
695703
<-eventStream
696704
}
697705
wg.Done()
@@ -731,7 +739,7 @@ func TestMarkStaleOnError(t *testing.T) {
731739
wg := &sync.WaitGroup{}
732740
wg.Add(1)
733741
go func() {
734-
for i := 0; i < 1; i++ {
742+
for i := 0; i < 2; i++ {
735743
<-eventStream
736744
}
737745
wg.Done()
@@ -809,7 +817,7 @@ func TestStoreCheckpointStoreError(t *testing.T) {
809817
wg := &sync.WaitGroup{}
810818
wg.Add(1)
811819
go func() {
812-
for i := 0; i < 1; i++ {
820+
for i := 0; i < 2; i++ {
813821
<-eventStream
814822
}
815823
wg.Done()
@@ -1079,6 +1087,7 @@ func TestUpdateStreamMissingWebhookURL(t *testing.T) {
10791087
wg := &sync.WaitGroup{}
10801088
wg.Add(1)
10811089
go func() {
1090+
<-eventStream
10821091
<-eventStream
10831092
wg.Done()
10841093
}()
@@ -1122,6 +1131,7 @@ func TestUpdateStreamInvalidWebhookURL(t *testing.T) {
11221131
wg := &sync.WaitGroup{}
11231132
wg.Add(1)
11241133
go func() {
1134+
<-eventStream
11251135
<-eventStream
11261136
wg.Done()
11271137
}()

internal/events/subscription.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,13 +140,35 @@ func (s *subscription) processNewEvents() {
140140
EventName: ccEvent.EventName,
141141
Payload: ccEvent.Payload,
142142
}
143+
if s.ep.stream.spec.Timestamps {
144+
s.getEventTimestamp(event)
145+
}
143146
if err := s.ep.processEventEntry(s.info, event); err != nil {
144147
log.Errorf("Failed to process event: %s", err)
145148
}
146149
}
147150
}
148151
}
149152

153+
func (s *subscription) getEventTimestamp(evt *eventsapi.EventEntry) {
154+
// the key in the cache is the block number represented as a string
155+
blockNumber := strconv.FormatUint(evt.BlockNumber, 10)
156+
if ts, ok := s.ep.stream.blockTimestampCache.Get(blockNumber); ok {
157+
// we found the timestamp for the block in our local cache, assert it's type and return, no need to query the chain
158+
evt.Timestamp = ts.(int64)
159+
return
160+
}
161+
// we didn't find the timestamp in our cache, query the node for the block header where we can find the timestamp
162+
_, block, err := s.client.QueryBlock(s.info.ChannelId, evt.BlockNumber, s.info.Signer)
163+
if err != nil {
164+
log.Errorf("Unable to retrieve block[%s] timestamp: %s", blockNumber, err)
165+
evt.Timestamp = 0 // set to 0, we were not able to retrieve the timestamp.
166+
return
167+
}
168+
evt.Timestamp = block.Timestamp
169+
s.ep.stream.blockTimestampCache.Add(blockNumber, evt.Timestamp)
170+
}
171+
150172
func (s *subscription) unsubscribe(deleting bool) {
151173
log.Infof("%s: Unsubscribing existing filter (deleting=%t)", s.info.ID, deleting)
152174
s.deleting = deleting

internal/events/test_helper.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/hyperledger/fabric-protos-go/peer"
3030
"github.com/hyperledger/firefly-fabconnect/internal/conf"
3131
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
32+
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
3233
"github.com/hyperledger/firefly-fabconnect/internal/kvstore"
3334
mockfabric "github.com/hyperledger/firefly-fabconnect/mocks/fabric/client"
3435
mockkvstore "github.com/hyperledger/firefly-fabconnect/mocks/kvstore"
@@ -190,8 +191,18 @@ func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient {
190191
Height: 10,
191192
},
192193
}
194+
rawBlock := &utils.RawBlock{
195+
Header: &common.BlockHeader{
196+
Number: uint64(20),
197+
},
198+
}
199+
block := &utils.Block{
200+
Number: uint64(20),
201+
Timestamp: int64(1000000),
202+
}
193203
rpc.On("SubscribeEvent", mock.Anything, mock.Anything).Return(nil, roBlockEventChan, roCCEventChan, nil)
194204
rpc.On("QueryChainInfo", mock.Anything, mock.Anything).Return(res, nil)
205+
rpc.On("QueryBlock", mock.Anything, mock.Anything, mock.Anything).Return(rawBlock, block, nil)
195206
rpc.On("Unregister", mock.Anything).Return()
196207

197208
go func() {
@@ -203,6 +214,11 @@ func mockRPCClient(fromBlock string, withReset ...bool) *mockfabric.RPCClient {
203214
blockEventChan <- &fab.BlockEvent{
204215
Block: constructBlock(11),
205216
}
217+
ccEventChan <- &fab.CCEvent{
218+
BlockNumber: uint64(10),
219+
TxID: "3144a3ad43dcc11374832bbb71561320de81fd80d69cc8e26a9ea7d3240a5e84",
220+
ChaincodeID: "asset_transfer",
221+
}
206222
if len(withReset) > 0 {
207223
blockEventChan <- &fab.BlockEvent{
208224
Block: constructBlock(11),

internal/fabric/client/api.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import (
2222
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/fab"
2323
"github.com/hyperledger/fabric-sdk-go/pkg/common/providers/msp"
2424
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
25+
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
2526
)
2627

2728
type ChaincodeSpec struct {
@@ -54,6 +55,7 @@ type RPCClient interface {
5455
Invoke(channelId, signer, chaincodeName, method string, args []string, isInit bool) (*TxReceipt, error)
5556
Query(channelId, signer, chaincodeName, method string, args []string) ([]byte, error)
5657
QueryChainInfo(channelId, signer string) (*fab.BlockchainInfoResponse, error)
58+
QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error)
5759
QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error)
5860
SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error)
5961
Unregister(*RegistrationWrapper)

internal/fabric/client/client_ccp.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
mspImpl "github.com/hyperledger/fabric-sdk-go/pkg/msp"
3030
"github.com/hyperledger/firefly-fabconnect/internal/errors"
3131
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
32+
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
3233
log "github.com/sirupsen/logrus"
3334
)
3435

@@ -148,6 +149,19 @@ func (w *ccpRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.Blockchai
148149
return result, nil
149150
}
150151

152+
func (w *ccpRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) {
153+
log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber)
154+
155+
rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer)
156+
if err != nil {
157+
log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err)
158+
return nil, nil, err
159+
}
160+
161+
log.Tracef("RPC [%s] <-- success", channelId)
162+
return rawblock, block, nil
163+
}
164+
151165
// The returned registration must be closed when done
152166
func (w *ccpRPCWrapper) SubscribeEvent(subInfo *eventsapi.SubscriptionInfo, since uint64) (*RegistrationWrapper, <-chan *fab.BlockEvent, <-chan *fab.CCEvent, error) {
153167
reg, blockEventCh, ccEventCh, err := w.eventClientWrapper.subscribeEvent(subInfo, since)

internal/fabric/client/client_gateway_clientside.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/hyperledger/fabric-sdk-go/pkg/gateway"
2626
"github.com/hyperledger/firefly-fabconnect/internal/errors"
2727
eventsapi "github.com/hyperledger/firefly-fabconnect/internal/events/api"
28+
"github.com/hyperledger/firefly-fabconnect/internal/fabric/utils"
2829
log "github.com/sirupsen/logrus"
2930
)
3031

@@ -110,6 +111,19 @@ func (w *gwRPCWrapper) QueryChainInfo(channelId, signer string) (*fab.Blockchain
110111
return result, nil
111112
}
112113

114+
func (w *gwRPCWrapper) QueryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) {
115+
log.Tracef("RPC [%s] --> QueryBlock %v", channelId, blockNumber)
116+
117+
rawblock, block, err := w.ledgerClientWrapper.queryBlock(channelId, blockNumber, signer)
118+
if err != nil {
119+
log.Errorf("Failed to query block %v on channel %s. %s", blockNumber, channelId, err)
120+
return nil, nil, err
121+
}
122+
123+
log.Tracef("RPC [%s] <-- success", channelId)
124+
return rawblock, block, nil
125+
}
126+
113127
func (w *gwRPCWrapper) QueryTransaction(channelId, signer, txId string) (map[string]interface{}, error) {
114128
log.Tracef("RPC [%s] --> QueryTransaction %s", channelId, txId)
115129

internal/fabric/client/ledger.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,19 @@ func (l *ledgerClientWrapper) queryChainInfo(channelId, signer string) (*fab.Blo
6060
return result, nil
6161
}
6262

63+
func (l *ledgerClientWrapper) queryBlock(channelId string, blockNumber uint64, signer string) (*utils.RawBlock, *utils.Block, error) {
64+
client, err := l.getLedgerClient(channelId, signer)
65+
if err != nil {
66+
return nil, nil, errors.Errorf("Failed to get channel client. %s", err)
67+
}
68+
result, err := client.QueryBlock(blockNumber)
69+
if err != nil {
70+
return nil, nil, err
71+
}
72+
rawblock, block, err := utils.DecodeBlock(result)
73+
return rawblock, block, err
74+
}
75+
6376
func (l *ledgerClientWrapper) queryTransaction(channelId, signer, txId string) (map[string]interface{}, error) {
6477
client, err := l.getLedgerClient(channelId, signer)
6578
if err != nil {

0 commit comments

Comments
 (0)