Skip to content

Commit 919567a

Browse files
committed
chore: implement dipper service client
1 parent 8bc0d18 commit 919567a

File tree

6 files changed

+379
-14
lines changed

6 files changed

+379
-14
lines changed

packages/indexer-common/package.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,11 @@
2222
"clean": "rm -rf ./node_modules ./dist ./tsconfig.tsbuildinfo"
2323
},
2424
"dependencies": {
25+
"@bufbuild/protobuf": "2.2.3",
2526
"@graphprotocol/common-ts": "2.0.11",
2627
"@graphprotocol/cost-model": "0.1.18",
28+
"@graphprotocol/dips-proto": "0.2.0",
29+
"@grpc/grpc-js": "^1.12.6",
2730
"@semiotic-labs/tap-contracts-bindings": "^1.2.1",
2831
"@thi.ng/heaps": "1.2.38",
2932
"@types/lodash.clonedeep": "^4.5.7",
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
import { Client, credentials } from '@grpc/grpc-js'
2+
import { UnaryCallback } from '@grpc/grpc-js/build/src/client'
3+
import { DipperServiceClientImpl } from '@graphprotocol/dips-proto/generated/gateway'
4+
import { Wallet } from 'ethers'
5+
import {
6+
_TypedDataEncoder,
7+
arrayify,
8+
defaultAbiCoder,
9+
recoverAddress,
10+
} from 'ethers/lib/utils'
11+
import { toAddress } from '@graphprotocol/common-ts'
12+
13+
type RpcImpl = (service: string, method: string, data: Uint8Array) => Promise<Uint8Array>
14+
15+
interface Rpc {
16+
request: RpcImpl
17+
}
18+
19+
export const domainSalt =
20+
'0xb4632c657c26dce5d4d7da1d65bda185b14ff8f905ddbb03ea0382ed06c5ef28'
21+
export const chainId = 0xa4b1 // 42161
22+
export const cancelAgreementDomain = {
23+
name: 'Graph Protocol Indexing Agreement Cancellation',
24+
version: '0',
25+
chainId: chainId,
26+
salt: domainSalt,
27+
}
28+
export const cancelAgreementTypes = {
29+
CancellationRequest: [{ name: 'agreement_id', type: 'bytes16' }],
30+
}
31+
32+
export const collectPaymentsDomain = {
33+
name: 'Graph Protocol Indexing Agreement Collection',
34+
version: '0',
35+
chainId: chainId,
36+
salt: domainSalt,
37+
}
38+
export const collectPaymentsTypes = {
39+
CollectionRequest: [
40+
{ name: 'agreement_id', type: 'bytes16' },
41+
{ name: 'allocation_id', type: 'address' },
42+
{ name: 'entity_count', type: 'uint64' },
43+
],
44+
}
45+
46+
export const createSignedCancellationRequest = async (
47+
agreementId: string,
48+
wallet: Wallet,
49+
): Promise<Uint8Array> => {
50+
const signature = await wallet._signTypedData(
51+
cancelAgreementDomain,
52+
cancelAgreementTypes,
53+
{ agreement_id: agreementId },
54+
)
55+
return arrayify(
56+
defaultAbiCoder.encode(['tuple(bytes16)', 'bytes'], [[agreementId], signature]),
57+
)
58+
}
59+
60+
export const createSignedCollectionRequest = async (
61+
agreementId: string,
62+
allocationId: string,
63+
entityCount: number,
64+
wallet: Wallet,
65+
): Promise<Uint8Array> => {
66+
const signature = await wallet._signTypedData(
67+
collectPaymentsDomain,
68+
collectPaymentsTypes,
69+
{ agreement_id: agreementId, allocation_id: allocationId, entity_count: entityCount },
70+
)
71+
return arrayify(
72+
defaultAbiCoder.encode(
73+
['tuple(bytes16, address, uint64)', 'bytes'],
74+
[[agreementId, allocationId, entityCount], signature],
75+
),
76+
)
77+
}
78+
79+
export const decodeTapReceipt = (receipt: Uint8Array, verifyingContract: string) => {
80+
const [message, signature] = defaultAbiCoder.decode(
81+
['tuple(address,uint64,uint64,uint128)', 'bytes'],
82+
receipt,
83+
)
84+
85+
const [allocationId, timestampNs, nonce, value] = message
86+
87+
// Recover the signer address from the signature
88+
// compute the EIP-712 digest of the message
89+
const domain = {
90+
name: 'TAP',
91+
version: '1',
92+
chainId: chainId,
93+
verifyingContract,
94+
}
95+
96+
const types = {
97+
Receipt: [
98+
{ name: 'allocation_id', type: 'address' },
99+
{ name: 'timestamp_ns', type: 'uint64' },
100+
{ name: 'nonce', type: 'uint64' },
101+
{ name: 'value', type: 'uint128' },
102+
],
103+
}
104+
105+
const digest = _TypedDataEncoder.hash(domain, types, {
106+
allocation_id: allocationId,
107+
timestamp_ns: timestampNs,
108+
nonce: nonce,
109+
value: value,
110+
})
111+
const signerAddress = recoverAddress(digest, signature)
112+
return {
113+
allocation_id: allocationId,
114+
signer_address: toAddress(signerAddress),
115+
signature: signature,
116+
timestamp_ns: timestampNs,
117+
nonce: nonce,
118+
value: value,
119+
}
120+
}
121+
122+
export const createRpc = (url: string): Rpc => {
123+
const client = new Client(url, credentials.createInsecure())
124+
const request: RpcImpl = (service, method, data) => {
125+
// Conventionally in gRPC, the request path looks like
126+
// "package.names.ServiceName/MethodName",
127+
// we therefore construct such a string
128+
const path = `/${service}/${method}`
129+
130+
return new Promise((resolve, reject) => {
131+
// makeUnaryRequest transmits the result (and error) with a callback
132+
// transform this into a promise!
133+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
134+
const resultCallback: UnaryCallback<any> = (err, res) => {
135+
if (err) {
136+
return reject(err)
137+
}
138+
resolve(res)
139+
}
140+
// eslint-disable-next-line @typescript-eslint/no-explicit-any
141+
function passThrough(argument: any) {
142+
return argument
143+
}
144+
145+
// Using passThrough as the deserialize functions
146+
client.makeUnaryRequest(
147+
path,
148+
(d) => Buffer.from(d),
149+
passThrough,
150+
data,
151+
resultCallback,
152+
)
153+
})
154+
}
155+
156+
return { request }
157+
}
158+
159+
export const createDipperServiceClient = (url: string) => {
160+
const rpc = createRpc(url)
161+
return new DipperServiceClientImpl(rpc)
162+
}

packages/indexer-common/src/indexing-fees/dips.ts

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,35 @@ import {
1111
} from '@graphprotocol/indexer-common'
1212
import { Op } from 'sequelize'
1313

14+
import {
15+
createDipperServiceClient,
16+
createSignedCancellationRequest,
17+
createSignedCollectionRequest,
18+
decodeTapReceipt,
19+
} from './dipper-service-client'
20+
import {
21+
CollectPaymentStatus,
22+
DipperServiceClientImpl,
23+
} from '@graphprotocol/dips-proto/generated/gateway'
24+
import { IndexingAgreement } from '../indexer-management/models/indexing-agreement'
25+
1426
export class DipsManager {
27+
private dipperServiceClient: DipperServiceClientImpl
28+
1529
constructor(
1630
private logger: Logger,
1731
private models: IndexerManagementModels,
1832
private graphNode: GraphNode,
1933
private network: Network,
2034
private parent: AllocationManager | null,
21-
) {}
35+
) {
36+
if (!this.network.specification.indexerOptions.dipperEndpoint) {
37+
throw new Error('dipperEndpoint is not set')
38+
}
39+
this.dipperServiceClient = createDipperServiceClient(
40+
this.network.specification.indexerOptions.dipperEndpoint,
41+
)
42+
}
2243
// Cancel an agreement associated to an allocation if it exists
2344
async tryCancelAgreement(allocationId: string) {
2445
const agreement = await this.models.IndexingAgreement.findOne({
@@ -28,8 +49,22 @@ export class DipsManager {
2849
},
2950
})
3051
if (agreement) {
31-
// TODO use dips-proto to cancel agreement via grpc
32-
// Mark the agreement as cancelled
52+
try {
53+
const cancellation = await createSignedCancellationRequest(
54+
agreement.id,
55+
this.network.wallet,
56+
)
57+
await this.dipperServiceClient.CancelAgreement({
58+
version: 1,
59+
signedCancellation: cancellation,
60+
})
61+
62+
// Mark the agreement as cancelled
63+
agreement.cancelled_at = new Date()
64+
await agreement.save()
65+
} catch (error) {
66+
this.logger.error(`Error cancelling agreement ${agreement.id}`, { error })
67+
}
3368
}
3469
}
3570
// Update the current and last allocation ids for an agreement if it exists
@@ -62,18 +97,50 @@ export class DipsManager {
6297
})
6398
for (const agreement of outstandingAgreements) {
6499
if (agreement.last_allocation_id) {
65-
await this.tryCollectPayment(agreement.last_allocation_id)
100+
await this.tryCollectPayment(agreement)
66101
} else {
67102
// This should never happen as we check for this in the query
68103
this.logger.error(`Agreement ${agreement.id} has no last allocation id`)
69104
}
70105
}
71106
}
72-
async tryCollectPayment(lastAllocationId: string) {
73-
// TODO: use dips-proto to collect payment via grpc
74-
75-
// TODO: store the receipt in the database
76-
// (tap-agent will take care of aggregating it into a RAV)
107+
async tryCollectPayment(agreement: IndexingAgreement) {
108+
if (!agreement.last_allocation_id) {
109+
this.logger.error(`Agreement ${agreement.id} has no last allocation id`)
110+
return
111+
}
112+
const entityCount = 0 // TODO: get entity count from graph node
113+
const collection = await createSignedCollectionRequest(
114+
agreement.id,
115+
agreement.last_allocation_id,
116+
entityCount,
117+
this.network.wallet,
118+
)
119+
try {
120+
const response = await this.dipperServiceClient.CollectPayment({
121+
version: 1,
122+
signedCollection: collection,
123+
})
124+
if (response.status === CollectPaymentStatus.ACCEPT) {
125+
if (!this.network.tapCollector) {
126+
throw new Error('TapCollector not initialized')
127+
}
128+
// Store the tap receipt in the database
129+
const tapReceipt = decodeTapReceipt(
130+
response.tapReceipt,
131+
this.network.tapCollector?.tapContracts.tapVerifier.address,
132+
)
133+
await this.network.queryFeeModels.scalarTapReceipts.create(tapReceipt)
134+
} else {
135+
this.logger.error(`Error collecting payment for agreement ${agreement.id}`, {
136+
error: response.status,
137+
})
138+
}
139+
} catch (error) {
140+
this.logger.error(`Error collecting payment for agreement ${agreement.id}`, {
141+
error,
142+
})
143+
}
77144

78145
// Mark the agreement as having had a payment collected
79146
await this.models.IndexingAgreement.update(
@@ -82,7 +149,7 @@ export class DipsManager {
82149
},
83150
{
84151
where: {
85-
last_allocation_id: lastAllocationId,
152+
id: agreement.id,
86153
},
87154
},
88155
)

packages/indexer-common/src/network.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ export class Network {
5252
specification: spec.NetworkSpecification
5353
paused: Eventual<boolean>
5454
isOperator: Eventual<boolean>
55-
55+
queryFeeModels: QueryFeeModels
5656
private constructor(
5757
logger: Logger,
5858
contracts: NetworkContracts,
@@ -66,6 +66,7 @@ export class Network {
6666
specification: spec.NetworkSpecification,
6767
paused: Eventual<boolean>,
6868
isOperator: Eventual<boolean>,
69+
queryFeeModels: QueryFeeModels,
6970
) {
7071
this.logger = logger
7172
this.contracts = contracts
@@ -79,6 +80,7 @@ export class Network {
7980
this.specification = specification
8081
this.paused = paused
8182
this.isOperator = isOperator
83+
this.queryFeeModels = queryFeeModels
8284
}
8385

8486
static async create(
@@ -345,6 +347,7 @@ export class Network {
345347
specification,
346348
paused,
347349
isOperator,
350+
queryFeeModels,
348351
)
349352
}
350353

packages/indexer-common/src/query-fees/models.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,20 @@ export interface ScalarTapReceiptsAttributes {
1313
value: bigint
1414
error_log?: string
1515
}
16+
export interface ScalarTapReceiptsCreationAttributes {
17+
allocation_id: Address
18+
signer_address: Address
19+
signature: Uint8Array
20+
timestamp_ns: bigint
21+
nonce: bigint
22+
value: bigint
23+
}
24+
1625
export class ScalarTapReceipts
17-
extends Model<ScalarTapReceiptsAttributes>
26+
extends Model<ScalarTapReceiptsAttributes, ScalarTapReceiptsCreationAttributes>
1827
implements ScalarTapReceiptsAttributes
1928
{
20-
public id!: number
29+
public id!: CreationOptional<number>
2130
public allocation_id!: Address
2231
public signer_address!: Address
2332
public signature!: Uint8Array

0 commit comments

Comments
 (0)