Skip to content

Commit d393c2d

Browse files
committed
common: limit graph-node status query to actions' deploymentIDs
1 parent 0a5690a commit d393c2d

File tree

3 files changed

+122
-10
lines changed

3 files changed

+122
-10
lines changed

packages/indexer-common/src/graph-node.ts

Lines changed: 109 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import { BlockPointer, ChainIndexingStatus, IndexingStatus } from './types'
1212
import pRetry, { Options } from 'p-retry'
1313
import axios, { AxiosInstance } from 'axios'
1414
import fetch from 'isomorphic-fetch'
15+
import { Action } from './indexer-management'
1516

1617
interface indexNode {
1718
id: string
@@ -103,9 +104,15 @@ export class GraphNode {
103104
await pRetry(
104105
async () => {
105106
const deployments = await this.subgraphDeployments()
106-
this.logger.info(`Successfully connected to indexing status API`, {
107-
currentDeployments: deployments.map((deployment) => deployment.display),
108-
})
107+
if (deployments.length < 100) {
108+
this.logger.info(`Successfully connected to indexing status API`, {
109+
currentDeployments: deployments.map((deployment) => deployment.display),
110+
})
111+
} else {
112+
this.logger.info(`Successfully connected to indexing status API`, {
113+
currentDeploymentCount: deployments.length,
114+
})
115+
}
109116
},
110117
{
111118
retries: 10,
@@ -148,10 +155,98 @@ export class GraphNode {
148155
)
149156
}
150157

158+
public async subgraphDeploymentAssignmentsForAllocateActions(
159+
subgraphStatus: SubgraphStatus,
160+
actions: Action[],
161+
): Promise<SubgraphDeploymentAssignment[]> {
162+
const deploymentIDs = actions.map((action) => action.deploymentID)
163+
164+
const nodeOnlyResult = await this.status
165+
.query(
166+
gql`
167+
query indexingStatuses($subgraphs: [String!]!) {
168+
indexingStatuses(subgraphs: $subgraphs) {
169+
subgraphDeployment: subgraph
170+
node
171+
}
172+
}
173+
`,
174+
{ subgraphs: deploymentIDs },
175+
)
176+
.toPromise()
177+
178+
if (nodeOnlyResult.error) {
179+
throw nodeOnlyResult.error
180+
}
181+
182+
const withAssignments: string[] = nodeOnlyResult.data.indexingStatuses
183+
.filter(
184+
(result: { node: string | null }) =>
185+
result.node !== null && result.node !== undefined,
186+
)
187+
.map((result: { subgraphDeployment: string }) => result.subgraphDeployment)
188+
189+
const result = await this.status
190+
.query(
191+
gql`
192+
query indexingStatuses($subgraphs: [String!]!) {
193+
indexingStatuses(subgraphs: $subgraphs) {
194+
subgraphDeployment: subgraph
195+
node
196+
paused
197+
}
198+
}
199+
`,
200+
{ subgraphs: withAssignments },
201+
)
202+
.toPromise()
203+
204+
if (result.error) {
205+
throw result.error
206+
}
207+
208+
if (!result.data.indexingStatuses || result.data.length === 0) {
209+
this.logger.warn(`No 'indexingStatuses' data returned from index nodes`, {
210+
data: result.data,
211+
})
212+
return []
213+
}
214+
215+
type QueryResult = {
216+
subgraphDeployment: string
217+
node: string | undefined
218+
paused: boolean | undefined
219+
}
220+
221+
const results = result.data.indexingStatuses
222+
.filter((status: QueryResult) => {
223+
if (subgraphStatus === SubgraphStatus.ACTIVE) {
224+
return (
225+
status.paused === false ||
226+
(status.paused === undefined && status.node !== 'removed')
227+
)
228+
} else if (subgraphStatus === SubgraphStatus.PAUSED) {
229+
return status.node === 'removed' || status.paused === true
230+
} else if (subgraphStatus === SubgraphStatus.ALL) {
231+
return true
232+
}
233+
})
234+
.map((status: QueryResult) => {
235+
return {
236+
id: new SubgraphDeploymentID(status.subgraphDeployment),
237+
node: status.node,
238+
paused: status.paused ?? status.node === 'removed',
239+
}
240+
})
241+
242+
return results
243+
}
244+
151245
public async subgraphDeploymentsAssignments(
152246
subgraphStatus: SubgraphStatus,
153247
): Promise<SubgraphDeploymentAssignment[]> {
154248
try {
249+
const startTimeMs = Date.now()
155250
this.logger.debug('Fetch subgraph deployment assignments')
156251

157252
// FIXME: remove this initial check for just node when graph-node releases
@@ -170,6 +265,10 @@ export class GraphNode {
170265
)
171266
.toPromise()
172267

268+
this.logger.debug(
269+
`Fetch subgraph deployment assignments took ${Date.now() - startTimeMs}ms`,
270+
)
271+
173272
if (nodeOnlyResult.error) {
174273
throw nodeOnlyResult.error
175274
}
@@ -214,7 +313,7 @@ export class GraphNode {
214313
paused: boolean | undefined
215314
}
216315

217-
return result.data.indexingStatuses
316+
const results = result.data.indexingStatuses
218317
.filter((status: QueryResult) => {
219318
if (subgraphStatus === SubgraphStatus.ACTIVE) {
220319
return (
@@ -234,6 +333,12 @@ export class GraphNode {
234333
paused: status.paused ?? status.node === 'removed',
235334
}
236335
})
336+
this.logger.debug(
337+
`Fetching mapped subgraph deployment ${results.length} assignments took ${
338+
Date.now() - startTimeMs
339+
}ms`,
340+
)
341+
return results
237342
} catch (error) {
238343
const err = indexerError(IndexerErrorCode.IE018, error)
239344
this.logger.error(`Failed to query indexing status API`, { err })

packages/indexer-common/src/indexer-management/actions.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,14 @@ export class ActionManager {
236236
this.logger.warn('Previous batch action execution is still in progress')
237237
return this.executeBatchActionsPromise
238238
}
239-
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
240-
const updatedActions = await this.executeBatchActionsPromise
241-
this.executeBatchActionsPromise = undefined
239+
240+
let updatedActions: Action[] = []
241+
try {
242+
this.executeBatchActionsPromise = this.executeApprovedActionsInner(network)
243+
updatedActions = await this.executeBatchActionsPromise
244+
} finally {
245+
this.executeBatchActionsPromise = undefined
246+
}
242247
return updatedActions
243248
}
244249

packages/indexer-common/src/indexer-management/allocations.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -322,9 +322,11 @@ export class AllocationManager {
322322
logger.info('Ensure subgraph deployments are deployed before we allocate to them', {
323323
allocateActions,
324324
})
325-
const currentAssignments = await this.graphNode.subgraphDeploymentsAssignments(
326-
SubgraphStatus.ALL,
327-
)
325+
const currentAssignments =
326+
await this.graphNode.subgraphDeploymentAssignmentsForAllocateActions(
327+
SubgraphStatus.ALL,
328+
actions,
329+
)
328330
await pMap(
329331
allocateActions,
330332
async (action: Action) =>

0 commit comments

Comments
 (0)