Skip to content

Commit 87ceab8

Browse files
authored
Merge pull request #4078 from OriginTrail/feature/improve-publish-success-rate
[fix] don't remove cache on complete, instead use scheduled cleaners
2 parents 43d08f2 + eef936b commit 87ceab8

File tree

9 files changed

+242
-13
lines changed

9 files changed

+242
-13
lines changed

package-lock.json

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "origintrail_node",
3-
"version": "8.2.3",
3+
"version": "8.2.4",
44
"description": "OTNode V8",
55
"main": "index.js",
66
"type": "module",

src/commands/cleaners/operation-id-cleaner-command.js

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
BYTES_IN_KILOBYTE,
44
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
55
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
6+
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
67
OPERATION_ID_STATUS,
78
COMMAND_PRIORITY,
89
} from '../../constants/constants.js';
@@ -23,14 +24,33 @@ class OperationIdCleanerCommand extends Command {
2324
* @param command
2425
*/
2526
async execute() {
27+
let memoryBytes = 0;
28+
let fileBytes = 0;
29+
try {
30+
memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes();
31+
} catch (error) {
32+
this.logger.warn(`Unable to read memory cache footprint: ${error.message}`);
33+
}
34+
try {
35+
fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes();
36+
} catch (error) {
37+
this.logger.warn(`Unable to read file cache footprint: ${error.message}`);
38+
}
39+
const bytesInMegabyte = 1024 * 1024;
40+
this.logger.debug(
41+
`Operation cache footprint before cleanup: memory=${(
42+
memoryBytes / bytesInMegabyte
43+
).toFixed(2)}MB, files=${(fileBytes / bytesInMegabyte).toFixed(2)}MB`,
44+
);
45+
2646
this.logger.debug('Starting command for removal of expired cache files');
2747
const timeToBeDeleted = Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS;
2848
await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [
2949
OPERATION_ID_STATUS.COMPLETED,
3050
OPERATION_ID_STATUS.FAILED,
3151
]);
3252
let removed = await this.operationIdService.removeExpiredOperationIdMemoryCache(
33-
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
53+
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
3454
);
3555
if (removed) {
3656
this.logger.debug(
@@ -68,7 +88,7 @@ class OperationIdCleanerCommand extends Command {
6888
default(map) {
6989
const command = {
7090
name: 'operationIdCleanerCommand',
71-
period: OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
91+
period: OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
7292
data: {},
7393
transactional: false,
7494
priority: COMMAND_PRIORITY.LOWEST,

src/constants/constants.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -722,6 +722,11 @@ export const EXPECTED_TRANSACTION_ERRORS = {
722722
* operation id command cleanup interval time 24h
723723
*/
724724
export const OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
725+
/**
726+
* @constant {number} OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS -
727+
* operation id memory cleanup interval time 1h
728+
*/
729+
export const OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS = 60 * 60 * 1000;
725730
/**
726731
* @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time
727732
* finalized commands command cleanup interval time 24h

src/service/operation-id-service.js

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,33 @@ class OperationIdService {
150150
delete this.memoryCachedHandlersData[operationId];
151151
}
152152

153+
getOperationIdMemoryCacheSizeBytes() {
154+
let total = 0;
155+
for (const operationId in this.memoryCachedHandlersData) {
156+
const { data } = this.memoryCachedHandlersData[operationId];
157+
total += Buffer.from(JSON.stringify(data)).byteLength;
158+
}
159+
return total;
160+
}
161+
162+
async getOperationIdFileCacheSizeBytes() {
163+
const cacheFolderPath = this.fileService.getOperationIdCachePath();
164+
const cacheFolderExists = await this.fileService.pathExists(cacheFolderPath);
165+
if (!cacheFolderExists) return 0;
166+
167+
const fileList = await this.fileService.readDirectory(cacheFolderPath);
168+
const sizeResults = await Promise.allSettled(
169+
fileList.map((fileName) =>
170+
this.fileService
171+
.stat(path.join(cacheFolderPath, fileName))
172+
.then((stats) => stats.size),
173+
),
174+
);
175+
return sizeResults
176+
.filter((res) => res.status === 'fulfilled')
177+
.reduce((acc, res) => acc + res.value, 0);
178+
}
179+
153180
async removeExpiredOperationIdMemoryCache(expiredTimeout) {
154181
const now = Date.now();
155182
let deleted = 0;

src/service/operation-service.js

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -58,21 +58,31 @@ class OperationService {
5858
return operationIdStatuses;
5959
}
6060

61-
async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) {
61+
async markOperationAsCompleted(
62+
operationId,
63+
blockchain,
64+
responseData,
65+
endStatuses,
66+
options = {},
67+
) {
68+
const { reuseExistingCache = false } = options;
6269
this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`);
6370

71+
await this.repositoryModuleManager.updateOperationStatus(
72+
this.operationName,
73+
operationId,
74+
OPERATION_STATUS.COMPLETED,
75+
);
76+
6477
if (responseData === null) {
6578
await this.operationIdService.removeOperationIdCache(operationId);
6679
} else {
6780
await this.operationIdService.cacheOperationIdDataToMemory(operationId, responseData);
68-
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
81+
if (!reuseExistingCache) {
82+
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
83+
}
6984
}
7085

71-
await this.repositoryModuleManager.updateOperationStatus(
72-
this.operationName,
73-
operationId,
74-
OPERATION_STATUS.COMPLETED,
75-
);
7686
for (let i = 0; i < endStatuses.length; i += 1) {
7787
const status = endStatuses[i];
7888
const response = {

src/service/publish-service.js

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,14 @@ class PublishService extends OperationService {
7979
`[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` +
8080
`datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`,
8181
);
82+
const cachedData =
83+
(await this.operationIdService.getCachedOperationIdData(operationId)) || null;
8284
await this.markOperationAsCompleted(
8385
operationId,
8486
blockchain,
85-
null,
87+
cachedData,
8688
this.completedStatuses,
89+
{ reuseExistingCache: true },
8790
);
8891
await this.repositoryModuleManager.updateMinAcksReached(operationId, true);
8992
this.logResponsesSummary(completedNumber, failedNumber);
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
import { describe, it, beforeEach, afterEach } from 'mocha';
2+
import { expect } from 'chai';
3+
import sinon from 'sinon';
4+
5+
import OperationIdCleanerCommand from '../../../src/commands/cleaners/operation-id-cleaner-command.js';
6+
import {
7+
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
8+
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
9+
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
10+
OPERATION_ID_STATUS,
11+
} from '../../../src/constants/constants.js';
12+
13+
describe('OperationIdCleanerCommand', () => {
14+
let clock;
15+
let operationIdService;
16+
let repositoryModuleManager;
17+
let logger;
18+
let command;
19+
20+
beforeEach(() => {
21+
clock = sinon.useFakeTimers(new Date('2023-01-01T00:00:00Z').getTime());
22+
23+
operationIdService = {
24+
getOperationIdMemoryCacheSizeBytes: sinon.stub().returns(1024),
25+
getOperationIdFileCacheSizeBytes: sinon.stub().resolves(2048),
26+
removeExpiredOperationIdMemoryCache: sinon.stub().resolves(512),
27+
removeExpiredOperationIdFileCache: sinon.stub().resolves(3),
28+
};
29+
30+
repositoryModuleManager = {
31+
removeOperationIdRecord: sinon.stub().resolves(),
32+
};
33+
34+
logger = {
35+
debug: sinon.spy(),
36+
info: sinon.spy(),
37+
warn: sinon.spy(),
38+
error: sinon.spy(),
39+
};
40+
41+
command = new OperationIdCleanerCommand({
42+
logger,
43+
repositoryModuleManager,
44+
operationIdService,
45+
fileService: {},
46+
});
47+
});
48+
49+
afterEach(() => {
50+
clock.restore();
51+
});
52+
53+
it('cleans memory with 1h TTL and files with 24h TTL while reporting footprint', async () => {
54+
await command.execute();
55+
56+
expect(operationIdService.getOperationIdMemoryCacheSizeBytes.calledOnce).to.be.true;
57+
expect(operationIdService.getOperationIdFileCacheSizeBytes.calledOnce).to.be.true;
58+
59+
expect(
60+
repositoryModuleManager.removeOperationIdRecord.calledWith(
61+
Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
62+
[OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED],
63+
),
64+
).to.be.true;
65+
66+
expect(
67+
operationIdService.removeExpiredOperationIdMemoryCache.calledWith(
68+
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
69+
),
70+
).to.be.true;
71+
72+
expect(
73+
operationIdService.removeExpiredOperationIdFileCache.calledWith(
74+
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
75+
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
76+
),
77+
).to.be.true;
78+
79+
expect(logger.debug.called).to.be.true;
80+
});
81+
82+
it('handles missing memory cache gracefully', async () => {
83+
operationIdService.getOperationIdMemoryCacheSizeBytes.throws(new Error('no memory cache'));
84+
await command.execute();
85+
86+
expect(
87+
repositoryModuleManager.removeOperationIdRecord.calledWith(
88+
Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
89+
[OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED],
90+
),
91+
).to.be.true;
92+
93+
expect(
94+
operationIdService.removeExpiredOperationIdFileCache.calledWith(
95+
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
96+
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
97+
),
98+
).to.be.true;
99+
});
100+
});
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
import { describe, it, beforeEach, afterEach } from 'mocha';
2+
import { expect } from 'chai';
3+
import fs from 'fs/promises';
4+
import path from 'path';
5+
import os from 'os';
6+
import OperationIdService from '../../../src/service/operation-id-service.js';
7+
8+
describe('OperationIdService file cache cleanup', () => {
9+
let tmpDir;
10+
let service;
11+
12+
beforeEach(async () => {
13+
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'opid-cache-'));
14+
const now = Date.now();
15+
16+
// Older than TTL (2 hours)
17+
const oldFile = path.join(tmpDir, 'old.json');
18+
await fs.writeFile(oldFile, '{}');
19+
await fs.utimes(
20+
oldFile,
21+
new Date(now - 2 * 60 * 60 * 1000),
22+
new Date(now - 2 * 60 * 60 * 1000),
23+
);
24+
25+
// Newer than TTL (10 minutes)
26+
const newFile = path.join(tmpDir, 'new.json');
27+
await fs.writeFile(newFile, '{}');
28+
await fs.utimes(newFile, new Date(now - 10 * 60 * 1000), new Date(now - 10 * 60 * 1000));
29+
30+
const fileService = {
31+
getOperationIdCachePath: () => tmpDir,
32+
async pathExists(p) {
33+
try {
34+
await fs.stat(p);
35+
return true;
36+
} catch {
37+
return false;
38+
}
39+
},
40+
readDirectory: (p) => fs.readdir(p),
41+
stat: (p) => fs.stat(p),
42+
removeFile: (p) => fs.rm(p, { force: true }),
43+
};
44+
45+
service = new OperationIdService({
46+
logger: { debug: () => {}, warn: () => {}, error: () => {} },
47+
fileService,
48+
repositoryModuleManager: {},
49+
eventEmitter: { emit: () => {} },
50+
});
51+
});
52+
53+
afterEach(async () => {
54+
await fs.rm(tmpDir, { recursive: true, force: true });
55+
});
56+
57+
it('removes only files older than TTL', async () => {
58+
const deleted = await service.removeExpiredOperationIdFileCache(60 * 60 * 1000, 10);
59+
const remainingFiles = await fs.readdir(tmpDir);
60+
61+
expect(deleted).to.equal(1);
62+
expect(remainingFiles).to.deep.equal(['new.json']);
63+
});
64+
});

0 commit comments

Comments
 (0)