Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ All notable changes to the Aptos TypeScript SDK will be captured in this file. T

## Unreleased

- [**Breaking Change**] Adds support to `TransactionWorker` for adding new transactions while the worker is running. Marked as breaking because the semantics of the worker are different, since before it would only submit the original batch of transactions and then do nothing with additional transactions pushed to the worker.

# 1.38.0 (2025-04-02)

- Adds and default implementation of `verifySignatureAsync` to `PublicKey`.
Expand Down
96 changes: 63 additions & 33 deletions examples/typescript/batch_funds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
InputGenerateTransactionPayloadData,
Network,
NetworkToNetworkName,
TransactionWorker,
TransactionWorkerEventsEnum,
UserTransactionResponse,
} from "@aptos-labs/ts-sdk";
Expand All @@ -38,11 +39,13 @@ const aptos = new Aptos(config);
async function main() {
const accountsCount = 2;
const transactionsCount = 10;
const totalTransactions = accountsCount * transactionsCount;

const totalExpectedTransactions = accountsCount * transactionsCount;

const start = Date.now() / 1000; // current time in seconds

console.log("starting...");
console.log("Starting...");

// create senders and recipients accounts
const senders: Account[] = [];
const recipients: Account[] = [];
Expand All @@ -61,9 +64,9 @@ async function main() {
// fund sender accounts
const funds: Array<Promise<UserTransactionResponse>> = [];

for (let i = 0; i < senders.length; i += 1) {
for (const sender of senders) {
funds.push(
aptos.fundAccount({ accountAddress: senders[i].accountAddress.toStringWithoutPrefix(), amount: 10000000000 }),
aptos.fundAccount({ accountAddress: sender.accountAddress.toStringWithoutPrefix(), amount: 10000000000 }),
);
}

Expand All @@ -72,21 +75,19 @@ async function main() {
console.log(`${funds.length} sender accounts funded in ${Date.now() / 1000 - last} seconds`);
last = Date.now() / 1000;

// read sender accounts
// Read sender accounts to check their balances.
const balances: Array<Promise<AccountData>> = [];
for (let i = 0; i < senders.length; i += 1) {
balances.push(aptos.getAccountInfo({ accountAddress: senders[i].accountAddress }));
for (const sender of senders) {
balances.push(aptos.getAccountInfo({ accountAddress: sender.accountAddress }));
}
await Promise.all(balances);

console.log(`${balances.length} sender account balances checked in ${Date.now() / 1000 - last} seconds`);
last = Date.now() / 1000;

// create transactions
// Create transaction payloads.
const payloads: InputGenerateTransactionPayloadData[] = [];
// 100 transactions
for (let j = 0; j < transactionsCount; j += 1) {
// 5 recipients
for (let i = 0; i < recipients.length; i += 1) {
const txn: InputGenerateTransactionPayloadData = {
function: "0x1::aptos_account::transfer",
Expand All @@ -96,33 +97,62 @@ async function main() {
}
}

console.log(`sends ${totalTransactions * senders.length} transactions to ${aptos.config.network}....`);
// emit batch transactions
senders.map((sender) => aptos.transaction.batch.forSingleAccount({ sender, data: payloads }));
console.log(
`Sending ${totalExpectedTransactions * senders.length} (${totalExpectedTransactions} transactions per sender) transactions to ${aptos.config.network}...`,
);

aptos.transaction.batch.on(TransactionWorkerEventsEnum.TransactionSent, async (data) => {
console.log("message:", data.message);
console.log("transaction hash:", data.transactionHash);
});
const transactionWorkers: TransactionWorker[] = [];
for (const sender of senders) {
// Create a transaction worker for each sender.
const transactionWorker = new TransactionWorker(config, sender);
transactionWorkers.push(transactionWorker);

aptos.transaction.batch.on(TransactionWorkerEventsEnum.ExecutionFinish, async (data) => {
// log event output
console.log(data.message);

// verify accounts sequence number
const accounts = senders.map((sender) => aptos.getAccountInfo({ accountAddress: sender.accountAddress }));
const accountsData = await Promise.all(accounts);
accountsData.forEach((accountData) => {
console.log(
`account sequence number is ${(totalTransactions * senders.length) / 2}: ${
accountData.sequence_number === "20"
}`,
);
// Register listeners for certain events.
transactionWorker.on(TransactionWorkerEventsEnum.TransactionSent, async (data) => {
console.log(`Transaction sent. Hash: ${data.transactionHash}. Message: ${data.message}`);
});
transactionWorker.on(TransactionWorkerEventsEnum.TransactionExecuted, async (data) => {
console.log(`Transaction executed. Hash: ${data.transactionHash}. Message: ${data.message}`);
});
// worker finished execution, we can now unsubscribe from event listeners
aptos.transaction.batch.removeAllListeners();
process.exit(0);
transactionWorker.on(TransactionWorkerEventsEnum.TransactionExecutionFailed, async (data) => {
console.log(`Transaction execution failed. Message: ${data.message}`);
});

// Push the payloads to the transaction worker.
transactionWorker.pushMany(payloads.map((payload) => [payload, undefined]));

// Start the transaction worker.
transactionWorker.start();
}

// Wait for all transaction workers to finish, up to 45 seconds.
const timeout = 45 * 1000;
const startTime = Date.now();
await Promise.all(
transactionWorkers.map(async (worker) => {
while (worker.executedTransactions.length < totalExpectedTransactions) {
console.debug("Waiting for transaction worker to finish...");

// Check if we've exceeded the timeout
if (Date.now() - startTime > timeout) {
console.error("Timeout waiting for transaction worker to finish");
worker.stop();
break;
}

await new Promise((resolve) => setTimeout(resolve, 500));
}
}),
);

// Verify the sequence numbers of the accounts.
const accounts = senders.map((sender) => aptos.getAccountInfo({ accountAddress: sender.accountAddress }));
const accountsData = await Promise.all(accounts);
accountsData.forEach((accountData) => {
console.log(`Account sequence number is ${accountData.sequence_number}, it should be ${totalExpectedTransactions}`);
});

process.exit(0);
}

main();
13 changes: 9 additions & 4 deletions src/api/transaction.ts
Original file line number Diff line number Diff line change
Expand Up @@ -623,11 +623,16 @@ export class Transaction {
// TRANSACTION SUBMISSION //

/**
* @deprecated Prefer to use `aptos.transaction.batch.forSingleAccount()`
* @deprecated For a safer, more ergonomic API, consider using `TransactionWorker` directly.
* You can also use `aptos.transaction.batch.forSingleAccount()`.
*
* Batch transactions for a single account by submitting multiple transaction payloads.
* This function is useful for efficiently processing and submitting transactions that do not depend on each other, such as
* batch funding or batch token minting.
* Batch transactions for a single account by submitting multiple transaction
* payloads. This function is useful for efficiently processing and submitting
* transactions that do not depend on each other, such as batch funding or batch token
* minting.
*
* **Warning**: This does not actually use the batch submission API at
* /v1/transactions/batch,it still submits transactions sequentially.
*
* @param args - The arguments for batching transactions.
* @param args.sender - The sender account to sign and submit the transactions.
Expand Down
5 changes: 5 additions & 0 deletions src/api/transactionSubmission/management.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ import { InputGenerateTransactionPayloadData, InputGenerateTransactionOptions }
import { AptosConfig } from "../aptosConfig";
import { Account } from "../../account";

/**
* For a safer, more ergonomic API, consider using {@link TransactionWorker} directly.
*/
export class TransactionManagement extends EventEmitter<TransactionWorkerEvents> {
account!: Account;

Expand Down Expand Up @@ -178,6 +181,8 @@ export class TransactionManagement extends EventEmitter<TransactionWorkerEvents>
}

/**
* For a safer, more ergonomic API, consider using {@link TransactionWorker} directly.
*
* Send batch transactions for a single account.
*
* This function uses a transaction worker that receives payloads to be processed
Expand Down
2 changes: 1 addition & 1 deletion src/transactions/management/accountSequenceNumber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ export class AccountSequenceNumber {
* @group Implementation
* @category Transactions
*/
async nextSequenceNumber(): Promise<bigint | null> {
async nextSequenceNumber(): Promise<bigint> {
/* eslint-disable no-await-in-loop */
while (this.lock) {
await sleep(this.sleepTime);
Expand Down
53 changes: 47 additions & 6 deletions src/transactions/management/asyncQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,31 @@ export class AsyncQueue<T> {
* @category Transactions
*/
enqueue(item: T): void {
this.cancelled = false;
this.enqueueMany([item]);
}

if (this.pendingDequeue.length > 0) {
const promise = this.pendingDequeue.shift();
/**
* Enqueues multiple items to the queue.
*
* @param items - The items to be added to the queue.
* @group Implementation
* @category Transactions
*/
enqueueMany(items: T[]): void {
this.cancelled = false;

promise?.resolve(item);
// Process as many items as we have pending dequeues.
const numItemsToResolveImmediately = Math.min(this.pendingDequeue.length, items.length);

return;
for (let i = 0; i < numItemsToResolveImmediately; i++) {
const promise = this.pendingDequeue.shift();
promise?.resolve(items[i]);
}

this.queue.push(item);
// Add remaining items to the queue.
if (numItemsToResolveImmediately < items.length) {
this.queue.push(...items.slice(numItemsToResolveImmediately));
}
}

/**
Expand All @@ -61,6 +75,33 @@ export class AsyncQueue<T> {
});
}

/**
* Dequeues all items from the queue and returns a promise that resolves to an array
* of items. If the queue is empty, it creates a new promise that will be resolved to
* an array with a single item when an item is enqueued.
*
* @returns Promise<T[]> - A promise that resolves to an array of all items in the queue.
* @group Implementation
* @category Transactions
*/
async dequeueAll(): Promise<T[]> {
if (this.queue.length > 0) {
// Get all items from the queue.
const items = [...this.queue];
// Clear the queue.
this.queue.length = 0;
return Promise.resolve(items);
}

return new Promise<T[]>((resolve, reject) => {
// When an item is added, it will resolve with that single item as an array.
this.pendingDequeue.push({
resolve: (item: T) => resolve([item]),
reject,
});
});
}

/**
* Determine whether the queue is empty.
*
Expand Down
1 change: 1 addition & 0 deletions src/transactions/management/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from "./accountSequenceNumber";
export * from "./transactionWorker";
export { AsyncQueueCancelledError } from "./asyncQueue";
Loading