Skip to content

Commit 88030ed

Browse files
committed
Add support for continually adding more txns to TransactionWorker, make it more extensible
1 parent 8844a55 commit 88030ed

File tree

10 files changed

+399
-167
lines changed

10 files changed

+399
-167
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ All notable changes to the Aptos TypeScript SDK will be captured in this file. T
44

55
## Unreleased
66

7+
- [**Breaking Change**] Adds support to `TransactionWorker` for adding new transactions while the worker is running. Marked as breaking because the semantics of the worker are different, since before it would only submit the original batch of transactions and then do nothing with additional transactions pushed to the worker.
8+
79
# 1.38.0 (2025-04-02)
810

911
- Adds and default implementation of `verifySignatureAsync` to `PublicKey`.

examples/typescript/batch_funds.ts

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import {
2626
InputGenerateTransactionPayloadData,
2727
Network,
2828
NetworkToNetworkName,
29+
TransactionWorker,
2930
TransactionWorkerEventsEnum,
3031
UserTransactionResponse,
3132
} from "@aptos-labs/ts-sdk";
@@ -38,11 +39,13 @@ const aptos = new Aptos(config);
3839
async function main() {
3940
const accountsCount = 2;
4041
const transactionsCount = 10;
41-
const totalTransactions = accountsCount * transactionsCount;
42+
43+
const totalExpectedTransactions = accountsCount * transactionsCount;
4244

4345
const start = Date.now() / 1000; // current time in seconds
4446

45-
console.log("starting...");
47+
console.log("Starting...");
48+
4649
// create senders and recipients accounts
4750
const senders: Account[] = [];
4851
const recipients: Account[] = [];
@@ -61,9 +64,9 @@ async function main() {
6164
// fund sender accounts
6265
const funds: Array<Promise<UserTransactionResponse>> = [];
6366

64-
for (let i = 0; i < senders.length; i += 1) {
67+
for (const sender of senders) {
6568
funds.push(
66-
aptos.fundAccount({ accountAddress: senders[i].accountAddress.toStringWithoutPrefix(), amount: 10000000000 }),
69+
aptos.fundAccount({ accountAddress: sender.accountAddress.toStringWithoutPrefix(), amount: 10000000000 }),
6770
);
6871
}
6972

@@ -72,21 +75,19 @@ async function main() {
7275
console.log(`${funds.length} sender accounts funded in ${Date.now() / 1000 - last} seconds`);
7376
last = Date.now() / 1000;
7477

75-
// read sender accounts
78+
// Read sender accounts to check their balances.
7679
const balances: Array<Promise<AccountData>> = [];
77-
for (let i = 0; i < senders.length; i += 1) {
78-
balances.push(aptos.getAccountInfo({ accountAddress: senders[i].accountAddress }));
80+
for (const sender of senders) {
81+
balances.push(aptos.getAccountInfo({ accountAddress: sender.accountAddress }));
7982
}
8083
await Promise.all(balances);
8184

8285
console.log(`${balances.length} sender account balances checked in ${Date.now() / 1000 - last} seconds`);
8386
last = Date.now() / 1000;
8487

85-
// create transactions
88+
// Create transaction payloads.
8689
const payloads: InputGenerateTransactionPayloadData[] = [];
87-
// 100 transactions
8890
for (let j = 0; j < transactionsCount; j += 1) {
89-
// 5 recipients
9091
for (let i = 0; i < recipients.length; i += 1) {
9192
const txn: InputGenerateTransactionPayloadData = {
9293
function: "0x1::aptos_account::transfer",
@@ -96,33 +97,62 @@ async function main() {
9697
}
9798
}
9899

99-
console.log(`sends ${totalTransactions * senders.length} transactions to ${aptos.config.network}....`);
100-
// emit batch transactions
101-
senders.map((sender) => aptos.transaction.batch.forSingleAccount({ sender, data: payloads }));
100+
console.log(
101+
`Sending ${totalExpectedTransactions * senders.length} (${totalExpectedTransactions} transactions per sender) transactions to ${aptos.config.network}...`,
102+
);
102103

103-
aptos.transaction.batch.on(TransactionWorkerEventsEnum.TransactionSent, async (data) => {
104-
console.log("message:", data.message);
105-
console.log("transaction hash:", data.transactionHash);
106-
});
104+
const transactionWorkers: TransactionWorker[] = [];
105+
for (const sender of senders) {
106+
// Create a transaction worker for each sender.
107+
const transactionWorker = new TransactionWorker(config, sender);
108+
transactionWorkers.push(transactionWorker);
107109

108-
aptos.transaction.batch.on(TransactionWorkerEventsEnum.ExecutionFinish, async (data) => {
109-
// log event output
110-
console.log(data.message);
111-
112-
// verify accounts sequence number
113-
const accounts = senders.map((sender) => aptos.getAccountInfo({ accountAddress: sender.accountAddress }));
114-
const accountsData = await Promise.all(accounts);
115-
accountsData.forEach((accountData) => {
116-
console.log(
117-
`account sequence number is ${(totalTransactions * senders.length) / 2}: ${
118-
accountData.sequence_number === "20"
119-
}`,
120-
);
110+
// Register listeners for certain events.
111+
transactionWorker.on(TransactionWorkerEventsEnum.TransactionSent, async (data) => {
112+
console.log(`Transaction sent. Hash: ${data.transactionHash}. Message: ${data.message}`);
113+
});
114+
transactionWorker.on(TransactionWorkerEventsEnum.TransactionExecuted, async (data) => {
115+
console.log(`Transaction executed. Hash: ${data.transactionHash}. Message: ${data.message}`);
121116
});
122-
// worker finished execution, we can now unsubscribe from event listeners
123-
aptos.transaction.batch.removeAllListeners();
124-
process.exit(0);
117+
transactionWorker.on(TransactionWorkerEventsEnum.TransactionExecutionFailed, async (data) => {
118+
console.log(`Transaction execution failed. Message: ${data.message}`);
119+
});
120+
121+
// Push the payloads to the transaction worker.
122+
transactionWorker.pushMany(payloads.map((payload) => [payload, undefined]));
123+
124+
// Start the transaction worker.
125+
transactionWorker.start();
126+
}
127+
128+
// Wait for all transaction workers to finish, up to 45 seconds.
129+
const timeout = 45 * 1000;
130+
const startTime = Date.now();
131+
await Promise.all(
132+
transactionWorkers.map(async (worker) => {
133+
while (worker.executedTransactions.length < totalExpectedTransactions) {
134+
console.debug("Waiting for transaction worker to finish...");
135+
136+
// Check if we've exceeded the timeout
137+
if (Date.now() - startTime > timeout) {
138+
console.error("Timeout waiting for transaction worker to finish");
139+
worker.stop();
140+
break;
141+
}
142+
143+
await new Promise((resolve) => setTimeout(resolve, 500));
144+
}
145+
}),
146+
);
147+
148+
// Verify the sequence numbers of the accounts.
149+
const accounts = senders.map((sender) => aptos.getAccountInfo({ accountAddress: sender.accountAddress }));
150+
const accountsData = await Promise.all(accounts);
151+
accountsData.forEach((accountData) => {
152+
console.log(`Account sequence number is ${accountData.sequence_number}, it should be ${totalExpectedTransactions}`);
125153
});
154+
155+
process.exit(0);
126156
}
127157

128158
main();

src/api/transaction.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -623,11 +623,16 @@ export class Transaction {
623623
// TRANSACTION SUBMISSION //
624624

625625
/**
626-
* @deprecated Prefer to use `aptos.transaction.batch.forSingleAccount()`
626+
* @deprecated For a safer, more ergonomic API, consider using `TransactionWorker` directly.
627+
* You can also use `aptos.transaction.batch.forSingleAccount()`.
627628
*
628-
* Batch transactions for a single account by submitting multiple transaction payloads.
629-
* This function is useful for efficiently processing and submitting transactions that do not depend on each other, such as
630-
* batch funding or batch token minting.
629+
* Batch transactions for a single account by submitting multiple transaction
630+
* payloads. This function is useful for efficiently processing and submitting
631+
* transactions that do not depend on each other, such as batch funding or batch token
632+
* minting.
633+
*
634+
* **Warning**: This does not actually use the batch submission API at
635+
* /v1/transactions/batch,it still submits transactions sequentially.
631636
*
632637
* @param args - The arguments for batching transactions.
633638
* @param args.sender - The sender account to sign and submit the transactions.

src/api/transactionSubmission/management.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import { InputGenerateTransactionPayloadData, InputGenerateTransactionOptions }
44
import { AptosConfig } from "../aptosConfig";
55
import { Account } from "../../account";
66

7+
/**
8+
* For a safer, more ergonomic API, consider using {@link TransactionWorker} directly.
9+
*/
710
export class TransactionManagement extends EventEmitter<TransactionWorkerEvents> {
811
account!: Account;
912

@@ -178,6 +181,8 @@ export class TransactionManagement extends EventEmitter<TransactionWorkerEvents>
178181
}
179182

180183
/**
184+
* For a safer, more ergonomic API, consider using {@link TransactionWorker} directly.
185+
*
181186
* Send batch transactions for a single account.
182187
*
183188
* This function uses a transaction worker that receives payloads to be processed

src/transactions/management/accountSequenceNumber.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ export class AccountSequenceNumber {
110110
* @group Implementation
111111
* @category Transactions
112112
*/
113-
async nextSequenceNumber(): Promise<bigint | null> {
113+
async nextSequenceNumber(): Promise<bigint> {
114114
/* eslint-disable no-await-in-loop */
115115
while (this.lock) {
116116
await sleep(this.sleepTime);

src/transactions/management/asyncQueue.ts

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,31 @@ export class AsyncQueue<T> {
3030
* @category Transactions
3131
*/
3232
enqueue(item: T): void {
33-
this.cancelled = false;
33+
this.enqueueMany([item]);
34+
}
3435

35-
if (this.pendingDequeue.length > 0) {
36-
const promise = this.pendingDequeue.shift();
36+
/**
37+
* Enqueues multiple items to the queue.
38+
*
39+
* @param items - The items to be added to the queue.
40+
* @group Implementation
41+
* @category Transactions
42+
*/
43+
enqueueMany(items: T[]): void {
44+
this.cancelled = false;
3745

38-
promise?.resolve(item);
46+
// Process as many items as we have pending dequeues.
47+
const numItemsToResolveImmediately = Math.min(this.pendingDequeue.length, items.length);
3948

40-
return;
49+
for (let i = 0; i < numItemsToResolveImmediately; i++) {
50+
const promise = this.pendingDequeue.shift();
51+
promise?.resolve(items[i]);
4152
}
4253

43-
this.queue.push(item);
54+
// Add remaining items to the queue.
55+
if (numItemsToResolveImmediately < items.length) {
56+
this.queue.push(...items.slice(numItemsToResolveImmediately));
57+
}
4458
}
4559

4660
/**
@@ -61,6 +75,33 @@ export class AsyncQueue<T> {
6175
});
6276
}
6377

78+
/**
79+
* Dequeues all items from the queue and returns a promise that resolves to an array
80+
* of items. If the queue is empty, it creates a new promise that will be resolved to
81+
* an array with a single item when an item is enqueued.
82+
*
83+
* @returns Promise<T[]> - A promise that resolves to an array of all items in the queue.
84+
* @group Implementation
85+
* @category Transactions
86+
*/
87+
async dequeueAll(): Promise<T[]> {
88+
if (this.queue.length > 0) {
89+
// Get all items from the queue.
90+
const items = [...this.queue];
91+
// Clear the queue.
92+
this.queue.length = 0;
93+
return Promise.resolve(items);
94+
}
95+
96+
return new Promise<T[]>((resolve, reject) => {
97+
// When an item is added, it will resolve with that single item as an array.
98+
this.pendingDequeue.push({
99+
resolve: (item: T) => resolve([item]),
100+
reject,
101+
});
102+
});
103+
}
104+
64105
/**
65106
* Determine whether the queue is empty.
66107
*
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
export * from "./accountSequenceNumber";
22
export * from "./transactionWorker";
3+
export { AsyncQueueCancelledError } from "./asyncQueue";

0 commit comments

Comments
 (0)