Skip to content

Commit 6a6a1f3

Browse files
authored
add rpc function to fetch transactions by hashes (#202)
### TL;DR Added support for fetching transactions by hash through the RPC client. ### What changed? - Added a new `RawTransaction` type alias in the common package - Generalized the `RPCFetchBatch` function to `RPCFetchSingleBatch` to work with any key type, not just block numbers - Added `GetTransactionParams` function to support transaction hash parameters - Implemented `GetTransactions` method in the RPC client to fetch transactions by hash - Added `SerializeTransactions` function to convert raw transaction data to the internal format - Added `Close` method to the `IRPCClient` interface and updated the mock accordingly ### How to test? 1. Fetch transactions using the new method: ```go client := rpc.NewClient(...) transactions := client.GetTransactions([]string{"0x123...", "0x456..."}) ``` 2. Verify that transaction data is properly serialized and contains the expected fields 3. Test with both valid and invalid transaction hashes to ensure error handling works correctly ### Why make this change? This change enables direct fetching of transactions by hash, which is useful for scenarios where we need to retrieve specific transactions without fetching entire blocks. This improves efficiency when only transaction data is needed and the containing block is not relevant.
2 parents 210ebd3 + 54a6fcd commit 6a6a1f3

File tree

9 files changed

+175
-58
lines changed

9 files changed

+175
-58
lines changed

internal/common/transaction.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/rs/zerolog/log"
1212
)
1313

