diff --git a/package-lock.json b/package-lock.json index ff0ea70537..8248b91a7a 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "license": "ISC", "dependencies": { "@comunica/query-sparql": "^4.0.2", diff --git a/package.json b/package.json index e4813f2734..7cc8b8bf19 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "origintrail_node", - "version": "8.2.3", + "version": "8.2.4", "description": "OTNode V8", "main": "index.js", "type": "module", diff --git a/src/commands/cleaners/operation-id-cleaner-command.js b/src/commands/cleaners/operation-id-cleaner-command.js index a604c6430e..9935321bf5 100644 --- a/src/commands/cleaners/operation-id-cleaner-command.js +++ b/src/commands/cleaners/operation-id-cleaner-command.js @@ -3,6 +3,7 @@ import { BYTES_IN_KILOBYTE, OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, OPERATION_ID_STATUS, COMMAND_PRIORITY, } from '../../constants/constants.js'; @@ -23,6 +24,25 @@ class OperationIdCleanerCommand extends Command { * @param command */ async execute() { + let memoryBytes = 0; + let fileBytes = 0; + try { + memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes(); + } catch (error) { + this.logger.warn(`Unable to read memory cache footprint: ${error.message}`); + } + try { + fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes(); + } catch (error) { + this.logger.warn(`Unable to read file cache footprint: ${error.message}`); + } + const bytesInMegabyte = 1024 * 1024; + this.logger.debug( + `Operation cache footprint before cleanup: memory=${( + memoryBytes / bytesInMegabyte + ).toFixed(2)}MB, files=${(fileBytes / bytesInMegabyte).toFixed(2)}MB`, + ); + this.logger.debug('Starting command for removal of expired cache files'); const timeToBeDeleted = Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS; await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [ @@ -30,7 +50,7 @@ class OperationIdCleanerCommand extends Command { OPERATION_ID_STATUS.FAILED, ]); let removed = await this.operationIdService.removeExpiredOperationIdMemoryCache( - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, ); if (removed) { this.logger.debug( @@ -68,7 +88,7 @@ class OperationIdCleanerCommand extends Command { default(map) { const command = { name: 'operationIdCleanerCommand', - period: OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + period: OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, data: {}, transactional: false, priority: COMMAND_PRIORITY.LOWEST, diff --git a/src/constants/constants.js b/src/constants/constants.js index e96c96b647..560bd08735 100644 --- a/src/constants/constants.js +++ b/src/constants/constants.js @@ -722,6 +722,11 @@ export const EXPECTED_TRANSACTION_ERRORS = { * operation id command cleanup interval time 24h */ export const OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000; +/** + * @constant {number} OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS - + * operation id memory cleanup interval time 1h + */ +export const OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS = 60 * 60 * 1000; /** * @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time * finalized commands command cleanup interval time 24h diff --git a/src/service/operation-id-service.js b/src/service/operation-id-service.js index b959699e21..1f91780c65 100644 --- a/src/service/operation-id-service.js +++ b/src/service/operation-id-service.js @@ -150,6 +150,33 @@ class OperationIdService { delete this.memoryCachedHandlersData[operationId]; } + getOperationIdMemoryCacheSizeBytes() { + let total = 0; + for (const operationId in this.memoryCachedHandlersData) { + const { data } = this.memoryCachedHandlersData[operationId]; + total += Buffer.from(JSON.stringify(data)).byteLength; + } + return total; + } + + async getOperationIdFileCacheSizeBytes() { + const cacheFolderPath = this.fileService.getOperationIdCachePath(); + const cacheFolderExists = await this.fileService.pathExists(cacheFolderPath); + if (!cacheFolderExists) return 0; + + const fileList = await this.fileService.readDirectory(cacheFolderPath); + const sizeResults = await Promise.allSettled( + fileList.map((fileName) => + this.fileService + .stat(path.join(cacheFolderPath, fileName)) + .then((stats) => stats.size), + ), + ); + return sizeResults + .filter((res) => res.status === 'fulfilled') + .reduce((acc, res) => acc + res.value, 0); + } + async removeExpiredOperationIdMemoryCache(expiredTimeout) { const now = Date.now(); let deleted = 0; diff --git a/src/service/operation-service.js b/src/service/operation-service.js index 64d7a0b314..10d84b2f48 100644 --- a/src/service/operation-service.js +++ b/src/service/operation-service.js @@ -58,21 +58,31 @@ class OperationService { return operationIdStatuses; } - async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) { + async markOperationAsCompleted( + operationId, + blockchain, + responseData, + endStatuses, + options = {}, + ) { + const { reuseExistingCache = false } = options; this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`); + await this.repositoryModuleManager.updateOperationStatus( + this.operationName, + operationId, + OPERATION_STATUS.COMPLETED, + ); + if (responseData === null) { await this.operationIdService.removeOperationIdCache(operationId); } else { await this.operationIdService.cacheOperationIdDataToMemory(operationId, responseData); - await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData); + if (!reuseExistingCache) { + await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData); + } } - await this.repositoryModuleManager.updateOperationStatus( - this.operationName, - operationId, - OPERATION_STATUS.COMPLETED, - ); for (let i = 0; i < endStatuses.length; i += 1) { const status = endStatuses[i]; const response = { diff --git a/src/service/publish-service.js b/src/service/publish-service.js index 8851f8403e..1e6046240d 100644 --- a/src/service/publish-service.js +++ b/src/service/publish-service.js @@ -79,11 +79,14 @@ class PublishService extends OperationService { `[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` + `datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`, ); + const cachedData = + (await this.operationIdService.getCachedOperationIdData(operationId)) || null; await this.markOperationAsCompleted( operationId, blockchain, - null, + cachedData, this.completedStatuses, + { reuseExistingCache: true }, ); await this.repositoryModuleManager.updateMinAcksReached(operationId, true); this.logResponsesSummary(completedNumber, failedNumber); diff --git a/test/unit/commands/operation-id-cleaner-command.test.js b/test/unit/commands/operation-id-cleaner-command.test.js new file mode 100644 index 0000000000..c62734034c --- /dev/null +++ b/test/unit/commands/operation-id-cleaner-command.test.js @@ -0,0 +1,100 @@ +import { describe, it, beforeEach, afterEach } from 'mocha'; +import { expect } from 'chai'; +import sinon from 'sinon'; + +import OperationIdCleanerCommand from '../../../src/commands/cleaners/operation-id-cleaner-command.js'; +import { + OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, + OPERATION_ID_STATUS, +} from '../../../src/constants/constants.js'; + +describe('OperationIdCleanerCommand', () => { + let clock; + let operationIdService; + let repositoryModuleManager; + let logger; + let command; + + beforeEach(() => { + clock = sinon.useFakeTimers(new Date('2023-01-01T00:00:00Z').getTime()); + + operationIdService = { + getOperationIdMemoryCacheSizeBytes: sinon.stub().returns(1024), + getOperationIdFileCacheSizeBytes: sinon.stub().resolves(2048), + removeExpiredOperationIdMemoryCache: sinon.stub().resolves(512), + removeExpiredOperationIdFileCache: sinon.stub().resolves(3), + }; + + repositoryModuleManager = { + removeOperationIdRecord: sinon.stub().resolves(), + }; + + logger = { + debug: sinon.spy(), + info: sinon.spy(), + warn: sinon.spy(), + error: sinon.spy(), + }; + + command = new OperationIdCleanerCommand({ + logger, + repositoryModuleManager, + operationIdService, + fileService: {}, + }); + }); + + afterEach(() => { + clock.restore(); + }); + + it('cleans memory with 1h TTL and files with 24h TTL while reporting footprint', async () => { + await command.execute(); + + expect(operationIdService.getOperationIdMemoryCacheSizeBytes.calledOnce).to.be.true; + expect(operationIdService.getOperationIdFileCacheSizeBytes.calledOnce).to.be.true; + + expect( + repositoryModuleManager.removeOperationIdRecord.calledWith( + Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + [OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED], + ), + ).to.be.true; + + expect( + operationIdService.removeExpiredOperationIdMemoryCache.calledWith( + OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS, + ), + ).to.be.true; + + expect( + operationIdService.removeExpiredOperationIdFileCache.calledWith( + OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, + ), + ).to.be.true; + + expect(logger.debug.called).to.be.true; + }); + + it('handles missing memory cache gracefully', async () => { + operationIdService.getOperationIdMemoryCacheSizeBytes.throws(new Error('no memory cache')); + await command.execute(); + + expect( + repositoryModuleManager.removeOperationIdRecord.calledWith( + Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + [OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED], + ), + ).to.be.true; + + expect( + operationIdService.removeExpiredOperationIdFileCache.calledWith( + OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS, + OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER, + ), + ).to.be.true; + }); +}); diff --git a/test/unit/service/operation-id-service-cache.test.js b/test/unit/service/operation-id-service-cache.test.js new file mode 100644 index 0000000000..43b40c219f --- /dev/null +++ b/test/unit/service/operation-id-service-cache.test.js @@ -0,0 +1,64 @@ +import { describe, it, beforeEach, afterEach } from 'mocha'; +import { expect } from 'chai'; +import fs from 'fs/promises'; +import path from 'path'; +import os from 'os'; +import OperationIdService from '../../../src/service/operation-id-service.js'; + +describe('OperationIdService file cache cleanup', () => { + let tmpDir; + let service; + + beforeEach(async () => { + tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'opid-cache-')); + const now = Date.now(); + + // Older than TTL (2 hours) + const oldFile = path.join(tmpDir, 'old.json'); + await fs.writeFile(oldFile, '{}'); + await fs.utimes( + oldFile, + new Date(now - 2 * 60 * 60 * 1000), + new Date(now - 2 * 60 * 60 * 1000), + ); + + // Newer than TTL (10 minutes) + const newFile = path.join(tmpDir, 'new.json'); + await fs.writeFile(newFile, '{}'); + await fs.utimes(newFile, new Date(now - 10 * 60 * 1000), new Date(now - 10 * 60 * 1000)); + + const fileService = { + getOperationIdCachePath: () => tmpDir, + async pathExists(p) { + try { + await fs.stat(p); + return true; + } catch { + return false; + } + }, + readDirectory: (p) => fs.readdir(p), + stat: (p) => fs.stat(p), + removeFile: (p) => fs.rm(p, { force: true }), + }; + + service = new OperationIdService({ + logger: { debug: () => {}, warn: () => {}, error: () => {} }, + fileService, + repositoryModuleManager: {}, + eventEmitter: { emit: () => {} }, + }); + }); + + afterEach(async () => { + await fs.rm(tmpDir, { recursive: true, force: true }); + }); + + it('removes only files older than TTL', async () => { + const deleted = await service.removeExpiredOperationIdFileCache(60 * 60 * 1000, 10); + const remainingFiles = await fs.readdir(tmpDir); + + expect(deleted).to.equal(1); + expect(remainingFiles).to.deep.equal(['new.json']); + }); +});