Skip to content

Commit f2d1eca

Browse files
authored
feat: requiredBlockNumber callback parameter in TheGraphClient (#3238)
`TheGraphClient#queryEntities` callback now has extra parameter which delivers the information about required block number. We already had a poller in TheGraphClient, but it seems that that approach doesn't always work (maybe the latest block number and the actual query are be fetched from different indexers, maybe because there is a load balancer in The Graph). Also modified Autostaker to use that feature in the queries. ## Future improvements - Implement the support also in `TheGraphClient#queryEntity` (which doesn't currently use callbacks)
1 parent a61d4fc commit f2d1eca

File tree

4 files changed

+87
-62
lines changed

4 files changed

+87
-62
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@ Changes before Tatum release are not documented in this file.
1212

1313
#### Added
1414

15-
- Transaction timeouts in Autostaker (https://github.com/streamr-dev/network/pull/3236)
15+
Autostaker changes:
16+
- transaction timeouts (https://github.com/streamr-dev/network/pull/3236)
17+
- queries filter by required block number (https://github.com/streamr-dev/network/pull/3238)
1618

1719
#### Changed
1820

packages/node/src/plugins/autostaker/AutostakerPlugin.ts

Lines changed: 65 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -236,31 +236,33 @@ export class AutostakerPlugin extends Plugin<AutostakerPluginConfig> {
236236
stakes: Map<SponsorshipID, WeiAmount>,
237237
streamrClient: StreamrClient
238238
): Promise<Map<SponsorshipID, SponsorshipConfig>> {
239-
const queryResult = streamrClient.getTheGraphClient().queryEntities<SponsorshipQueryResultItem>((lastId: string, pageSize: number) => {
240-
// TODO add support spnsorships which have non-zero minimumStakingPeriodSeconds (i.e. implement some loggic in the
241-
// payoutPropotionalStrategy so that we ensure that unstaking doesn't happen too soon)
242-
return {
243-
query: `
244-
{
245-
sponsorships (
246-
where: {
247-
projectedInsolvency_gt: ${Math.floor(Date.now() / 1000)}
248-
minimumStakingPeriodSeconds: "0"
249-
minOperators_lte: ${this.pluginConfig.maxAcceptableMinOperatorCount}
250-
totalPayoutWeiPerSec_gte: "${MIN_SPONSORSHIP_TOTAL_PAYOUT_PER_SECOND.toString()}"
251-
id_gt: "${lastId}"
252-
},
253-
first: ${pageSize}
254-
) {
255-
id
256-
totalPayoutWeiPerSec
257-
operatorCount
258-
maxOperators
239+
const queryResult = streamrClient.getTheGraphClient()
240+
.queryEntities<SponsorshipQueryResultItem>((lastId: string, pageSize: number, requiredBlockNumber: number) => {
241+
// TODO add support spnsorships which have non-zero minimumStakingPeriodSeconds (i.e. implement some loggic in the
242+
// payoutPropotionalStrategy so that we ensure that unstaking doesn't happen too soon)
243+
return {
244+
query: `
245+
{
246+
sponsorships (
247+
where: {
248+
projectedInsolvency_gt: ${Math.floor(Date.now() / 1000)}
249+
minimumStakingPeriodSeconds: "0"
250+
minOperators_lte: ${this.pluginConfig.maxAcceptableMinOperatorCount}
251+
totalPayoutWeiPerSec_gte: "${MIN_SPONSORSHIP_TOTAL_PAYOUT_PER_SECOND.toString()}"
252+
id_gt: "${lastId}"
253+
},
254+
first: ${pageSize}
255+
block: { number_gte: ${requiredBlockNumber} }
256+
) {
257+
id
258+
totalPayoutWeiPerSec
259+
operatorCount
260+
maxOperators
261+
}
259262
}
260-
}
261-
`
262-
}
263-
})
263+
`
264+
}
265+
})
264266
const sponsorships = await collect(queryResult)
265267
const hasAcceptableOperatorCount = (item: SponsorshipQueryResultItem) => {
266268
if (stakes.has(item.id)) {
@@ -279,50 +281,54 @@ export class AutostakerPlugin extends Plugin<AutostakerPluginConfig> {
279281
}
280282

281283
private async getMyCurrentStakes(streamrClient: StreamrClient): Promise<Map<SponsorshipID, WeiAmount>> {
282-
const queryResult = streamrClient.getTheGraphClient().queryEntities<StakeQueryResultItem>((lastId: string, pageSize: number) => {
283-
return {
284-
query: `
285-
{
286-
stakes (
287-
where: {
288-
operator: "${this.pluginConfig.operatorContractAddress.toLowerCase()}",
289-
id_gt: "${lastId}"
290-
},
291-
first: ${pageSize}
292-
) {
293-
id
294-
sponsorship {
284+
const queryResult = streamrClient.getTheGraphClient()
285+
.queryEntities<StakeQueryResultItem>((lastId: string, pageSize: number, requiredBlockNumber: number) => {
286+
return {
287+
query: `
288+
{
289+
stakes (
290+
where: {
291+
operator: "${this.pluginConfig.operatorContractAddress.toLowerCase()}",
292+
id_gt: "${lastId}"
293+
},
294+
first: ${pageSize}
295+
block: { number_gte: ${requiredBlockNumber} }
296+
) {
295297
id
298+
sponsorship {
299+
id
300+
}
301+
amountWei
296302
}
297-
amountWei
298303
}
299-
}
300-
`
301-
}
302-
})
304+
`
305+
}
306+
})
303307
const stakes = await collect(queryResult)
304308
return new Map(stakes.map((stake) => [stake.sponsorship.id, BigInt(stake.amountWei) ]))
305309
}
306310

307311
private async getUndelegationQueueAmount(streamrClient: StreamrClient): Promise<WeiAmount> {
308-
const queryResult = streamrClient.getTheGraphClient().queryEntities<UndelegationQueueQueryResultItem>((lastId: string, pageSize: number) => {
309-
return {
310-
query: `
311-
{
312-
queueEntries (
313-
where: {
314-
operator: "${this.pluginConfig.operatorContractAddress.toLowerCase()}",
315-
id_gt: "${lastId}"
316-
},
317-
first: ${pageSize}
318-
) {
319-
id
320-
amount
312+
const queryResult = streamrClient.getTheGraphClient()
313+
.queryEntities<UndelegationQueueQueryResultItem>((lastId: string, pageSize: number, requiredBlockNumber: number) => {
314+
return {
315+
query: `
316+
{
317+
queueEntries (
318+
where: {
319+
operator: "${this.pluginConfig.operatorContractAddress.toLowerCase()}",
320+
id_gt: "${lastId}"
321+
},
322+
first: ${pageSize}
323+
block: { number_gte: ${requiredBlockNumber} }
324+
) {
325+
id
326+
amount
327+
}
321328
}
322-
}
323-
`
324-
}
325-
})
329+
`
330+
}
331+
})
326332
const entries = await collect(queryResult)
327333
return sum(entries.map((entry) => BigInt(entry.amount)))
328334
}

packages/utils/src/TheGraphClient.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ export class TheGraphClient {
5151
}
5252

5353
async* queryEntities<T extends { id: string }>(
54-
createQuery: (lastId: string, pageSize: number) => GraphQLQuery,
54+
createQuery: (lastId: string, pageSize: number, requiredBlockNumber: number) => GraphQLQuery,
5555
/*
5656
* For simple queries there is one root level property, e.g. "streams" or "permissions"
5757
* which contain array of items. If the query contains more than one root level property
@@ -68,7 +68,7 @@ export class TheGraphClient {
6868
let lastResultSet: T[] | undefined
6969
do {
7070
const lastId = (lastResultSet !== undefined) ? lastResultSet[lastResultSet.length - 1].id : ''
71-
const query = createQuery(lastId, pageSize)
71+
const query = createQuery(lastId, pageSize, this.requiredBlockNumber)
7272
const response = await this.sendQuery(query)
7373
const items: T[] = parseItems(response)
7474
yield* items

packages/utils/test/TheGraphClient.test.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ class EmulatedTheGraphIndex {
3636
}
3737
}
3838

39+
const nextValue = async <T>(source: AsyncIterator<T>): Promise<T | undefined> => {
40+
const item = source.next()
41+
return (await item).value
42+
}
43+
3944
describe('TheGraphClient', () => {
4045

4146
let theGraphIndex: EmulatedTheGraphIndex
@@ -183,4 +188,16 @@ describe('TheGraphClient', () => {
183188
foo: 'result-7'
184189
})
185190
})
191+
192+
it('required block number in callback', async () => {
193+
theGraphIndex.start()
194+
const callback1 = jest.fn().mockReturnValue(MOCK_QUERY)
195+
await nextValue(client.queryEntities(callback1))
196+
expect(callback1).toHaveBeenCalledWith('', expect.any(Number), 0)
197+
198+
client.updateRequiredBlockNumber(2)
199+
const callback2 = jest.fn().mockReturnValue(MOCK_QUERY)
200+
await nextValue(client.queryEntities(callback2))
201+
expect(callback2).toHaveBeenCalledWith('', expect.any(Number), 2)
202+
})
186203
})

0 commit comments

Comments
 (0)