Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "8.2.2",
"version": "8.2.3",
"description": "OTNode V8",
"main": "index.js",
"type": "module",
Expand Down
17 changes: 10 additions & 7 deletions src/commands/protocols/publish/publish-finalization-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class PublishFinalizationCommand extends Command {
const { id, publishOperationId, merkleRoot, byteSize } = eventData;
const { blockchain, contractAddress } = event;
const operationId = this.operationIdService.generateId();
const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH_FINALIZATION.PUBLISH_FINALIZATION_START,
operationId,
Expand Down Expand Up @@ -70,8 +72,10 @@ class PublishFinalizationCommand extends Command {
cachedMerkleRoot = result.merkleRoot;
assertion = result.assertion;
publisherPeerId = result.remotePeerId;
} catch (error) {
this.logger.error(`Failed to read cached publish data: ${error.message}`); // TODO: Make this log more descriptive
} catch (_error) {
this.logger.warn(
`[Cache] Failed to read cached publish data for UAL ${ual} (publishOperationId: ${publishOperationId}, txHash: ${txHash}, operationId: ${operationId})`,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.FAILED,
operationId,
Expand All @@ -81,8 +85,6 @@ class PublishFinalizationCommand extends Command {
return Command.empty();
}

const ual = this.ualService.deriveUAL(blockchain, contractAddress, id);

try {
await this.validatePublishData(merkleRoot, cachedMerkleRoot, byteSize, assertion, ual);
} catch (e) {
Expand Down Expand Up @@ -185,23 +187,24 @@ class PublishFinalizationCommand extends Command {

async readWithRetries(publishOperationId) {
let attempt = 0;
const datasetPath = this.fileService.getPendingStorageDocumentPath(publishOperationId);

while (attempt < MAX_RETRIES_READ_CACHED_PUBLISH_DATA) {
try {
const datasetPath =
this.fileService.getPendingStorageDocumentPath(publishOperationId);
// eslint-disable-next-line no-await-in-loop
const cachedData = await this.fileService.readFile(datasetPath, true);
return cachedData;
} catch (error) {
attempt += 1;

// eslint-disable-next-line no-await-in-loop
await new Promise((resolve) => {
setTimeout(resolve, RETRY_DELAY_READ_CACHED_PUBLISH_DATA);
});
}
}
this.logger.warn(
`[Cache] Exhausted retries reading cached publish data (publishOperationId: ${publishOperationId}, path: ${datasetPath}).`,
);
// TODO: Mark this operation as failed
throw new Error('Failed to read cached publish data');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,12 +177,24 @@ class OtBlazegraph extends OtTripleStore {
}

async queryVoid(repository, query, timeout) {
return axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
try {
return await axios.post(this.repositories[repository].sparqlEndpoint, query, {
headers: {
'Content-Type': 'application/sparql-update; charset=UTF-8',
'X-BIGDATA-MAX-QUERY-MILLIS': timeout,
},
});
} catch (error) {
const status = error?.response?.status;
const dataSnippet =
typeof error?.response?.data === 'string' ? error.response.data.slice(0, 200) : '';
this.logger.error(
`[OtBlazegraph.queryVoid] Update failed for ${repository} (status: ${status}): ${
error.message
}${dataSnippet ? ` | data: ${dataSnippet}` : ''}`,
);
throw error;
}
}

async deleteRepository(repository) {
Expand Down
62 changes: 36 additions & 26 deletions src/service/publish-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -69,32 +69,42 @@ class PublishService extends OperationService {
// }

// 2. Check if all responses have been received
if (totalResponses === numberOfFoundNodes) {
// 2.1 If minimum replication is reached, mark the operation as completed
if (completedNumber >= minAckResponses) {
await this.markOperationAsCompleted(
operationId,
blockchain,
null,
this.completedStatuses,
);
await this.repositoryModuleManager.updateMinAcksReached(operationId, true);
this.logResponsesSummary(completedNumber, failedNumber);
}
// 2.2 Otherwise, mark as failed
else {
await this.markOperationAsFailed(
operationId,
blockchain,
'Not replicated to enough nodes!',
this.errorType,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_FAILED,
operationId,
);
this.logResponsesSummary(completedNumber, failedNumber);
}
// 2.1 If minimum replication is reached, mark the operation as completed

const record = await this.operationIdService.getOperationIdRecord(operationId);
if (record?.minAcksReached) return;

if (completedNumber >= minAckResponses) {
this.logger.info(
`[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` +
`datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`,
);
await this.markOperationAsCompleted(
operationId,
blockchain,
null,
this.completedStatuses,
);
await this.repositoryModuleManager.updateMinAcksReached(operationId, true);
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Race condition in minimum replication check

The new logic introduces a TOCTOU (time-of-check-to-time-of-use) race condition. Multiple concurrent responses can all read record?.minAcksReached as false before any of them updates it to true. Since getOperationIdRecord and updateMinAcksReached are not executed atomically (they're outside the mutex), concurrent responses that exceed minAckResponses may all proceed to call markOperationAsCompleted, potentially completing the same operation multiple times. The previous logic avoided this by only acting when totalResponses === numberOfFoundNodes, ensuring exactly one caller would reach the decision point.

Fix in Cursor Fix in Web

this.logResponsesSummary(completedNumber, failedNumber);
}
// 2.2 Otherwise, mark as failed
else if (totalResponses === numberOfFoundNodes) {
this.logger.warn(
`[PUBLISH] Failed for operationId: ${operationId}, ` +
`only ${completedNumber}/${minAckResponses} nodes responded successfully`,
);
await this.markOperationAsFailed(
operationId,
blockchain,
'Not replicated to enough nodes!',
this.errorType,
);
this.operationIdService.emitChangeEvent(
OPERATION_ID_STATUS.PUBLISH.PUBLISH_FAILED,
operationId,
);
this.logResponsesSummary(completedNumber, failedNumber);
}
// else {
// // 3. Not all responses have arrived yet.
Expand Down
Loading