Skip to content

Commit 88eb49d

Browse files
committed
adjust buffering to improve stability; begin documentation; fix notifier
err handling
1 parent 2b51d34 commit 88eb49d

File tree

4 files changed

+62
-15
lines changed

4 files changed

+62
-15
lines changed

core/blockchain_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2227,8 +2227,8 @@ func BenchmarkBlockChain_1x1000ValueTransferToExisting(b *testing.B) {
22272227

22282228
func BenchmarkBlockChain_1x1000Executions(b *testing.B) {
22292229
var (
2230-
numTxs= 1000
2231-
numBlocks= 1
2230+
numTxs = 1000
2231+
numBlocks = 1
22322232
)
22332233
b.StopTimer()
22342234
b.ResetTimer()

statediff/api.go

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,30 @@ func (api *PublicStateDiffAPI) Stream(ctx context.Context) (*rpc.Subscription, e
5656

5757
go func() {
5858
// subscribe to events from the state diff service
59-
payloadChannel := make(chan Payload, 10)
60-
quitChan := make(chan bool)
59+
payloadChannel := make(chan Payload, chainEventChanSize)
60+
quitChan := make(chan bool, 1)
6161
api.sds.Subscribe(rpcSub.ID, payloadChannel, quitChan)
6262
// loop and await state diff payloads and relay them to the subscriber with the notifier
6363
for {
6464
select {
6565
case packet := <-payloadChannel:
66-
if err := notifier.Notify(rpcSub.ID, packet); err != nil {
67-
log.Error("Failed to send state diff packet", "err", err)
66+
if notifyErr := notifier.Notify(rpcSub.ID, packet); notifyErr != nil {
67+
log.Error("Failed to send state diff packet; error: " + notifyErr.Error())
68+
unSubErr := api.sds.Unsubscribe(rpcSub.ID)
69+
if unSubErr != nil {
70+
log.Error("Failed to unsubscribe from the state diff service; error: " + unSubErr.Error())
71+
}
72+
return
6873
}
6974
case err := <-rpcSub.Err():
70-
log.Error("State diff service rpcSub error", err)
71-
err = api.sds.Unsubscribe(rpcSub.ID)
7275
if err != nil {
73-
log.Error("Failed to unsubscribe from the state diff service", err)
76+
log.Error("State diff service rpcSub error: " + err.Error())
77+
err = api.sds.Unsubscribe(rpcSub.ID)
78+
if err != nil {
79+
log.Error("Failed to unsubscribe from the state diff service; error: " + err.Error())
80+
}
81+
return
7482
}
75-
return
7683
case <-quitChan:
7784
// don't need to unsubscribe, statediff service does so before sending the quit signal
7885
return

statediff/doc.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
// Copyright 2019 The go-ethereum Authors
2+
// This file is part of the go-ethereum library.
3+
//
4+
// The go-ethereum library is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Lesser General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
//
9+
// The go-ethereum library is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Lesser General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Lesser General Public License
15+
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
16+
17+
/*
18+
This work is adapted from work by Charles Crain at https://github.com/jpmorganchase/quorum/blob/9b7fd9af8082795eeeb6863d9746f12b82dd5078/statediff/statediff.go
19+
20+
Package statediff provides an auxiliary service that processes state diff objects from incoming chain events,
21+
relaying the objects to any rpc subscriptions.
22+
23+
Rpc subscriptions to the service can be created using the rpc.Client.Subscribe() method,
24+
with the "statediff" namespace, a statediff.Payload channel, and the name of the statediff api's rpc method- "stream".
25+
26+
e.g.
27+
28+
stateDiffPayloadChan := make(chan statediff.Payload, 20000)
29+
rpcSub, err := Subscribe(context.Background(), "statediff", stateDiffPayloadChan, "stream"})
30+
for {
31+
select {
32+
case stateDiffPayload := <- stateDiffPayloadChan:
33+
processPayload(stateDiffPayload)
34+
case err := <= rpcSub.Err():
35+
log.Error(err)
36+
}
37+
}
38+
39+
*/
40+
package statediff

statediff/service.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import (
3434
"github.com/ethereum/go-ethereum/rpc"
3535
)
3636

37+
const chainEventChanSize = 20000
38+
3739
type blockChain interface {
3840
SubscribeChainEvent(ch chan<- core.ChainEvent) event.Subscription
3941
GetBlockByHash(hash common.Hash) *types.Block
@@ -127,13 +129,11 @@ func (sds *Service) Loop(chainEventCh chan core.ChainEvent) {
127129
}
128130
sds.lastBlock = currentBlock
129131
if parentBlock == nil {
130-
log.Error("Parent block is nil, skipping this block",
131-
"parent block hash", parentHash.String(),
132-
"current block number", currentBlock.Number())
132+
log.Error(fmt.Sprintf("Parent block is nil, skipping this block (%d)", currentBlock.Number()))
133133
continue
134134
}
135135
if err := sds.processStateDiff(currentBlock, parentBlock); err != nil {
136-
log.Error("Error building statediff", "block number", currentBlock.Number(), "error", err)
136+
log.Error(fmt.Sprintf("Error building statediff for block %d; error: ", currentBlock.Number()) + err.Error())
137137
}
138138
case err := <-errCh:
139139
log.Warn("Error from chain event subscription, breaking loop", "error", err)
@@ -210,7 +210,7 @@ func (sds *Service) Unsubscribe(id rpc.ID) error {
210210
func (sds *Service) Start(*p2p.Server) error {
211211
log.Info("Starting statediff service")
212212

213-
chainEventCh := make(chan core.ChainEvent, 10)
213+
chainEventCh := make(chan core.ChainEvent, chainEventChanSize)
214214
go sds.Loop(chainEventCh)
215215

216216
return nil

0 commit comments

Comments
 (0)