Skip to content

Commit 5bcbd60

Browse files
feat: integrates new lockService in sendRawTransaction (#4627)
Signed-off-by: Konstantina Blazhukova <[email protected]> Signed-off-by: konstantinabl <[email protected]> Co-authored-by: Logan Nguyen <[email protected]> Signed-off-by: Konstantina Blazhukova <[email protected]>
1 parent 7c40cff commit 5bcbd60

File tree

12 files changed

+679
-769
lines changed

12 files changed

+679
-769
lines changed

package-lock.json

Lines changed: 197 additions & 713 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/config-service/src/services/globalConfig.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,11 @@ const _CONFIG = {
664664
required: false,
665665
defaultValue: 50,
666666
},
667+
ENABLE_NONCE_ORDERING: {
668+
type: 'boolean',
669+
required: false,
670+
defaultValue: false,
671+
},
667672
USE_MIRROR_NODE_MODULARIZED_SERVICES: {
668673
type: 'boolean',
669674
required: false,

packages/relay/src/lib/eth.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
IBlockService,
2020
ICommonService,
2121
IContractService,
22+
LockService,
2223
TransactionPoolService,
2324
TransactionService,
2425
} from './services';
@@ -126,6 +127,7 @@ export class EthImpl implements Eth {
126127
chain: string,
127128
public readonly cacheService: CacheService,
128129
storage: PendingTransactionStorage,
130+
lockService: LockService,
129131
) {
130132
this.chain = chain;
131133
this.logger = logger;
@@ -146,6 +148,7 @@ export class EthImpl implements Eth {
146148
logger,
147149
mirrorNodeClient,
148150
transactionPoolService,
151+
lockService,
149152
);
150153
this.accountService = new AccountService(
151154
cacheService,

packages/relay/src/lib/relay.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import { DebugImpl } from './debug';
2020
import { RpcMethodDispatcher } from './dispatcher';
2121
import { EthImpl } from './eth';
2222
import { NetImpl } from './net';
23+
import { LockService, LockStrategyFactory } from './services';
2324
import { CacheService } from './services/cacheService/cacheService';
2425
import HAPIService from './services/hapiService/hapiService';
2526
import { HbarLimitService } from './services/hbarLimitService';
@@ -314,7 +315,7 @@ export class Relay {
314315
duration,
315316
);
316317

317-
// Create HAPI service
318+
const lockService = new LockService(LockStrategyFactory.create(this.redisClient, this.logger));
318319
const hapiService = new HAPIService(this.logger, this.register, hbarLimitService);
319320
this.operatorAccountId = hapiService.getOperatorAccountId();
320321

@@ -348,6 +349,7 @@ export class Relay {
348349
chainId,
349350
this.cacheService,
350351
storage,
352+
lockService,
351353
);
352354

353355
// Set up event listeners

packages/relay/src/lib/services/ethService/transactionService/TransactionService.ts

Lines changed: 65 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import { Precheck } from '../../../precheck';
2121
import { ITransactionReceipt, RequestDetails, TypedEvents } from '../../../types';
2222
import { CacheService } from '../../cacheService/cacheService';
2323
import HAPIService from '../../hapiService/hapiService';
24-
import { ICommonService, TransactionPoolService } from '../../index';
24+
import { ICommonService, LockService, TransactionPoolService } from '../../index';
2525
import { ITransactionService } from './ITransactionService';
2626

2727
export class TransactionService implements ITransactionService {
@@ -46,6 +46,13 @@ export class TransactionService implements ITransactionService {
4646
*/
4747
private readonly hapiService: HAPIService;
4848

49+
/**
50+
* The lock service for managing transaction ordering.
51+
* @private
52+
* @readonly
53+
*/
54+
private readonly lockService: LockService;
55+
4956
/**
5057
* Logger instance for logging messages.
5158
* @private
@@ -86,6 +93,7 @@ export class TransactionService implements ITransactionService {
8693
logger: Logger,
8794
mirrorNodeClient: MirrorNodeClient,
8895
transactionPoolService: TransactionPoolService,
96+
lockService: LockService,
8997
) {
9098
this.cacheService = cacheService;
9199
this.chain = chain;
@@ -96,6 +104,7 @@ export class TransactionService implements ITransactionService {
96104
this.mirrorNodeClient = mirrorNodeClient;
97105
this.precheck = new Precheck(mirrorNodeClient, chain, transactionPoolService);
98106
this.transactionPoolService = transactionPoolService;
107+
this.lockService = lockService;
99108
}
100109

101110
/**
@@ -253,36 +262,60 @@ export class TransactionService implements ITransactionService {
253262

254263
const transactionBuffer = Buffer.from(this.prune0x(transaction), 'hex');
255264
const parsedTx = Precheck.parseRawTransaction(transaction);
256-
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
257-
await this.common.getGasPriceInWeibars(requestDetails),
258-
);
265+
let lockSessionKey: string | undefined;
266+
267+
// Acquire lock FIRST - before any side effects or async operations
268+
// This ensures proper nonce ordering for transactions from the same sender
269+
if (parsedTx.from) {
270+
lockSessionKey = await this.lockService.acquireLock(parsedTx.from);
271+
}
259272

260-
await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);
273+
try {
274+
const networkGasPriceInWeiBars = Utils.addPercentageBufferToGasPrice(
275+
await this.common.getGasPriceInWeibars(requestDetails),
276+
);
261277

262-
// Save the transaction to the transaction pool before submitting it to the network
263-
await this.transactionPoolService.saveTransaction(parsedTx.from!, parsedTx);
278+
await this.validateRawTransaction(parsedTx, networkGasPriceInWeiBars, requestDetails);
264279

265-
/**
266-
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
267-
* the transaction hash is calculated and returned immediately after passing all prechecks.
268-
* All transaction processing logic is then handled asynchronously in the background.
269-
*/
270-
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
271-
if (useAsyncTxProcessing) {
272-
this.sendRawTransactionProcessor(transactionBuffer, parsedTx, networkGasPriceInWeiBars, requestDetails);
273-
return Utils.computeTransactionHash(transactionBuffer);
274-
}
280+
// Save the transaction to the transaction pool before submitting it to the network
281+
await this.transactionPoolService.saveTransaction(parsedTx.from!, parsedTx);
275282

276-
/**
277-
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
278-
* wait for all transaction processing logic to complete before returning the transaction hash.
279-
*/
280-
return await this.sendRawTransactionProcessor(
281-
transactionBuffer,
282-
parsedTx,
283-
networkGasPriceInWeiBars,
284-
requestDetails,
285-
);
283+
/**
284+
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is enabled,
285+
* the transaction hash is calculated and returned immediately after passing all prechecks.
286+
* All transaction processing logic is then handled asynchronously in the background.
287+
*/
288+
const useAsyncTxProcessing = ConfigService.get('USE_ASYNC_TX_PROCESSING');
289+
if (useAsyncTxProcessing) {
290+
// Fire and forget - lock will be released after consensus submission
291+
this.sendRawTransactionProcessor(
292+
transactionBuffer,
293+
parsedTx,
294+
networkGasPriceInWeiBars,
295+
lockSessionKey,
296+
requestDetails,
297+
);
298+
return Utils.computeTransactionHash(transactionBuffer);
299+
}
300+
301+
/**
302+
* Note: If the USE_ASYNC_TX_PROCESSING feature flag is disabled,
303+
* wait for all transaction processing logic to complete before returning the transaction hash.
304+
*/
305+
return await this.sendRawTransactionProcessor(
306+
transactionBuffer,
307+
parsedTx,
308+
networkGasPriceInWeiBars,
309+
lockSessionKey,
310+
requestDetails,
311+
);
312+
} catch (error) {
313+
// Release lock on any error during validation or prechecks
314+
if (lockSessionKey) {
315+
await this.lockService.releaseLock(parsedTx.from!, lockSessionKey);
316+
}
317+
throw error;
318+
}
286319
}
287320

288321
/**
@@ -472,12 +505,14 @@ export class TransactionService implements ITransactionService {
472505
* @param {EthersTransaction} parsedTx - The parsed Ethereum transaction object.
473506
* @param {number} networkGasPriceInWeiBars - The current network gas price in wei bars.
474507
* @param {RequestDetails} requestDetails - Details of the request for logging and tracking purposes.
508+
* @param {string | null} lockSessionKey - The session key for the acquired lock, null if no lock was acquired.
475509
* @returns {Promise<string | JsonRpcError>} A promise that resolves to the transaction hash if successful, or a JsonRpcError if an error occurs.
476510
*/
477511
async sendRawTransactionProcessor(
478512
transactionBuffer: Buffer,
479513
parsedTx: EthersTransaction,
480514
networkGasPriceInWeiBars: number,
515+
lockSessionKey: string | undefined,
481516
requestDetails: RequestDetails,
482517
): Promise<string | JsonRpcError> {
483518
let sendRawTransactionError: any;
@@ -495,6 +530,9 @@ export class TransactionService implements ITransactionService {
495530
requestDetails,
496531
);
497532

533+
if (lockSessionKey) {
534+
await this.lockService.releaseLock(originalCallerAddress, lockSessionKey);
535+
}
498536
// Remove the transaction from the transaction pool after submission
499537
await this.transactionPoolService.removeTransaction(originalCallerAddress, parsedTx.serialized);
500538

packages/relay/src/lib/services/lockService/LocalLockStrategy.ts

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { randomUUID } from 'crypto';
66
import { LRUCache } from 'lru-cache';
77
import { Logger } from 'pino';
88

9+
import { LockStrategy } from '../../types/lock';
910
import { LockService } from './LockService';
1011

1112
/**
@@ -24,7 +25,7 @@ export interface LockState {
2425
* Each unique "address" gets its own mutex to ensure only one session can hold
2526
* the lock at a time. Locks are auto-expiring and stored in an LRU cache.
2627
*/
27-
export class LocalLockStrategy {
28+
export class LocalLockStrategy implements LockStrategy {
2829
/**
2930
* LRU cache of lock states, keyed by address.
3031
*/
@@ -55,12 +56,11 @@ export class LocalLockStrategy {
5556
* @param address - The key representing the resource to lock
5657
* @returns A session key identifying the current lock owner
5758
*/
58-
async acquireLock(address: string): Promise<string> {
59+
async acquireLock(address: string): Promise<string | undefined> {
60+
const sessionKey = randomUUID();
5961
if (this.logger.isLevelEnabled('debug')) {
60-
this.logger.debug(`Acquiring lock for address ${address}.`);
62+
this.logger.debug(`Acquiring lock for address ${address} and sessionkey ${sessionKey}.`);
6163
}
62-
63-
const sessionKey = randomUUID();
6464
const state = this.getOrCreateState(address);
6565

6666
// Acquire the mutex (this will block until available)
@@ -85,16 +85,19 @@ export class LocalLockStrategy {
8585
* @param sessionKey - The session key of the lock holder
8686
*/
8787
async releaseLock(address: string, sessionKey: string): Promise<void> {
88-
const state = this.localLockStates.get(address);
89-
90-
if (this.logger.isLevelEnabled('debug') && state?.acquiredAt) {
91-
const holdTime = Date.now() - state.acquiredAt;
92-
this.logger.debug(`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`);
93-
}
94-
95-
// Ensure only the lock owner can release
96-
if (state?.sessionKey === sessionKey) {
97-
await this.doRelease(state);
88+
const normalizedAddress = LockService.normalizeAddress(address);
89+
const state = this.localLockStates.get(normalizedAddress);
90+
if (state) {
91+
// Ensure only the lock owner can release
92+
if (state.sessionKey === sessionKey) {
93+
await this.doRelease(state);
94+
if (this.logger.isLevelEnabled('debug')) {
95+
const holdTime = Date.now() - state.acquiredAt!;
96+
this.logger.debug(
97+
`Releasing lock for address ${address} and session key ${sessionKey} held for ${holdTime}ms.`,
98+
);
99+
}
100+
}
98101
}
99102
}
100103

@@ -144,7 +147,8 @@ export class LocalLockStrategy {
144147
* @param sessionKey - The session key to verify ownership before releasing
145148
*/
146149
private async forceReleaseExpiredLock(address: string, sessionKey: string): Promise<void> {
147-
const state = this.localLockStates.get(address);
150+
const normalizedAddress = LockService.normalizeAddress(address);
151+
const state = this.localLockStates.get(normalizedAddress);
148152

149153
if (state?.sessionKey === sessionKey) {
150154
await this.doRelease(state);

packages/relay/src/lib/services/lockService/LockService.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// SPDX-License-Identifier: Apache-2.0
22

3+
import { ConfigService } from '@hashgraph/json-rpc-config-service/dist/services';
4+
35
import { LockStrategy } from '../../types';
46

57
/**
@@ -28,7 +30,10 @@ export class LockService {
2830
* @param address - The sender address to acquire the lock for.
2931
* @returns A promise that resolves to a unique session key, or null if acquisition fails (fail open).
3032
*/
31-
async acquireLock(address: string): Promise<string | null> {
33+
async acquireLock(address: string): Promise<string | undefined> {
34+
if (!ConfigService.get('ENABLE_NONCE_ORDERING')) {
35+
return;
36+
}
3237
return await this.strategy.acquireLock(address);
3338
}
3439

@@ -40,7 +45,9 @@ export class LockService {
4045
* @param sessionKey - The session key obtained during lock acquisition.
4146
*/
4247
async releaseLock(address: string, sessionKey: string): Promise<void> {
43-
await this.strategy.releaseLock(address, sessionKey);
48+
if (ConfigService.get('ENABLE_NONCE_ORDERING')) {
49+
await this.strategy.releaseLock(address, sessionKey);
50+
}
4451
}
4552

4653
/**

packages/relay/src/lib/services/lockService/RedisLockStrategy.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ export class RedisLockStrategy implements LockStrategy {
3939
* @param address - The sender address to acquire the lock for (will be normalized).
4040
* @returns A promise that resolves to a unique session key upon successful acquisition, or null if acquisition fails (fail open).
4141
*/
42-
async acquireLock(address: string): Promise<string | null> {
42+
async acquireLock(address: string): Promise<string | undefined> {
4343
const sessionKey = this.generateSessionKey();
4444
const lockKey = this.getLockKey(address);
4545
const queueKey = this.getQueueKey(address);
@@ -86,7 +86,7 @@ export class RedisLockStrategy implements LockStrategy {
8686
}
8787
} catch (error) {
8888
this.logger.error(error, `Failed to acquire lock: address=${address}, sessionKey=${sessionKey}. Failing open.`);
89-
return null;
89+
return;
9090
} finally {
9191
// Always remove from queue if we joined it (whether success or failure)
9292
if (joinedQueue) {

packages/relay/src/lib/types/lock.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export interface LockStrategy {
1515
* @param address - The address to acquire the lock for (will be normalized by implementation).
1616
* @returns A promise that resolves to a unique session key upon successful acquisition, or null if acquisition fails (fail open).
1717
*/
18-
acquireLock(address: string): Promise<string | null>;
18+
acquireLock(address: string): Promise<string | undefined>;
1919

2020
/**
2121
* Releases a lock for the specified address.

0 commit comments

Comments
 (0)