Skip to content

Commit e1170dd

Browse files
committed
problem: ERC20 Balancer subsription checks balancer on each block, even if no transfer happened
solution: use ERC20 Logs to determine changes to ERC20 balance
1 parent e51f486 commit e1170dd

File tree

6 files changed

+264
-62
lines changed

6 files changed

+264
-62
lines changed

src/main/kotlin/io/emeraldpay/dshackle/rpc/TrackERC20Address.kt

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,30 @@
1+
/**
2+
* Copyright (c) 2021 EmeraldPay, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package io.emeraldpay.dshackle.rpc
217

318
import io.emeraldpay.api.proto.BlockchainOuterClass
419
import io.emeraldpay.api.proto.Common
520
import io.emeraldpay.dshackle.SilentException
621
import io.emeraldpay.dshackle.config.TokensConfig
7-
import io.emeraldpay.dshackle.upstream.Head
822
import io.emeraldpay.dshackle.upstream.MultistreamHolder
9-
import io.emeraldpay.dshackle.upstream.Selector
23+
import io.emeraldpay.dshackle.upstream.ethereum.ERC20Balance
1024
import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream
11-
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
12-
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
1325
import io.emeraldpay.etherjar.domain.Address
26+
import io.emeraldpay.etherjar.domain.EventId
1427
import io.emeraldpay.etherjar.erc20.ERC20Token
15-
import io.emeraldpay.etherjar.hex.Hex32
16-
import io.emeraldpay.etherjar.hex.HexQuantity
1728
import io.emeraldpay.grpc.BlockchainType
1829
import io.emeraldpay.grpc.Chain
1930
import org.slf4j.LoggerFactory
@@ -35,6 +46,8 @@ class TrackERC20Address(
3546
private val log = LoggerFactory.getLogger(TrackERC20Address::class.java)
3647
}
3748

49+
var erc20Balance: ERC20Balance = ERC20Balance()
50+
3851
private val ethereumAddresses = EthereumAddresses()
3952
private val tokens: MutableMap<TokenId, TokenDefinition> = HashMap()
4053

@@ -72,13 +85,30 @@ class TrackERC20Address(
7285
val chain = Chain.byId(request.asset.chainValue)
7386
val asset = request.asset.code.lowercase(Locale.getDefault())
7487
val tokenDefinition = tokens[TokenId(chain, asset)] ?: return Flux.empty()
75-
val head = multistreamHolder.getUpstream(chain)?.getHead()?.getFlux() ?: Flux.empty()
88+
val logs = getUpstream(chain)
89+
.getSubscribe().logs
90+
.start(
91+
listOf(tokenDefinition.token.contract),
92+
listOf(EventId.fromSignature("Transfer", "address", "address", "uint256"))
93+
)
7694

7795
return ethereumAddresses.extract(request.address)
7896
.map { TrackedAddress(chain, it, tokenDefinition.token, tokenDefinition.name) }
7997
.flatMap { addr ->
8098
val current = getBalance(addr)
81-
val updates = head.flatMap { getBalance(addr) }
99+
100+
val updates = logs
101+
.filter {
102+
it.topics.size >= 3 && (Address.extract(it.topics[1]) == addr.address || Address.extract(it.topics[2]) == addr.address)
103+
}
104+
.distinctUntilChanged {
105+
// check it once per block
106+
it.blockHash
107+
}
108+
.flatMap {
109+
// make sure we use actual balance, don't trust event blindly
110+
getBalance(addr)
111+
}
82112
Flux.concat(current, updates)
83113
.distinctUntilChanged()
84114
.map { addr.withBalance(it) }
@@ -88,23 +118,7 @@ class TrackERC20Address(
88118

89119
fun getBalance(addr: TrackedAddress): Mono<BigInteger> {
90120
val upstream = getUpstream(addr.chain)
91-
return upstream
92-
.getDirectApi(Selector.empty)
93-
.flatMap { api ->
94-
api.read(prepareEthCall(addr.token, addr.address, upstream.getHead()))
95-
.flatMap(JsonRpcResponse::requireStringResult)
96-
.map {
97-
Hex32.from(it).asQuantity().value
98-
}
99-
}
100-
}
101-
102-
fun prepareEthCall(token: ERC20Token, target: Address, head: Head): JsonRpcRequest {
103-
val call = token
104-
.readBalanceOf(target)
105-
.toJson()
106-
val height = head.getCurrentHeight()?.let { HexQuantity.from(it).toHex() } ?: "latest"
107-
return JsonRpcRequest("eth_call", listOf(call, height))
121+
return erc20Balance.getBalance(upstream, addr.token, addr.address)
108122
}
109123

110124
fun getUpstream(chain: Chain): EthereumMultistream {
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/**
2+
* Copyright (c) 2021 EmeraldPay, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package io.emeraldpay.dshackle.upstream.ethereum
17+
18+
import io.emeraldpay.dshackle.upstream.Head
19+
import io.emeraldpay.dshackle.upstream.Selector
20+
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
21+
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
22+
import io.emeraldpay.etherjar.domain.Address
23+
import io.emeraldpay.etherjar.erc20.ERC20Token
24+
import io.emeraldpay.etherjar.hex.Hex32
25+
import io.emeraldpay.etherjar.hex.HexQuantity
26+
import org.slf4j.LoggerFactory
27+
import reactor.core.publisher.Flux
28+
import reactor.core.publisher.Mono
29+
import java.math.BigInteger
30+
31+
/**
32+
* Query for a ERC20 token balance for an address
33+
*/
34+
open class ERC20Balance {
35+
36+
companion object {
37+
private val log = LoggerFactory.getLogger(ERC20Balance::class.java)
38+
}
39+
40+
open fun getBalance(upstreams: EthereumMultistream, token: ERC20Token, address: Address): Mono<BigInteger> {
41+
return upstreams
42+
// use only up-to-date upstreams
43+
.getApiSource(Selector.HeightMatcher(upstreams.getHead().getCurrentHeight() ?: 0))
44+
.let { Flux.from(it) }
45+
.flatMap {
46+
getBalance(it.cast(EthereumUpstream::class.java), token, address)
47+
}
48+
.next()
49+
}
50+
51+
open fun getBalance(upstream: EthereumUpstream, token: ERC20Token, address: Address): Mono<BigInteger> {
52+
return upstream
53+
.getApi()
54+
.read(prepareEthCall(token, address, upstream.getHead()))
55+
.flatMap(JsonRpcResponse::requireStringResult)
56+
.map { Hex32.from(it).asQuantity().value }
57+
}
58+
59+
fun prepareEthCall(token: ERC20Token, target: Address, head: Head): JsonRpcRequest {
60+
val call = token
61+
.readBalanceOf(target)
62+
.toJson()
63+
val height = head.getCurrentHeight()?.let { HexQuantity.from(it).toHex() } ?: "latest"
64+
return JsonRpcRequest("eth_call", listOf(call, height))
65+
}
66+
}

