Skip to content

Commit 10ed107

Browse files
committed
Merge pull request #1899 from obscuren/mipmap-bloom
core, eth/filters, miner, xeth: Optimised log filtering
2 parents c5ef2af + 6dc1478 commit 10ed107

File tree

14 files changed

+729
-123
lines changed

14 files changed

+729
-123
lines changed

core/blockchain.go

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -654,10 +654,17 @@ func (self *BlockChain) InsertChain(chain types.Blocks) (int, error) {
654654
events = append(events, ChainEvent{block, block.Hash(), logs})
655655

656656
// This puts transactions in a extra db for rpc
657-
PutTransactions(self.chainDb, block, block.Transactions())
657+
if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
658+
return i, err
659+
}
658660
// store the receipts
659-
PutReceipts(self.chainDb, receipts)
660-
661+
if err := PutReceipts(self.chainDb, receipts); err != nil {
662+
return i, err
663+
}
664+
// Write map map bloom filters
665+
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
666+
return i, err
667+
}
661668
case SideStatTy:
662669
if glog.V(logger.Detail) {
663670
glog.Infof("inserted forked block #%d (TD=%v) (%d TXs %d UNCs) (%x...). Took %v\n", block.Number(), block.Difficulty(), len(block.Transactions()), len(block.Uncles()), block.Hash().Bytes()[0:4], time.Since(bstart))
@@ -743,8 +750,18 @@ func (self *BlockChain) reorg(oldBlock, newBlock *types.Block) error {
743750
// insert the block in the canonical way, re-writing history
744751
self.insert(block)
745752
// write canonical receipts and transactions
746-
PutTransactions(self.chainDb, block, block.Transactions())
747-
PutReceipts(self.chainDb, GetBlockReceipts(self.chainDb, block.Hash()))
753+
if err := PutTransactions(self.chainDb, block, block.Transactions()); err != nil {
754+
return err
755+
}
756+
receipts := GetBlockReceipts(self.chainDb, block.Hash())
757+
// write receipts
758+
if err := PutReceipts(self.chainDb, receipts); err != nil {
759+
return err
760+
}
761+
// Write map map bloom filters
762+
if err := WriteMipmapBloom(self.chainDb, block.NumberU64(), receipts); err != nil {
763+
return err
764+
}
748765

749766
addedTxs = append(addedTxs, block.Transactions()...)
750767
}

core/chain_makers.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,15 @@ func (b *BlockGen) AddTx(tx *types.Transaction) {
105105
b.receipts = append(b.receipts, receipt)
106106
}
107107

108+
// AddUncheckedReceipts forcefully adds a receipts to the block without a
109+
// backing transaction.
110+
//
111+
// AddUncheckedReceipts will cause consensus failures when used during real
112+
// chain processing. This is best used in conjuction with raw block insertion.
113+
func (b *BlockGen) AddUncheckedReceipt(receipt *types.Receipt) {
114+
b.receipts = append(b.receipts, receipt)
115+
}
116+
108117
// TxNonce returns the next valid transaction nonce for the
109118
// account at addr. It panics if the account does not exist.
110119
func (b *BlockGen) TxNonce(addr common.Address) uint64 {

core/chain_util.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ package core
1818

1919
import (
2020
"bytes"
21+
"encoding/binary"
22+
"fmt"
2123
"math/big"
2224

2325
"github.com/ethereum/go-ethereum/common"
@@ -42,6 +44,9 @@ var (
4244

4345
ExpDiffPeriod = big.NewInt(100000)
4446
blockHashPre = []byte("block-hash-") // [deprecated by eth/63]
47+
48+
mipmapPre = []byte("mipmap-log-bloom-")
49+
MIPMapLevels = []uint64{1000000, 500000, 100000, 50000, 1000}
4550
)
4651

4752
// CalcDifficulty is the difficulty adjustment algorithm. It returns
@@ -346,3 +351,42 @@ func GetBlockByHashOld(db ethdb.Database, hash common.Hash) *types.Block {
346351
}
347352
return (*types.Block)(&block)
348353
}
354+
355+
// returns a formatted MIP mapped key by adding prefix, canonical number and level
356+
//
357+
// ex. fn(98, 1000) = (prefix || 1000 || 0)
358+
func mipmapKey(num, level uint64) []byte {
359+
lkey := make([]byte, 8)
360+
binary.BigEndian.PutUint64(lkey, level)
361+
key := new(big.Int).SetUint64(num / level * level)
362+
363+
return append(mipmapPre, append(lkey, key.Bytes()...)...)
364+
}
365+
366+
// WriteMapmapBloom writes each address included in the receipts' logs to the
367+
// MIP bloom bin.
368+
func WriteMipmapBloom(db ethdb.Database, number uint64, receipts types.Receipts) error {
369+
batch := db.NewBatch()
370+
for _, level := range MIPMapLevels {
371+
key := mipmapKey(number, level)
372+
bloomDat, _ := db.Get(key)
373+
bloom := types.BytesToBloom(bloomDat)
374+
for _, receipt := range receipts {
375+
for _, log := range receipt.Logs() {
376+
bloom.Add(log.Address.Big())
377+
}
378+
}
379+
batch.Put(key, bloom.Bytes())
380+
}
381+
if err := batch.Write(); err != nil {
382+
return fmt.Errorf("mipmap write fail for: %d: %v", number, err)
383+
}
384+
return nil
385+
}
386+
387+
// GetMipmapBloom returns a bloom filter using the number and level as input
388+
// parameters. For available levels see MIPMapLevels.
389+
func GetMipmapBloom(db ethdb.Database, number, level uint64) types.Bloom {
390+
bloomDat, _ := db.Get(mipmapKey(number, level))
391+
return types.BytesToBloom(bloomDat)
392+
}

core/chain_util_test.go

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,15 @@ package core
1818

1919
import (
2020
"encoding/json"
21+
"io/ioutil"
2122
"math/big"
2223
"os"
2324
"testing"
2425

2526
"github.com/ethereum/go-ethereum/common"
2627
"github.com/ethereum/go-ethereum/core/types"
28+
"github.com/ethereum/go-ethereum/core/vm"
29+
"github.com/ethereum/go-ethereum/crypto"
2730
"github.com/ethereum/go-ethereum/crypto/sha3"
2831
"github.com/ethereum/go-ethereum/ethdb"
2932
"github.com/ethereum/go-ethereum/rlp"
@@ -318,3 +321,112 @@ func TestHeadStorage(t *testing.T) {
318321
t.Fatalf("Head block hash mismatch: have %v, want %v", entry, blockFull.Hash())
319322
}
320323
}
324+
325+
func TestMipmapBloom(t *testing.T) {
326+
db, _ := ethdb.NewMemDatabase()
327+
328+
receipt1 := new(types.Receipt)
329+
receipt1.SetLogs(vm.Logs{
330+
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
331+
&vm.Log{Address: common.BytesToAddress([]byte("address"))},
332+
})
333+
receipt2 := new(types.Receipt)
334+
receipt2.SetLogs(vm.Logs{
335+
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
336+
&vm.Log{Address: common.BytesToAddress([]byte("address1"))},
337+
})
338+
339+
WriteMipmapBloom(db, 1, types.Receipts{receipt1})
340+
WriteMipmapBloom(db, 2, types.Receipts{receipt2})
341+
342+
for _, level := range MIPMapLevels {
343+
bloom := GetMipmapBloom(db, 2, level)
344+
if !bloom.Test(new(big.Int).SetBytes([]byte("address1"))) {
345+
t.Error("expected test to be included on level:", level)
346+
}
347+
}
348+
349+
// reset
350+
db, _ = ethdb.NewMemDatabase()
351+
receipt := new(types.Receipt)
352+
receipt.SetLogs(vm.Logs{
353+
&vm.Log{Address: common.BytesToAddress([]byte("test"))},
354+
})
355+
WriteMipmapBloom(db, 999, types.Receipts{receipt1})
356+
357+
receipt = new(types.Receipt)
358+
receipt.SetLogs(vm.Logs{
359+
&vm.Log{Address: common.BytesToAddress([]byte("test 1"))},
360+
})
361+
WriteMipmapBloom(db, 1000, types.Receipts{receipt})
362+
363+
bloom := GetMipmapBloom(db, 1000, 1000)
364+
if bloom.TestBytes([]byte("test")) {
365+
t.Error("test should not have been included")
366+
}
367+
}
368+
369+
func TestMipmapChain(t *testing.T) {
370+
dir, err := ioutil.TempDir("", "mipmap")
371+
if err != nil {
372+
t.Fatal(err)
373+
}
374+
defer os.RemoveAll(dir)
375+
376+
var (
377+
db, _ = ethdb.NewLDBDatabase(dir, 16)
378+
key1, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
379+
addr = crypto.PubkeyToAddress(key1.PublicKey)
380+
addr2 = common.BytesToAddress([]byte("jeff"))
381+
382+
hash1 = common.BytesToHash([]byte("topic1"))
383+
)
384+
defer db.Close()
385+
386+
genesis := WriteGenesisBlockForTesting(db, GenesisAccount{addr, big.NewInt(1000000)})
387+
chain := GenerateChain(genesis, db, 1010, func(i int, gen *BlockGen) {
388+
var receipts types.Receipts
389+
switch i {
390+
case 1:
391+
receipt := types.NewReceipt(nil, new(big.Int))
392+
receipt.SetLogs(vm.Logs{
393+
&vm.Log{
394+
Address: addr,
395+
Topics: []common.Hash{hash1},
396+
},
397+
})
398+
gen.AddUncheckedReceipt(receipt)
399+
receipts = types.Receipts{receipt}
400+
case 1000:
401+
receipt := types.NewReceipt(nil, new(big.Int))
402+
receipt.SetLogs(vm.Logs{&vm.Log{Address: addr2}})
403+
gen.AddUncheckedReceipt(receipt)
404+
receipts = types.Receipts{receipt}
405+
406+
}
407+
408+
// store the receipts
409+
err := PutReceipts(db, receipts)
410+
if err != nil {
411+
t.Fatal(err)
412+
}
413+
WriteMipmapBloom(db, uint64(i+1), receipts)
414+
})
415+
for _, block := range chain {
416+
WriteBlock(db, block)
417+
if err := WriteCanonicalHash(db, block.Hash(), block.NumberU64()); err != nil {
418+
t.Fatalf("failed to insert block number: %v", err)
419+
}
420+
if err := WriteHeadBlockHash(db, block.Hash()); err != nil {
421+
t.Fatalf("failed to insert block number: %v", err)
422+
}
423+
if err := PutBlockReceipts(db, block, block.Receipts()); err != nil {
424+
t.Fatal("error writing block receipts:", err)
425+
}
426+
}
427+
428+
bloom := GetMipmapBloom(db, 0, 1000)
429+
if bloom.TestBytes(addr2[:]) {
430+
t.Error("address was included in bloom and should not have")
431+
}
432+
}

core/transaction_util.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package core
1818

1919
import (
20+
"fmt"
21+
2022
"github.com/ethereum/go-ethereum/common"
2123
"github.com/ethereum/go-ethereum/core/types"
2224
"github.com/ethereum/go-ethereum/ethdb"
@@ -32,22 +34,16 @@ var (
3234
)
3335

3436
// PutTransactions stores the transactions in the given database
35-
func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) {
36-
batch := new(leveldb.Batch)
37-
_, batchWrite := db.(*ethdb.LDBDatabase)
37+
func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactions) error {
38+
batch := db.NewBatch()
3839

3940
for i, tx := range block.Transactions() {
4041
rlpEnc, err := rlp.EncodeToBytes(tx)
4142
if err != nil {
42-
glog.V(logger.Debug).Infoln("Failed encoding tx", err)
43-
return
43+
return fmt.Errorf("failed encoding tx: %v", err)
4444
}
4545

46-
if batchWrite {
47-
batch.Put(tx.Hash().Bytes(), rlpEnc)
48-
} else {
49-
db.Put(tx.Hash().Bytes(), rlpEnc)
50-
}
46+
batch.Put(tx.Hash().Bytes(), rlpEnc)
5147

5248
var txExtra struct {
5349
BlockHash common.Hash
@@ -59,22 +55,16 @@ func PutTransactions(db ethdb.Database, block *types.Block, txs types.Transactio
5955
txExtra.Index = uint64(i)
6056
rlpMeta, err := rlp.EncodeToBytes(txExtra)
6157
if err != nil {
62-
glog.V(logger.Debug).Infoln("Failed encoding tx meta data", err)
63-
return
58+
return fmt.Errorf("failed encoding tx meta data: %v", err)
6459
}
6560

66-
if batchWrite {
67-
batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
68-
} else {
69-
db.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
70-
}
61+
batch.Put(append(tx.Hash().Bytes(), 0x0001), rlpMeta)
7162
}
7263

73-
if db, ok := db.(*ethdb.LDBDatabase); ok {
74-
if err := db.LDB().Write(batch, nil); err != nil {
75-
glog.V(logger.Error).Infoln("db write err:", err)
76-
}
64+
if err := batch.Write(); err != nil {
65+
return fmt.Errorf("failed writing tx to db: %v", err)
7766
}
67+
return nil
7868
}
7969

8070
func DeleteTransaction(db ethdb.Database, txHash common.Hash) {

core/types/bloom9.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package types
1818

1919
import (
20+
"fmt"
2021
"math/big"
2122

2223
"github.com/ethereum/go-ethereum/common"
@@ -28,6 +29,46 @@ type bytesBacked interface {
2829
Bytes() []byte
2930
}
3031

32+
const bloomLength = 256
33+
34+
type Bloom [bloomLength]byte
35+
36+
func BytesToBloom(b []byte) Bloom {
37+
var bloom Bloom
38+
bloom.SetBytes(b)
39+
return bloom
40+
}
41+
42+
func (b *Bloom) SetBytes(d []byte) {
43+
if len(b) < len(d) {
44+
panic(fmt.Sprintf("bloom bytes too big %d %d", len(b), len(d)))
45+
}
46+
47+
copy(b[bloomLength-len(d):], d)
48+
}
49+
50+
func (b *Bloom) Add(d *big.Int) {
51+
bin := new(big.Int).SetBytes(b[:])
52+
bin.Or(bin, bloom9(d.Bytes()))
53+
b.SetBytes(bin.Bytes())
54+
}
55+
56+
func (b Bloom) Big() *big.Int {
57+
return common.Bytes2Big(b[:])
58+
}
59+
60+
func (b Bloom) Bytes() []byte {
61+
return b[:]
62+
}
63+
64+
func (b Bloom) Test(test *big.Int) bool {
65+
return BloomLookup(b, test)
66+
}
67+
68+
func (b Bloom) TestBytes(test []byte) bool {
69+
return b.Test(common.BytesToBig(test))
70+
}
71+
3172
func CreateBloom(receipts Receipts) Bloom {
3273
bin := new(big.Int)
3374
for _, receipt := range receipts {

0 commit comments

Comments
 (0)