Skip to content

Commit f433541

Browse files
Merge pull request #4012 from OriginTrail/improvment/add-timeout-to-ask-query
Add ASK timeout to blazgraph query
2 parents 5096587 + fc87600 commit f433541

File tree

6 files changed

+191
-51
lines changed

6 files changed

+191
-51
lines changed

config/config.json

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@
9595
"query": 60000,
9696
"get": 10000,
9797
"batchGet": 10000,
98-
"insert": 300000
98+
"insert": 300000,
99+
"ask": 10000
99100
},
100101
"implementation": {
101102
"ot-blazegraph": {
@@ -349,7 +350,8 @@
349350
"query": 60000,
350351
"get": 10000,
351352
"batchGet": 10000,
352-
"insert": 300000
353+
"insert": 300000,
354+
"ask": 10000
353355
},
354356
"implementation": {
355357
"ot-blazegraph": {
@@ -567,7 +569,8 @@
567569
"query": 60000,
568570
"get": 10000,
569571
"batchGet": 10000,
570-
"insert": 300000
572+
"insert": 300000,
573+
"ask": 10000
571574
},
572575
"implementation": {
573576
"ot-blazegraph": {
@@ -776,7 +779,8 @@
776779
"query": 60000,
777780
"get": 10000,
778781
"batchGet": 10000,
779-
"insert": 300000
782+
"insert": 300000,
783+
"ask": 10000
780784
},
781785
"implementation": {
782786
"ot-blazegraph": {
@@ -991,7 +995,8 @@
991995
"query": 60000,
992996
"get": 10000,
993997
"batchGet": 10000,
994-
"insert": 300000
998+
"insert": 300000,
999+
"ask": 10000
9951000
},
9961001
"implementation": {
9971002
"ot-blazegraph": {

src/commands/protocols/get/receiver/v1.0.0/v1-0-0-handle-batch-get-request-command.js

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ class HandleBatchGetRequestCommand extends HandleProtocolMessageCommand {
3030
const { operationId, blockchain, includeMetadata } = commandData;
3131
let { uals, tokenIds } = commandData;
3232

33+
console.time(`HandleBatchGetRequestCommand [PREPARE]: ${operationId} ${uals.length}`);
3334
await this.operationIdService.updateOperationIdStatus(
3435
operationId,
3536
blockchain,
@@ -57,11 +58,16 @@ class HandleBatchGetRequestCommand extends HandleProtocolMessageCommand {
5758
}
5859
}
5960

61+
console.timeEnd(`HandleBatchGetRequestCommand [PREPARE]: ${operationId} ${uals.length}`);
62+
63+
console.time(`HandleBatchGetRequestCommand [PROCESSING]: ${operationId} ${uals.length}`);
64+
6065
const assertionPromise = this.tripleStoreService.getAssertionsInBatch(
6166
TRIPLE_STORE_REPOSITORY.DKG,
6267
uals,
6368
tokenIds,
6469
TRIPLES_VISIBILITY.PUBLIC,
70+
operationId,
6571
);
6672

6773
promises.push(assertionPromise);
@@ -81,6 +87,10 @@ class HandleBatchGetRequestCommand extends HandleProtocolMessageCommand {
8187
...(includeMetadata && metadata && { metadata }),
8288
};
8389

90+
console.timeEnd(`HandleBatchGetRequestCommand [PROCESSING]: ${operationId} ${uals.length}`);
91+
92+
console.time(`HandleBatchGetRequestCommand [RESPONSE]: ${operationId} ${uals.length}`);
93+
8494
if (assertions?.length) {
8595
await this.operationIdService.updateOperationIdStatus(
8696
operationId,
@@ -89,6 +99,8 @@ class HandleBatchGetRequestCommand extends HandleProtocolMessageCommand {
8999
);
90100
}
91101

102+
console.timeEnd(`HandleBatchGetRequestCommand [RESPONSE]: ${operationId} ${uals.length}`);
103+
92104
return { messageType: NETWORK_MESSAGE_TYPES.RESPONSES.ACK, messageData: responseData };
93105
}
94106

src/commands/protocols/get/sender/batch-get-command.js

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@ class BatchGetCommand extends Command {
6868
paranetNodesAccessPolicy,
6969
} = command.data;
7070

71+
console.time(`BatchGetCommand [PREPARE]: ${operationId} ${uals.length}`);
72+
7173
await this.operationIdService.updateOperationIdStatus(
7274
operationId,
7375
blockchain,
@@ -88,6 +90,8 @@ class BatchGetCommand extends Command {
8890

8991
const { isValid, errorMessage } = await this.validateUALs(operationId, blockchain, uals);
9092

93+
console.timeEnd(`BatchGetCommand [PREPARE]: ${operationId} ${uals.length}`);
94+
9195
if (!isValid) {
9296
await this.handleError(
9397
operationId,
@@ -98,6 +102,8 @@ class BatchGetCommand extends Command {
98102
return Command.empty();
99103
}
100104

105+
console.time(`BatchGetCommand [NETWORK]: ${operationId} ${uals.length}`);
106+
101107
const currentPeerId = this.networkModuleManager.getPeerId().toB58String();
102108
// let paranetId;
103109
const repository = TRIPLE_STORE_REPOSITORIES.DKG;
@@ -127,6 +133,10 @@ class BatchGetCommand extends Command {
127133
OPERATION_ID_STATUS.BATCH_GET.BATCH_GET_LOCAL_START,
128134
);
129135

136+
console.timeEnd(`BatchGetCommand [NETWORK]: ${operationId} ${uals.length}`);
137+
138+
console.time(`BatchGetCommand [TOKEN_IDS]: ${operationId} ${uals.length}`);
139+
130140
const tokenIds = {};
131141

132142
const tokenIdPromises = uals.map(async (ual) => {
@@ -149,19 +159,28 @@ class BatchGetCommand extends Command {
149159

150160
await Promise.all(tokenIdPromises);
151161

162+
console.timeEnd(`BatchGetCommand [TOKEN_IDS]: ${operationId} ${uals.length}`);
163+
164+
console.time(`BatchGetCommand [LOCAL_BATCH_GET]: ${operationId} ${uals.length}`);
165+
152166
const promises = [];
153167
const assertionPromise = this.tripleStoreService.getAssertionsInBatch(
154168
TRIPLE_STORE_REPOSITORY.DKG,
155169
uals,
156170
tokenIds,
157171
TRIPLES_VISIBILITY.PUBLIC,
172+
operationId,
158173
);
159174
promises.push(assertionPromise);
160175

161176
const [batchAssertions] = await Promise.all(promises);
162177

163178
const finalResult = { local: [], remote: {}, metadata: {} };
164179

180+
console.timeEnd(`BatchGetCommand [LOCAL_BATCH_GET]: ${operationId} ${uals.length}`);
181+
182+
console.time(`BatchGetCommand [LOCAL_BATCH_GET_VALIDATE]: ${operationId} ${uals.length}`);
183+
165184
const localGetResultValid = await this.validateBatchResponse(
166185
batchAssertions,
167186
blockchain,
@@ -178,6 +197,12 @@ class BatchGetCommand extends Command {
178197
(ual) => !localGetResultValid[ual],
179198
);
180199

200+
console.timeEnd(
201+
`BatchGetCommand [LOCAL_BATCH_GET_VALIDATE]: ${operationId} ${uals.length}`,
202+
);
203+
204+
console.time(`BatchGetCommand [LOCAL]: ${operationId} ${uals.length}`);
205+
181206
ualPresentLocally.forEach((ual) => {
182207
finalResult.local.push(ual);
183208
delete tokenIds[ual];
@@ -198,6 +223,8 @@ class BatchGetCommand extends Command {
198223
return Command.empty();
199224
}
200225

226+
console.timeEnd(`BatchGetCommand [LOCAL]: ${operationId} ${uals.length}`);
227+
201228
await this.operationIdService.updateOperationIdStatus(
202229
operationId,
203230
blockchain,
@@ -210,6 +237,8 @@ class BatchGetCommand extends Command {
210237
OPERATION_ID_STATUS.BATCH_GET.BATCH_GET_FIND_SHARD_START,
211238
);
212239

240+
console.time(`BatchGetCommand [FIND_SHARD]: ${operationId} ${uals.length}`);
241+
213242
let nodesInfo = [];
214243
// if (paranetNodesAccessPolicy === PARANET_ACCESS_POLICY.PERMISSIONED) {
215244
// const onChainNodes = await this.blockchainModuleManager.getPermissionedNodes(
@@ -247,12 +276,16 @@ class BatchGetCommand extends Command {
247276
return Command.empty();
248277
}
249278

279+
console.timeEnd(`BatchGetCommand [FIND_SHARD]: ${operationId} ${uals.length}`);
280+
250281
await this.operationIdService.updateOperationIdStatus(
251282
operationId,
252283
blockchain,
253284
OPERATION_ID_STATUS.BATCH_GET.BATCH_GET_FIND_SHARD_END,
254285
);
255286

287+
console.time(`BatchGetCommand [NETWORK]: ${operationId} ${uals.length}`);
288+
256289
let index = 0;
257290
let commandCompleted = false;
258291

@@ -280,12 +313,21 @@ class BatchGetCommand extends Command {
280313
// eslint-disable-next-line no-loop-func
281314
const messagePromises = batch.map(async (node) => {
282315
try {
316+
console.time(
317+
`BatchGetCommand [NETWORK_SEND_MESSAGE]: ${operationId} ${uals.length} ${node.id}`,
318+
);
283319
const result = await this.sendMessage(node, operationId, message);
320+
console.timeEnd(
321+
`BatchGetCommand [NETWORK_SEND_MESSAGE]: ${operationId} ${uals.length} ${node.id}`,
322+
);
284323

285324
if (commandCompleted || !result.success) {
286325
return;
287326
}
288327

328+
console.time(
329+
`BatchGetCommand [NETWORK_VALIDATE_RESPONSE]: ${operationId} ${uals.length} ${node.id}`,
330+
);
289331
const validationResult = await this.validateBatchResponse(
290332
result.responseData.assertions,
291333
blockchain,
@@ -294,6 +336,9 @@ class BatchGetCommand extends Command {
294336
finalResult,
295337
[OPERATION_ID_STATUS.GET.GET_END, OPERATION_ID_STATUS.COMPLETED],
296338
);
339+
console.timeEnd(
340+
`BatchGetCommand [NETWORK_VALIDATE_RESPONSE]: ${operationId} ${uals.length} ${node.id}`,
341+
);
297342

298343
if (commandCompleted) {
299344
return;
@@ -312,12 +357,18 @@ class BatchGetCommand extends Command {
312357

313358
if (hasReachedThreshold() && !commandCompleted) {
314359
commandCompleted = true;
360+
console.time(
361+
`BatchGetCommand [NETWORK_MARK_AS_COMPLETED]: ${operationId} ${uals.length} ${node.id}`,
362+
);
315363
await this.operationService.markOperationAsCompleted(
316364
operationId,
317365
blockchain,
318366
finalResult,
319367
[OPERATION_ID_STATUS.GET.GET_END, OPERATION_ID_STATUS.COMPLETED],
320368
);
369+
console.timeEnd(
370+
`BatchGetCommand [NETWORK_MARK_AS_COMPLETED]: ${operationId} ${uals.length} ${node.id}`,
371+
);
321372
}
322373
} catch (err) {
323374
this.logger.warn(`Node ${node.id} failed: ${err.message}`);
@@ -357,6 +408,8 @@ class BatchGetCommand extends Command {
357408
);
358409
}
359410

411+
console.timeEnd(`BatchGetCommand [NETWORK]: ${operationId} ${uals.length}`);
412+
360413
return Command.empty();
361414
}
362415

src/modules/triple-store/implementation/ot-blazegraph/ot-blazegraph.js

Lines changed: 52 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,11 @@ class OtBlazegraph extends OtTripleStore {
8383
return result ? JSON.parse(result) : [];
8484
}
8585

86+
async ask(repository, query, timeout = 10000) {
87+
const result = await this._executeQuery(repository, query, MEDIA_TYPES.JSON, timeout);
88+
return result ? JSON.parse(result).boolean : false;
89+
}
90+
8691
async _executeQuery(repository, query, mediaType, timeout) {
8792
const result = await axios.post(this.repositories[repository].sparqlEndpoint, query, {
8893
headers: {
@@ -93,48 +98,55 @@ class OtBlazegraph extends OtTripleStore {
9398
});
9499
let response;
95100
if (mediaType === MEDIA_TYPES.JSON) {
96-
const { bindings } = result.data.results;
97-
98-
let output = '[\n';
99-
100-
bindings.forEach((binding, bindingIndex) => {
101-
let string = ' {\n';
102-
103-
const keys = Object.keys(binding);
104-
105-
keys.forEach((key, index) => {
106-
let value = '';
107-
const entry = binding[key];
108-
109-
if (entry.datatype) {
110-
// e.g., "\"6900000\"^^http://www.w3.org/2001/XMLSchema#integer"
111-
const literal = `"${entry.value}"^^${entry.datatype}`;
112-
value = JSON.stringify(literal);
113-
} else if (entry['xml:lang']) {
114-
// e.g., "\"text here\"@en"
115-
const literal = `"${entry.value}"@${entry['xml:lang']}`;
116-
value = JSON.stringify(literal);
117-
} else if (entry.type === 'uri') {
118-
// URIs should be escaped and quoted directly
119-
value = JSON.stringify(entry.value);
120-
} else {
121-
// For plain literals, wrap in quotes and stringify
122-
const literal = `"${entry.value}"`;
123-
value = JSON.stringify(literal);
124-
}
125-
126-
const isLast = index === keys.length - 1;
127-
string += ` "${key}": ${value}${isLast ? '' : ','}\n`;
101+
// Check if this is an ASK query by looking for the boolean property
102+
if (result.data.boolean !== undefined) {
103+
// This is an ASK query response
104+
response = JSON.stringify(result.data);
105+
} else {
106+
// This is a SELECT query response
107+
const { bindings } = result.data.results;
108+
109+
let output = '[\n';
110+
111+
bindings.forEach((binding, bindingIndex) => {
112+
let string = ' {\n';
113+
114+
const keys = Object.keys(binding);
115+
116+
keys.forEach((key, index) => {
117+
let value = '';
118+
const entry = binding[key];
119+
120+
if (entry.datatype) {
121+
// e.g., "\"6900000\"^^http://www.w3.org/2001/XMLSchema#integer"
122+
const literal = `"${entry.value}"^^${entry.datatype}`;
123+
value = JSON.stringify(literal);
124+
} else if (entry['xml:lang']) {
125+
// e.g., "\"text here\"@en"
126+
const literal = `"${entry.value}"@${entry['xml:lang']}`;
127+
value = JSON.stringify(literal);
128+
} else if (entry.type === 'uri') {
129+
// URIs should be escaped and quoted directly
130+
value = JSON.stringify(entry.value);
131+
} else {
132+
// For plain literals, wrap in quotes and stringify
133+
const literal = `"${entry.value}"`;
134+
value = JSON.stringify(literal);
135+
}
136+
137+
const isLast = index === keys.length - 1;
138+
string += ` "${key}": ${value}${isLast ? '' : ','}\n`;
139+
});
140+
141+
const isLastBinding = bindingIndex === bindings.length - 1;
142+
string += ` }${isLastBinding ? '\n' : ',\n'}`;
143+
144+
output += string;
128145
});
129146

130-
const isLastBinding = bindingIndex === bindings.length - 1;
131-
string += ` }${isLastBinding ? '\n' : ',\n'}`;
132-
133-
output += string;
134-
});
135-
136-
output += ']';
137-
response = output;
147+
output += ']';
148+
response = output;
149+
}
138150
} else {
139151
response = result.data;
140152
}

0 commit comments

Comments
 (0)