src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/EthereumSubscribe.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ open class EthereumSubscribe(
1717
}
1818

1919
private val newHeads = ConnectNewHeads(upstream)
20-
private val logs = ConnectLogs(upstream)
20+
open val logs = ConnectLogs(upstream)
2121
private val syncing = ConnectSyncing(upstream)
2222

2323
@Suppress("UNCHECKED_CAST")

src/main/kotlin/io/emeraldpay/dshackle/upstream/ethereum/subscribe/ConnectLogs.kt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.slf4j.LoggerFactory
2424
import reactor.core.publisher.Flux
2525
import java.util.function.Function
2626

27-
class ConnectLogs(
27+
open class ConnectLogs(
2828
upstream: EthereumMultistream,
2929
private val connectBlockUpdates: ConnectBlockUpdates,
3030
) {
@@ -44,7 +44,7 @@ class ConnectLogs(
4444
return produceLogs.produce(connectBlockUpdates.connect())
4545
}
4646

47-
fun start(addresses: List<Address>, topics: List<Hex32>): Flux<LogMessage> {
47+
open fun start(addresses: List<Address>, topics: List<Hex32>): Flux<LogMessage> {
4848
// shortcut to the whole output if we don't have any filters
4949
if (addresses.isEmpty() && topics.isEmpty()) {
5050
return start()

src/test/groovy/io/emeraldpay/dshackle/rpc/TrackERC20AddressSpec.groovy

Lines changed: 98 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -6,20 +6,31 @@ import io.emeraldpay.dshackle.config.TokensConfig
66
import io.emeraldpay.dshackle.test.EthereumUpstreamMock
77
import io.emeraldpay.dshackle.test.MultistreamHolderMock
88
import io.emeraldpay.dshackle.test.ReaderMock
9-
import io.emeraldpay.dshackle.upstream.Multistream
109
import io.emeraldpay.dshackle.upstream.MultistreamHolder
11-
import io.emeraldpay.dshackle.reader.Reader
10+
import io.emeraldpay.dshackle.upstream.ethereum.ERC20Balance
11+
import io.emeraldpay.dshackle.upstream.ethereum.EthereumMultistream
12+
import io.emeraldpay.dshackle.upstream.ethereum.EthereumSubscribe
1213
import io.emeraldpay.dshackle.upstream.ethereum.EthereumUpstream
14+
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.ConnectLogs
15+
import io.emeraldpay.dshackle.upstream.ethereum.subscribe.json.LogMessage
1316
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcRequest
1417
import io.emeraldpay.dshackle.upstream.rpcclient.JsonRpcResponse
18+
import io.emeraldpay.etherjar.domain.BlockHash
19+
import io.emeraldpay.etherjar.domain.TransactionId
20+
import io.emeraldpay.etherjar.hex.Hex32
1521
import io.emeraldpay.grpc.Chain
1622
import io.emeraldpay.etherjar.domain.Address
1723
import io.emeraldpay.etherjar.erc20.ERC20Token
1824
import io.emeraldpay.etherjar.hex.HexData
1925
import io.emeraldpay.etherjar.rpc.json.TransactionCallJson
26+
import reactor.core.publisher.Flux
2027
import reactor.core.publisher.Mono
28+
import reactor.test.StepVerifier
29+
import spock.lang.Ignore
2130
import spock.lang.Specification
2231

32+
import java.time.Duration
33+
2334
class TrackERC20AddressSpec extends Specification {
2435

2536
def "Init with single token"() {
@@ -121,38 +132,6 @@ class TrackERC20AddressSpec extends Specification {
121132
supportSai
122133
}
123134

124-
def "Gets balance from upstream"() {
125-
setup:
126-
ReaderMock api = new ReaderMock()
127-
.with(
128-
new JsonRpcRequest("eth_call", [
129-
new TransactionCallJson().tap { json ->
130-
json.setTo(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9"))
131-
json.setData(HexData.from("0x70a0823100000000000000000000000016c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b"))
132-
},
133-
"latest"
134-
]),
135-
JsonRpcResponse.ok('"0x0000000000000000000000000000000000000000000000000000001f28d72868"')
136-
)
137-
138-
EthereumUpstream upstream = new EthereumUpstreamMock(Chain.ETHEREUM, api)
139-
MultistreamHolder ups = new MultistreamHolderMock(Chain.ETHEREUM, upstream)
140-
TrackERC20Address track = new TrackERC20Address(ups, new TokensConfig([]))
141-
142-
TrackERC20Address.TrackedAddress address = new TrackERC20Address.TrackedAddress(
143-
Chain.ETHEREUM,
144-
Address.from("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b"),
145-
new ERC20Token(Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")),
146-
"test",
147-
BigInteger.valueOf(1234)
148-
)
149-
when:
150-
def act = track.getBalance(address).block()
151-
152-
then:
153-
act.toLong() == 0x1f28d72868
154-
}
155-
156135
def "Builds response"() {
157136
setup:
158137
TrackERC20Address track = new TrackERC20Address(Stub(MultistreamHolder), new TokensConfig([]))
@@ -175,4 +154,89 @@ class TrackERC20AddressSpec extends Specification {
175154
.setBalance("1234")
176155
.build()
177156
}
157+
158+
def "Check balance when event happens"() {
159+
setup:
160+
def events = [
161+
new LogMessage(
162+
Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9"),
163+
BlockHash.from("0x0c0d2969c843d0b61fbab1b2302cf24d6681b2ae0a140a3c2908990d048f7631"),
164+
13668750,
165+
HexData.from("0x0000000000000000000000000000000000000000000000000000000048f2fc7b"),
166+
1,
167+
[
168+
Hex32.from("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef"),
169+
Hex32.from("0x000000000000000000000000b02f1329d6a6acef07a763258f8509c2847a0a3e"),
170+
Hex32.from("0x00000000000000000000000016c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b")
171+
],
172+
TransactionId.from("0x5a7898e27120575c33d3d0179af3b6353c7268bbad4255df079ed26b743a21a5"),
173+
1,
174+
false
175+
)
176+
]
177+
def logs = Mock(ConnectLogs) {
178+
1 * start(
179+
[Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")],
180+
[Hex32.from("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef")]
181+
) >> { args ->
182+
println("ConnectLogs.start $args")
183+
Flux.fromIterable(events)
184+
}
185+
}
186+
def sub = Mock(EthereumSubscribe) {
187+
1 * getLogs() >> logs
188+
}
189+
def up = Mock(EthereumMultistream) {
190+
1 * getSubscribe() >> sub
191+
_ * cast(EthereumMultistream) >> { args ->
192+
it
193+
}
194+
}
195+
def mup = Mock(MultistreamHolder) {
196+
_ * getUpstream(Chain.ETHEREUM) >> up
197+
}
198+
TokensConfig tokens = new TokensConfig([
199+
new TokensConfig.Token().tap {
200+
id = "test"
201+
blockchain = Chain.ETHEREUM
202+
name = "TEST"
203+
type = TokensConfig.Type.ERC20
204+
address = Address.from("0x54EedeAC495271d0F6B175474E89094C44Da98b9")
205+
}
206+
])
207+
TrackERC20Address track = new TrackERC20Address(mup, tokens)
208+
track.init()
209+
track.erc20Balance = Mock(ERC20Balance) {
210+
2 * it.getBalance(_, _, _) >>> [
211+
Mono.just(100000.toBigInteger()),
212+
Mono.just(150000.toBigInteger())
213+
]
214+
}
215+
def request = BlockchainOuterClass.BalanceRequest.newBuilder()
216+
.setAddress(
217+
Common.AnyAddress.newBuilder()
218+
.setAddressSingle(Common.SingleAddress.newBuilder().setAddress("0x16c15c65ad00b6dfbcc2cb8a7b6c2d0103a3883b"))
219+
)
220+
.setAsset(
221+
Common.Asset.newBuilder()
222+
.setChain(Common.ChainRef.CHAIN_ETHEREUM)
223+
.setCode("TEST")
224+
)
225+
.build()
226+
when:
227+
def act = track.subscribe(request)
228+
229+
then:
230+
StepVerifier.create(act)
231+
.expectNextMatches {
232+
println("Received: $it")
233+
it.getBalance() == "100000"
234+
}
235+
.expectNextMatches {
236+
println("Received: $it")
237+
it.getBalance() == "150000"
238+
}
239+
.expectComplete()
240+
.verify(Duration.ofSeconds(1))
241+
}
178242
}

0 commit comments

Comments
 (0)