|
8 | 8 | } from "../../db/wallets/walletNonce"; |
9 | 9 | import { getConfig } from "../../utils/cache/getConfig"; |
10 | 10 | import { getChain } from "../../utils/chain"; |
| 11 | +import { prettifyError } from "../../utils/error"; |
11 | 12 | import { logger } from "../../utils/logger"; |
12 | 13 | import { redis } from "../../utils/redis/redis"; |
13 | 14 | import { thirdwebClient } from "../../utils/sdk"; |
@@ -40,78 +41,49 @@ export const initNonceResyncWorker = async () => { |
40 | 41 | */ |
41 | 42 | const handler: Processor<any, void, string> = async (job: Job<string>) => { |
42 | 43 | const sentNoncesKeys = await redis.keys("nonce-sent*"); |
43 | | - job.log(`Found ${sentNoncesKeys.length} nonce-sent* keys`); |
| 44 | + if (sentNoncesKeys.length === 0) { |
| 45 | + job.log("No active wallets."); |
| 46 | + return; |
| 47 | + } |
44 | 48 |
|
45 | 49 | for (const sentNonceKey of sentNoncesKeys) { |
46 | | - const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey); |
47 | | - |
48 | | - const rpcRequest = getRpcClient({ |
49 | | - client: thirdwebClient, |
50 | | - chain: await getChain(chainId), |
51 | | - }); |
| 50 | + try { |
| 51 | + const { chainId, walletAddress } = splitSentNoncesKey(sentNonceKey); |
52 | 52 |
|
53 | | - const [transactionCount, lastUsedNonceDb] = await Promise.all([ |
54 | | - eth_getTransactionCount(rpcRequest, { |
55 | | - address: walletAddress, |
56 | | - blockTag: "latest", |
57 | | - }), |
58 | | - inspectNonce(chainId, walletAddress), |
59 | | - ]); |
60 | | - |
61 | | - if (Number.isNaN(transactionCount)) { |
62 | | - job.log( |
63 | | - `Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`, |
64 | | - ); |
65 | | - logger({ |
66 | | - level: "error", |
67 | | - message: `[nonceResyncWorker] Received invalid onchain transaction count for ${walletAddress}: ${transactionCount}`, |
68 | | - service: "worker", |
| 53 | + const rpcRequest = getRpcClient({ |
| 54 | + client: thirdwebClient, |
| 55 | + chain: await getChain(chainId), |
69 | 56 | }); |
70 | | - continue; |
71 | | - } |
| 57 | + const lastUsedNonceOnchain = |
| 58 | + (await eth_getTransactionCount(rpcRequest, { |
| 59 | + address: walletAddress, |
| 60 | + blockTag: "latest", |
| 61 | + })) - 1; |
| 62 | + const lastUsedNonceDb = await inspectNonce(chainId, walletAddress); |
72 | 63 |
|
73 | | - const lastUsedNonceOnchain = transactionCount - 1; |
74 | | - |
75 | | - job.log( |
76 | | - `${walletAddress} last used onchain nonce: ${lastUsedNonceOnchain} and last used db nonce: ${lastUsedNonceDb}`, |
77 | | - ); |
78 | | - logger({ |
79 | | - level: "debug", |
80 | | - message: `[nonceResyncWorker] last used onchain nonce: ${transactionCount} and last used db nonce: ${lastUsedNonceDb}`, |
81 | | - service: "worker", |
82 | | - }); |
83 | | - |
84 | | - // If the last used nonce onchain is the same as or ahead of the last used nonce in the db, |
85 | | - // There is no need to resync the nonce. |
86 | | - if (lastUsedNonceOnchain >= lastUsedNonceDb) { |
87 | | - job.log(`No need to resync nonce for ${walletAddress}`); |
88 | | - logger({ |
89 | | - level: "debug", |
90 | | - message: `[nonceResyncWorker] No need to resync nonce for ${walletAddress}`, |
91 | | - service: "worker", |
92 | | - }); |
93 | | - continue; |
94 | | - } |
| 64 | + // Recycle all nonces between (onchain nonce, db nonce] if they aren't in-flight ("sent nonce"). |
| 65 | + const recycled: number[] = []; |
| 66 | + for ( |
| 67 | + let nonce = lastUsedNonceOnchain + 1; |
| 68 | + nonce <= lastUsedNonceDb; |
| 69 | + nonce++ |
| 70 | + ) { |
| 71 | + const exists = await isSentNonce(chainId, walletAddress, nonce); |
| 72 | + if (!exists) { |
| 73 | + await recycleNonce(chainId, walletAddress, nonce); |
| 74 | + recycled.push(nonce); |
| 75 | + } |
| 76 | + } |
95 | 77 |
|
96 | | - // for each nonce between last used db nonce and last used onchain nonce |
97 | | - // check if nonce exists in nonce-sent set |
98 | | - // if it does not exist, recycle it |
99 | | - for ( |
100 | | - let _nonce = lastUsedNonceOnchain + 1; |
101 | | - _nonce < lastUsedNonceDb; |
102 | | - _nonce++ |
103 | | - ) { |
104 | | - const exists = await isSentNonce(chainId, walletAddress, _nonce); |
| 78 | + const message = `wallet=${chainId}:${walletAddress} lastUsedNonceOnchain=${lastUsedNonceOnchain} lastUsedNonceDb=${lastUsedNonceDb}, recycled=${recycled.join(",")}`; |
| 79 | + job.log(message); |
| 80 | + logger({ level: "debug", service: "worker", message }); |
| 81 | + } catch (error) { |
105 | 82 | logger({ |
106 | | - level: "debug", |
107 | | - message: `[nonceResyncWorker] nonce ${_nonce} exists in nonce-sent set: ${exists}`, |
| 83 | + level: "error", |
| 84 | + message: `[nonceResyncWorker] ${prettifyError(error)}`, |
108 | 85 | service: "worker", |
109 | 86 | }); |
110 | | - |
111 | | - // If nonce does not exist in nonce-sent set, recycle it |
112 | | - if (!exists) { |
113 | | - await recycleNonce(chainId, walletAddress, _nonce); |
114 | | - } |
115 | 87 | } |
116 | 88 | } |
117 | 89 | }; |
0 commit comments