14+
type RawTransaction = map[string]interface{}
15+
1416
type Transaction struct {
1517
ChainId *big.Int `json:"chain_id" ch:"chain_id" swaggertype:"string"`
1618
Hash string `json:"hash" ch:"hash"`

internal/common/utils.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import (
77
"strings"
88
)
99

10-
func BigIntSliceToChunks(values []*big.Int, chunkSize int) [][]*big.Int {
10+
func SliceToChunks[T any](values []T, chunkSize int) [][]T {
1111
if chunkSize >= len(values) || chunkSize <= 0 {
12-
return [][]*big.Int{values}
12+
return [][]T{values}
1313
}
14-
var chunks [][]*big.Int
14+
var chunks [][]T
1515
for i := 0; i < len(values); i += chunkSize {
1616
end := i + chunkSize
1717
if end > len(values) {

internal/orchestrator/reorg_handler.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ func (rh *ReorgHandler) getNewBlocksByNumber(blockHeaders []common.BlockHeader)
230230
blockNumbers = append(blockNumbers, header.Number)
231231
}
232232
blockCount := len(blockNumbers)
233-
chunks := common.BigIntSliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
233+
chunks := common.SliceToChunks(blockNumbers, rh.rpc.GetBlocksPerRequest().Blocks)
234234

235235
var wg sync.WaitGroup
236236
resultsCh := make(chan []rpc.GetBlocksResult, len(chunks))

internal/rpc/batcher.go

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package rpc
22

33
import (
44
"context"
5-
"math/big"
65
"sync"
76
"time"
87

@@ -11,28 +10,28 @@ import (
1110
"github.com/thirdweb-dev/indexer/internal/common"
1211
)
1312

14-
type RPCFetchBatchResult[T any] struct {
15-
BlockNumber *big.Int
16-
Error error
17-
Result T
13+
type RPCFetchBatchResult[K any, T any] struct {
14+
Key K
15+
Error error
16+
Result T
1817
}
1918

20-
func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize int, batchDelay int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] {
21-
if len(blockNumbers) <= batchSize {
22-
return RPCFetchBatch[T](rpc, blockNumbers, method, argsFunc)
19+
func RPCFetchInBatches[K any, T any](rpc *Client, keys []K, batchSize int, batchDelay int, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
20+
if len(keys) <= batchSize {
21+
return RPCFetchSingleBatch[K, T](rpc, keys, method, argsFunc)
2322
}
24-
chunks := common.BigIntSliceToChunks(blockNumbers, batchSize)
23+
chunks := common.SliceToChunks[K](keys, batchSize)
2524

26-
log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(blockNumbers), len(chunks), batchSize)
25+
log.Debug().Msgf("Fetching %s for %d blocks in %d chunks of max %d requests", method, len(keys), len(chunks), batchSize)
2726

2827
var wg sync.WaitGroup
29-
resultsCh := make(chan []RPCFetchBatchResult[T], len(chunks))
28+
resultsCh := make(chan []RPCFetchBatchResult[K, T], len(chunks))
3029

3130
for _, chunk := range chunks {
3231
wg.Add(1)
33-
go func(chunk []*big.Int) {
32+
go func(chunk []K) {
3433
defer wg.Done()
35-
resultsCh <- RPCFetchBatch[T](rpc, chunk, method, argsFunc)
34+
resultsCh <- RPCFetchSingleBatch[K, T](rpc, chunk, method, argsFunc)
3635
if batchDelay > 0 {
3736
time.Sleep(time.Duration(batchDelay) * time.Millisecond)
3837
}
@@ -43,25 +42,23 @@ func RPCFetchInBatches[T any](rpc *Client, blockNumbers []*big.Int, batchSize in
4342
close(resultsCh)
4443
}()
4544

46-
results := make([]RPCFetchBatchResult[T], 0, len(blockNumbers))
45+
results := make([]RPCFetchBatchResult[K, T], 0, len(keys))
4746
for batchResults := range resultsCh {
4847
results = append(results, batchResults...)
4948
}
5049

5150
return results
5251
}
5352

54-
func RPCFetchBatch[T any](rpc *Client, blockNumbers []*big.Int, method string, argsFunc func(*big.Int) []interface{}) []RPCFetchBatchResult[T] {
55-
batch := make([]gethRpc.BatchElem, len(blockNumbers))
56-
results := make([]RPCFetchBatchResult[T], len(blockNumbers))
53+
func RPCFetchSingleBatch[K any, T any](rpc *Client, keys []K, method string, argsFunc func(K) []interface{}) []RPCFetchBatchResult[K, T] {
54+
batch := make([]gethRpc.BatchElem, len(keys))
55+
results := make([]RPCFetchBatchResult[K, T], len(keys))
5756

58-
for i, blockNum := range blockNumbers {
59-
results[i] = RPCFetchBatchResult[T]{
60-
BlockNumber: blockNum,
61-
}
57+
for i, key := range keys {
58+
results[i] = RPCFetchBatchResult[K, T]{Key: key}
6259
batch[i] = gethRpc.BatchElem{
6360
Method: method,
64-
Args: argsFunc(blockNum),
61+
Args: argsFunc(key),
6562
Result: new(T),
6663
}
6764
}

internal/rpc/params.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,10 @@ func GetBlockWithTransactionsParams(blockNum *big.Int) []interface{} {
1010
return []interface{}{hexutil.EncodeBig(blockNum), true}
1111
}
1212

13+
func GetTransactionParams(txHash string) []interface{} {
14+
return []interface{}{txHash}
15+
}
16+
1317
func GetBlockWithoutTransactionsParams(blockNum *big.Int) []interface{} {
1418
return []interface{}{hexutil.EncodeBig(blockNum), false}
1519
}

internal/rpc/rpc.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ type GetBlocksResult struct {
2727
Data common.Block
2828
}
2929

30+
type GetTransactionsResult struct {
31+
Error error
32+
Data common.Transaction
33+
}
34+
3035
type BlocksPerRequestConfig struct {
3136
Blocks int
3237
Logs int
@@ -37,13 +42,15 @@ type BlocksPerRequestConfig struct {
3742
type IRPCClient interface {
3843
GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult
3944
GetBlocks(blockNumbers []*big.Int) []GetBlocksResult
45+
GetTransactions(txHashes []string) []GetTransactionsResult
4046
GetLatestBlockNumber() (*big.Int, error)
4147
GetChainID() *big.Int
4248
GetURL() string
4349
GetBlocksPerRequest() BlocksPerRequestConfig
4450
IsWebsocket() bool
4551
SupportsTraceBlock() bool
4652
HasCode(address string) (bool, error)
53+
Close()
4754
}
4855

4956
type Client struct {
@@ -213,28 +220,28 @@ func (rpc *Client) setChainID() error {
213220

214221
func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
215222
var wg sync.WaitGroup
216-
var blocks []RPCFetchBatchResult[common.RawBlock]
217-
var logs []RPCFetchBatchResult[common.RawLogs]
218-
var traces []RPCFetchBatchResult[common.RawTraces]
219-
var receipts []RPCFetchBatchResult[common.RawReceipts]
223+
var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]
224+
var logs []RPCFetchBatchResult[*big.Int, common.RawLogs]
225+
var traces []RPCFetchBatchResult[*big.Int, common.RawTraces]
226+
var receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]
220227
wg.Add(2)
221228

222229
go func() {
223230
defer wg.Done()
224-
result := RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
231+
result := RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithTransactionsParams)
225232
blocks = result
226233
}()
227234

228235
if rpc.supportsBlockReceipts {
229236
go func() {
230237
defer wg.Done()
231-
result := RPCFetchInBatches[common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
238+
result := RPCFetchInBatches[*big.Int, common.RawReceipts](rpc, blockNumbers, rpc.blocksPerRequest.Receipts, config.Cfg.RPC.BlockReceipts.BatchDelay, "eth_getBlockReceipts", GetBlockReceiptsParams)
232239
receipts = result
233240
}()
234241
} else {
235242
go func() {
236243
defer wg.Done()
237-
result := RPCFetchInBatches[common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
244+
result := RPCFetchInBatches[*big.Int, common.RawLogs](rpc, blockNumbers, rpc.blocksPerRequest.Logs, config.Cfg.RPC.Logs.BatchDelay, "eth_getLogs", GetLogsParams)
238245
logs = result
239246
}()
240247
}
@@ -243,7 +250,7 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
243250
wg.Add(1)
244251
go func() {
245252
defer wg.Done()
246-
result := RPCFetchInBatches[common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
253+
result := RPCFetchInBatches[*big.Int, common.RawTraces](rpc, blockNumbers, rpc.blocksPerRequest.Traces, config.Cfg.RPC.Traces.BatchDelay, "trace_block", TraceBlockParams)
247254
traces = result
248255
}()
249256
}
@@ -255,19 +262,34 @@ func (rpc *Client) GetFullBlocks(blockNumbers []*big.Int) []GetFullBlockResult {
255262

256263
func (rpc *Client) GetBlocks(blockNumbers []*big.Int) []GetBlocksResult {
257264
var wg sync.WaitGroup
258-
var blocks []RPCFetchBatchResult[common.RawBlock]
265+
var blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]
259266

260267
wg.Add(1)
261268

262269
go func() {
263270
defer wg.Done()
264-
blocks = RPCFetchBatch[common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
271+
blocks = RPCFetchSingleBatch[*big.Int, common.RawBlock](rpc, blockNumbers, "eth_getBlockByNumber", GetBlockWithoutTransactionsParams)
265272
}()
266273
wg.Wait()
267274

268275
return SerializeBlocks(rpc.chainID, blocks)
269276
}
270277

278+
func (rpc *Client) GetTransactions(txHashes []string) []GetTransactionsResult {
279+
var wg sync.WaitGroup
280+
var transactions []RPCFetchBatchResult[string, common.RawTransaction]
281+
282+
wg.Add(1)
283+
284+
go func() {
285+
defer wg.Done()
286+
transactions = RPCFetchSingleBatch[string, common.RawTransaction](rpc, txHashes, "eth_getTransactionByHash", GetTransactionParams)
287+
}()
288+
wg.Wait()
289+
290+
return SerializeTransactions(rpc.chainID, transactions)
291+
}
292+
271293
func (rpc *Client) GetLatestBlockNumber() (*big.Int, error) {
272294
blockNumber, err := rpc.EthClient.BlockNumber(context.Background())
273295
if err != nil {

internal/rpc/serializer.go

Lines changed: 33 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
"github.com/thirdweb-dev/indexer/internal/common"
1212
)
1313

14-
func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock], logs []RPCFetchBatchResult[common.RawLogs], traces []RPCFetchBatchResult[common.RawTraces], receipts []RPCFetchBatchResult[common.RawReceipts]) []GetFullBlockResult {
14+
func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock], logs []RPCFetchBatchResult[*big.Int, common.RawLogs], traces []RPCFetchBatchResult[*big.Int, common.RawTraces], receipts []RPCFetchBatchResult[*big.Int, common.RawReceipts]) []GetFullBlockResult {
1515
if blocks == nil {
1616
return []GetFullBlockResult{}
1717
}
@@ -21,46 +21,46 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R
2121
rawReceiptsMap := mapBatchResultsByBlockNumber[common.RawReceipts](receipts)
2222
rawTracesMap := mapBatchResultsByBlockNumber[common.RawTraces](traces)
2323

24-
for _, rawBlock := range blocks {
24+
for _, rawBlockData := range blocks {
2525
result := GetFullBlockResult{
26-
BlockNumber: rawBlock.BlockNumber,
26+
BlockNumber: rawBlockData.Key,
2727
}
28-
if rawBlock.Result == nil {
29-
log.Warn().Err(rawBlock.Error).Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
30-
result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlock.Error)
28+
if rawBlockData.Result == nil {
29+
log.Warn().Err(rawBlockData.Error).Msgf("Received a nil block result for block %s.", rawBlockData.Key.String())
30+
result.Error = fmt.Errorf("received a nil block result from RPC. %v", rawBlockData.Error)
3131
results = append(results, result)
3232
continue
3333
}
3434

35-
if rawBlock.Error != nil {
36-
result.Error = rawBlock.Error
35+
if rawBlockData.Error != nil {
36+
result.Error = rawBlockData.Error
3737
results = append(results, result)
3838
continue
3939
}
4040

41-
result.Data.Block = serializeBlock(chainId, rawBlock.Result)
41+
result.Data.Block = serializeBlock(chainId, rawBlockData.Result)
4242
blockTimestamp := result.Data.Block.Timestamp
4343

44-
if rawReceipts, exists := rawReceiptsMap[rawBlock.BlockNumber.String()]; exists {
44+
if rawReceipts, exists := rawReceiptsMap[rawBlockData.Key.String()]; exists {
4545
if rawReceipts.Error != nil {
4646
result.Error = rawReceipts.Error
4747
} else {
4848
result.Data.Logs = serializeLogsFromReceipts(chainId, rawReceipts.Result, result.Data.Block)
49-
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
49+
result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, &rawReceipts.Result)
5050
}
5151
} else {
52-
if rawLogs, exists := rawLogsMap[rawBlock.BlockNumber.String()]; exists {
52+
if rawLogs, exists := rawLogsMap[rawBlockData.Key.String()]; exists {
5353
if rawLogs.Error != nil {
5454
result.Error = rawLogs.Error
5555
} else {
5656
result.Data.Logs = serializeLogs(chainId, rawLogs.Result, result.Data.Block)
57-
result.Data.Transactions = serializeTransactions(chainId, rawBlock.Result["transactions"].([]interface{}), blockTimestamp, nil)
57+
result.Data.Transactions = serializeTransactions(chainId, rawBlockData.Result["transactions"].([]interface{}), blockTimestamp, nil)
5858
}
5959
}
6060
}
6161

6262
if result.Error == nil {
63-
if rawTraces, exists := rawTracesMap[rawBlock.BlockNumber.String()]; exists {
63+
if rawTraces, exists := rawTracesMap[rawBlockData.Key.String()]; exists {
6464
if rawTraces.Error != nil {
6565
result.Error = rawTraces.Error
6666
} else {
@@ -75,26 +75,26 @@ func SerializeFullBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.R
7575
return results
7676
}
7777

78-
func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[T]) map[string]*RPCFetchBatchResult[T] {
78+
func mapBatchResultsByBlockNumber[T any](results []RPCFetchBatchResult[*big.Int, T]) map[string]*RPCFetchBatchResult[*big.Int, T] {
7979
if results == nil {
80-
return make(map[string]*RPCFetchBatchResult[T], 0)
80+
return make(map[string]*RPCFetchBatchResult[*big.Int, T], 0)
8181
}
82-
resultsMap := make(map[string]*RPCFetchBatchResult[T], len(results))
82+
resultsMap := make(map[string]*RPCFetchBatchResult[*big.Int, T], len(results))
8383
for _, result := range results {
84-
resultsMap[result.BlockNumber.String()] = &result
84+
resultsMap[result.Key.String()] = &result
8585
}
8686
return resultsMap
8787
}
8888

89-
func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[common.RawBlock]) []GetBlocksResult {
89+
func SerializeBlocks(chainId *big.Int, blocks []RPCFetchBatchResult[*big.Int, common.RawBlock]) []GetBlocksResult {
9090
results := make([]GetBlocksResult, 0, len(blocks))
9191

9292
for _, rawBlock := range blocks {
9393
result := GetBlocksResult{
94-
BlockNumber: rawBlock.BlockNumber,
94+
BlockNumber: rawBlock.Key,
9595
}
9696
if rawBlock.Result == nil {
97-
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.BlockNumber.String())
97+
log.Warn().Msgf("Received a nil block result for block %s.", rawBlock.Key.String())
9898
result.Error = fmt.Errorf("received a nil block result from RPC")
9999
results = append(results, result)
100100
continue
@@ -473,3 +473,15 @@ func interfaceToJsonString(value interface{}) string {
473473
}
474474
return string(jsonString)
475475
}
476+
477+
func SerializeTransactions(chainId *big.Int, transactions []RPCFetchBatchResult[string, common.RawTransaction]) []GetTransactionsResult {
478+
results := make([]GetTransactionsResult, 0, len(transactions))
479+
for _, transaction := range transactions {
480+
result := GetTransactionsResult{
481+
Error: transaction.Error,
482+
Data: serializeTransaction(chainId, transaction.Result, time.Time{}, nil),
483+
}
484+
results = append(results, result)
485+
}
486+
return results
487+
}

internal/worker/worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func NewWorker(rpc rpc.IRPCClient) *Worker {
2424

2525
func (w *Worker) Run(blockNumbers []*big.Int) []rpc.GetFullBlockResult {
2626
blockCount := len(blockNumbers)
27-
chunks := common.BigIntSliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)
27+
chunks := common.SliceToChunks(blockNumbers, w.rpc.GetBlocksPerRequest().Blocks)
2828

2929
var wg sync.WaitGroup
3030
resultsCh := make(chan []rpc.GetFullBlockResult, len(chunks))

0 commit comments

Comments
 (0)