11"""
2- This is the multicaller module. TODO: BETTER NAME
3-
4- TODO-MIKE: What exactly does this do and is it a bona fide config module?
2+ MultiCaller class
53
64---
75(c) Copyright Bprotocol foundation 2023-24.
86All rights reserved.
97Licensed under MIT.
108"""
11- from functools import partial
12- from typing import List , Callable , ContextManager , Any , Dict
9+ from typing import Any , List , Dict
1310
14- import web3
1511from eth_abi import decode
16- from web3 import Web3
12+ from web3 . contract . contract import ContractFunction
1713
1814from fastlane_bot .data .abi import MULTICALL_ABI
1915
2016
21- def cast (typ , val ):
22- """Cast a value to a type.
23-
24- This returns the value unchanged. To the type checker this
25- signals that the return value has the designated type, but at
26- runtime we intentionally don't check anything (we want this
27- to be as fast as possible).
28- """
29- return val
30-
31-
3217def collapse_if_tuple (abi : Dict [str , Any ]) -> str :
3318 """
3419 Converts a tuple from a dict to a parenthesized list of its types.
@@ -46,141 +31,38 @@ def collapse_if_tuple(abi: Dict[str, Any]) -> str:
4631 ... )
4732 '(address,uint256,bytes)'
4833 """
49- typ = abi ["type" ]
50- if not isinstance (typ , str ):
51- raise TypeError (
52- "The 'type' must be a string, but got %r of type %s" % (typ , type (typ ))
53- )
54- elif not typ .startswith ("tuple" ):
55- return typ
56-
57- delimited = "," .join (collapse_if_tuple (c ) for c in abi ["components" ])
58- # Whatever comes after "tuple" is the array dims. The ABI spec states that
59- # this will have the form "", "[]", or "[k]".
60- array_dim = typ [5 :]
61- collapsed = "({}){}" .format (delimited , array_dim )
62-
63- return collapsed
64-
65-
66- def get_output_types_from_abi (abi : List [Dict [str , Any ]], function_name : str ) -> List [str ]:
67- """
68- Get the output types from an ABI.
69-
70- Parameters
71- ----------
72- abi : List[Dict[str, Any]]
73- The ABI
74- function_name : str
75- The function name
76-
77- Returns
78- -------
79- List[str]
80- The output types
81-
82- """
83- for item in abi :
84- if item ['type' ] == 'function' and item ['name' ] == function_name :
85- return [collapse_if_tuple (cast (Dict [str , Any ], item )) for item in item ['outputs' ]]
86- raise ValueError (f"No function named { function_name } found in ABI." )
34+ if abi ["type" ].startswith ("tuple" ):
35+ delimited = "," .join (collapse_if_tuple (c ) for c in abi ["components" ])
36+ return "({}){}" .format (delimited , abi ["type" ][len ("tuple" ):])
37+ return abi ["type" ]
8738
8839
89- class ContractMethodWrapper :
90- """
91- Wraps a contract method to be used with multicall.
92- """
93- __DATE__ = "2022-09-26"
94- __VERSION__ = "0.0.2"
95-
96- def __init__ (self , original_method , multicaller ):
97- self .original_method = original_method
98- self .multicaller = multicaller
99-
100- def __call__ (self , * args , ** kwargs ):
101- contract_call = self .original_method (* args , ** kwargs )
102- self .multicaller .add_call (contract_call )
103- return contract_call
104-
105-
106- class MultiCaller (ContextManager ):
40+ class MultiCaller :
10741 """
10842 Context manager for multicalls.
10943 """
11044 __DATE__ = "2022-09-26"
11145 __VERSION__ = "0.0.2"
11246
47+ def __init__ (self , web3 : Any , multicall_contract_address : str ):
48+ self .multicall_contract = web3 .eth .contract (abi = MULTICALL_ABI , address = multicall_contract_address )
49+ self .contract_calls : List [ContractFunction ] = []
50+ self .output_types_list : List [List [str ]] = []
11351
114- def __init__ (self , contract : web3 .contract .Contract ,
115- web3 : Web3 ,
116- block_identifier : Any = 'latest' , multicall_address = "0x5BA1e12693Dc8F9c48aAD8770482f4739bEeD696" ):
117- self ._contract_calls : List [Callable ] = []
118- self .contract = contract
119- self .block_identifier = block_identifier
120- self .web3 = web3
121- self .MULTICALL_CONTRACT_ADDRESS = self .web3 .to_checksum_address (multicall_address )
122-
123- def __enter__ (self ) -> 'MultiCaller' :
124- return self
125-
126- def __exit__ (self , exc_type , exc_val , exc_tb ):
127- pass
128-
129- def add_call (self , fn : Callable , * args , ** kwargs ) -> None :
130- self ._contract_calls .append (partial (fn , * args , ** kwargs ))
131-
132- def multicall (self ) -> List [Any ]:
133- calls_for_aggregate = []
134- output_types_list = []
135- _calls_for_aggregate = {}
136- _output_types_list = {}
137- for fn in self ._contract_calls :
138- fn_name = str (fn ).split ('functools.partial(<Function ' )[1 ].split ('>' )[0 ]
139- output_types = get_output_types_from_abi (self .contract .abi , fn_name )
140- if fn_name in _calls_for_aggregate :
141- _calls_for_aggregate [fn_name ].append ({
142- 'target' : self .contract .address ,
143- 'callData' : fn ()._encode_transaction_data ()
144- })
145- _output_types_list [fn_name ].append (output_types )
146- else :
147- _calls_for_aggregate [fn_name ] = [{
148- 'target' : self .contract .address ,
149- 'callData' : fn ()._encode_transaction_data ()
150- }]
151- _output_types_list [fn_name ] = [output_types ]
152-
153- for fn_list in _calls_for_aggregate .keys ():
154- calls_for_aggregate += (_calls_for_aggregate [fn_list ])
155- output_types_list += (_output_types_list [fn_list ])
156-
157- encoded_data = self .web3 .eth .contract (
158- abi = MULTICALL_ABI ,
159- address = self .MULTICALL_CONTRACT_ADDRESS
160- ).functions .aggregate (calls_for_aggregate ).call (block_identifier = self .block_identifier )
161-
162- if not isinstance (encoded_data , list ):
163- raise TypeError (f"Expected encoded_data to be a list, got { type (encoded_data )} instead." )
164-
165- encoded_data = encoded_data [1 ]
166- decoded_data_list = []
167- for output_types , encoded_output in zip (output_types_list , encoded_data ):
168- decoded_data = decode (output_types , encoded_output )
169- decoded_data_list .append (decoded_data )
170-
171- return_data = [i [0 ] for i in decoded_data_list if len (i ) == 1 ]
172- return_data += [i [1 ] for i in decoded_data_list if len (i ) > 1 ]
52+ def add_call (self , call : ContractFunction ):
53+ self .contract_calls .append ({'target' : call .address , 'callData' : call ._encode_transaction_data ()})
54+ self .output_types_list .append ([collapse_if_tuple (item ) for item in call .abi ['outputs' ]])
17355
174- # Handling for Bancor POL - combine results into a Tuple
175- if "tokenPrice" in _calls_for_aggregate and "amountAvailableForTrading" in _calls_for_aggregate :
176- new_return = []
177- returned_items = int (len (return_data ))
178- total_pools = int (returned_items / 2 )
179- assert returned_items % 2 == 0 , f"[multicaller.py multicall] non-even number of returned calls for Bancor POL { returned_items } "
180- total_pools = int (total_pools )
56+ def run_calls (self , block_identifier : Any = 'latest' ) -> List [Any ]:
57+ encoded_data = self .multicall_contract .functions .tryAggregate (
58+ False ,
59+ self .contract_calls
60+ ).call (block_identifier = block_identifier )
18161
182- for idx in range (total_pools ):
183- new_return .append ((return_data [idx ][0 ], return_data [idx ][1 ], return_data [idx + total_pools ]))
184- return_data = new_return
62+ result_list = [
63+ decode (output_types , encoded_output [1 ]) if encoded_output [0 ] else (None ,)
64+ for output_types , encoded_output in zip (self .output_types_list , encoded_data )
65+ ]
18566
186- return return_data
67+ # Convert every single-value tuple into a single value
68+ return [result if len (result ) > 1 else result [0 ] for result in result_list ]
0 commit comments