diff --git a/console-jest.config.js b/console-jest.config.js index 2f3268a45..c933afc6e 100644 --- a/console-jest.config.js +++ b/console-jest.config.js @@ -1,3 +1,3 @@ -import console from "console"; +import "reflect-metadata"; -global.console = console; +globalThis.console = console; diff --git a/package-lock.json b/package-lock.json index 239dab20e..cd389e5d2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -44,7 +44,7 @@ "ts-jest": "^29.0.5", "typedoc": "^0.26.11", "typedoc-plugin-frontmatter": "1.0.0", - "typedoc-plugin-inline-sources": "1.2.0", + "typedoc-plugin-inline-sources": "^1.2.0", "typedoc-plugin-markdown": "4.2.10", "typescript": "5.1" } diff --git a/package.json b/package.json index a27f72dc1..70441d8ea 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,7 @@ "ts-jest": "^29.0.5", "typedoc": "^0.26.11", "typedoc-plugin-frontmatter": "1.0.0", - "typedoc-plugin-inline-sources": "1.2.0", + "typedoc-plugin-inline-sources": "^1.2.0", "typedoc-plugin-markdown": "4.2.10", "typescript": "5.1" }, diff --git a/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts b/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts index 4f991d012..9aded3518 100644 --- a/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts +++ b/packages/indexer/src/api/GeneratedResolverFactoryGraphqlModule.ts @@ -90,6 +90,7 @@ export class GeneratedResolverFactoryGraphqlModule extends ResolverFactoryGraphq public async initializePrismaClient() { // setup the prisma client and feed it to the server, // since this is necessary for the returned resolvers to work + const prismaClient = new PrismaClient({ // datasourceUrl: 'postgresql://admin:password@localhost:5433/protokit-indexer?schema=public' }); diff --git a/packages/persistance/prisma/migrations/20251222105633_pending_l1_transactions/migration.sql b/packages/persistance/prisma/migrations/20251222105633_pending_l1_transactions/migration.sql new file mode 100644 index 000000000..b98317704 --- /dev/null +++ b/packages/persistance/prisma/migrations/20251222105633_pending_l1_transactions/migration.sql @@ -0,0 +1,44 @@ +/* + Warnings: + + - You are about to drop the column `settlementTransactionHash` on the `Batch` table. All the data in the column will be lost. + - The primary key for the `Settlement` table will be changed. If it partially fails, the table could be left without primary key constraint. + - You are about to drop the column `transactionHash` on the `Settlement` table. All the data in the column will be lost. + - Added the required column `transactionId` to the `Settlement` table without a default value. This is not possible if the table is not empty. + +*/ +-- DropForeignKey +ALTER TABLE "Batch" DROP CONSTRAINT "Batch_settlementTransactionHash_fkey"; + +-- AlterTable +ALTER TABLE "Batch" DROP COLUMN "settlementTransactionHash", +ADD COLUMN "settlementTransactionId" TEXT; + +-- AlterTable +ALTER TABLE "Settlement" DROP CONSTRAINT "Settlement_pkey", +DROP COLUMN "transactionHash", +ADD COLUMN "transactionId" TEXT NOT NULL, +ADD CONSTRAINT "Settlement_pkey" PRIMARY KEY ("transactionId"); + +-- CreateTable +CREATE TABLE "PendingL1Transaction" ( + "id" TEXT NOT NULL, + "sender" TEXT NOT NULL, + "nonce" INTEGER NOT NULL, + "attempts" INTEGER NOT NULL, + "status" VARCHAR(32) NOT NULL, + "transaction" JSON NOT NULL, + "lastError" TEXT, + "sentAt" TIMESTAMP(3), + + CONSTRAINT "PendingL1Transaction_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "PendingL1Transaction_sender_nonce_key" ON "PendingL1Transaction"("sender", "nonce"); + +-- AddForeignKey +ALTER TABLE "Batch" ADD CONSTRAINT "Batch_settlementTransactionId_fkey" FOREIGN KEY ("settlementTransactionId") REFERENCES "Settlement"("transactionId") ON DELETE SET NULL ON UPDATE CASCADE; + +-- AddForeignKey +ALTER TABLE "Settlement" ADD CONSTRAINT "Settlement_transactionId_fkey" FOREIGN KEY ("transactionId") REFERENCES "PendingL1Transaction"("id") ON DELETE RESTRICT ON UPDATE CASCADE; diff --git a/packages/persistance/prisma/schema.prisma b/packages/persistance/prisma/schema.prisma index d0ed1472e..9f985e67e 100644 --- a/packages/persistance/prisma/schema.prisma +++ b/packages/persistance/prisma/schema.prisma @@ -105,8 +105,8 @@ model Batch { blocks Block[] - settlementTransactionHash String? - settlement Settlement? @relation(fields: [settlementTransactionHash], references: [transactionHash]) + settlementTransactionId String? + settlement Settlement? @relation(fields: [settlementTransactionId], references: [transactionId]) } model BlockResult { @@ -123,11 +123,11 @@ model BlockResult { } model Settlement { - // transaction String - transactionHash String @id + transactionId String @id promisedMessagesHash String - batches Batch[] + batches Batch[] + pendingL1Transaction PendingL1Transaction? @relation(fields: [transactionId], references: [id]) } model IncomingMessageBatchTransaction { @@ -148,3 +148,21 @@ model IncomingMessageBatch { messages IncomingMessageBatchTransaction[] } + +model PendingL1Transaction { + id String @id @default(cuid()) + sender String + nonce Int + attempts Int + status String @db.VarChar(32) + transaction Json @db.Json + hash String? + lastError String? + sentAt DateTime? + queuedAt DateTime? + nextActionAt DateTime? + + settlement Settlement? + + @@unique([sender, nonce]) +} diff --git a/packages/persistance/src/PrismaDatabaseConnection.ts b/packages/persistance/src/PrismaDatabaseConnection.ts index 84440ca1c..5e0453962 100644 --- a/packages/persistance/src/PrismaDatabaseConnection.ts +++ b/packages/persistance/src/PrismaDatabaseConnection.ts @@ -13,6 +13,7 @@ import { PrismaBlockStorage } from "./services/prisma/PrismaBlockStorage"; import { PrismaSettlementStorage } from "./services/prisma/PrismaSettlementStorage"; import { PrismaMessageStorage } from "./services/prisma/PrismaMessageStorage"; import { PrismaTransactionStorage } from "./services/prisma/PrismaTransactionStorage"; +import { PrismaPendingL1TransactionStorage } from "./services/prisma/PrismaPendingL1TransactionStorage"; export interface PrismaDatabaseConfig { // Either object-based config or connection string @@ -82,6 +83,9 @@ export class PrismaDatabaseConnection transactionStorage: { useClass: PrismaTransactionStorage, }, + pendingL1TransactionStorage: { + useClass: PrismaPendingL1TransactionStorage, + }, }; } @@ -97,6 +101,7 @@ export class PrismaDatabaseConnection "IncomingMessageBatch", "IncomingMessageBatchTransaction", "LinkedLeaf", + "PendingL1Transaction", ]; await this.prismaClient.$transaction( diff --git a/packages/persistance/src/index.ts b/packages/persistance/src/index.ts index 788de3d57..88b5e7df1 100644 --- a/packages/persistance/src/index.ts +++ b/packages/persistance/src/index.ts @@ -7,6 +7,7 @@ export * from "./services/prisma/PrismaBatchStore"; export * from "./services/prisma/PrismaSettlementStorage"; export * from "./services/prisma/PrismaMessageStorage"; export * from "./services/prisma/PrismaTransactionStorage"; +export * from "./services/prisma/PrismaPendingL1TransactionStorage"; export * from "./services/prisma/mappers/BatchMapper"; export * from "./services/prisma/mappers/BlockMapper"; export * from "./services/prisma/mappers/FieldMapper"; diff --git a/packages/persistance/src/services/prisma/PrismaPendingL1TransactionStorage.ts b/packages/persistance/src/services/prisma/PrismaPendingL1TransactionStorage.ts new file mode 100644 index 000000000..123047815 --- /dev/null +++ b/packages/persistance/src/services/prisma/PrismaPendingL1TransactionStorage.ts @@ -0,0 +1,96 @@ +import { inject, injectable } from "tsyringe"; +import { + PendingL1TransactionRecord, + PendingL1TransactionStatus, + PendingL1TransactionStorage, +} from "@proto-kit/sequencer"; + +import type { PrismaConnection } from "../../PrismaDatabaseConnection"; + +import { PendingL1TransactionMapper } from "./mappers/PendingL1TransactionMapper"; + +@injectable() +export class PrismaPendingL1TransactionStorage + implements PendingL1TransactionStorage +{ + private readonly mapper = new PendingL1TransactionMapper(); + + public constructor( + @inject("Database") private readonly connection: PrismaConnection + ) {} + + public async queue( + record: Omit + ): Promise { + const { prismaClient } = this.connection; + const status: PendingL1TransactionStatus = "queued"; + const txnRecord = await prismaClient.pendingL1Transaction.create({ + data: this.mapper.mapOut({ ...record, status }), + }); + return txnRecord.id; + } + + public async update( + id: string, + updates: Partial> + ): Promise { + const { prismaClient } = this.connection; + await prismaClient.pendingL1Transaction.update({ + where: { id }, + data: { + ...(updates.attempts !== undefined && { attempts: updates.attempts }), + ...(updates.status !== undefined && { status: updates.status }), + ...(updates.transaction !== undefined && { + transaction: updates.transaction.toJSON(), + }), + ...(updates.hash !== undefined && { hash: updates.hash }), + ...(updates.lastError !== undefined && { + lastError: updates.lastError, + }), + ...(updates.sentAt !== undefined && { sentAt: updates.sentAt }), + ...(updates.queuedAt !== undefined && { queuedAt: updates.queuedAt }), + ...(updates.nextActionAt !== undefined && { + nextActionAt: updates.nextActionAt, + }), + }, + }); + } + + public async delete(id: string): Promise { + const { prismaClient } = this.connection; + await prismaClient.pendingL1Transaction.delete({ + where: { + id, + }, + }); + } + + public async findById( + id: string + ): Promise { + const { prismaClient } = this.connection; + const record = await prismaClient.pendingL1Transaction.findUnique({ + where: { + id, + }, + }); + if (!record) { + return undefined; + } + return this.mapper.mapIn(record); + } + + public async findByStatuses( + statuses: PendingL1TransactionStatus[] + ): Promise { + const { prismaClient } = this.connection; + const rows = await prismaClient.pendingL1Transaction.findMany({ + where: { + status: { + in: statuses, + }, + }, + }); + return rows.map((record) => this.mapper.mapIn(record)); + } +} diff --git a/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts b/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts index e6b5addcb..47c5f3c31 100644 --- a/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts +++ b/packages/persistance/src/services/prisma/PrismaSettlementStorage.ts @@ -18,7 +18,7 @@ export class PrismaSettlementStorage implements SettlementStorage { const batch = await prismaClient.batch.findFirst({ where: { - settlementTransactionHash: { + settlementTransactionId: { not: null, }, }, diff --git a/packages/persistance/src/services/prisma/mappers/BatchMapper.ts b/packages/persistance/src/services/prisma/mappers/BatchMapper.ts index 490c6cda4..e69f40fcb 100644 --- a/packages/persistance/src/services/prisma/mappers/BatchMapper.ts +++ b/packages/persistance/src/services/prisma/mappers/BatchMapper.ts @@ -21,7 +21,7 @@ export class BatchMapper const batch: PrismaBatch = { proof: input.proof, height: input.height, - settlementTransactionHash: null, + settlementTransactionId: null, }; return [batch, []]; } diff --git a/packages/persistance/src/services/prisma/mappers/PendingL1TransactionMapper.ts b/packages/persistance/src/services/prisma/mappers/PendingL1TransactionMapper.ts new file mode 100644 index 000000000..dc7d3110c --- /dev/null +++ b/packages/persistance/src/services/prisma/mappers/PendingL1TransactionMapper.ts @@ -0,0 +1,43 @@ +import { PendingL1Transaction, Prisma } from "@prisma/client"; +import { + PendingL1TransactionRecord, + PendingL1TransactionStatus, +} from "@proto-kit/sequencer"; +import { Mina } from "o1js"; + +export class PendingL1TransactionMapper { + public mapIn(input: PendingL1Transaction): PendingL1TransactionRecord { + return { + id: input.id, + sender: input.sender, + nonce: input.nonce, + attempts: input.attempts, + status: input.status as PendingL1TransactionStatus, + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + transaction: Mina.Transaction.fromJSON(input.transaction as any), + hash: input.hash ?? undefined, + lastError: input.lastError ?? undefined, + sentAt: input.sentAt ?? undefined, + queuedAt: input.queuedAt ?? undefined, + nextActionAt: input.nextActionAt ?? undefined, + }; + } + + public mapOut( + input: Omit & { id?: string } + ): Prisma.PendingL1TransactionCreateInput { + return { + id: input.id ?? undefined, + sender: input.sender, + nonce: input.nonce, + attempts: input.attempts, + status: input.status, + transaction: input.transaction.toJSON(), + hash: input.hash ?? null, + lastError: input.lastError ?? null, + sentAt: input.sentAt ?? null, + queuedAt: input.queuedAt ?? null, + nextActionAt: input.nextActionAt ?? null, + }; + } +} diff --git a/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts b/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts index 5f923e34f..33979555c 100644 --- a/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts +++ b/packages/persistance/src/services/prisma/mappers/SettlementMapper.ts @@ -12,7 +12,7 @@ export class SettlementMapper const [settlement, batches] = input; return { batches, - transactionHash: settlement.transactionHash, + transactionId: settlement.transactionId, promisedMessagesHash: settlement.promisedMessagesHash, }; } @@ -21,7 +21,7 @@ export class SettlementMapper return [ { promisedMessagesHash: input.promisedMessagesHash, - transactionHash: input.transactionHash, + transactionId: input.transactionId, }, input.batches, ]; diff --git a/packages/sequencer/src/index.ts b/packages/sequencer/src/index.ts index a0fee0179..b6f9ff8bf 100644 --- a/packages/sequencer/src/index.ts +++ b/packages/sequencer/src/index.ts @@ -11,7 +11,6 @@ export * from "./sequencer/builder/Closeable"; export * from "./worker/flow/Flow"; export * from "./worker/flow/Task"; export * from "./worker/flow/JSONTaskSerializer"; -// export * from "./worker/queue/BullQueue"; export * from "./worker/queue/TaskQueue"; export * from "./worker/queue/LocalTaskQueue"; export * from "./worker/queue/ListenerList"; @@ -71,6 +70,7 @@ export * from "./storage/repositories/BlockStorage"; export * from "./storage/repositories/SettlementStorage"; export * from "./storage/repositories/MessageStorage"; export * from "./storage/repositories/TransactionStorage"; +export * from "./storage/repositories/PendingL1TransactionStorage"; export * from "./storage/inmemory/InMemoryDatabase"; export * from "./storage/inmemory/InMemoryAsyncMerkleTreeStore"; export * from "./storage/inmemory/InMemoryBlockStorage"; @@ -78,6 +78,7 @@ export * from "./storage/inmemory/InMemoryBatchStorage"; export * from "./storage/inmemory/InMemorySettlementStorage"; export * from "./storage/inmemory/InMemoryMessageStorage"; export * from "./storage/inmemory/InMemoryTransactionStorage"; +export * from "./storage/inmemory/InMemoryPendingL1TransactionStorage"; export * from "./storage/StorageDependencyFactory"; export * from "./storage/Database"; export * from "./storage/DatabasePruneModule"; @@ -108,6 +109,10 @@ export * from "./settlement/permissions/BaseLayerContractPermissions"; export * from "./settlement/permissions/ProvenSettlementPermissions"; export * from "./settlement/permissions/SignedSettlementPermissions"; export * from "./settlement/tasks/SettlementProvingTask"; +export * from "./settlement/transactions/L1TransactionRetryStrategy"; +export * from "./settlement/transactions/DefaultL1TransactionRetryStrategy"; +export * from "./settlement/transactions/L1TransactionDispatcher"; +export * from "./settlement/transactions/TxStatusWaiter"; export * from "./settlement/transactions/MinaTransactionSender"; export * from "./settlement/transactions/MinaTransactionSimulator"; export * from "./settlement/transactions/MinaSimulationService"; diff --git a/packages/sequencer/src/settlement/BridgingModule.ts b/packages/sequencer/src/settlement/BridgingModule.ts index a575c5e3a..114525f00 100644 --- a/packages/sequencer/src/settlement/BridgingModule.ts +++ b/packages/sequencer/src/settlement/BridgingModule.ts @@ -50,7 +50,6 @@ import groupBy from "lodash/groupBy"; // eslint-disable-next-line import/no-extraneous-dependencies import truncate from "lodash/truncate"; -import { FeeStrategy } from "../protocol/baselayer/fees/FeeStrategy"; import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer"; import { AsyncLinkedLeafStore } from "../state/async/AsyncLinkedLeafStore"; import { CachedLinkedLeafStore } from "../state/lmt/CachedLinkedLeafStore"; @@ -113,8 +112,6 @@ export class BridgingModule extends SequencerModule { private readonly outgoingMessageCollector: OutgoingMessageCollector, @inject("AsyncLinkedLeafStore") private readonly linkedLeafStore: AsyncLinkedLeafStore, - @inject("FeeStrategy") - private readonly feeStrategy: FeeStrategy, @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer, @inject("SettlementSigner") private readonly signer: MinaSigner, @inject("TransactionSender") @@ -249,7 +246,6 @@ export class BridgingModule extends SequencerModule { sender: feepayer, nonce: nonce, memo: `Deploy token bridge for ${truncate(tokenId.toString(), { length: 6 })}`, - fee: this.feeStrategy.getFee(), }, async () => { AccountUpdate.fundNewAccount(feepayer, 1); @@ -512,7 +508,6 @@ export class BridgingModule extends SequencerModule { sender: feepayer, // eslint-disable-next-line no-plusplus nonce: nonce++, - fee: this.feeStrategy.getFee(), memo: "pull state root", }, async () => { @@ -630,7 +625,6 @@ export class BridgingModule extends SequencerModule { sender: feepayer, // eslint-disable-next-line no-plusplus nonce: nonce++, - fee: this.feeStrategy.getFee(), memo: "roll up actions", }, async () => { diff --git a/packages/sequencer/src/settlement/SettlementModule.ts b/packages/sequencer/src/settlement/SettlementModule.ts index 4ab3439d3..532fdd35c 100644 --- a/packages/sequencer/src/settlement/SettlementModule.ts +++ b/packages/sequencer/src/settlement/SettlementModule.ts @@ -26,6 +26,7 @@ import type { MinaBaseLayer } from "../protocol/baselayer/MinaBaseLayer"; import { Batch, SettleableBatch } from "../storage/model/Batch"; import { Settlement } from "../storage/model/Settlement"; import { SettlementStorage } from "../storage/repositories/SettlementStorage"; +import { FeeStrategy } from "../protocol/baselayer/fees/FeeStrategy"; import { SettlementUtils } from "./utils/SettlementUtils"; import type { BridgingModule } from "./BridgingModule"; @@ -38,6 +39,7 @@ import { AddressRegistry, InMemoryAddressRegistry, } from "./interactions/AddressRegistry"; +import { DefaultL1TransactionRetryStrategy } from "./transactions/DefaultL1TransactionRetryStrategy"; export type SettlementModuleConfig = { addresses?: { @@ -69,6 +71,8 @@ export class SettlementModule @inject("SettlementSigner") private readonly signer: MinaSigner, @inject("Sequencer") private readonly parentContainer: ModuleContainerLike, + @inject("FeeStrategy") + private readonly feeStrategy: FeeStrategy, @inject("AddressRegistry") private readonly addressRegistry: AddressRegistry, private readonly argsRegistry: ContractArgsRegistry @@ -82,6 +86,9 @@ export class SettlementModule AddressRegistry: { useClass: InMemoryAddressRegistry, }, + L1TransactionRetryStrategy: { + useClass: DefaultL1TransactionRetryStrategy, + }, }; } diff --git a/packages/sequencer/src/settlement/interactions/bridging/BridgingSettlementInteraction.ts b/packages/sequencer/src/settlement/interactions/bridging/BridgingSettlementInteraction.ts index ee8b74983..37055e6e1 100644 --- a/packages/sequencer/src/settlement/interactions/bridging/BridgingSettlementInteraction.ts +++ b/packages/sequencer/src/settlement/interactions/bridging/BridgingSettlementInteraction.ts @@ -112,7 +112,7 @@ export class BridgingSettlementInteraction implements SettleInteraction { signingWithSignatureCheck: this.signer.getContractAddresses(), }); - const { hash: transactionHash } = + const { transactionId } = await this.transactionSender.proveAndSendTransaction(tx, "included"); log.info("Settlement transaction sent and included"); @@ -120,7 +120,7 @@ export class BridgingSettlementInteraction implements SettleInteraction { return { batches: [batch.height], promisedMessagesHash: latestSequenceStateHash.toString(), - transactionHash, + transactionId, }; } } diff --git a/packages/sequencer/src/settlement/interactions/vanilla/VanillaSettlementInteraction.ts b/packages/sequencer/src/settlement/interactions/vanilla/VanillaSettlementInteraction.ts index 096438ab5..ce835bf7b 100644 --- a/packages/sequencer/src/settlement/interactions/vanilla/VanillaSettlementInteraction.ts +++ b/packages/sequencer/src/settlement/interactions/vanilla/VanillaSettlementInteraction.ts @@ -104,7 +104,7 @@ export class VanillaSettlementInteraction implements SettleInteraction { signingWithSignatureCheck: this.signer.getContractAddresses(), }); - const { hash: transactionHash } = + const { transactionId } = await this.transactionSender.proveAndSendTransaction(tx, "included"); log.info("Settlement transaction sent and included"); @@ -112,7 +112,7 @@ export class VanillaSettlementInteraction implements SettleInteraction { return { batches: [batch.height], promisedMessagesHash: latestSequenceStateHash.toString(), - transactionHash, + transactionId, }; } } diff --git a/packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts b/packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts new file mode 100644 index 000000000..6234beac2 --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/DefaultL1TransactionRetryStrategy.ts @@ -0,0 +1,81 @@ +import { noop } from "@proto-kit/common"; +import { Transaction, UInt64 } from "o1js"; +import { inject } from "tsyringe"; + +import { + sequencerModule, + SequencerModule, +} from "../../sequencer/builder/SequencerModule"; +import { PendingL1TransactionRecord } from "../../storage/repositories/PendingL1TransactionStorage"; +import { FeeStrategy } from "../../protocol/baselayer/fees/FeeStrategy"; + +import { L1TransactionRetryStrategy } from "./L1TransactionRetryStrategy"; + +export type TransactionRetryConfig = { + maxAttempts?: number; + feeMultiplier?: number; + maxFee?: number; + retryDelayMs?: number; +}; + +const DEFAULT_RETRY_CONFIG: Required = { + maxAttempts: 3, + feeMultiplier: 1.1, + maxFee: 10 * 1e9, + retryDelayMs: 60 * 1000, // 1 minute +}; + +@sequencerModule() +export class DefaultL1TransactionRetryStrategy + extends SequencerModule + implements L1TransactionRetryStrategy +{ + public constructor( + @inject("FeeStrategy") private readonly feeStrategy: FeeStrategy + ) { + super(); + } + + private get retryConfig(): Required { + return { + maxAttempts: this.config.maxAttempts ?? DEFAULT_RETRY_CONFIG.maxAttempts, + feeMultiplier: + this.config.feeMultiplier ?? DEFAULT_RETRY_CONFIG.feeMultiplier, + maxFee: this.config.maxFee ?? DEFAULT_RETRY_CONFIG.maxFee, + retryDelayMs: + this.config.retryDelayMs ?? DEFAULT_RETRY_CONFIG.retryDelayMs, + }; + } + + public async start(): Promise { + noop(); + } + + public async shouldRetry( + record: PendingL1TransactionRecord + ): Promise { + return record.attempts < this.retryConfig.maxAttempts; + } + + public getRetryDelayMs(): number { + return this.retryConfig.retryDelayMs; + } + + public async prepareRetryTransaction( + record: PendingL1TransactionRecord + ): Promise> { + const tx = record.transaction; + const currentFee = tx.transaction.feePayer.body.fee; + const newFee = UInt64.from(this.bumpFee(Number(currentFee.toBigInt()))); + await tx.setFee(newFee); + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return tx as Transaction; + } + + private bumpFee(currentFee: number): number { + const { feeMultiplier, maxFee } = this.retryConfig; + const baseFee = this.feeStrategy.getFee(); + const bumped = Number(currentFee) * feeMultiplier; + return Math.floor(Math.min(Math.max(bumped, baseFee), maxFee)); + } +} diff --git a/packages/sequencer/src/settlement/transactions/L1TransactionDispatcher.ts b/packages/sequencer/src/settlement/transactions/L1TransactionDispatcher.ts new file mode 100644 index 000000000..0e890d1a3 --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/L1TransactionDispatcher.ts @@ -0,0 +1,285 @@ +import { Transaction } from "o1js"; +import { inject, injectable } from "tsyringe"; +import { log } from "@proto-kit/common"; + +import { + PendingL1TransactionRecord, + PendingL1TransactionStorage, +} from "../../storage/repositories/PendingL1TransactionStorage"; +import { MinaSigner } from "../MinaSigner"; + +import { L1TransactionRetryStrategy } from "./L1TransactionRetryStrategy"; +import { TxStatusWaiter } from "./TxStatusWaiter"; +import { checkZkappTransactionStatus } from "./ZkappTransactionStatus"; + +export interface DispatcherConfig { + pollIntervalMs?: number; + statusCheckIntervalMs?: number; + inclusionTimeoutMs?: number; +} + +const DEFAULT_CONFIG: Required = { + pollIntervalMs: 5000, + statusCheckIntervalMs: 5000, + inclusionTimeoutMs: 10 * 60 * 1000, +}; + +@injectable() +export class L1TransactionDispatcher { + private pollingTimeout?: NodeJS.Timeout; + + /** + * Serialize all dispatcher work (polling ticks and sender-level wakeups) to avoid + * concurrent send attempts of the same queued transaction. + */ + private workInFlight?: Promise; + + public constructor( + @inject("PendingL1TransactionStorage") + private readonly pendingStorage: PendingL1TransactionStorage, + @inject("L1TransactionRetryStrategy") + private readonly retryStrategy: L1TransactionRetryStrategy, + @inject("SettlementSigner") private readonly signer: MinaSigner, + private readonly waiter: TxStatusWaiter, + private readonly dispatcherConfig: DispatcherConfig = {} + ) {} + + private get config(): Required { + return { + pollIntervalMs: + this.dispatcherConfig.pollIntervalMs ?? DEFAULT_CONFIG.pollIntervalMs, + statusCheckIntervalMs: + this.dispatcherConfig.statusCheckIntervalMs ?? + DEFAULT_CONFIG.statusCheckIntervalMs, + inclusionTimeoutMs: + this.dispatcherConfig.inclusionTimeoutMs ?? + DEFAULT_CONFIG.inclusionTimeoutMs, + }; + } + + public start() { + this.startPolling(); + } + + public async stop(): Promise { + if (this.pollingTimeout !== undefined) { + clearTimeout(this.pollingTimeout); + this.pollingTimeout = undefined; + } + } + + public requestDispatch(sender: string) { + void this.enqueueWork(async () => { + const txs = await this.pendingStorage.findByStatuses(["queued", "sent"]); + await this.processSender(sender, txs); + }); + } + + private startPolling() { + const poll = async () => { + try { + await this.enqueueWork(async () => { + await this.tick(); + }); + } catch (e) { + log.error("Error in L1TransactionDispatcher polling loop", e); + } finally { + this.pollingTimeout = setTimeout(poll, this.config.pollIntervalMs); + this.pollingTimeout.unref?.(); + } + }; + this.pollingTimeout = setTimeout(poll, this.config.pollIntervalMs); + this.pollingTimeout.unref?.(); + } + + private async enqueueWork(work: () => Promise): Promise { + const previous = this.workInFlight ?? Promise.resolve(); + const next = previous.then(work, work); + this.workInFlight = next.finally(() => { + if (this.workInFlight === next) { + this.workInFlight = undefined; + } + }); + return await this.workInFlight; + } + + private async tick(): Promise { + const now = new Date(); + const pendingTransactions = ( + await this.pendingStorage.findByStatuses(["queued", "sent"]) + ).filter((r) => (r.nextActionAt ?? now) <= now); + + const bySender: Record = {}; + for (const tx of pendingTransactions) { + (bySender[tx.sender] ??= []).push(tx); + } + + for (const sender of Object.keys(bySender)) { + // eslint-disable-next-line no-await-in-loop + await this.processSender(sender, bySender[sender]); + } + } + + private async processSender( + sender: string, + txsInput: PendingL1TransactionRecord[] + ): Promise { + // Sort in ascending order of nonce + const txsSorted = txsInput + .sort((a, b) => a.nonce - b.nonce) + .filter((r) => r.sender === sender); + + // Send queued txs in ascending nonce order. Then check sent txs. + for (const record of txsSorted) { + if (record.status === "queued") { + // eslint-disable-next-line no-await-in-loop + await this.sendQueuedTransaction(record.id); + } else if (record.status === "sent") { + // eslint-disable-next-line no-await-in-loop + await this.checkSentTransaction(record.id); + } + } + } + + private async sendQueuedTransaction(txId: string): Promise { + const record = await this.pendingStorage.findById(txId); + if (!record) return; + if (record.status !== "queued") return; + + await this.sendTransaction(record); + } + + private async sendTransaction( + record: Omit< + PendingL1TransactionRecord, + "status" | "sentAt" | "lastError" | "nextActionAt" | "hash" + > + ): Promise { + const tx = record.transaction; + try { + const pendingTx = await tx.send(); + const now = new Date(); + + await this.pendingStorage.update(record.id, { + status: "sent", + attempts: record.attempts + 1, + sentAt: now, + transaction: tx, + hash: pendingTx.hash, + lastError: undefined, + nextActionAt: new Date( + now.getTime() + this.config.statusCheckIntervalMs + ), + }); + log.info( + `Sent L1 transaction ${pendingTx.hash} for nonce ${record.nonce} (Attempt ${record.attempts + 1})` + ); + this.waiter.notifySent(record.id, pendingTx.hash); + } catch (error) { + log.error( + `Failed to send transaction ${record.sender}:${record.nonce}`, + error + ); + await this.pendingStorage.update(record.id, { + status: "failed", + lastError: error instanceof Error ? error.message : String(error), + }); + this.waiter.notifyFailed(record.id, error); + } + } + + private async checkSentTransaction(txId: string): Promise { + const record = await this.pendingStorage.findById(txId); + if (!record) return; + if (record.status !== "sent") return; + + if (record.hash === undefined || record.hash.length === 0) { + await this.retryTransaction(record); + return; + } + + // Single status check (no long blocking loops) + const result = await checkZkappTransactionStatus(record.hash); + if (result.success) { + await this.pendingStorage.update(record.id, { + status: "included", + }); + this.waiter.notifyIncluded(record.id, record.hash); + return; + } else if (result.failureReason) { + log.error(`Transaction ${record.hash} failed`, result.failureReason); + await this.retryTransaction(record); + return; + } else if (!result.success) { + // recheck status after a timeout + const now = new Date(); + const sentAt = record.sentAt?.getTime() ?? 0; + const elapsed = now.getTime() - sentAt; + if (elapsed < this.config.inclusionTimeoutMs) { + await this.pendingStorage.update(record.id, { + nextActionAt: new Date( + now.getTime() + this.config.statusCheckIntervalMs + ), + }); + return; + } + } + + // if the transaction is not included after the inclusionTimeoutMs, retry the transaction + await this.retryTransaction(record); + } + + private async retryTransaction( + record: PendingL1TransactionRecord + ): Promise { + const latest = await this.pendingStorage.findById(record.id); + if (!latest) return; + if (latest.status === "included" || latest.status === "failed") return; + + const shouldRetry = await this.retryStrategy.shouldRetry(latest); + if (!shouldRetry) { + await this.pendingStorage.update(latest.id, { + status: "failed", + lastError: latest.lastError ?? "Max attempts reached", + }); + this.waiter.notifyFailed( + latest.id, + new Error(`Max attempts reached for ${latest.sender}:${latest.nonce}`) + ); + return; + } + + try { + // If the strategy wants a delay, schedule using nextActionAt + const delayMs = this.retryStrategy.getRetryDelayMs?.(latest) ?? 0; + const now = new Date(); + const earliestRetryAt = + latest.sentAt !== undefined + ? new Date(latest.sentAt.getTime() + delayMs) + : now; + if (earliestRetryAt > now) { + await this.pendingStorage.update(latest.id, { + nextActionAt: earliestRetryAt, + }); + return; + } + + const retryTx = await this.retryStrategy.prepareRetryTransaction(latest); + const signedRetryTx = this.signer.signTx( + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + retryTx as Transaction + ); + await this.sendTransaction({ + ...latest, + transaction: signedRetryTx, + }); + } catch (error) { + log.error(`Failed to retry ${latest.sender}:${latest.nonce}`, error); + await this.pendingStorage.update(latest.id, { + status: "failed", + lastError: error instanceof Error ? error.message : String(error), + }); + this.waiter.notifyFailed(latest.id, error); + } + } +} diff --git a/packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts b/packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts new file mode 100644 index 000000000..9b974d8be --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/L1TransactionRetryStrategy.ts @@ -0,0 +1,17 @@ +import { Transaction } from "o1js"; + +import { PendingL1TransactionRecord } from "../../storage/repositories/PendingL1TransactionStorage"; + +export interface L1TransactionRetryStrategy { + shouldRetry(record: PendingL1TransactionRecord): Promise; + + /** + * Optional retry delay. If provided, the dispatcher will schedule + * the next retry after the delay from the last sentAt time. + */ + getRetryDelayMs?(record: PendingL1TransactionRecord): number; + + prepareRetryTransaction( + record: PendingL1TransactionRecord + ): Promise>; +} diff --git a/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts b/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts index a594d85c0..634e757dd 100644 --- a/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts +++ b/packages/sequencer/src/settlement/transactions/MinaTransactionSender.ts @@ -1,146 +1,122 @@ -import { fetchAccount, Mina, PublicKey, Transaction } from "o1js"; +import { fetchAccount, PublicKey, Transaction, UInt64 } from "o1js"; import { inject, injectable } from "tsyringe"; -import { - EventEmitter, - EventsRecord, - EventListenable, - log, - ReplayingSingleUseEventEmitter, - filterNonUndefined, -} from "@proto-kit/common"; +import { filterNonUndefined, log } from "@proto-kit/common"; import type { MinaBaseLayer } from "../../protocol/baselayer/MinaBaseLayer"; +import { PendingL1TransactionStorage } from "../../storage/repositories/PendingL1TransactionStorage"; import { FlowCreator } from "../../worker/flow/Flow"; import { SettlementProvingTask, TransactionTaskResult, } from "../tasks/SettlementProvingTask"; +import { FeeStrategy } from "../../protocol/baselayer/fees/FeeStrategy"; +import { MinaSigner } from "../MinaSigner"; +import { closeable, Closeable } from "../../sequencer/builder/Closeable"; import { MinaTransactionSimulator } from "./MinaTransactionSimulator"; +import { L1TransactionDispatcher } from "./L1TransactionDispatcher"; +import { TxStatusWaiter, WaitableTxStatus } from "./TxStatusWaiter"; -type SenderKey = string; - -export interface TxEvents extends EventsRecord { - sent: [{ hash: string }]; - included: [{ hash: string }]; - rejected: [any]; -} - -export type TxSendResult = - Input extends "none" ? void : { hash: string }; +export type TxSendResult< + Input extends "sent" | "included" | "queued" | "none", +> = Input extends "none" ? void : { transactionId: string }; @injectable() -export class MinaTransactionSender { - private txStatusEmitters: Record> = {}; - - // TODO Persist all of that - private txQueue: Record = {}; - - private txIdCursor: number = 0; - - private cache: { tx: Transaction; id: number }[] = []; - +@closeable() +export class MinaTransactionSender implements Closeable { public constructor( private readonly creator: FlowCreator, private readonly provingTask: SettlementProvingTask, private readonly simulator: MinaTransactionSimulator, - @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer - ) {} + @inject("BaseLayer") private readonly baseLayer: MinaBaseLayer, + @inject("PendingL1TransactionStorage") + private readonly pendingStorage: PendingL1TransactionStorage, + @inject("SettlementSigner") private readonly signer: MinaSigner, + @inject("FeeStrategy") private readonly feeStrategy: FeeStrategy, + private readonly dispatcher: L1TransactionDispatcher, + private readonly waiter: TxStatusWaiter + ) { + this.dispatcher.start(); + } public async getNextNonce(sender: PublicKey): Promise { const account = await this.simulator.getAccount(sender); return parseInt(account.nonce.toString(), 10); } - private async trySendCached({ - tx, - id, - }: { - tx: Transaction; - id: number; - }): Promise { - const feePayer = tx.transaction.feePayer.body; - const sender = feePayer.publicKey.toBase58(); - const senderQueue = this.txQueue[sender]; - - const sendable = senderQueue.at(0) === Number(feePayer.nonce.toString()); - if (sendable) { - const txId = await tx.send(); - - const statusEmitter = this.txStatusEmitters[id]; - log.info(`Sent L1 transaction ${txId.hash}`); - statusEmitter.emit("sent", { hash: txId.hash }); - - txId.wait().then( - (included) => { - log.info(`L1 transaction ${included.hash} has been included`); - statusEmitter.emit("included", { hash: included.hash }); - }, - (error) => { - log.info("Waiting on L1 transaction threw and error", error); - statusEmitter.emit("rejected", error); - } - ); - - senderQueue.pop(); - return txId; - } - return undefined; - } - - private async resolveCached(): Promise { - const indizesToRemove: number[] = []; - for (let i = 0; i < this.cache.length; i++) { - // eslint-disable-next-line no-await-in-loop - const result = await this.trySendCached(this.cache[i]); - if (result !== undefined) { - indizesToRemove.push(i); - } - } - this.cache = this.cache.filter( - (ignored, index) => !indizesToRemove.includes(index) + /** + * sets the fee, signs the transaction, proves it, and sends it to the network. + */ + public async signProveAndSendTransaction< + Wait extends "sent" | "included" | "queued" | "none", + >( + transaction: Transaction, + signers: PublicKey[], + waitOnStatus: Wait + ): Promise> { + const unsignedTx = await transaction.setFee( + UInt64.from(this.feeStrategy.getFee()) ); - return indizesToRemove.length; - } - - private async sendOrQueue( - tx: Transaction - ): Promise> { - // eslint-disable-next-line no-plusplus - const id = this.txIdCursor++; - this.cache.push({ tx, id }); - const eventEmitter = new ReplayingSingleUseEventEmitter(); - this.txStatusEmitters[id] = eventEmitter; - - let removedLastIteration = 0; - do { - // eslint-disable-next-line no-await-in-loop - removedLastIteration = await this.resolveCached(); - } while (removedLastIteration > 0); - - // This altered return type only exposes listening-related functions and erases the rest - return eventEmitter; + const signedTx = this.signer.signTx( + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + unsignedTx as Transaction, + { pubKeys: signers } + ); + return await this.proveAndSendTransaction(signedTx, waitOnStatus); } + /** + * Submit a transaction to be proven, queued and dispatched to L1. + */ public async proveAndSendTransaction< - Wait extends "sent" | "included" | "none", + Wait extends "sent" | "included" | "queued" | "none", >( - transaction: Transaction, + transaction: Transaction, waitOnStatus: Wait ): Promise> { const { publicKey, nonce } = transaction.transaction.feePayer.body; + const sender = publicKey.toBase58(); + const nonceNum = Number(nonce.toString()); + const result = await this.proveTransaction(transaction, sender, nonceNum); - log.debug( - `Proving tx from sender ${publicKey.toBase58()} nonce ${nonce.toString()}` - ); + log.debug("Tx proving complete, queueing for sending"); - // Add Transaction to sender's queue - (this.txQueue[publicKey.toBase58()] ??= []).push(Number(nonce.toString())); + const now = new Date(); + const txnId = await this.pendingStorage.queue({ + sender, + nonce: nonceNum, + attempts: 0, + transaction: result.transaction, + queuedAt: now, + nextActionAt: now, + }); - const flow = this.creator.createFlow( - `tx-${publicKey.toBase58()}-${nonce.toString()}`, - {} - ); + this.dispatcher.requestDispatch(sender); + + if (waitOnStatus === "queued") { + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return { transactionId: txnId } as TxSendResult; + } + + if (waitOnStatus === "sent" || waitOnStatus === "included") { + const desired: WaitableTxStatus = waitOnStatus; + await this.waiter.waitFor(txnId, desired); + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return { transactionId: txnId } as TxSendResult; + } + + // eslint-disable-next-line @typescript-eslint/consistent-type-assertions + return undefined as TxSendResult; + } + + private async proveTransaction( + transaction: Transaction, + sender: string, + nonceNum: number + ): Promise { + log.debug(`Proving tx from sender ${sender} nonce ${nonceNum}`); + + const flow = this.creator.createFlow(`tx-${sender}-${nonceNum}`, {}); const accounts = await Promise.all( transaction.transaction.accountUpdates.map( @@ -149,12 +125,9 @@ export class MinaTransactionSender { ) ); - // Load accounts await this.simulator.getAccounts(transaction); await this.simulator.applyTransaction(transaction); - log.trace("Applied transaction to local simulated ledger"); - const { network } = this.baseLayer.config; const graphql = network.type === "local" ? undefined : network.graphql; @@ -178,33 +151,10 @@ export class MinaTransactionSender { } ); - const result = await resultPromise; - - log.debug("Tx proving complete, queueing for sending"); + return await resultPromise; + } - log.trace(result.transaction.toPretty()); - - const txStatus = await this.sendOrQueue(result.transaction); - - if (waitOnStatus !== "none") { - const waitInstruction: "sent" | "included" = waitOnStatus; - const hash = await new Promise>( - (resolve, reject) => { - txStatus.on(waitInstruction, (txSendResult) => { - log.info(`Tx ${txSendResult.hash} included`); - resolve(txSendResult); - }); - txStatus.on("rejected", (error) => { - reject(error); - }); - } - ); - - // Yeah that's not super clean, but couldn't figure out a better way tbh - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - return hash as TxSendResult; - } - // eslint-disable-next-line @typescript-eslint/consistent-type-assertions - return undefined as TxSendResult; + public async close(): Promise { + await this.dispatcher.stop(); } } diff --git a/packages/sequencer/src/settlement/transactions/TxStatusWaiter.ts b/packages/sequencer/src/settlement/transactions/TxStatusWaiter.ts new file mode 100644 index 000000000..74c8402f8 --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/TxStatusWaiter.ts @@ -0,0 +1,127 @@ +import { inject, injectable } from "tsyringe"; +import { + EventsRecord, + ReplayingSingleUseEventEmitter, +} from "@proto-kit/common"; + +import { + PendingL1TransactionRecord, + PendingL1TransactionStorage, +} from "../../storage/repositories/PendingL1TransactionStorage"; + +export interface TxLifecycleEvents extends EventsRecord { + sent: [{ hash: string }]; + included: [{ hash: string }]; + failed: [{ error: unknown }]; +} + +export type WaitableTxStatus = "sent" | "included"; + +export interface WaitForTxOptions { + timeoutMs?: number; +} + +@injectable() +export class TxStatusWaiter { + private readonly emitters = new Map< + string, + ReplayingSingleUseEventEmitter + >(); + + public constructor( + @inject("PendingL1TransactionStorage") + private readonly pendingStorage: PendingL1TransactionStorage + ) {} + + private getEmitter( + txId: string + ): ReplayingSingleUseEventEmitter { + let emitter = this.emitters.get(txId); + if (!emitter) { + emitter = new ReplayingSingleUseEventEmitter(); + this.emitters.set(txId, emitter); + } + return emitter; + } + + public notifySent(txId: string, hash: string) { + this.getEmitter(txId).emit("sent", { hash }); + } + + public notifyIncluded(txId: string, hash: string) { + this.getEmitter(txId).emit("included", { hash }); + this.emitters.delete(txId); + } + + public notifyFailed(txId: string, error: unknown) { + this.getEmitter(txId).emit("failed", { error }); + this.emitters.delete(txId); + } + + private static isSatisfied( + record: PendingL1TransactionRecord, + desired: WaitableTxStatus + ): boolean { + if (desired === "sent") { + return record.status === "sent" || record.status === "included"; + } + return record.status === "included"; + } + + private static toError(record: PendingL1TransactionRecord): Error { + return new Error(record.lastError ?? "L1 transaction failed"); + } + + public async waitFor( + txId: string, + desiredStatus: WaitableTxStatus, + options: WaitForTxOptions = {} + ): Promise { + const initial = await this.pendingStorage.findById(txId); + if (!initial) { + throw new Error(`Unknown pending L1 transaction id ${txId}`); + } + if (initial.status === "failed") { + throw TxStatusWaiter.toError(initial); + } + if (TxStatusWaiter.isSatisfied(initial, desiredStatus)) { + return; + } + + const emitter = this.getEmitter(txId); + const eventPromise = new Promise((resolve, reject) => { + emitter.on(desiredStatus, () => resolve()); + emitter.on("failed", ({ error }) => { + reject(error instanceof Error ? error : new Error(String(error))); + }); + }); + + // if the status change happened between the first DB read and listener registration + const afterSubscribe = await this.pendingStorage.findById(txId); + if (!afterSubscribe) { + throw new Error(`Unknown pending L1 transaction id ${txId}`); + } + if (afterSubscribe.status === "failed") { + throw TxStatusWaiter.toError(afterSubscribe); + } + if (TxStatusWaiter.isSatisfied(afterSubscribe, desiredStatus)) { + return; + } + + if (options.timeoutMs !== undefined) { + const timeoutPromise = new Promise((_, reject) => { + const t = setTimeout(() => { + reject( + new Error( + `Timeout waiting for ${txId} to reach status ${desiredStatus}` + ) + ); + }, options.timeoutMs); + t.unref?.(); + }); + await Promise.race([eventPromise, timeoutPromise]); + } else { + await eventPromise; + } + } +} diff --git a/packages/sequencer/src/settlement/transactions/ZkappTransactionStatus.ts b/packages/sequencer/src/settlement/transactions/ZkappTransactionStatus.ts new file mode 100644 index 000000000..5d22b8c29 --- /dev/null +++ b/packages/sequencer/src/settlement/transactions/ZkappTransactionStatus.ts @@ -0,0 +1,8 @@ +/** + * Test seam for unit tests. + * + * We re-export `o1js.checkZkappTransaction` under a local module path so Jest can + * reliably mock it in ESM mode via `jest.unstable_mockModule(...)` without + * mocking `o1js` itself. + */ +export { checkZkappTransaction as checkZkappTransactionStatus } from "o1js"; diff --git a/packages/sequencer/src/storage/StorageDependencyFactory.ts b/packages/sequencer/src/storage/StorageDependencyFactory.ts index e53d3b0c9..c7c13319a 100644 --- a/packages/sequencer/src/storage/StorageDependencyFactory.ts +++ b/packages/sequencer/src/storage/StorageDependencyFactory.ts @@ -11,6 +11,7 @@ import { AsyncMerkleTreeStore } from "../state/async/AsyncMerkleTreeStore"; import { BatchStorage } from "./repositories/BatchStorage"; import { BlockQueue, BlockStorage } from "./repositories/BlockStorage"; import { MessageStorage } from "./repositories/MessageStorage"; +import { PendingL1TransactionStorage } from "./repositories/PendingL1TransactionStorage"; import { SettlementStorage } from "./repositories/SettlementStorage"; import { TransactionStorage } from "./repositories/TransactionStorage"; @@ -28,6 +29,7 @@ export interface StorageDependencyMinimumDependencies extends DependencyRecord { messageStorage: DependencyDeclaration; settlementStorage: DependencyDeclaration; transactionStorage: DependencyDeclaration; + pendingL1TransactionStorage: DependencyDeclaration; } export interface StorageDependencyFactory extends DependencyFactory { diff --git a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts index 5406d539f..7e1e2d323 100644 --- a/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts +++ b/packages/sequencer/src/storage/inmemory/InMemoryDatabase.ts @@ -16,6 +16,7 @@ import { InMemoryMessageStorage } from "./InMemoryMessageStorage"; import { InMemorySettlementStorage } from "./InMemorySettlementStorage"; import { InMemoryTransactionStorage } from "./InMemoryTransactionStorage"; import { InMemoryAsyncMerkleTreeStore } from "./InMemoryAsyncMerkleTreeStore"; +import { InMemoryPendingL1TransactionStorage } from "./InMemoryPendingL1TransactionStorage"; @sequencerModule() @closeable() @@ -55,6 +56,9 @@ export class InMemoryDatabase extends SequencerModule implements Database { transactionStorage: { useClass: InMemoryTransactionStorage, }, + pendingL1TransactionStorage: { + useClass: InMemoryPendingL1TransactionStorage, + }, }; } diff --git a/packages/sequencer/src/storage/inmemory/InMemoryPendingL1TransactionStorage.ts b/packages/sequencer/src/storage/inmemory/InMemoryPendingL1TransactionStorage.ts new file mode 100644 index 000000000..83bd034e8 --- /dev/null +++ b/packages/sequencer/src/storage/inmemory/InMemoryPendingL1TransactionStorage.ts @@ -0,0 +1,56 @@ +import { + PendingL1TransactionRecord, + PendingL1TransactionStatus, + PendingL1TransactionStorage, +} from "../repositories/PendingL1TransactionStorage"; + +export class InMemoryPendingL1TransactionStorage + implements PendingL1TransactionStorage +{ + // Key: sender:nonce + private store = new Map(); + + public async queue( + record: Omit + ): Promise { + const key = Math.random().toString(36).substring(2, 15); + this.store.set(key, { + ...record, + id: key, + status: "queued", + }); + return key; + } + + public async update( + id: string, + updates: Partial> + ): Promise { + const existing = this.store.get(id); + if (existing === undefined) { + return; + } + this.store.set(id, { + ...existing, + ...updates, + }); + } + + public async delete(id: string): Promise { + this.store.delete(id); + } + + public async findById( + id: string + ): Promise { + return this.store.get(id); + } + + public async findByStatuses( + statuses: PendingL1TransactionStatus[] + ): Promise { + return Array.from(this.store.values()).filter((record) => + statuses.includes(record.status) + ); + } +} diff --git a/packages/sequencer/src/storage/model/Settlement.ts b/packages/sequencer/src/storage/model/Settlement.ts index f3b91ab73..ea8c807f0 100644 --- a/packages/sequencer/src/storage/model/Settlement.ts +++ b/packages/sequencer/src/storage/model/Settlement.ts @@ -1,5 +1,5 @@ export interface Settlement { - transactionHash: string; + transactionId: string; promisedMessagesHash: string; batches: number[]; } diff --git a/packages/sequencer/src/storage/repositories/PendingL1TransactionStorage.ts b/packages/sequencer/src/storage/repositories/PendingL1TransactionStorage.ts new file mode 100644 index 000000000..535b4d40f --- /dev/null +++ b/packages/sequencer/src/storage/repositories/PendingL1TransactionStorage.ts @@ -0,0 +1,48 @@ +import { Transaction } from "o1js"; + +export type PendingL1TransactionStatus = + | "queued" + | "sent" + | "included" + | "failed"; + +export interface PendingL1TransactionRecord { + id: string; + sender: string; + nonce: number; + attempts: number; + status: PendingL1TransactionStatus; + transaction: Transaction; + hash?: string; + lastError?: string; + sentAt?: Date; + /** + * When the transaction entered the durable queue (after proving). + * Optional for storages that don't persist it yet. + */ + queuedAt?: Date; + /** + * When the dispatcher should next act on this transaction. + * Optional for storages that don't persist it yet. + */ + nextActionAt?: Date; +} + +export interface PendingL1TransactionStorage { + queue( + record: Omit + ): Promise; + + update( + id: string, + updates: Partial> + ): Promise; + + delete(id: string): Promise; + + findById(id: string): Promise; + + findByStatuses( + statuses: PendingL1TransactionStatus[] + ): Promise; +} diff --git a/packages/sequencer/test/TestingSequencer.ts b/packages/sequencer/test/TestingSequencer.ts index 41715c40a..8f9f9f9fe 100644 --- a/packages/sequencer/test/TestingSequencer.ts +++ b/packages/sequencer/test/TestingSequencer.ts @@ -15,6 +15,7 @@ import { SequencerStartupModule, } from "../src"; import { ConstantFeeStrategy } from "../src/protocol/baselayer/fees/ConstantFeeStrategy"; +import { DefaultL1TransactionRetryStrategy } from "../src/settlement/transactions/DefaultL1TransactionRetryStrategy"; export interface DefaultTestingSequencerModules extends SequencerModulesRecord { Database: typeof InMemoryDatabase; @@ -27,6 +28,7 @@ export interface DefaultTestingSequencerModules extends SequencerModulesRecord { TaskQueue: typeof LocalTaskQueue; FeeStrategy: typeof ConstantFeeStrategy; SequencerStartupModule: typeof SequencerStartupModule; + L1TransactionRetryStrategy: typeof DefaultL1TransactionRetryStrategy; } export function testingSequencerModules< @@ -52,6 +54,7 @@ export function testingSequencerModules< TaskQueue: LocalTaskQueue, FeeStrategy: ConstantFeeStrategy, SequencerStartupModule, + L1TransactionRetryStrategy: DefaultL1TransactionRetryStrategy, } satisfies DefaultTestingSequencerModules; return { diff --git a/packages/sequencer/test/integration/BlockProduction-test.ts b/packages/sequencer/test/integration/BlockProduction-test.ts index 71c4172f9..7f1d2847a 100644 --- a/packages/sequencer/test/integration/BlockProduction-test.ts +++ b/packages/sequencer/test/integration/BlockProduction-test.ts @@ -150,6 +150,7 @@ export function testBlockProduction< TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Runtime: { Balance: {}, diff --git a/packages/sequencer/test/integration/BlockProductionSize.test.ts b/packages/sequencer/test/integration/BlockProductionSize.test.ts index 109b576c5..7e177ff7e 100644 --- a/packages/sequencer/test/integration/BlockProductionSize.test.ts +++ b/packages/sequencer/test/integration/BlockProductionSize.test.ts @@ -80,6 +80,7 @@ describe("block limit", () => { LocalTaskWorkerModule: VanillaTaskWorkerModules.defaultConfig(), BaseLayer: {}, TaskQueue: {}, + L1TransactionRetryStrategy: {}, FeeStrategy: {}, SequencerStartupModule: {}, }, diff --git a/packages/sequencer/test/integration/Mempool.test.ts b/packages/sequencer/test/integration/Mempool.test.ts index d5aec4037..238564d72 100644 --- a/packages/sequencer/test/integration/Mempool.test.ts +++ b/packages/sequencer/test/integration/Mempool.test.ts @@ -104,6 +104,7 @@ describe.each([["InMemory", InMemoryDatabase]])( BaseLayer: {}, TaskQueue: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Protocol: Protocol.defaultConfig(), }); diff --git a/packages/sequencer/test/integration/Proven.test.ts b/packages/sequencer/test/integration/Proven.test.ts index bca7f53af..1c9b23808 100644 --- a/packages/sequencer/test/integration/Proven.test.ts +++ b/packages/sequencer/test/integration/Proven.test.ts @@ -100,6 +100,7 @@ describe.skip("Proven", () => { TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, BaseLayer: { network: { type: "local", diff --git a/packages/sequencer/test/integration/StorageIntegration.test.ts b/packages/sequencer/test/integration/StorageIntegration.test.ts index 16f17bbe0..00f61fb90 100644 --- a/packages/sequencer/test/integration/StorageIntegration.test.ts +++ b/packages/sequencer/test/integration/StorageIntegration.test.ts @@ -100,6 +100,7 @@ describe.each([["InMemory", InMemoryDatabase]])( TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Protocol: Protocol.defaultConfig(), }); diff --git a/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts b/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts index e34b72af6..14515d75b 100644 --- a/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts +++ b/packages/sequencer/test/protocol/production/sequencing/atomic-block-production.test.ts @@ -58,6 +58,7 @@ describe("atomic block production", () => { TaskQueue: {}, FeeStrategy: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, }, Runtime: { Balance: {}, diff --git a/packages/sequencer/test/settlement/MinaTransactionSender.test.ts b/packages/sequencer/test/settlement/MinaTransactionSender.test.ts new file mode 100644 index 000000000..a93f3d870 --- /dev/null +++ b/packages/sequencer/test/settlement/MinaTransactionSender.test.ts @@ -0,0 +1,260 @@ +/* eslint-disable @typescript-eslint/no-unsafe-assignment */ +import "reflect-metadata"; +import { jest } from "@jest/globals"; + +import type { PendingL1TransactionStorage } from "../../src/storage/repositories/PendingL1TransactionStorage"; +import { InMemoryPendingL1TransactionStorage } from "../../src/storage/inmemory/InMemoryPendingL1TransactionStorage"; + +let MinaTransactionSender: any; +let L1TransactionDispatcher: any; +let TxStatusWaiter: any; +let checkZkappTransactionStatus: any; + +beforeAll(async () => { + // Capture the mock in this closure so we don't need an extra import just to access it. + checkZkappTransactionStatus = jest.fn(); + + // L1TransactionDispatcher imports checkZkappTransactionStatus directly, so we need to mock + // the module BEFORE importing MinaTransactionSender (ESM). + jest.unstable_mockModule( + "../../src/settlement/transactions/ZkappTransactionStatus", + () => ({ + checkZkappTransactionStatus, + }) + ); + ({ MinaTransactionSender } = await import( + "../../src/settlement/transactions/MinaTransactionSender" + )); + ({ L1TransactionDispatcher } = await import( + "../../src/settlement/transactions/L1TransactionDispatcher" + )); + ({ TxStatusWaiter } = await import( + "../../src/settlement/transactions/TxStatusWaiter" + )); +}); + +type SenderFixture = { + sender: any; + retryStrategy: any; +}; + +function makeSender( + pendingStorage: PendingL1TransactionStorage, + dispatcherConfig: { + pollIntervalMs?: number; + statusCheckIntervalMs?: number; + inclusionTimeoutMs?: number; + } = { + pollIntervalMs: 5, + statusCheckIntervalMs: 5, + } +): SenderFixture { + const flowCreator = { + createFlow: jest.fn(() => ({ + withFlow: async (fn: any) => + await new Promise((resolve, reject) => { + fn(resolve, reject); + }), + pushTask: async ( + _task: any, + params: { transaction: any }, + onResult: (result: any) => Promise + ) => { + await onResult({ transaction: params.transaction }); + }, + })), + } as any; + + const provingTask = {} as any; + + const simulator = { + getAccount: jest.fn(async () => ({ nonce: { toString: () => "0" } })), + getAccounts: jest.fn(async () => undefined), + applyTransaction: jest.fn(async () => undefined), + } as any; + + const baseLayer = { config: { network: { type: "local" } } } as any; + + const retryStrategy = { + shouldRetry: jest.fn(async () => true), + prepareRetryTransaction: jest.fn(async (record: any) => record.transaction), + } as any; + + const signer = { signTx: jest.fn((tx: any) => tx) } as any; + const feeStrategy = { getFee: () => 0 } as any; + + const waiter = new TxStatusWaiter(pendingStorage as any); + const dispatcher = new L1TransactionDispatcher( + pendingStorage as any, + retryStrategy as any, + signer as any, + waiter, + dispatcherConfig + ); + const sender = new MinaTransactionSender( + flowCreator, + provingTask, + simulator, + baseLayer, + pendingStorage as any, + signer, + feeStrategy, + dispatcher, + waiter + ); + return { sender: sender as typeof MinaTransactionSender, retryStrategy }; +} + +function makeTx({ senderBase58 = "S", nonce = 0, hash = "TX_HASH" } = {}) { + const tx: any = { + // used by proveAndSendTransaction() before proving + transaction: { + feePayer: { + body: { + publicKey: { toBase58: () => senderBase58 }, + nonce: { toString: () => String(nonce) }, + }, + }, + accountUpdates: [], + }, + setFee: jest.fn(async () => tx), + send: jest.fn(async () => ({ hash })), + toPretty: () => "", + }; + + return { tx }; +} + +describe("MinaTransactionSender (unit)", () => { + beforeEach(() => { + jest.resetAllMocks(); + }); + + it("proveAndSendTransaction should immediately send if there is no lower-nonce pending tx, and resolve 'sent'", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const { sender } = makeSender(pendingStorage); + checkZkappTransactionStatus.mockResolvedValue({ success: false }); + + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H1" }); + + try { + const { transactionId } = await sender.proveAndSendTransaction( + tx, + "sent" + ); + + const record = await pendingStorage.findById(transactionId as string); + + expect(record?.status).toBe("sent"); + expect(record?.hash).toBe("H1"); + expect(record?.attempts).toBe(1); + expect(tx.send).toHaveBeenCalledTimes(1); + } finally { + await sender.close(); + } + }); + + it("proveAndSendTransaction should transition sent -> included once the transaction is included", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const { sender } = makeSender(pendingStorage); + checkZkappTransactionStatus.mockResolvedValueOnce({ success: true }); + + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H2" }); + try { + const { transactionId } = await sender.proveAndSendTransaction( + tx, + "included" + ); + const record = await pendingStorage.findById(transactionId as string); + expect(record?.status).toBe("included"); + } finally { + await sender.close(); + } + }); + + it("should send multiple nonce-increasing transactions for the same sender", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const { sender } = makeSender(pendingStorage); + checkZkappTransactionStatus.mockResolvedValue({ success: false }); + + const tx0 = makeTx({ senderBase58: "S", nonce: 0, hash: "H0" }); + const tx1 = makeTx({ senderBase58: "S", nonce: 1, hash: "H1" }); + + try { + const sent0 = await sender.proveAndSendTransaction(tx0.tx, "sent"); + const sent1 = await sender.proveAndSendTransaction(tx1.tx, "sent"); + + expect(tx0.tx.send).toHaveBeenCalledTimes(1); + expect(tx1.tx.send).toHaveBeenCalledTimes(1); + + const r0 = await pendingStorage.findById(sent0.transactionId as string); + const r1 = await pendingStorage.findById(sent1.transactionId as string); + expect(r0).toBeDefined(); + expect(r1).toBeDefined(); + } finally { + await sender.close(); + } + }); + + it("should retry a transaction if first attempt fails", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const { sender } = makeSender(pendingStorage, { + pollIntervalMs: 5, + statusCheckIntervalMs: 0, + inclusionTimeoutMs: 0, + }); + + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H-R1" }); + + (tx as any).send = jest + .fn() + .mockImplementationOnce(async () => ({ hash: "H-R1" })) + .mockImplementationOnce(async () => ({ hash: "H-R2" })); + + checkZkappTransactionStatus + .mockResolvedValueOnce({ success: false }) + .mockResolvedValueOnce({ success: true }); + + try { + const { transactionId } = await sender.proveAndSendTransaction( + tx, + "included" + ); + const record = await pendingStorage.findById(transactionId as string); + + expect((tx as any).send).toHaveBeenCalledTimes(2); + expect(record?.status).toBe("included"); + expect(record?.hash).toBe("H-R2"); + expect(record?.attempts).toBeGreaterThanOrEqual(2); + } finally { + await sender.close(); + } + }); + + it("should stop retrying when shouldRetry returns false", async () => { + const pendingStorage: PendingL1TransactionStorage = + new InMemoryPendingL1TransactionStorage(); + const { sender, retryStrategy } = makeSender(pendingStorage, { + pollIntervalMs: 5, + statusCheckIntervalMs: 0, + inclusionTimeoutMs: 0, + }); + + // Force "no retry" + retryStrategy.shouldRetry = jest.fn(async () => false); + + checkZkappTransactionStatus.mockResolvedValueOnce({ success: false }); + const { tx } = makeTx({ senderBase58: "S", nonce: 0, hash: "H-F" }); + try { + const promise = sender.proveAndSendTransaction(tx, "included"); + await expect(promise).rejects.toBeDefined(); + } finally { + await sender.close(); + } + }); +}); +/* eslint-enable @typescript-eslint/no-unsafe-assignment */ diff --git a/packages/sequencer/test/settlement/Settlement.ts b/packages/sequencer/test/settlement/Settlement.ts index 0b89e8871..798e0900a 100644 --- a/packages/sequencer/test/settlement/Settlement.ts +++ b/packages/sequencer/test/settlement/Settlement.ts @@ -61,11 +61,11 @@ import { import { BlockProofSerializer } from "../../src/protocol/production/tasks/serializers/BlockProofSerializer"; import { testingSequencerModules } from "../TestingSequencer"; import { createTransaction } from "../integration/utils"; -import { FeeStrategy } from "../../src/protocol/baselayer/fees/FeeStrategy"; import { BridgingModule } from "../../src/settlement/BridgingModule"; import { FungibleTokenContractModule } from "../../src/settlement/utils/FungibleTokenContractModule"; import { FungibleTokenAdminContractModule } from "../../src/settlement/utils/FungibleTokenAdminContractModule"; import { MinaNetworkUtils } from "../../src/protocol/baselayer/network-utils/MinaNetworkUtils"; +import { DefaultL1TransactionRetryStrategy } from "../../src/settlement/transactions/DefaultL1TransactionRetryStrategy"; import { Balances, BalancesKey } from "./mocks/Balances"; import { WithdrawalMessageProcessor, Withdrawals } from "./mocks/Withdrawals"; @@ -109,8 +109,6 @@ export const settlementTestFn = ( let blockQueue: BlockQueue; let userPublicKey: PublicKey; - let feeStrategy: FeeStrategy; - let blockSerializer: BlockProofSerializer; const bridgedTokenId = @@ -133,6 +131,7 @@ export const settlementTestFn = ( SettlementModule: SettlementModule, BridgingModule: BridgingModule, SettlementSigner: InMemoryMinaSigner, + L1TransactionRetryStrategy: DefaultL1TransactionRetryStrategy, }, { SettlementProvingTask, @@ -192,6 +191,7 @@ export const settlementTestFn = ( SettlementModule: {}, BridgingModule: {}, SequencerStartupModule: {}, + L1TransactionRetryStrategy: {}, TaskQueue: { simulatedDuration: 0, @@ -283,7 +283,6 @@ export const settlementTestFn = ( "BlockTrigger" ); blockQueue = appChain.sequencer.resolve("BlockQueue") as BlockQueue; - feeStrategy = appChain.sequencer.resolve("FeeStrategy") as FeeStrategy; blockSerializer = appChain.sequencer.dependencyContainer.resolve(BlockProofSerializer); @@ -386,7 +385,6 @@ export const settlementTestFn = ( sender: sequencerKey.toPublicKey(), memo: "Deploy custom token", nonce: nonceCounter++, - fee: feeStrategy.getFee(), }, async () => { AccountUpdate.fundNewAccount(sequencerKey.toPublicKey(), 3); @@ -455,7 +453,6 @@ export const settlementTestFn = ( sender: sequencerKey.toPublicKey(), memo: "Mint custom token", nonce: nonceCounter++, - fee: feeStrategy.getFee(), }, async () => { AccountUpdate.fundNewAccount(sequencerKey.toPublicKey(), 1); @@ -771,12 +768,10 @@ export const settlementTestFn = ( const amount = BigInt(1e9 * 10); - const fee = feeStrategy.getFee(); const tx = await Mina.transaction( { sender: userKey.toPublicKey(), nonce: user0Nonce++, - fee, memo: "Redeem withdrawal", }, async () => { @@ -819,7 +814,7 @@ export const settlementTestFn = ( ).balance.toBigInt(); // tx fee - const minaFees = BigInt(fee); + const minaFees = BigInt(tx.transaction.feePayer.body.fee.toString()); expect((balanceAfter - balanceBefore).toString()).toBe( (amount - (tokenConfig === undefined ? minaFees : 0n)).toString() diff --git a/test/tsconfig.json b/test/tsconfig.json new file mode 100644 index 000000000..881a2c585 --- /dev/null +++ b/test/tsconfig.json @@ -0,0 +1,15 @@ +{ + "extends": "../tsconfig.json", + "compilerOptions": { + "noEmit": true, + "types": ["node", "jest"], + "module": "ES2022", + "moduleResolution": "node" + }, + "include": [ + "../packages/*/src/**/*.ts", + "../packages/*/test/**/*.ts", + "../packages/*/test-integration/**/*.ts" + ] +} +