@@ -7,7 +7,10 @@ import {
7
7
IndexingDecisionBasis ,
8
8
IndexingRuleAttributes ,
9
9
Network ,
10
+ QueryFeeModels ,
11
+ sequentialTimerMap ,
10
12
SubgraphIdentifierType ,
13
+ TapCollector ,
11
14
upsertIndexingRule ,
12
15
} from '@graphprotocol/indexer-common'
13
16
import { Op } from 'sequelize'
@@ -23,6 +26,10 @@ import {
23
26
GatewayDipsServiceClientImpl ,
24
27
} from '@graphprotocol/dips-proto/generated/gateway'
25
28
import { IndexingAgreement } from '../indexer-management/models/indexing-agreement'
29
+ import { NetworkSpecification } from '../network-specification'
30
+ import { Wallet } from 'ethers'
31
+
32
+ const DIPS_COLLECTION_INTERVAL = 60_000
26
33
27
34
export class DipsManager {
28
35
private gatewayDipsServiceClient : GatewayDipsServiceClientImpl
@@ -86,9 +93,109 @@ export class DipsManager {
86
93
await agreement . save ( )
87
94
}
88
95
}
96
+ async ensureAgreementRules ( ) {
97
+ if ( ! this . parent ) {
98
+ this . logger . error (
99
+ 'DipsManager has no parent AllocationManager, cannot ensure agreement rules' ,
100
+ )
101
+ return
102
+ }
103
+ // Get all the indexing agreements that are not cancelled
104
+ const indexingAgreements = await this . models . IndexingAgreement . findAll ( {
105
+ where : {
106
+ cancelled_at : null ,
107
+ } ,
108
+ } )
109
+ // For each agreement, check that there is an indexing rule to always
110
+ // allocate to the agreement's subgraphDeploymentId, and if not, create one
111
+ for ( const agreement of indexingAgreements ) {
112
+ const subgraphDeploymentID = new SubgraphDeploymentID (
113
+ agreement . subgraph_deployment_id ,
114
+ )
115
+ // If there is not yet an indexingRule that deems this deployment worth allocating to, make one
116
+ if ( ! ( await this . parent . matchingRuleExists ( this . logger , subgraphDeploymentID ) ) ) {
117
+ this . logger . debug ( `Creating indexing rule for agreement ${ agreement . id } ` )
118
+ const indexingRule = {
119
+ identifier : agreement . subgraph_deployment_id ,
120
+ allocationAmount : formatGRT (
121
+ this . network . specification . indexerOptions . dipsAllocationAmount ,
122
+ ) ,
123
+ identifierType : SubgraphIdentifierType . DEPLOYMENT ,
124
+ decisionBasis : IndexingDecisionBasis . ALWAYS ,
125
+ protocolNetwork : this . network . specification . networkIdentifier ,
126
+ autoRenewal : true ,
127
+ allocationLifetime : Math . max (
128
+ Number ( agreement . min_epochs_per_collection ) ,
129
+ Number ( agreement . max_epochs_per_collection ) -
130
+ this . network . specification . indexerOptions . dipsEpochsMargin ,
131
+ ) ,
132
+ } as Partial < IndexingRuleAttributes >
133
+
134
+ await upsertIndexingRule ( this . logger , this . models , indexingRule )
135
+ }
136
+ }
137
+ }
138
+ }
139
+
140
+ export class DipsCollector {
141
+ private gatewayDipsServiceClient : GatewayDipsServiceClientImpl
142
+ constructor (
143
+ private logger : Logger ,
144
+ private managementModels : IndexerManagementModels ,
145
+ private queryFeeModels : QueryFeeModels ,
146
+ private specification : NetworkSpecification ,
147
+ private tapCollector : TapCollector ,
148
+ private wallet : Wallet ,
149
+ ) {
150
+ if ( ! this . specification . indexerOptions . dipperEndpoint ) {
151
+ throw new Error ( 'dipperEndpoint is not set' )
152
+ }
153
+ this . gatewayDipsServiceClient = createGatewayDipsServiceClient (
154
+ this . specification . indexerOptions . dipperEndpoint ,
155
+ )
156
+ }
157
+
158
+ static create (
159
+ logger : Logger ,
160
+ managementModels : IndexerManagementModels ,
161
+ queryFeeModels : QueryFeeModels ,
162
+ specification : NetworkSpecification ,
163
+ tapCollector : TapCollector ,
164
+ wallet : Wallet ,
165
+ ) {
166
+ const collector = new DipsCollector (
167
+ logger ,
168
+ managementModels ,
169
+ queryFeeModels ,
170
+ specification ,
171
+ tapCollector ,
172
+ wallet ,
173
+ )
174
+ collector . startCollectionLoop ( )
175
+ return collector
176
+ }
177
+
178
+ startCollectionLoop ( ) {
179
+ sequentialTimerMap (
180
+ {
181
+ logger : this . logger ,
182
+ milliseconds : DIPS_COLLECTION_INTERVAL ,
183
+ } ,
184
+ async ( ) => {
185
+ this . logger . debug ( 'Running DIPS payment collection loop' )
186
+ await this . collectAllPayments ( )
187
+ } ,
188
+ {
189
+ onError : ( err ) => {
190
+ this . logger . error ( 'Failed to collect DIPS payments' , { err } )
191
+ } ,
192
+ } ,
193
+ )
194
+ }
195
+
89
196
// Collect payments for all outstanding agreements
90
197
async collectAllPayments ( ) {
91
- const outstandingAgreements = await this . models . IndexingAgreement . findAll ( {
198
+ const outstandingAgreements = await this . managementModels . IndexingAgreement . findAll ( {
92
199
where : {
93
200
last_payment_collected_at : null ,
94
201
last_allocation_id : {
@@ -110,27 +217,27 @@ export class DipsManager {
110
217
agreement . id ,
111
218
agreement . last_allocation_id ,
112
219
entityCount ,
113
- this . network . wallet ,
220
+ this . wallet ,
114
221
)
115
222
try {
116
223
const response = await this . gatewayDipsServiceClient . CollectPayment ( {
117
224
version : 1 ,
118
225
signedCollection : collection ,
119
226
} )
120
227
if ( response . status === CollectPaymentStatus . ACCEPT ) {
121
- if ( ! this . network . tapCollector ) {
228
+ if ( ! this . tapCollector ) {
122
229
throw new Error ( 'TapCollector not initialized' )
123
230
}
124
231
// Store the tap receipt in the database
125
232
const tapReceipt = decodeTapReceipt (
126
233
response . tapReceipt ,
127
- this . network . tapCollector ?. tapContracts . tapVerifier . address ,
234
+ this . tapCollector ?. tapContracts . tapVerifier . address ,
128
235
)
129
236
// TODO: check that the signer of the TAP receipt is a signer
130
237
// on the corresponding escrow account for the payer (sender) of the
131
238
// indexing agreement
132
239
const escrowSender = await getEscrowSenderForSigner (
133
- this . network . tapCollector ?. tapSubgraph ,
240
+ this . tapCollector ?. tapSubgraph ,
134
241
tapReceipt . signer_address ,
135
242
)
136
243
if ( escrowSender !== agreement . payer ) {
@@ -139,9 +246,9 @@ export class DipsManager {
139
246
'Signer of TAP receipt is not a signer on the indexing agreement' ,
140
247
)
141
248
}
142
- await this . network . queryFeeModels . scalarTapReceipts . create ( tapReceipt )
249
+ await this . queryFeeModels . scalarTapReceipts . create ( tapReceipt )
143
250
// Mark the agreement as having had a payment collected
144
- await this . models . IndexingAgreement . update (
251
+ await this . managementModels . IndexingAgreement . update (
145
252
{
146
253
last_payment_collected_at : new Date ( ) ,
147
254
} ,
@@ -160,46 +267,4 @@ export class DipsManager {
160
267
} )
161
268
}
162
269
}
163
- async ensureAgreementRules ( ) {
164
- if ( ! this . parent ) {
165
- this . logger . error (
166
- 'DipsManager has no parent AllocationManager, cannot ensure agreement rules' ,
167
- )
168
- return
169
- }
170
- // Get all the indexing agreements that are not cancelled
171
- const indexingAgreements = await this . models . IndexingAgreement . findAll ( {
172
- where : {
173
- cancelled_at : null ,
174
- } ,
175
- } )
176
- // For each agreement, check that there is an indexing rule to always
177
- // allocate to the agreement's subgraphDeploymentId, and if not, create one
178
- for ( const agreement of indexingAgreements ) {
179
- const subgraphDeploymentID = new SubgraphDeploymentID (
180
- agreement . subgraph_deployment_id ,
181
- )
182
- // If there is not yet an indexingRule that deems this deployment worth allocating to, make one
183
- if ( ! ( await this . parent . matchingRuleExists ( this . logger , subgraphDeploymentID ) ) ) {
184
- this . logger . debug ( `Creating indexing rule for agreement ${ agreement . id } ` )
185
- const indexingRule = {
186
- identifier : agreement . subgraph_deployment_id ,
187
- allocationAmount : formatGRT (
188
- this . network . specification . indexerOptions . dipsAllocationAmount ,
189
- ) ,
190
- identifierType : SubgraphIdentifierType . DEPLOYMENT ,
191
- decisionBasis : IndexingDecisionBasis . ALWAYS ,
192
- protocolNetwork : this . network . specification . networkIdentifier ,
193
- autoRenewal : true ,
194
- allocationLifetime : Math . max (
195
- Number ( agreement . min_epochs_per_collection ) ,
196
- Number ( agreement . max_epochs_per_collection ) -
197
- this . network . specification . indexerOptions . dipsEpochsMargin ,
198
- ) ,
199
- } as Partial < IndexingRuleAttributes >
200
-
201
- await upsertIndexingRule ( this . logger , this . models , indexingRule )
202
- }
203
- }
204
- }
205
270
}
0 commit comments