Skip to content

Commit 0462170

Browse files
feat: index bulks (#57)
1 parent 9fa6f2c commit 0462170

File tree

5 files changed

+507
-170
lines changed

5 files changed

+507
-170
lines changed

schema.graphql

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ type Dataset implements Ressource @entity {
149149
checksum: Bytes!
150150
timestamp: BigInt! # last transfer
151151
usages: [Deal!]! @derivedFrom(field: "dataset")
152+
bulkUsages: [BulkSlice!]! @derivedFrom(field: "datasets")
152153
orders: [DatasetOrder!]! @derivedFrom(field: "dataset")
153154
transfers: [DatasetTransfer!]! @derivedFrom(field: "dataset")
154155
}
@@ -246,7 +247,7 @@ type AppOrder @entity {
246247

247248
type DatasetOrder @entity {
248249
id: ID!
249-
dataset: Dataset!
250+
dataset: Dataset # nullable: off-chain bulk datasetorder could reference non-existing dataset
250251
datasetprice: BigDecimal!
251252
volume: BigInt
252253
tag: Bytes
@@ -333,11 +334,29 @@ type Deal @entity {
333334
schedulerRewardRatio: BigInt!
334335
sponsor: Account!
335336
timestamp: BigInt! # creation
336-
apporder: AppOrder # todo: not available if not broadcasted
337-
datasetorder: DatasetOrder # todo: not available if not broadcasted
338-
workerpoolorder: WorkerpoolOrder # todo: not available if not broadcasted
339-
requestorder: RequestOrder # todo: not available if not broadcasted
337+
apporder: AppOrder
338+
datasetorder: DatasetOrder
339+
workerpoolorder: WorkerpoolOrder
340+
requestorder: RequestOrder
340341
events: [DealEvent!]! @derivedFrom(field: "deal")
342+
bulk: Bulk
343+
}
344+
345+
type Bulk @entity(immutable: true) {
346+
id: ID!
347+
hash: String!
348+
slices: [BulkSlice!]! @derivedFrom(field: "bulk")
349+
deal: Deal! @derivedFrom(field: "bulk")
350+
}
351+
352+
type BulkSlice @entity(immutable: true) {
353+
id: ID!
354+
bulk: Bulk!
355+
hash: String!
356+
task: Task # nullable: task may not be initialized at the time of BulkSlice creation
357+
index: BigInt!
358+
datasets: [Dataset!]!
359+
datasetOrders: [DatasetOrder!]!
341360
}
342361

343362
enum TaskStatus {
@@ -366,6 +385,7 @@ type Task @entity {
366385
rewards: [Reward!]! @derivedFrom(field: "task")
367386
seizes: [Seize!]! @derivedFrom(field: "task")
368387
events: [TaskEvent!]! @derivedFrom(field: "task")
388+
bulkSlice: BulkSlice @derivedFrom(field: "task")
369389
}
370390

371391
enum ContributionStatus {

src/Modules/Bulk.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import {
2+
Address,
3+
BigInt,
4+
Bytes,
5+
dataSource,
6+
DataSourceContext,
7+
DataSourceTemplate,
8+
json,
9+
JSONValueKind,
10+
} from '@graphprotocol/graph-ts';
11+
import { Bulk, BulkSlice } from '../../generated/schema';
12+
import {
13+
computeTaskId,
14+
CONTEXT_BOT_FIRST,
15+
CONTEXT_BOT_SIZE,
16+
CONTEXT_BULK,
17+
CONTEXT_DEAL,
18+
CONTEXT_INDEX,
19+
createBulkOrderID,
20+
createBulkSliceID,
21+
fetchDatasetorder,
22+
isAddressString,
23+
isBytes32String,
24+
isHexString,
25+
isIntegerString,
26+
toRLC,
27+
} from '../utils';
28+
29+
export function handleBulk(content: Bytes): void {
30+
const hash = dataSource.stringParam();
31+
const context = dataSource.context();
32+
const dealId = context.getString(CONTEXT_DEAL);
33+
const botFirst = context.getBigInt(CONTEXT_BOT_FIRST);
34+
const botSize = context.getBigInt(CONTEXT_BOT_SIZE);
35+
36+
const bulkId = dealId;
37+
let bulk = Bulk.load(bulkId);
38+
if (bulk != null) {
39+
// immutable bulk already exists nothing to do
40+
return;
41+
}
42+
bulk = new Bulk(bulkId);
43+
bulk.hash = hash;
44+
45+
const jsonContent = json.try_fromBytes(content);
46+
if (jsonContent.isOk && jsonContent.value.kind == JSONValueKind.ARRAY) {
47+
const contentArray = jsonContent.value.toArray();
48+
49+
for (let i = 0; i < contentArray.length; i++) {
50+
const entry = contentArray[i];
51+
const index = BigInt.fromI32(i);
52+
if (
53+
// exclude slice out of deal bot range
54+
index >= botFirst &&
55+
index < botFirst.plus(botSize) &&
56+
entry.kind == JSONValueKind.STRING
57+
) {
58+
const sliceCid = entry.toString();
59+
let sliceContext = new DataSourceContext();
60+
sliceContext.setString(CONTEXT_BULK, bulkId);
61+
sliceContext.setString(CONTEXT_DEAL, dealId);
62+
sliceContext.setBigInt(CONTEXT_INDEX, index);
63+
DataSourceTemplate.createWithContext('BulkSlice', [sliceCid], sliceContext);
64+
}
65+
}
66+
}
67+
68+
bulk.save();
69+
}
70+
71+
export function handleBulkSlice(content: Bytes): void {
72+
const hash = dataSource.stringParam();
73+
const context = dataSource.context();
74+
const bulk = context.getString(CONTEXT_BULK);
75+
const dealId = context.getString(CONTEXT_DEAL);
76+
const index = context.getBigInt(CONTEXT_INDEX);
77+
const taskId = computeTaskId(dealId, index);
78+
79+
if (taskId !== null) {
80+
const bulkSliceId = createBulkSliceID(dealId, index);
81+
let bulkSlice = BulkSlice.load(bulkSliceId);
82+
if (bulkSlice != null) {
83+
// immutable bulk slice already exists nothing to do
84+
return;
85+
}
86+
bulkSlice = new BulkSlice(bulkSliceId);
87+
bulkSlice.task = taskId;
88+
bulkSlice.hash = hash;
89+
bulkSlice.bulk = bulk;
90+
bulkSlice.index = index;
91+
bulkSlice.datasets = new Array<string>();
92+
bulkSlice.datasetOrders = new Array<string>();
93+
94+
const jsonContent = json.try_fromBytes(content);
95+
if (jsonContent.isOk && jsonContent.value.kind == JSONValueKind.ARRAY) {
96+
const datasetOrderArray = jsonContent.value.toArray();
97+
98+
for (let i = 0; i < datasetOrderArray.length; i++) {
99+
const datasetOrder = datasetOrderArray[i];
100+
if (datasetOrder.kind == JSONValueKind.OBJECT) {
101+
const orderObj = datasetOrder.toObject();
102+
103+
const datasetEntry = orderObj.getEntry('dataset');
104+
const datasetPriceEntry = orderObj.getEntry('datasetprice');
105+
const volumeEntry = orderObj.getEntry('volume');
106+
const tagEntry = orderObj.getEntry('tag');
107+
const apprestrictEntry = orderObj.getEntry('apprestrict');
108+
const workerpoolrestrictEntry = orderObj.getEntry('workerpoolrestrict');
109+
const requesterrestrictEntry = orderObj.getEntry('requesterrestrict');
110+
const saltEntry = orderObj.getEntry('salt');
111+
const signEntry = orderObj.getEntry('sign');
112+
// check that all entries are present and valid
113+
if (
114+
datasetEntry != null &&
115+
datasetEntry.value.kind == JSONValueKind.STRING &&
116+
isAddressString(datasetEntry.value.toString().toLowerCase()) &&
117+
datasetPriceEntry != null &&
118+
datasetPriceEntry.value.kind == JSONValueKind.STRING &&
119+
isIntegerString(datasetPriceEntry.value.toString()) &&
120+
volumeEntry != null &&
121+
volumeEntry.value.kind == JSONValueKind.STRING &&
122+
isIntegerString(volumeEntry.value.toString()) &&
123+
tagEntry != null &&
124+
tagEntry.value.kind == JSONValueKind.STRING &&
125+
isBytes32String(tagEntry.value.toString()) &&
126+
apprestrictEntry != null &&
127+
apprestrictEntry.value.kind == JSONValueKind.STRING &&
128+
isAddressString(apprestrictEntry.value.toString().toLowerCase()) &&
129+
workerpoolrestrictEntry != null &&
130+
workerpoolrestrictEntry.value.kind == JSONValueKind.STRING &&
131+
isAddressString(workerpoolrestrictEntry.value.toString().toLowerCase()) &&
132+
requesterrestrictEntry != null &&
133+
requesterrestrictEntry.value.kind == JSONValueKind.STRING &&
134+
isAddressString(requesterrestrictEntry.value.toString().toLowerCase()) &&
135+
saltEntry != null &&
136+
saltEntry.value.kind == JSONValueKind.STRING &&
137+
isBytes32String(saltEntry.value.toString()) &&
138+
signEntry != null &&
139+
signEntry.value.kind == JSONValueKind.STRING &&
140+
isHexString(signEntry.value.toString())
141+
) {
142+
// datasetOrderId cannot be orderHash as it could collide with on-chain indexed order
143+
const datasetOrderId = createBulkOrderID(taskId, BigInt.fromI32(i));
144+
145+
const datasetAddress = datasetEntry.value.toString().toLowerCase();
146+
147+
let datasetOrder = fetchDatasetorder(datasetOrderId);
148+
datasetOrder.dataset = datasetAddress;
149+
datasetOrder.datasetprice = toRLC(
150+
BigInt.fromString(datasetPriceEntry.value.toString()),
151+
);
152+
datasetOrder.volume = BigInt.fromString(volumeEntry.value.toString());
153+
datasetOrder.tag = Bytes.fromHexString(tagEntry.value.toString());
154+
datasetOrder.apprestrict = Address.fromString(
155+
apprestrictEntry.value.toString().toLowerCase(),
156+
);
157+
datasetOrder.workerpoolrestrict = Address.fromString(
158+
workerpoolrestrictEntry.value.toString().toLowerCase(),
159+
);
160+
datasetOrder.requesterrestrict = Address.fromString(
161+
requesterrestrictEntry.value.toString().toLowerCase(),
162+
);
163+
datasetOrder.salt = Bytes.fromHexString(saltEntry.value.toString());
164+
datasetOrder.sign = Bytes.fromHexString(signEntry.value.toString());
165+
datasetOrder.save();
166+
167+
let datasetOrders = bulkSlice.datasetOrders;
168+
datasetOrders.push(datasetOrderId);
169+
bulkSlice.datasetOrders = datasetOrders;
170+
171+
let datasets = bulkSlice.datasets;
172+
datasets.push(datasetAddress);
173+
bulkSlice.datasets = datasets;
174+
}
175+
}
176+
}
177+
}
178+
bulkSlice.save();
179+
}
180+
}

src/Modules/IexecPoco.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
// SPDX-FileCopyrightText: 2020-2025 IEXEC BLOCKCHAIN TECH <[email protected]>
22
// SPDX-License-Identifier: Apache-2.0
33

4-
import { Address, BigInt, dataSource } from '@graphprotocol/graph-ts';
4+
import {
5+
Address,
6+
BigInt,
7+
dataSource,
8+
DataSourceContext,
9+
DataSourceTemplate,
10+
json,
11+
} from '@graphprotocol/graph-ts';
512
const chainName = dataSource.network();
613

714
import {
@@ -34,6 +41,9 @@ import {
3441
} from '../../generated/schema';
3542

3643
import {
44+
CONTEXT_BOT_FIRST,
45+
CONTEXT_BOT_SIZE,
46+
CONTEXT_DEAL,
3747
createContributionID,
3848
createEventID,
3949
fetchAccount,
@@ -94,17 +104,40 @@ export function handleOrdersMatched(event: OrdersMatchedEvent): void {
94104
deal.timestamp = event.block.timestamp;
95105
deal.save();
96106

107+
// if no dataset, check if params include a bulk_cid reference
108+
if (deal.dataset == Address.zero().toHex()) {
109+
const params = json.try_fromString(viewedDeal.params);
110+
if (params.isOk) {
111+
const bulkCid = params.value.toObject().getEntry('bulk_cid');
112+
if (bulkCid) {
113+
// the same bulk may be used by many deals => we use dealid as bulk ID to avoid collisions
114+
const bulkId = event.params.dealid.toHex();
115+
let context = new DataSourceContext();
116+
// Pass onchain data that will be needed in file handlers
117+
context.setString(CONTEXT_DEAL, deal.id);
118+
context.setBigInt(CONTEXT_BOT_FIRST, deal.botFirst);
119+
context.setBigInt(CONTEXT_BOT_SIZE, deal.botSize);
120+
DataSourceTemplate.createWithContext('Bulk', [bulkCid.value.toString()], context);
121+
// bulk may not be indexed, this is not an issue, the model will prune it
122+
deal.bulk = bulkId;
123+
deal.save();
124+
}
125+
}
126+
}
127+
97128
const dataset = deal.dataset;
98129

99130
let apporder = fetchApporder(event.params.appHash.toHex());
100131
apporder.app = deal.app;
101132
apporder.appprice = deal.appPrice;
102133
apporder.save();
103134

104-
let datasetorder = fetchDatasetorder(event.params.datasetHash.toHex());
105-
if (dataset) datasetorder.dataset = dataset;
106-
datasetorder.datasetprice = deal.datasetPrice;
107-
datasetorder.save();
135+
if (dataset != Address.zero().toHex()) {
136+
let datasetorder = fetchDatasetorder(event.params.datasetHash.toHex());
137+
if (dataset) datasetorder.dataset = dataset;
138+
datasetorder.datasetprice = deal.datasetPrice;
139+
datasetorder.save();
140+
}
108141

109142
let workerpoolorder = fetchWorkerpoolorder(event.params.workerpoolHash.toHex());
110143
workerpoolorder.workerpool = deal.workerpool;

0 commit comments

Comments
 (0)