Skip to content

Commit f1b00cf

Browse files
rjl493456442karalabe
authored andcommitted
core: re-omit new log event when logs rebirth
1 parent 442320a commit f1b00cf

File tree

2 files changed

+224
-9
lines changed

2 files changed

+224
-9
lines changed

core/blockchain.go

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1401,21 +1401,25 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
14011401
commonBlock *types.Block
14021402
deletedTxs types.Transactions
14031403
deletedLogs []*types.Log
1404+
rebirthLogs []*types.Log
14041405
// collectLogs collects the logs that were generated during the
14051406
// processing of the block that corresponds with the given hash.
1406-
// These logs are later announced as deleted.
1407-
collectLogs = func(hash common.Hash) {
1408-
// Coalesce logs and set 'Removed'.
1407+
// These logs are later announced as deleted or reborn
1408+
collectLogs = func(hash common.Hash, removed bool) {
14091409
number := bc.hc.GetBlockNumber(hash)
14101410
if number == nil {
14111411
return
14121412
}
14131413
receipts := rawdb.ReadReceipts(bc.db, hash, *number)
14141414
for _, receipt := range receipts {
14151415
for _, log := range receipt.Logs {
1416-
del := *log
1417-
del.Removed = true
1418-
deletedLogs = append(deletedLogs, &del)
1416+
l := *log
1417+
if removed {
1418+
l.Removed = true
1419+
deletedLogs = append(deletedLogs, &l)
1420+
} else {
1421+
rebirthLogs = append(rebirthLogs, &l)
1422+
}
14191423
}
14201424
}
14211425
}
@@ -1428,7 +1432,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
14281432
oldChain = append(oldChain, oldBlock)
14291433
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
14301434

1431-
collectLogs(oldBlock.Hash())
1435+
collectLogs(oldBlock.Hash(), true)
14321436
}
14331437
} else {
14341438
// reduce new chain and append new chain blocks for inserting later on
@@ -1452,7 +1456,7 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
14521456
oldChain = append(oldChain, oldBlock)
14531457
newChain = append(newChain, newBlock)
14541458
deletedTxs = append(deletedTxs, oldBlock.Transactions()...)
1455-
collectLogs(oldBlock.Hash())
1459+
collectLogs(oldBlock.Hash(), true)
14561460

14571461
oldBlock, newBlock = bc.GetBlock(oldBlock.ParentHash(), oldBlock.NumberU64()-1), bc.GetBlock(newBlock.ParentHash(), newBlock.NumberU64()-1)
14581462
if oldBlock == nil {
@@ -1478,6 +1482,10 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
14781482
for i := len(newChain) - 1; i >= 0; i-- {
14791483
// insert the block in the canonical way, re-writing history
14801484
bc.insert(newChain[i])
1485+
// collect reborn logs due to chain reorg(except head block)
1486+
if i != 0 {
1487+
collectLogs(newChain[i].Hash(), false)
1488+
}
14811489
// write lookup entries for hash based transaction/receipt searches
14821490
rawdb.WriteTxLookupEntries(bc.db, newChain[i])
14831491
addedTxs = append(addedTxs, newChain[i].Transactions()...)
@@ -1495,6 +1503,9 @@ func (bc *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
14951503
if len(deletedLogs) > 0 {
14961504
go bc.rmLogsFeed.Send(RemovedLogsEvent{deletedLogs})
14971505
}
1506+
if len(rebirthLogs) > 0 {
1507+
go bc.logsFeed.Send(rebirthLogs)
1508+
}
14981509
if len(oldChain) > 0 {
14991510
go func() {
15001511
for _, block := range oldChain {

core/blockchain_test.go

Lines changed: 205 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -884,7 +884,6 @@ func TestChainTxReorgs(t *testing.T) {
884884
}
885885

886886
func TestLogReorgs(t *testing.T) {
887-
888887
var (
889888
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
890889
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
@@ -930,6 +929,211 @@ func TestLogReorgs(t *testing.T) {
930929
}
931930
}
932931

932+
func TestLogRebirth(t *testing.T) {
933+
var (
934+
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
935+
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
936+
db = ethdb.NewMemDatabase()
937+
// this code generates a log
938+
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
939+
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
940+
genesis = gspec.MustCommit(db)
941+
signer = types.NewEIP155Signer(gspec.Config.ChainID)
942+
newLogCh = make(chan bool)
943+
)
944+
945+
// listenNewLog checks whether the received logs number is equal with expected.
946+
listenNewLog := func(sink chan []*types.Log, expect int) {
947+
cnt := 0
948+
for {
949+
select {
950+
case logs := <-sink:
951+
cnt += len(logs)
952+
case <-time.NewTimer(5 * time.Second).C:
953+
// new logs timeout
954+
newLogCh <- false
955+
return
956+
}
957+
if cnt == expect {
958+
break
959+
} else if cnt > expect {
960+
// redundant logs received
961+
newLogCh <- false
962+
return
963+
}
964+
}
965+
select {
966+
case <-sink:
967+
// redundant logs received
968+
newLogCh <- false
969+
case <-time.NewTimer(100 * time.Millisecond).C:
970+
newLogCh <- true
971+
}
972+
}
973+
974+
blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
975+
defer blockchain.Stop()
976+
977+
logsCh := make(chan []*types.Log)
978+
blockchain.SubscribeLogsEvent(logsCh)
979+
980+
rmLogsCh := make(chan RemovedLogsEvent)
981+
blockchain.SubscribeRemovedLogsEvent(rmLogsCh)
982+
983+
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
984+
if i == 1 {
985+
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
986+
if err != nil {
987+
t.Fatalf("failed to create tx: %v", err)
988+
}
989+
gen.AddTx(tx)
990+
}
991+
})
992+
993+
// Spawn a goroutine to receive log events
994+
go listenNewLog(logsCh, 1)
995+
if _, err := blockchain.InsertChain(chain); err != nil {
996+
t.Fatalf("failed to insert chain: %v", err)
997+
}
998+
if !<-newLogCh {
999+
t.Fatalf("failed to receive new log event")
1000+
}
1001+
1002+
// Generate long reorg chain
1003+
forkChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
1004+
if i == 1 {
1005+
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
1006+
if err != nil {
1007+
t.Fatalf("failed to create tx: %v", err)
1008+
}
1009+
gen.AddTx(tx)
1010+
// Higher block difficulty
1011+
gen.OffsetTime(-9)
1012+
}
1013+
})
1014+
1015+
// Spawn a goroutine to receive log events
1016+
go listenNewLog(logsCh, 1)
1017+
if _, err := blockchain.InsertChain(forkChain); err != nil {
1018+
t.Fatalf("failed to insert forked chain: %v", err)
1019+
}
1020+
if !<-newLogCh {
1021+
t.Fatalf("failed to receive new log event")
1022+
}
1023+
// Ensure removedLog events received
1024+
select {
1025+
case ev := <-rmLogsCh:
1026+
if len(ev.Logs) == 0 {
1027+
t.Error("expected logs")
1028+
}
1029+
case <-time.NewTimer(1 * time.Second).C:
1030+
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
1031+
}
1032+
1033+
newBlocks, _ := GenerateChain(params.TestChainConfig, chain[len(chain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
1034+
go listenNewLog(logsCh, 1)
1035+
if _, err := blockchain.InsertChain(newBlocks); err != nil {
1036+
t.Fatalf("failed to insert forked chain: %v", err)
1037+
}
1038+
// Rebirth logs should omit a newLogEvent
1039+
if !<-newLogCh {
1040+
t.Fatalf("failed to receive new log event")
1041+
}
1042+
// Ensure removedLog events received
1043+
select {
1044+
case ev := <-rmLogsCh:
1045+
if len(ev.Logs) == 0 {
1046+
t.Error("expected logs")
1047+
}
1048+
case <-time.NewTimer(1 * time.Second).C:
1049+
t.Fatal("Timeout. There is no RemovedLogsEvent has been sent.")
1050+
}
1051+
}
1052+
1053+
func TestSideLogRebirth(t *testing.T) {
1054+
var (
1055+
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
1056+
addr1 = crypto.PubkeyToAddress(key1.PublicKey)
1057+
db = ethdb.NewMemDatabase()
1058+
// this code generates a log
1059+
code = common.Hex2Bytes("60606040525b7f24ec1d3ff24c2f6ff210738839dbc339cd45a5294d85c79361016243157aae7b60405180905060405180910390a15b600a8060416000396000f360606040526008565b00")
1060+
gspec = &Genesis{Config: params.TestChainConfig, Alloc: GenesisAlloc{addr1: {Balance: big.NewInt(10000000000000)}}}
1061+
genesis = gspec.MustCommit(db)
1062+
signer = types.NewEIP155Signer(gspec.Config.ChainID)
1063+
newLogCh = make(chan bool)
1064+
)
1065+
1066+
// listenNewLog checks whether the received logs number is equal with expected.
1067+
listenNewLog := func(sink chan []*types.Log, expect int) {
1068+
cnt := 0
1069+
for {
1070+
select {
1071+
case logs := <-sink:
1072+
cnt += len(logs)
1073+
case <-time.NewTimer(5 * time.Second).C:
1074+
// new logs timeout
1075+
newLogCh <- false
1076+
return
1077+
}
1078+
if cnt == expect {
1079+
break
1080+
} else if cnt > expect {
1081+
// redundant logs received
1082+
newLogCh <- false
1083+
return
1084+
}
1085+
}
1086+
select {
1087+
case <-sink:
1088+
// redundant logs received
1089+
newLogCh <- false
1090+
case <-time.NewTimer(100 * time.Millisecond).C:
1091+
newLogCh <- true
1092+
}
1093+
}
1094+
1095+
blockchain, _ := NewBlockChain(db, nil, gspec.Config, ethash.NewFaker(), vm.Config{}, nil)
1096+
defer blockchain.Stop()
1097+
1098+
logsCh := make(chan []*types.Log)
1099+
blockchain.SubscribeLogsEvent(logsCh)
1100+
1101+
chain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
1102+
if i == 1 {
1103+
// Higher block difficulty
1104+
gen.OffsetTime(-9)
1105+
}
1106+
})
1107+
if _, err := blockchain.InsertChain(chain); err != nil {
1108+
t.Fatalf("failed to insert forked chain: %v", err)
1109+
}
1110+
1111+
// Generate side chain with lower difficulty
1112+
sideChain, _ := GenerateChain(params.TestChainConfig, genesis, ethash.NewFaker(), db, 2, func(i int, gen *BlockGen) {
1113+
if i == 1 {
1114+
tx, err := types.SignTx(types.NewContractCreation(gen.TxNonce(addr1), new(big.Int), 1000000, new(big.Int), code), signer, key1)
1115+
if err != nil {
1116+
t.Fatalf("failed to create tx: %v", err)
1117+
}
1118+
gen.AddTx(tx)
1119+
}
1120+
})
1121+
if _, err := blockchain.InsertChain(sideChain); err != nil {
1122+
t.Fatalf("failed to insert forked chain: %v", err)
1123+
}
1124+
1125+
// Generate a new block based on side chain
1126+
newBlocks, _ := GenerateChain(params.TestChainConfig, sideChain[len(sideChain)-1], ethash.NewFaker(), db, 1, func(i int, gen *BlockGen) {})
1127+
go listenNewLog(logsCh, 1)
1128+
if _, err := blockchain.InsertChain(newBlocks); err != nil {
1129+
t.Fatalf("failed to insert forked chain: %v", err)
1130+
}
1131+
// Rebirth logs should omit a newLogEvent
1132+
if !<-newLogCh {
1133+
t.Fatalf("failed to receive new log event")
1134+
}
1135+
}
1136+
9331137
func TestReorgSideEvent(t *testing.T) {
9341138
var (
9351139
db = ethdb.NewMemDatabase()

0 commit comments

Comments
 (0)