diff --git a/.github/workflows/indexer-agent-image.yml b/.github/workflows/indexer-agent-image.yml index e36f6ba5d..f64db6441 100644 --- a/.github/workflows/indexer-agent-image.yml +++ b/.github/workflows/indexer-agent-image.yml @@ -1,6 +1,7 @@ name: Indexer Agent Image on: + workflow_dispatch: push: branches: - main diff --git a/.github/workflows/indexer-cli-image.yml b/.github/workflows/indexer-cli-image.yml index c3b36fb83..4ded8c3dc 100644 --- a/.github/workflows/indexer-cli-image.yml +++ b/.github/workflows/indexer-cli-image.yml @@ -1,6 +1,7 @@ name: Indexer CLI Image on: + workflow_dispatch: push: branches: - main diff --git a/packages/indexer-agent/src/agent.ts b/packages/indexer-agent/src/agent.ts index 093144cb8..0dc389dca 100644 --- a/packages/indexer-agent/src/agent.ts +++ b/packages/indexer-agent/src/agent.ts @@ -709,17 +709,15 @@ export class Agent { return } - await this.multiNetworks.mapNetworkMapped( - activeAllocations, - async ({ network, operator }, activeAllocations: Allocation[]) => { - if (network.specification.indexerOptions.enableDips) { - await operator.dipsManager!.acceptPendingProposals( - activeAllocations, - ) - await operator.dipsManager!.collectAgreementPayments() + await this.multiNetworks.map(async ({ network, operator }) => { + if (network.specification.indexerOptions.enableDips) { + if (!operator.dipsManager) { + throw new Error('DipsManager is not available') } - }, - ) + + await operator.dipsManager.collectAgreementPayments() + } + }) }, ) } diff --git a/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts b/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts index d3bb6fbb4..a9b75f60b 100644 --- a/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts +++ b/packages/indexer-agent/src/db/migrations/24-add-dips-schema.ts @@ -16,11 +16,13 @@ export async function up({ context }: Context): Promise { // 1. Add 'dips' to the IndexingRules.decisionBasis enum. // Skipped on fresh DBs — sequelize.sync() will create the enum already // including 'dips' from the model definition. Existing prod DBs need this - // ALTER to add the value to a pre-existing enum type. + // ALTER to add the value to a pre-existing enum type. IF NOT EXISTS keeps + // the migration idempotent against DBs that already have the value from + // the now-deleted migration 19-add-dips-to-decision-basis. if (await queryInterface.tableExists('IndexingRules')) { logger.info(`Adding 'dips' to enum_IndexingRules_decisionBasis`) await queryInterface.sequelize.query( - `ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE 'dips'`, + `ALTER TYPE "enum_IndexingRules_decisionBasis" ADD VALUE IF NOT EXISTS 'dips'`, ) } else { logger.debug( diff --git a/packages/indexer-common/src/indexer-management/allocations.ts b/packages/indexer-common/src/indexer-management/allocations.ts index 5449a8e2f..d19e50047 100644 --- a/packages/indexer-common/src/indexer-management/allocations.ts +++ b/packages/indexer-common/src/indexer-management/allocations.ts @@ -175,6 +175,8 @@ export class AllocationManager { this, this.pendingRcaModel, ) + this.dipsManager.startProposalAcceptanceLoop() + this.dipsManager.startAllocationSweepLoop() } } diff --git a/packages/indexer-common/src/indexer-management/monitor.ts b/packages/indexer-common/src/indexer-management/monitor.ts index 6de5f7e2f..aa793de97 100644 --- a/packages/indexer-common/src/indexer-management/monitor.ts +++ b/packages/indexer-common/src/indexer-management/monitor.ts @@ -63,11 +63,16 @@ export class NetworkMonitor { if (!this.indexingPaymentsSubgraph) { return false } + // Only `Accepted` blocks unallocation. Once the agreement is canceled + // (by payer or by us), the on-chain agreement is already gone, so + // protecting the allocation only strands it. The agent's separate + // collectAgreementPayments loop handles any final collect on + // `CanceledByPayer` agreements independently of allocation lifetime. const result = await this.indexingPaymentsSubgraph.checkedQuery( gql` query indexingAgreements($allocationId: Bytes!) { indexingAgreements( - where: { allocationId: $allocationId, state_in: [Accepted, CanceledByPayer] } + where: { allocationId: $allocationId, state: Accepted } first: 1 ) { id diff --git a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts index bcd33115d..23cf4bf4a 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/accept-proposals.test.ts @@ -4,9 +4,12 @@ import { PendingRcaConsumer } from '../pending-rca-consumer' import { DecodedRcaProposal } from '../types' import { Allocation, + AllocationManager, AllocationStatus, IndexerManagementModels, + IndexingDecisionBasis, Network, + SubgraphIdentifierType, } from '@graphprotocol/indexer-common' let logger: Logger @@ -108,10 +111,17 @@ function createMockModels() { findOne: jest.fn().mockResolvedValue(null), findAll: jest.fn().mockResolvedValue([]), destroy: jest.fn().mockResolvedValue(1), + upsert: jest.fn().mockResolvedValue([{ id: 1 }, true]), }, } as unknown as IndexerManagementModels } +function createMockParent() { + return { + matchingRuleExists: jest.fn().mockResolvedValue(false), + } as unknown as AllocationManager +} + function createMockNetwork() { return { contracts: { @@ -168,11 +178,18 @@ function createDipsManager( network: Network, models: IndexerManagementModels, consumer: PendingRcaConsumer, + parent: AllocationManager = createMockParent(), + offerMonitor?: { offerExists: jest.Mock }, ): DipsManager { + const graphNode = { ensure: jest.fn().mockResolvedValue(undefined) } // eslint-disable-next-line @typescript-eslint/no-explicit-any - const dm = new DipsManager(logger, models, network, {} as any, null, {} as any) - // eslint-disable-next-line @typescript-eslint/no-explicit-any + const dm = new DipsManager(logger, models, network, graphNode as any, parent, {} as any) +// eslint-disable-next-line @typescript-eslint/no-explicit-any ;(dm as any).pendingRcaConsumer = consumer + if (offerMonitor !== undefined) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + ;(dm as any).offerMonitor = offerMonitor + } return dm } @@ -583,4 +600,172 @@ describe('DipsManager.acceptPendingProposals', () => { expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) }) }) + + describe('rule creation ordering (race condition fix)', () => { + test('upserts the DIPS indexing rule before broadcasting acceptIndexingAgreement', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtx', + status: 1, + }) + + const dm = createDipsManager(network, models, consumer) + + await dm.acceptPendingProposals([allocation]) + + const upsertOrder = (models.IndexingRule.upsert as jest.Mock).mock + .invocationCallOrder[0] + const executeOrder = (network.transactionManager.executeTransaction as jest.Mock) + .mock.invocationCallOrder[0] + + expect(upsertOrder).toBeDefined() + expect(executeOrder).toBeDefined() + expect(upsertOrder).toBeLessThan(executeOrder) + }) + + test('skips rule upsert and rejects proposal when deployment is blocklisted', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + ;(consumer.getPendingProposalsForDeployment as jest.Mock).mockResolvedValue([]) + const models = createMockModels() + ;(models.IndexingRule.findAll as jest.Mock).mockResolvedValue([ + { + identifier: proposal.subgraphDeploymentId.ipfsHash, + identifierType: SubgraphIdentifierType.DEPLOYMENT, + decisionBasis: IndexingDecisionBasis.NEVER, + }, + ]) + const network = createMockNetwork() + + const dm = createDipsManager(network, models, consumer) + + await dm.acceptPendingProposals([allocation]) + + expect(consumer.markRejected).toHaveBeenCalledWith( + proposal.id, + 'deployment blocklisted', + ) + expect(models.IndexingRule.upsert).not.toHaveBeenCalled() + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + }) + + test('skips rule upsert when parent reports a matching rule already exists', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtx', + status: 1, + }) + + const parent = { + matchingRuleExists: jest.fn().mockResolvedValue(true), + } as unknown as AllocationManager + + const dm = createDipsManager(network, models, consumer, parent) + + await dm.acceptPendingProposals([allocation]) + + expect(models.IndexingRule.upsert).not.toHaveBeenCalled() + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + }) + + describe('offer-existence gate', () => { + test('stays pending when offer absent and deadline > now + safety margin', async () => { + const proposal = createMockProposal({ + // 5 minutes from now — well beyond the 30s safety margin + deadline: BigInt(Math.floor(Date.now() / 1000) + 300), + }) + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + const offerMonitor = { offerExists: jest.fn().mockResolvedValue(false) } + + const dm = createDipsManager(network, models, consumer, undefined, offerMonitor) + + await dm.acceptPendingProposals([allocation]) + + expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id) + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + expect(consumer.markRejected).not.toHaveBeenCalled() + expect(consumer.markAccepted).not.toHaveBeenCalled() + }) + + test('marks rejected when offer absent and deadline within safety margin', async () => { + const proposal = createMockProposal({ + // 10 seconds from now — inside the 30s safety margin + deadline: BigInt(Math.floor(Date.now() / 1000) + 10), + }) + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + const offerMonitor = { offerExists: jest.fn().mockResolvedValue(false) } + + const dm = createDipsManager(network, models, consumer, undefined, offerMonitor) + + await dm.acceptPendingProposals([allocation]) + + expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id) + expect(consumer.markRejected).toHaveBeenCalledWith( + proposal.id, + 'offer_never_landed', + ) + expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled() + }) + + test('proceeds to acceptIndexingAgreement when offer is present', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtxhash', + status: 1, + }) + const offerMonitor = { offerExists: jest.fn().mockResolvedValue(true) } + + const dm = createDipsManager(network, models, consumer, undefined, offerMonitor) + + await dm.acceptPendingProposals([allocation]) + + expect(offerMonitor.offerExists).toHaveBeenCalledWith(proposal.id) + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + + test('bypasses gate when indexingPaymentsSubgraph is not configured', async () => { + const proposal = createMockProposal() + const allocation = createMockAllocation() + const consumer = createMockConsumer([proposal]) + const models = createMockModels() + const network = createMockNetwork() + ;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({ + hash: '0xtxhash', + status: 1, + }) + + // No offerMonitor passed — DipsManager constructor sets it to null + // because createMockNetwork() does not define indexingPaymentsSubgraph. + const dm = createDipsManager(network, models, consumer) + // eslint-disable-next-line @typescript-eslint/no-explicit-any + expect((dm as any).offerMonitor).toBeNull() + + await dm.acceptPendingProposals([allocation]) + + expect(network.transactionManager.executeTransaction).toHaveBeenCalled() + expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id) + }) + }) }) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts new file mode 100644 index 000000000..f978bec08 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/offer-monitor.test.ts @@ -0,0 +1,56 @@ +import { createLogger } from '@graphprotocol/common-ts' +import { OfferMonitor } from '../offer-monitor' + +const logger = createLogger({ + name: 'OfferMonitor.test', + async: false, + level: 'error', +}) + +describe('OfferMonitor', () => { + it('converts UUID-format agreement ids to bytes16 hex before querying', async () => { + const query = jest.fn().mockResolvedValue({ data: { offer: { id: '0xabc' } } }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92') + + expect(exists).toBe(true) + expect(query).toHaveBeenCalledTimes(1) + expect(query.mock.calls[0][1]).toEqual({ + id: '0xbea99452e465e9d98a792356edcc7e92', + }) + }) + + it('passes through already-hex ids unchanged (lowercased)', async () => { + const query = jest.fn().mockResolvedValue({ data: { offer: { id: '0xabc' } } }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + await monitor.offerExists('0xBEA99452E465E9D98A792356EDCC7E92') + + expect(query.mock.calls[0][1]).toEqual({ + id: '0xbea99452e465e9d98a792356edcc7e92', + }) + }) + + it('returns false when the subgraph reports the offer is missing', async () => { + const query = jest.fn().mockResolvedValue({ data: { offer: null } }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92') + + expect(exists).toBe(false) + }) + + it('treats subgraph errors as transient (not yet on-chain)', async () => { + const query = jest.fn().mockResolvedValue({ error: new Error('subgraph hiccup') }) + const subgraph = { query } as never + const monitor = new OfferMonitor(logger, subgraph) + + const exists = await monitor.offerExists('bea99452-e465-e9d9-8a79-2356edcc7e92') + + expect(exists).toBe(false) + }) +}) diff --git a/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts index c2bd732bc..980be336b 100644 --- a/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts +++ b/packages/indexer-common/src/indexing-fees/__tests__/pending-rca-consumer.test.ts @@ -42,7 +42,7 @@ function encodeTestPayload(overrides?: { [ { subgraphDeploymentId: TEST_DEPLOYMENT_BYTES32, - version: 1n, + version: 0n, terms: termsEncoded, }, ], diff --git a/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts new file mode 100644 index 000000000..3e6026949 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/__tests__/sweep-allocations.test.ts @@ -0,0 +1,194 @@ +import { createLogger, SubgraphDeploymentID } from '@graphprotocol/common-ts' +import { DipsManager } from '../dips' +import { + IndexerManagementModels, + Network, + AllocationManager, +} from '@graphprotocol/indexer-common' + +const logger = createLogger({ + name: 'DipsManager.sweep.test', + async: false, + level: 'error', +}) + +// SubgraphDeploymentID maps an IPFS hash to a known bytes32; we capture +// that mapping for the subgraph fixture and use the same hash on the rule. +const TEST_DEPLOYMENT_IPFS = 'QmPdbQaRCMhgouSZSW3sHZxU3M8KwcngWASvreAexzmmrh' +const RESOLVED_BYTES32 = new SubgraphDeploymentID(TEST_DEPLOYMENT_IPFS).bytes32 +const OTHER_DEPLOYMENT_IPFS = 'QmTzQ1JRkWErjk39mryYw2WVaphAZNAREyMchXzYQ7c15W' + +function nowSeconds(): number { + return Math.floor(Date.now() / 1000) +} + +function createMockModels(rules: Array<{ id: number; identifier: string }>) { + const destroy = jest.fn().mockResolvedValue(1) + return { + models: { + IndexingRule: { + findAll: jest.fn().mockResolvedValue(rules), + destroy, + }, + } as unknown as IndexerManagementModels, + destroy, + } +} + +function createMockNetwork( + subgraphResult: unknown, + indexerAddress = '0x5555555555555555555555555555555555555555', +) { + const query = jest.fn().mockResolvedValue(subgraphResult) + return { + network: { + specification: { + indexerOptions: { + address: indexerAddress, + dipsCollectionTarget: 1, + }, + networkIdentifier: 'eip155:1337', + }, + indexingPaymentsSubgraph: { query }, + } as unknown as Network, + query, + } +} + +function createDipsManager( + network: Network, + models: IndexerManagementModels, +): DipsManager { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + return new DipsManager( + logger, + models, + network, + {} as any, // eslint-disable-line @typescript-eslint/no-explicit-any + {} as AllocationManager, + {} as any, // eslint-disable-line @typescript-eslint/no-explicit-any + ) +} + +describe('DipsManager.sweepDipsAllocations', () => { + test('removes a dips rule that has no matching Accepted agreement', async () => { + const { models, destroy } = createMockModels([ + { id: 42, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: nowSeconds() } }, + // Indexer has no accepted agreements at all + indexingAgreements: [], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).toHaveBeenCalledTimes(1) + expect(destroy).toHaveBeenCalledWith({ where: { id: 42 } }) + }) + + test('keeps a dips rule whose deployment has an Accepted agreement', async () => { + const { models, destroy } = createMockModels([ + { id: 7, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: nowSeconds() } }, + indexingAgreements: [ + { + id: '0xagreement1', + subgraphDeploymentId: RESOLVED_BYTES32, + }, + ], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).not.toHaveBeenCalled() + }) + + test('disables only the unbacked rule, leaves backed rules intact', async () => { + const { models, destroy } = createMockModels([ + { id: 1, identifier: TEST_DEPLOYMENT_IPFS }, + { id: 2, identifier: OTHER_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: nowSeconds() } }, + indexingAgreements: [ + { + id: '0xagreement1', + // Only backs the first rule + subgraphDeploymentId: RESOLVED_BYTES32, + }, + ], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).toHaveBeenCalledTimes(1) + expect(destroy).toHaveBeenCalledWith({ where: { id: 2 } }) + }) + + test('skips the sweep when subgraph block timestamp is stale', async () => { + const { models, destroy } = createMockModels([ + { id: 99, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + // Timestamp 10 minutes behind wall clock — beyond the 300s threshold + const stale = nowSeconds() - 600 + const { network } = createMockNetwork({ + data: { + _meta: { block: { timestamp: stale } }, + indexingAgreements: [], + }, + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + // Stale subgraph: do not act on its data + expect(destroy).not.toHaveBeenCalled() + }) + + test('skips the sweep when subgraph query fails', async () => { + const { models, destroy } = createMockModels([ + { id: 5, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const { network } = createMockNetwork({ + error: new Error('connection refused'), + }) + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).not.toHaveBeenCalled() + }) + + test('is a no-op when indexingPaymentsSubgraph is not configured', async () => { + const { models, destroy } = createMockModels([ + { id: 1, identifier: TEST_DEPLOYMENT_IPFS }, + ]) + const network = { + specification: { + indexerOptions: { + address: '0x5555555555555555555555555555555555555555', + dipsCollectionTarget: 1, + }, + networkIdentifier: 'eip155:1337', + }, + indexingPaymentsSubgraph: null, + } as unknown as Network + + const dm = createDipsManager(network, models) + await dm.sweepDipsAllocations() + + expect(destroy).not.toHaveBeenCalled() + }) +}) diff --git a/packages/indexer-common/src/indexing-fees/dips.ts b/packages/indexer-common/src/indexing-fees/dips.ts index c9e4837d5..83e19556d 100644 --- a/packages/indexer-common/src/indexing-fees/dips.ts +++ b/packages/indexer-common/src/indexing-fees/dips.ts @@ -7,6 +7,7 @@ import { import { Allocation, AllocationManager, + AllocationStatus, GraphNode, IndexerManagementModels, IndexingDecisionBasis, @@ -15,8 +16,11 @@ import { SubgraphIdentifierType, upsertIndexingRule, } from '@graphprotocol/indexer-common' +import gql from 'graphql-tag' +import pMap from 'p-map' import { PendingRcaProposal } from '../indexer-management/models/pending-rca-proposal' +import { OfferMonitor } from './offer-monitor' import { PendingRcaConsumer } from './pending-rca-consumer' import { DecodedRcaProposal } from './types' import { tryParseCustomError } from '../utils' @@ -28,13 +32,33 @@ import { SubgraphIndexingAgreement, } from './agreement-monitor' import { CollectionTracker } from './collection-tracker' +import { sequentialTimerMap } from '../sequential-timer' +const DIPS_ACCEPTANCE_INTERVAL = 5_000 // POIs are computed against a recent-but-not-tip block to avoid reorg edge cases. const RECENT_BLOCK_OFFSET = 10 +const DIPS_SWEEP_INTERVAL = 60_000 +// If the indexing-payments-subgraph is more than this many seconds behind +// wall-clock, treat its data as unreliable and skip the sweep this tick. +// Normal indexing lag should never approach this; anything older indicates +// the subgraph is broken / paused / disconnected. +const DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS = 300 +// Per-tick parallelism cap. Proposals target distinct agreementIds and the +// wallet's nonce queue serialises submissions, so concurrent processProposal +// calls are safe; small enough that a stuck call doesn't head-of-line others. +const DIPS_ACCEPT_CONCURRENCY = 4 +// When the offer hasn't landed on-chain yet, keep retrying until the RCA +// deadline is within this window. Inside the window, give up cleanly so +// reassessment can pick a replacement before the deadline lapses. +const OFFER_GATE_DEADLINE_SAFETY_MARGIN_SECONDS = 30n + +const elapsedMs = (start: bigint): number => + Number(process.hrtime.bigint() - start) / 1_000_000 export class DipsManager { declare pendingRcaConsumer: PendingRcaConsumer declare collectionTracker: CollectionTracker + declare offerMonitor: OfferMonitor | null constructor( private logger: Logger, private models: IndexerManagementModels, @@ -44,6 +68,13 @@ export class DipsManager { pendingRcaModel: typeof PendingRcaProposal, ) { this.pendingRcaConsumer = new PendingRcaConsumer(this.logger, pendingRcaModel) + + // Null when no indexing-payments-subgraph is configured; processProposal + // skips the offer-existence gate in that case. + this.offerMonitor = this.network.indexingPaymentsSubgraph + ? new OfferMonitor(this.logger, this.network.indexingPaymentsSubgraph) + : null + this.collectionTracker = new CollectionTracker( this.network.specification.indexerOptions.dipsCollectionTarget, ) @@ -140,6 +171,15 @@ export class DipsManager { return } + // Deploy must precede the on-chain allocation: reconcile reads + // graph_node.indexingStatus, and an undefined status triggers + // failsHealthCheck → spurious unallocate. Idempotent; graph-node + // dedupes redundant calls across proposals sharing a deployment. + await this.graphNode.ensure( + `indexer-agent/${deploymentId.ipfsHash.slice(-10)}`, + deploymentId, + ) + const { amount } = await this.getDipsAllocationAmount(deploymentId) this.logger.info( `Creating DIPS indexing rule for deployment ${deploymentId.toString()}`, @@ -264,6 +304,17 @@ export class DipsManager { activeAllocations: Allocation[], ): Promise { const now = BigInt(Math.floor(Date.now() / 1000)) + const t0 = process.hrtime.bigint() + const phases: Record = {} + const logSummary = (outcome: string) => { + this.logger.info('processProposal completed', { + proposalId: proposal.id, + deployment: proposal.subgraphDeploymentId.ipfsHash, + outcome, + phases, + totalMs: elapsedMs(t0), + }) + } if (proposal.deadline <= now) { this.logger.info('Rejecting proposal: deadline expired', { @@ -273,18 +324,91 @@ export class DipsManager { }) await consumer.markRejected(proposal.id, 'deadline_expired') await this.cleanupDipsRule(consumer, proposal) + logSummary('rejected_deadline_expired') return } + // Create the dips rule eagerly here rather than leaving it to the reconcile + // loop: the accept tx can confirm and clear the pending row before the next + // reconcile tick, which would leave the rule uncreated and graph-node never + // told to deploy the subgraph. + const tRule = process.hrtime.bigint() + const allDeploymentRules = await this.models.IndexingRule.findAll({ + where: { identifierType: SubgraphIdentifierType.DEPLOYMENT }, + }) + const blocklisted = allDeploymentRules.find((r) => + this.isOnChainOptOutRule(r, proposal.subgraphDeploymentId), + ) + if (blocklisted) { + this.logger.info( + `Blocklisted deployment ${proposal.subgraphDeploymentId.toString()}, rejecting proposal ${ + proposal.id + }`, + ) + await consumer.markRejected(proposal.id, 'deployment blocklisted') + phases.ruleMs = elapsedMs(tRule) + logSummary('rejected_blocklisted') + return + } + await this.upsertDipsRuleFor(proposal.subgraphDeploymentId, { + allocationLifetime: Math.max( + Number(proposal.minSecondsPerCollection), + Number(proposal.maxSecondsPerCollection), + ), + }) + phases.ruleMs = elapsedMs(tRule) + + // Gate accept on the on-chain offer existing. If dipper's offer() tx was + // evicted (nonce collision, gas spike), rcaOffers is empty and + // acceptIndexingAgreement reverts with RecurringCollectorInvalidSigner — + // a transient state, retry next tick. Inside the safety margin, give up + // so reassessment can pick a replacement before the deadline lapses. + if (this.offerMonitor) { + const tOffer = process.hrtime.bigint() + const offerOnChain = await this.offerMonitor.offerExists(proposal.id) + phases.offerMs = elapsedMs(tOffer) + if (!offerOnChain) { + if (proposal.deadline > now + OFFER_GATE_DEADLINE_SAFETY_MARGIN_SECONDS) { + this.logger.debug( + 'Offer not yet on-chain, waiting for next acceptance-loop tick', + { + proposalId: proposal.id, + deadline: proposal.deadline.toString(), + now: now.toString(), + }, + ) + logSummary('waiting_for_offer') + return + } + this.logger.warn( + 'Offer never landed on-chain within the RCA deadline, rejecting proposal', + { + proposalId: proposal.id, + deadline: proposal.deadline.toString(), + now: now.toString(), + }, + ) + await consumer.markRejected(proposal.id, 'offer_never_landed') + await this.cleanupDipsRule(consumer, proposal) + logSummary('rejected_offer_never_landed') + return + } + } + const allocation = activeAllocations.find( (a) => a.subgraphDeployment.id.bytes32 === proposal.subgraphDeploymentId.bytes32, ) + const tAccept = process.hrtime.bigint() if (allocation) { await this.acceptWithExistingAllocation(consumer, proposal, allocation) } else { await this.acceptWithNewAllocation(consumer, proposal, activeAllocations) } + phases.acceptMs = elapsedMs(tAccept) + // The accept helpers swallow errors via handleAcceptError; per-outcome + // log lines from inside them tell the actual story. + logSummary('accept_attempted') } private async acceptWithExistingAllocation( @@ -555,82 +679,89 @@ export class DipsManager { async collectAgreementPayments(): Promise { const logger = this.logger.child({ function: 'collectAgreementPayments' }) - const indexerAddress = this.network.specification.indexerOptions.address + try { + const indexerAddress = this.network.specification.indexerOptions.address - if (!this.network.indexingPaymentsSubgraph) { - logger.warn( - 'Indexing payments subgraph not configured, skipping agreement collection', + if (!this.network.indexingPaymentsSubgraph) { + logger.warn( + 'Indexing payments subgraph not configured, skipping agreement collection', + ) + return + } + const agreements = await fetchCollectableAgreements( + this.network.indexingPaymentsSubgraph, + indexerAddress, ) - return - } - const agreements = await fetchCollectableAgreements( - this.network.indexingPaymentsSubgraph, - indexerAddress, - ) - - if (agreements.length === 0) { - logger.debug('No collectable agreements found') - return - } - - // Cancel any agreements whose deployments are blocklisted - await this.cancelBlocklistedAgreements(agreements) - // Use chain timestamp for consistency with contract timing and subgraph data - const blockNumber = await this.network.networkProvider.getBlockNumber() - const block = await this.network.networkProvider.getBlock(blockNumber) - const nowSeconds = block ? Number(block.timestamp) : Math.floor(Date.now() / 1000) + if (agreements.length === 0) { + logger.debug('No collectable agreements found') + return + } - // Sync tracker state from subgraph data - for (const agreement of agreements) { - this.collectionTracker.track(agreement.id, { - lastCollectedAt: Number(agreement.lastCollectionAt), - minSecondsPerCollection: agreement.minSecondsPerCollection, - maxSecondsPerCollection: agreement.maxSecondsPerCollection, - }) - } + // Cancel any agreements whose deployments are blocklisted + await this.cancelBlocklistedAgreements(agreements) - const readyIds = this.collectionTracker.getReadyAgreements(nowSeconds) - if (readyIds.length === 0) { - logger.debug('No agreements ready for collection', { - total: agreements.length, - }) - return - } + // Use chain timestamp for consistency with contract timing and subgraph data + const blockNumber = await this.network.networkProvider.getBlockNumber() + const block = await this.network.networkProvider.getBlock(blockNumber) + const nowSeconds = block ? Number(block.timestamp) : Math.floor(Date.now() / 1000) + + // Sync tracker state from subgraph data + for (const agreement of agreements) { + this.collectionTracker.track(agreement.id, { + lastCollectedAt: Number(agreement.lastCollectionAt), + minSecondsPerCollection: agreement.minSecondsPerCollection, + maxSecondsPerCollection: agreement.maxSecondsPerCollection, + }) + } - logger.info( - `${readyIds.length} of ${agreements.length} agreement(s) ready for collection`, - ) + const readyIds = this.collectionTracker.getReadyAgreements(nowSeconds) + if (readyIds.length === 0) { + logger.debug('No agreements ready for collection', { + total: agreements.length, + }) + return + } - const readyAgreements = agreements.filter((a) => readyIds.includes(a.id)) + logger.info( + `${readyIds.length} of ${agreements.length} agreement(s) ready for collection`, + ) - for (const agreement of readyAgreements) { - try { - const result = await this.tryCollectAgreement(agreement, blockNumber, logger) - if (result === 'collected') { - this.collectionTracker.updateAfterCollection(agreement.id, nowSeconds) - this.cleanupFinishedAgreement(agreement, nowSeconds, logger) + const readyAgreements = agreements.filter((a) => readyIds.includes(a.id)) + + for (const agreement of readyAgreements) { + try { + const result = await this.tryCollectAgreement(agreement, blockNumber, logger) + if (result === 'collected') { + this.collectionTracker.updateAfterCollection(agreement.id, nowSeconds) + this.cleanupFinishedAgreement(agreement, nowSeconds, logger) + } + // 'paused' / 'unauthorized' are pre-flight checks; no on-chain attempt was + // made, so don't bump the tracker. Next tick will retry immediately. + } catch (err) { + const isDeterministic = this.isDeterministicError(err) + const errorDetail = isDeterministic + ? tryParseCustomError(err) + : err instanceof Error + ? err.message + : String(err) + // Throttle the retry so we don't hammer the chain on every poll cycle. + // Deterministic errors during collection are typically recoverable + // (subgraph sync, allocation reconcile, provision changes), so we + // don't auto-cancel; we just slow down. + this.collectionTracker.markAttempted(agreement.id, nowSeconds) + logger.warn('Failed to collect agreement, will retry after throttle', { + agreementId: agreement.id, + error: errorDetail, + deterministic: isDeterministic, + }) } - // 'paused' / 'unauthorized' are pre-flight checks; no on-chain attempt was - // made, so don't bump the tracker. Next tick will retry immediately. - } catch (err) { - const isDeterministic = this.isDeterministicError(err) - const errorDetail = isDeterministic - ? tryParseCustomError(err) - : err instanceof Error - ? err.message - : String(err) - // Throttle the retry so we don't hammer the chain on every poll cycle. - // Deterministic errors during collection are typically recoverable - // (subgraph sync, allocation reconcile, provision changes), so we - // don't auto-cancel; we just slow down. - this.collectionTracker.markAttempted(agreement.id, nowSeconds) - logger.warn('Failed to collect agreement, will retry after throttle', { - agreementId: agreement.id, - error: errorDetail, - deterministic: isDeterministic, - }) } + } catch (err) { + // Catch outer fetch failures (subgraph query, RPC getBlockNumber/getBlock, + // cancelBlocklistedAgreements) so a transient failure skips this tick rather + // than aborting the entire reconcile cycle for every network. + logger.warn('Skipping DIPs collection tick due to fetch failure', { err }) } } @@ -749,11 +880,40 @@ export class DipsManager { proposal: DecodedRcaProposal, error: unknown, ): Promise { + // ABI-level mismatches are deterministic; retrying for the full RCA + // deadline only burns the budget. Mark rejected immediately so dipper + // reassessment can pick a working candidate. + const abiMismatchReason = this.classifyAbiMismatch(error) + if (abiMismatchReason !== null) { + const callException = error as { code?: string; message?: string } + this.logger.warn('Rejecting proposal: ABI mismatch (non-recoverable)', { + proposalId: proposal.id, + deployment: proposal.subgraphDeploymentId.ipfsHash, + reason: abiMismatchReason, + ethersCode: callException.code ?? null, + errorMessage: callException.message ?? null, + }) + await consumer.markRejected(proposal.id, abiMismatchReason) + await this.cleanupDipsRule(consumer, proposal) + return + } + if (this.isDeterministicError(error)) { const parsedError = tryParseCustomError(error) + const callException = error as { + reason?: string + data?: string + message?: string + transaction?: { to?: string; data?: string } + } this.logger.warn('Rejecting proposal: deterministic contract error', { proposalId: proposal.id, + deployment: proposal.subgraphDeploymentId.ipfsHash, error: parsedError, + revertReason: callException.reason ?? null, + revertData: callException.data ?? null, + errorMessage: callException.message ?? null, + contractTarget: callException.transaction?.to ?? null, }) await consumer.markRejected(proposal.id, String(parsedError)) await this.cleanupDipsRule(consumer, proposal) @@ -765,6 +925,20 @@ export class DipsManager { } } + private classifyAbiMismatch(error: unknown): string | null { + const typedError = error as { code?: string; operation?: string } + if ( + typedError?.code === 'UNSUPPORTED_OPERATION' && + typedError?.operation === 'fragment' + ) { + return 'abi_fragment_mismatch' + } + if (typedError?.code === 'INVALID_ARGUMENT') { + return 'abi_invalid_argument' + } + return null + } + private isDeterministicError(error: unknown): boolean { const typedError = error as { code?: string } return typedError?.code === 'CALL_EXCEPTION' @@ -811,4 +985,216 @@ export class DipsManager { const { deployments } = await this.getDipsTargetDeployments() return deployments } + startProposalAcceptanceLoop() { + if (!this.pendingRcaConsumer) { + this.logger.debug('No pending RCA consumer configured, skipping acceptance loop') + return + } + const consumer = this.pendingRcaConsumer + + sequentialTimerMap( + { + logger: this.logger, + milliseconds: DIPS_ACCEPTANCE_INTERVAL, + }, + async () => { + const proposals = await consumer.getPendingProposals() + if (proposals.length === 0) { + return + } + + this.logger.info('Processing pending RCA proposals for on-chain acceptance', { + count: proposals.length, + concurrency: DIPS_ACCEPT_CONCURRENCY, + }) + + const activeAllocations = await this.network.networkMonitor.allocations( + AllocationStatus.ACTIVE, + ) + + // Run up to DIPS_ACCEPT_CONCURRENCY proposals in parallel. Each + // processProposal call targets a distinct agreementId and has no + // shared mutable state with the others. Per-proposal failures are + // already isolated by handleAcceptError; the explicit try/catch + // here defends against any unexpected throw escaping that. + await pMap( + proposals, + async (proposal) => { + try { + await this.processProposal(consumer, proposal, activeAllocations) + } catch (error) { + this.logger.error('Unexpected error processing proposal', { + proposalId: proposal.id, + error, + }) + } + }, + { concurrency: DIPS_ACCEPT_CONCURRENCY, stopOnError: false }, + ) + }, + { + onError: (err) => { + this.logger.error('Failed to process pending RCA proposals', { err }) + }, + }, + ) + } + + /** + * Query the indexing-payments-subgraph for the agent's accepted agreements + * and the subgraph's current chain timestamp. Used by the allocation + * sweep to verify that each `dips`-basis indexing rule has a paying + * agreement backing it. + */ + async fetchAcceptedAgreementsForSelf(): Promise<{ + deployments: Set + blockTimestamp: number | null + }> { + if (!this.network.indexingPaymentsSubgraph) { + return { deployments: new Set(), blockTimestamp: null } + } + const indexer = this.network.specification.indexerOptions.address.toLowerCase() + const result = await this.network.indexingPaymentsSubgraph.query( + gql` + query selfAgreements($indexer: String!) { + _meta { + block { + timestamp + } + } + indexingAgreements(where: { indexer: $indexer, state: Accepted }, first: 1000) { + id + subgraphDeploymentId + } + } + `, + { indexer }, + ) + if (result.error) { + throw new Error(`indexing-payments query failed: ${result.error}`) + } + const data = result.data ?? {} + const deployments = new Set( + (data.indexingAgreements ?? []).map((a: { subgraphDeploymentId: string }) => + a.subgraphDeploymentId.toLowerCase(), + ), + ) + const blockTimestamp = data._meta?.block?.timestamp ?? null + return { deployments, blockTimestamp } + } + + /** + * Reconcile local `dips`-basis indexing rules against the + * indexing-payments-subgraph. Each rule represents a deployment the + * agent allocated to as part of a DIPs agreement. If the subgraph + * cannot confirm an Accepted agreement for that deployment, the rule + * is stale (the agent is allocated without payment, e.g. because + * dipper marked the agreement Expired or the original on-chain accept + * never linked back). The rule is deleted; the agent's normal + * reconciliation closes the allocation through its existing path. + * + * The subgraph block timestamp is checked first: if the subgraph is + * far behind wall-clock, the sweep is skipped this tick so we never + * disable rules based on stale data. + */ + async sweepDipsAllocations(): Promise { + if (!this.network.indexingPaymentsSubgraph) { + return + } + const logger = this.logger.child({ function: 'sweepDipsAllocations' }) + + let acceptedDeployments: Set + let blockTimestamp: number | null + try { + const result = await this.fetchAcceptedAgreementsForSelf() + acceptedDeployments = result.deployments + blockTimestamp = result.blockTimestamp + } catch (err) { + logger.warn('Skipping DIPs allocation sweep: subgraph query failed', { + err, + }) + return + } + + if (blockTimestamp === null) { + logger.warn( + 'Skipping DIPs allocation sweep: indexing-payments subgraph returned no _meta timestamp', + ) + return + } + + const nowSeconds = Math.floor(Date.now() / 1000) + const lag = nowSeconds - Number(blockTimestamp) + if (lag > DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS) { + logger.warn('Skipping DIPs allocation sweep: indexing-payments subgraph is stale', { + subgraphTimestamp: blockTimestamp, + nowSeconds, + lagSeconds: lag, + thresholdSeconds: DIPS_SWEEP_STALENESS_THRESHOLD_SECONDS, + }) + return + } + + const dipsRules = await this.models.IndexingRule.findAll({ + where: { + decisionBasis: IndexingDecisionBasis.DIPS, + identifierType: SubgraphIdentifierType.DEPLOYMENT, + }, + }) + + let removed = 0 + for (const rule of dipsRules) { + const deploymentBytes32 = new SubgraphDeploymentID(rule.identifier).bytes32 + const deploymentLower = deploymentBytes32.toLowerCase() + if (acceptedDeployments.has(deploymentLower)) { + continue + } + logger.warn( + 'Removing DIPs indexing rule with no backing agreement in indexing-payments-subgraph', + { + deployment: rule.identifier, + subgraphTimestamp: blockTimestamp, + }, + ) + await this.models.IndexingRule.destroy({ where: { id: rule.id } }) + removed += 1 + } + + if (removed > 0) { + logger.info('DIPs allocation sweep removed stale rules', { + removed, + rulesChecked: dipsRules.length, + acceptedAgreements: acceptedDeployments.size, + }) + } else { + logger.debug('DIPs allocation sweep: all dips rules backed', { + rulesChecked: dipsRules.length, + acceptedAgreements: acceptedDeployments.size, + }) + } + } + + startAllocationSweepLoop() { + if (!this.network.indexingPaymentsSubgraph) { + this.logger.debug( + 'No indexing-payments-subgraph configured, skipping DIPs allocation sweep loop', + ) + return + } + + sequentialTimerMap( + { + logger: this.logger, + milliseconds: DIPS_SWEEP_INTERVAL, + }, + async () => { + await this.sweepDipsAllocations() + }, + { + onError: (err) => { + this.logger.error('DIPs allocation sweep tick failed', { err }) + }, + }, + ) + } } diff --git a/packages/indexer-common/src/indexing-fees/index.ts b/packages/indexer-common/src/indexing-fees/index.ts index 0f8dba604..eb03983ff 100644 --- a/packages/indexer-common/src/indexing-fees/index.ts +++ b/packages/indexer-common/src/indexing-fees/index.ts @@ -1,3 +1,4 @@ export * from './dips' +export * from './offer-monitor' export * from './types' export * from './pending-rca-consumer' diff --git a/packages/indexer-common/src/indexing-fees/offer-monitor.ts b/packages/indexer-common/src/indexing-fees/offer-monitor.ts new file mode 100644 index 000000000..e6b95f9b9 --- /dev/null +++ b/packages/indexer-common/src/indexing-fees/offer-monitor.ts @@ -0,0 +1,61 @@ +import { Logger } from '@graphprotocol/common-ts' +import gql from 'graphql-tag' +import { SubgraphClient } from '../subgraph-client' + +const OFFER_EXISTS_QUERY = gql` + query offerExists($id: ID!) { + offer(id: $id) { + id + } + } +` + +// The pending_rca_proposals table stores the agreementId as a UUID +// (`bea99452-e465-e9d9-8a79-2356edcc7e92`); the subgraph keys Offer +// entities by the on-chain bytes16 hex (`0xbea99452...92`). +function toBytes16Id(agreementId: string): string { + if (agreementId.startsWith('0x')) { + return agreementId.toLowerCase() + } + return `0x${agreementId.replace(/-/g, '').toLowerCase()}` +} + +/** + * Checks the indexing-payments-subgraph for the presence of an `Offer` + * entity. Used by the DIPs accept path to gate `acceptIndexingAgreement` + * on dipper's `offer()` tx having landed on-chain; without this gate the + * contract reverts with `RecurringCollectorInvalidSigner` whenever the + * agent's poll beats dipper's submission. + * + * Subgraph errors are treated as "not yet" (transient) — better to wait + * one more tick than to false-positive a rejection on a momentary + * subgraph hiccup. + */ +export class OfferMonitor { + constructor( + private readonly logger: Logger, + private readonly subgraph: SubgraphClient, + ) {} + + async offerExists(agreementId: string): Promise { + try { + const result = await this.subgraph.query(OFFER_EXISTS_QUERY, { + id: toBytes16Id(agreementId), + }) + if (result.error) { + this.logger.debug('Offer existence check failed (will retry on next tick)', { + agreementId, + err: result.error, + }) + return false + } + return Boolean(result.data?.offer) + } catch (err) { + this.logger.debug('Offer existence check threw (will retry on next tick)', { + agreementId, + err, + }) + return false + } + } +} diff --git a/packages/indexer-common/src/network-specification.ts b/packages/indexer-common/src/network-specification.ts index 5e941d4b3..868b1d8f7 100644 --- a/packages/indexer-common/src/network-specification.ts +++ b/packages/indexer-common/src/network-specification.ts @@ -153,6 +153,10 @@ export const ProtocolSubgraphs = z networkSubgraph: Subgraph, epochSubgraph: Subgraph, tapSubgraph: OptionalSubgraph, + // Source of truth for on-chain RCA offers. The DIPs accept path + // queries this before calling acceptIndexingAgreement so the + // contract's rcaOffers check doesn't revert on a race where the + // offer tx hasn't landed yet. indexingPaymentsSubgraph: OptionalSubgraph, }) .strict()