Skip to content

Commit 0eda5d3

Browse files
fix: refactor bulk indexation to avoid collisions between on-chain and off-chain handlers
1 parent 77d22e8 commit 0eda5d3

File tree

4 files changed

+179
-120
lines changed

4 files changed

+179
-120
lines changed

schema.graphql

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,7 @@ type Dataset implements Ressource @entity {
149149
checksum: Bytes!
150150
timestamp: BigInt! # last transfer
151151
usages: [Deal!]! @derivedFrom(field: "dataset")
152+
bulkUsages: [BulkSlice!]! @derivedFrom(field: "datasets")
152153
orders: [DatasetOrder!]! @derivedFrom(field: "dataset")
153154
transfers: [DatasetTransfer!]! @derivedFrom(field: "dataset")
154155
}
@@ -344,18 +345,20 @@ type Deal @entity {
344345
type Bulk @entity(immutable: true) {
345346
id: ID!
346347
hash: String!
347-
content: String!
348348
slices: [BulkSlice!] @derivedFrom(field: "bulk")
349-
deals: [Deal!]! @derivedFrom(field: "bulk")
349+
deal: Deal! @derivedFrom(field: "bulk")
350+
content: String! # raw content (is it useful?)
350351
}
351352

352353
type BulkSlice @entity(immutable: true) {
353354
id: ID!
355+
bulk: Bulk!
354356
hash: String!
355-
content: String!
356-
bulk: Bulk!
357+
task: Task # task may not be initialized at the time of BulkSlice creation
357358
index: BigInt!
358-
datasetOrders: [DatasetOrder!]
359+
datasets: [Dataset!]!
360+
datasetOrders: [DatasetOrder!]!
361+
content: String! # raw content (is it useful?)
359362
}
360363

361364
enum TaskStatus {
@@ -384,6 +387,7 @@ type Task @entity {
384387
rewards: [Reward!]! @derivedFrom(field: "task")
385388
seizes: [Seize!]! @derivedFrom(field: "task")
386389
events: [TaskEvent!]! @derivedFrom(field: "task")
390+
bulkSlice: BulkSlice @derivedFrom(field: "task")
387391
}
388392

389393
enum ContributionStatus {

src/Modules/Bulk.ts

Lines changed: 133 additions & 109 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,16 @@ import {
1010
} from '@graphprotocol/graph-ts';
1111
import { Bulk, BulkSlice } from '../../generated/schema';
1212
import {
13+
computeTaskId,
14+
CONTEXT_BOT_FIRST,
15+
CONTEXT_BOT_SIZE,
1316
CONTEXT_BULK,
17+
CONTEXT_DEAL,
1418
CONTEXT_DOMAIN_SEPARATOR_HASH,
1519
CONTEXT_INDEX,
16-
CONTEXT_REQUESTHASH,
20+
createBulkOrderID,
1721
createBulkSliceID,
1822
fetchDatasetorder,
19-
hashDatasetOrder,
2023
isAddressString,
2124
isBytes32String,
2225
isHexString,
@@ -27,18 +30,19 @@ import {
2730
export function handleBulk(content: Bytes): void {
2831
const hash = dataSource.stringParam();
2932
const context = dataSource.context();
30-
const requestorder = context.getString(CONTEXT_REQUESTHASH);
3133
const domainSeparator = context.getBytes(CONTEXT_DOMAIN_SEPARATOR_HASH);
34+
const dealId = context.getString(CONTEXT_DEAL);
35+
const botFirst = context.getBigInt(CONTEXT_BOT_FIRST);
36+
const botSize = context.getBigInt(CONTEXT_BOT_SIZE);
3237

33-
// multiple deals using same requestorder use the same bulk, we use requestorderHash as bulk ID
34-
const bulkid = requestorder;
38+
const bulkId = dealId;
3539

36-
let bulk = Bulk.load(bulkid);
40+
let bulk = Bulk.load(bulkId);
3741
if (bulk != null) {
3842
// immutable bulk already exists nothing to do
3943
return;
4044
}
41-
bulk = new Bulk(bulkid);
45+
bulk = new Bulk(bulkId);
4246
bulk.hash = hash;
4347
bulk.content = content.toString();
4448

@@ -51,13 +55,20 @@ export function handleBulk(content: Bytes): void {
5155

5256
if (isIntegerString(entry.key)) {
5357
const index = BigInt.fromString(entry.key);
54-
if (entry.value.kind == JSONValueKind.OBJECT) {
58+
if (
59+
// exclude slice out of deal bot range
60+
index >= botFirst &&
61+
index < botFirst.plus(botSize) &&
62+
entry.value.kind == JSONValueKind.OBJECT
63+
) {
5564
let sliceCid = entry.value.toObject().getEntry('orders');
5665
if (sliceCid != null && sliceCid.value.kind == JSONValueKind.STRING) {
5766
let sliceContext = new DataSourceContext();
58-
sliceContext.setString(CONTEXT_BULK, bulkid);
67+
sliceContext.setString(CONTEXT_BULK, bulkId);
68+
sliceContext.setString(CONTEXT_DEAL, dealId);
5969
sliceContext.setBigInt(CONTEXT_INDEX, index);
6070
sliceContext.setBytes(CONTEXT_DOMAIN_SEPARATOR_HASH, domainSeparator);
71+
6172
DataSourceTemplate.createWithContext(
6273
'BulkSlice',
6374
[sliceCid.value.toString()],
@@ -76,113 +87,126 @@ export function handleBulkSlice(content: Bytes): void {
7687
const hash = dataSource.stringParam();
7788
const context = dataSource.context();
7889
const bulk = context.getString(CONTEXT_BULK);
90+
const dealId = context.getString(CONTEXT_DEAL);
7991
const index = context.getBigInt(CONTEXT_INDEX);
80-
const domainSeparator = context.getBytes(CONTEXT_DOMAIN_SEPARATOR_HASH);
92+
const taskId = computeTaskId(dealId, index);
8193

82-
const bulkSliceId = createBulkSliceID(bulk, index);
94+
if (taskId !== null) {
95+
const bulkSliceId = createBulkSliceID(dealId, index);
96+
let bulkSlice = BulkSlice.load(bulkSliceId);
97+
if (bulkSlice != null) {
98+
// immutable bulk slice already exists nothing to do
99+
return;
100+
}
101+
bulkSlice = new BulkSlice(bulkSliceId);
102+
bulkSlice.task = taskId;
103+
bulkSlice.hash = hash;
104+
bulkSlice.bulk = bulk;
105+
bulkSlice.index = index;
106+
bulkSlice.content = content.toString();
107+
bulkSlice.datasets = new Array<string>();
108+
bulkSlice.datasetOrders = new Array<string>();
83109

84-
let bulkSlice = BulkSlice.load(bulkSliceId);
85-
if (bulkSlice != null) {
86-
// immutable bulk slice already exists nothing to do
87-
return;
88-
}
89-
bulkSlice = new BulkSlice(createBulkSliceID(bulk, index));
90-
bulkSlice.hash = hash;
91-
bulkSlice.bulk = bulk;
92-
bulkSlice.index = index;
93-
bulkSlice.content = content.toString();
110+
const jsonContent = json.try_fromBytes(content);
111+
if (jsonContent.isOk && jsonContent.value.kind == JSONValueKind.ARRAY) {
112+
const datasetOrderArray = jsonContent.value.toArray();
94113

95-
const jsonContent = json.try_fromBytes(content);
96-
if (jsonContent.isOk && jsonContent.value.kind == JSONValueKind.ARRAY) {
97-
const datasetOrderArray = jsonContent.value.toArray();
98-
for (let i = 0; i < datasetOrderArray.length; i++) {
99-
const datasetOrder = datasetOrderArray[i];
100-
if (datasetOrder.kind == JSONValueKind.OBJECT) {
101-
const orderObj = datasetOrder.toObject();
102-
103-
const datasetEntry = orderObj.getEntry('dataset');
104-
const datasetPriceEntry = orderObj.getEntry('datasetprice');
105-
const volumeEntry = orderObj.getEntry('volume');
106-
const tagEntry = orderObj.getEntry('tag');
107-
const apprestrictEntry = orderObj.getEntry('apprestrict');
108-
const workerpoolrestrictEntry = orderObj.getEntry('workerpoolrestrict');
109-
const requesterrestrictEntry = orderObj.getEntry('requesterrestrict');
110-
const saltEntry = orderObj.getEntry('salt');
111-
const signEntry = orderObj.getEntry('sign');
112-
// check that all entries are present and valid
113-
if (
114-
datasetEntry != null &&
115-
datasetEntry.value.kind == JSONValueKind.STRING &&
116-
isAddressString(datasetEntry.value.toString().toLowerCase()) &&
117-
datasetPriceEntry != null &&
118-
datasetPriceEntry.value.kind == JSONValueKind.STRING &&
119-
isIntegerString(datasetPriceEntry.value.toString()) &&
120-
volumeEntry != null &&
121-
volumeEntry.value.kind == JSONValueKind.STRING &&
122-
isIntegerString(volumeEntry.value.toString()) &&
123-
tagEntry != null &&
124-
tagEntry.value.kind == JSONValueKind.STRING &&
125-
isBytes32String(tagEntry.value.toString()) &&
126-
apprestrictEntry != null &&
127-
apprestrictEntry.value.kind == JSONValueKind.STRING &&
128-
isAddressString(apprestrictEntry.value.toString().toLowerCase()) &&
129-
workerpoolrestrictEntry != null &&
130-
workerpoolrestrictEntry.value.kind == JSONValueKind.STRING &&
131-
isAddressString(workerpoolrestrictEntry.value.toString().toLowerCase()) &&
132-
requesterrestrictEntry != null &&
133-
requesterrestrictEntry.value.kind == JSONValueKind.STRING &&
134-
isAddressString(requesterrestrictEntry.value.toString().toLowerCase()) &&
135-
saltEntry != null &&
136-
saltEntry.value.kind == JSONValueKind.STRING &&
137-
isBytes32String(saltEntry.value.toString()) &&
138-
signEntry != null &&
139-
signEntry.value.kind == JSONValueKind.STRING &&
140-
isHexString(signEntry.value.toString())
141-
) {
142-
// compute order hash with domain separator from contract
143-
const orderHash = hashDatasetOrder(
144-
Address.fromString(datasetEntry.value.toString()),
145-
BigInt.fromString(datasetPriceEntry.value.toString()),
146-
BigInt.fromString(volumeEntry.value.toString()),
147-
Bytes.fromHexString(tagEntry.value.toString()),
148-
Address.fromString(apprestrictEntry.value.toString()),
149-
Address.fromString(workerpoolrestrictEntry.value.toString()),
150-
Address.fromString(requesterrestrictEntry.value.toString()),
151-
Bytes.fromHexString(saltEntry.value.toString()),
152-
domainSeparator,
153-
);
154-
155-
// store dataset order
156-
let datasetOrder = fetchDatasetorder(orderHash.toHex());
157-
datasetOrder.dataset = datasetEntry.value.toString().toLowerCase();
158-
datasetOrder.datasetprice = toRLC(
159-
BigInt.fromString(datasetPriceEntry.value.toString()),
160-
);
161-
datasetOrder.volume = BigInt.fromString(volumeEntry.value.toString());
162-
datasetOrder.tag = Bytes.fromHexString(tagEntry.value.toString());
163-
datasetOrder.apprestrict = Address.fromString(
164-
apprestrictEntry.value.toString().toLowerCase(),
165-
);
166-
datasetOrder.workerpoolrestrict = Address.fromString(
167-
workerpoolrestrictEntry.value.toString().toLowerCase(),
168-
);
169-
datasetOrder.requesterrestrict = Address.fromString(
170-
requesterrestrictEntry.value.toString().toLowerCase(),
171-
);
172-
datasetOrder.salt = Bytes.fromHexString(saltEntry.value.toString());
173-
datasetOrder.sign = Bytes.fromHexString(signEntry.value.toString());
174-
datasetOrder.save();
175-
176-
let datasetOrders = bulkSlice.datasetOrders;
177-
if (datasetOrders == null) {
178-
datasetOrders = new Array<string>();
114+
for (let i = 0; i < datasetOrderArray.length; i++) {
115+
const datasetOrder = datasetOrderArray[i];
116+
if (datasetOrder.kind == JSONValueKind.OBJECT) {
117+
const orderObj = datasetOrder.toObject();
118+
119+
const datasetEntry = orderObj.getEntry('dataset');
120+
const datasetPriceEntry = orderObj.getEntry('datasetprice');
121+
const volumeEntry = orderObj.getEntry('volume');
122+
const tagEntry = orderObj.getEntry('tag');
123+
const apprestrictEntry = orderObj.getEntry('apprestrict');
124+
const workerpoolrestrictEntry = orderObj.getEntry('workerpoolrestrict');
125+
const requesterrestrictEntry = orderObj.getEntry('requesterrestrict');
126+
const saltEntry = orderObj.getEntry('salt');
127+
const signEntry = orderObj.getEntry('sign');
128+
// check that all entries are present and valid
129+
if (
130+
datasetEntry != null &&
131+
datasetEntry.value.kind == JSONValueKind.STRING &&
132+
isAddressString(datasetEntry.value.toString().toLowerCase()) &&
133+
datasetPriceEntry != null &&
134+
datasetPriceEntry.value.kind == JSONValueKind.STRING &&
135+
isIntegerString(datasetPriceEntry.value.toString()) &&
136+
volumeEntry != null &&
137+
volumeEntry.value.kind == JSONValueKind.STRING &&
138+
isIntegerString(volumeEntry.value.toString()) &&
139+
tagEntry != null &&
140+
tagEntry.value.kind == JSONValueKind.STRING &&
141+
isBytes32String(tagEntry.value.toString()) &&
142+
apprestrictEntry != null &&
143+
apprestrictEntry.value.kind == JSONValueKind.STRING &&
144+
isAddressString(apprestrictEntry.value.toString().toLowerCase()) &&
145+
workerpoolrestrictEntry != null &&
146+
workerpoolrestrictEntry.value.kind == JSONValueKind.STRING &&
147+
isAddressString(workerpoolrestrictEntry.value.toString().toLowerCase()) &&
148+
requesterrestrictEntry != null &&
149+
requesterrestrictEntry.value.kind == JSONValueKind.STRING &&
150+
isAddressString(requesterrestrictEntry.value.toString().toLowerCase()) &&
151+
saltEntry != null &&
152+
saltEntry.value.kind == JSONValueKind.STRING &&
153+
isBytes32String(saltEntry.value.toString()) &&
154+
signEntry != null &&
155+
signEntry.value.kind == JSONValueKind.STRING &&
156+
isHexString(signEntry.value.toString())
157+
) {
158+
// datasetOrderId cannot be orderHash as it could collide with on-chain indexed order
159+
const datasetOrderId = createBulkOrderID(taskId, BigInt.fromI32(i));
160+
161+
const datasetAddress = datasetEntry.value.toString().toLowerCase();
162+
163+
let datasetOrder = fetchDatasetorder(datasetOrderId);
164+
datasetOrder.dataset = datasetAddress;
165+
datasetOrder.datasetprice = toRLC(
166+
BigInt.fromString(datasetPriceEntry.value.toString()),
167+
);
168+
datasetOrder.volume = BigInt.fromString(volumeEntry.value.toString());
169+
datasetOrder.tag = Bytes.fromHexString(tagEntry.value.toString());
170+
datasetOrder.apprestrict = Address.fromString(
171+
apprestrictEntry.value.toString().toLowerCase(),
172+
);
173+
datasetOrder.workerpoolrestrict = Address.fromString(
174+
workerpoolrestrictEntry.value.toString().toLowerCase(),
175+
);
176+
datasetOrder.requesterrestrict = Address.fromString(
177+
requesterrestrictEntry.value.toString().toLowerCase(),
178+
);
179+
datasetOrder.salt = Bytes.fromHexString(saltEntry.value.toString());
180+
datasetOrder.sign = Bytes.fromHexString(signEntry.value.toString());
181+
182+
// todo: it may be useful to keep on order entity?
183+
// compute order hash with domain separator from contract
184+
// const domainSeparator = context.getBytes(CONTEXT_DOMAIN_SEPARATOR_HASH);
185+
// const orderHash = hashDatasetOrder(
186+
// Address.fromString(datasetEntry.value.toString()),
187+
// BigInt.fromString(datasetPriceEntry.value.toString()),
188+
// BigInt.fromString(volumeEntry.value.toString()),
189+
// Bytes.fromHexString(tagEntry.value.toString()),
190+
// Address.fromString(apprestrictEntry.value.toString()),
191+
// Address.fromString(workerpoolrestrictEntry.value.toString()),
192+
// Address.fromString(requesterrestrictEntry.value.toString()),
193+
// Bytes.fromHexString(saltEntry.value.toString()),
194+
// domainSeparator,
195+
// );
196+
// order.hash = orderHash;
197+
datasetOrder.save();
198+
199+
let datasetOrders = bulkSlice.datasetOrders;
200+
datasetOrders.push(datasetOrderId);
201+
bulkSlice.datasetOrders = datasetOrders;
202+
203+
let datasets = bulkSlice.datasets;
204+
datasets.push(datasetAddress);
205+
bulkSlice.datasets = datasets;
179206
}
180-
datasetOrders.push(orderHash.toHex());
181-
bulkSlice.datasetOrders = datasetOrders;
182207
}
183208
}
184209
}
210+
bulkSlice.save();
185211
}
186-
187-
bulkSlice.save();
188212
}

src/Modules/IexecPoco.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,10 @@ import {
4242
} from '../../generated/schema';
4343

4444
import {
45+
CONTEXT_BOT_FIRST,
46+
CONTEXT_BOT_SIZE,
47+
CONTEXT_DEAL,
4548
CONTEXT_DOMAIN_SEPARATOR_HASH,
46-
CONTEXT_REQUESTHASH,
4749
createContributionID,
4850
createEventID,
4951
fetchAccount,
@@ -110,14 +112,16 @@ export function handleOrdersMatched(event: OrdersMatchedEvent): void {
110112
const bulkCid = params.value.toObject().getEntry('bulk_cid');
111113
if (bulkCid) {
112114
// the same bulk is used by any deal using the same requestorder => we use requestorderHash as bulk ID
113-
const bulkId = event.params.requestHash.toHex();
115+
const bulkId = event.params.dealid.toHex();
114116
// create the bulk if not existing yet
115117
const indexedBulk = Bulk.load(bulkId);
116118
if (!indexedBulk) {
117119
let context = new DataSourceContext();
118-
context.setString(CONTEXT_REQUESTHASH, bulkId);
119-
// pass the domainSeparator to the template, as it cannot be retrieved from the contract in the template
120+
// Pass onchain data that will be needed in file handlers
120121
const domainSeparator = contract.eip712domain_separator();
122+
context.setString(CONTEXT_DEAL, deal.id);
123+
context.setBigInt(CONTEXT_BOT_FIRST, deal.botFirst);
124+
context.setBigInt(CONTEXT_BOT_SIZE, deal.botSize);
121125
context.setBytes(CONTEXT_DOMAIN_SEPARATOR_HASH, domainSeparator);
122126
DataSourceTemplate.createWithContext('Bulk', [bulkCid.value.toString()], context);
123127
}

0 commit comments

Comments
 (0)