Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "origintrail_node",
"version": "8.2.3",
"version": "8.2.4",
"description": "OTNode V8",
"main": "index.js",
"type": "module",
Expand Down
24 changes: 22 additions & 2 deletions src/commands/cleaners/operation-id-cleaner-command.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {
BYTES_IN_KILOBYTE,
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
OPERATION_ID_STATUS,
COMMAND_PRIORITY,
} from '../../constants/constants.js';
Expand All @@ -23,14 +24,33 @@ class OperationIdCleanerCommand extends Command {
* @param command
*/
async execute() {
let memoryBytes = 0;
let fileBytes = 0;
try {
memoryBytes = this.operationIdService.getOperationIdMemoryCacheSizeBytes();
} catch (error) {
this.logger.warn(`Unable to read memory cache footprint: ${error.message}`);
}
try {
fileBytes = await this.operationIdService.getOperationIdFileCacheSizeBytes();
} catch (error) {
this.logger.warn(`Unable to read file cache footprint: ${error.message}`);
}
const bytesInMegabyte = 1024 * 1024;
this.logger.debug(
`Operation cache footprint before cleanup: memory=${(
memoryBytes / bytesInMegabyte
).toFixed(2)}MB, files=${(fileBytes / bytesInMegabyte).toFixed(2)}MB`,
);

this.logger.debug('Starting command for removal of expired cache files');
const timeToBeDeleted = Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS;
await this.repositoryModuleManager.removeOperationIdRecord(timeToBeDeleted, [
OPERATION_ID_STATUS.COMPLETED,
OPERATION_ID_STATUS.FAILED,
]);
let removed = await this.operationIdService.removeExpiredOperationIdMemoryCache(
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
);
if (removed) {
this.logger.debug(
Expand Down Expand Up @@ -68,7 +88,7 @@ class OperationIdCleanerCommand extends Command {
default(map) {
const command = {
name: 'operationIdCleanerCommand',
period: OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
period: OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
data: {},
transactional: false,
priority: COMMAND_PRIORITY.LOWEST,
Expand Down
5 changes: 5 additions & 0 deletions src/constants/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -722,6 +722,11 @@ export const EXPECTED_TRANSACTION_ERRORS = {
* operation id command cleanup interval time 24h
*/
export const OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS = 24 * 60 * 60 * 1000;
/**
* @constant {number} OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS -
* operation id memory cleanup interval time 1h
*/
export const OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS = 60 * 60 * 1000;
/**
* @constant {number} FINALIZED_COMMAND_CLEANUP_TIME_MILLS - Command cleanup interval time
* finalized commands command cleanup interval time 24h
Expand Down
27 changes: 27 additions & 0 deletions src/service/operation-id-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,33 @@ class OperationIdService {
delete this.memoryCachedHandlersData[operationId];
}

getOperationIdMemoryCacheSizeBytes() {
let total = 0;
for (const operationId in this.memoryCachedHandlersData) {
const { data } = this.memoryCachedHandlersData[operationId];
total += Buffer.from(JSON.stringify(data)).byteLength;
}
return total;
}

async getOperationIdFileCacheSizeBytes() {
const cacheFolderPath = this.fileService.getOperationIdCachePath();
const cacheFolderExists = await this.fileService.pathExists(cacheFolderPath);
if (!cacheFolderExists) return 0;

const fileList = await this.fileService.readDirectory(cacheFolderPath);
const sizeResults = await Promise.allSettled(
fileList.map((fileName) =>
this.fileService
.stat(path.join(cacheFolderPath, fileName))
.then((stats) => stats.size),
),
);
return sizeResults
.filter((res) => res.status === 'fulfilled')
.reduce((acc, res) => acc + res.value, 0);
}

async removeExpiredOperationIdMemoryCache(expiredTimeout) {
const now = Date.now();
let deleted = 0;
Expand Down
24 changes: 17 additions & 7 deletions src/service/operation-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,21 +58,31 @@ class OperationService {
return operationIdStatuses;
}

async markOperationAsCompleted(operationId, blockchain, responseData, endStatuses) {
async markOperationAsCompleted(
operationId,
blockchain,
responseData,
endStatuses,
options = {},
) {
const { reuseExistingCache = false } = options;
this.logger.info(`Finalizing ${this.operationName} for operationId: ${operationId}`);

await this.repositoryModuleManager.updateOperationStatus(
this.operationName,
operationId,
OPERATION_STATUS.COMPLETED,
);

if (responseData === null) {
await this.operationIdService.removeOperationIdCache(operationId);
} else {
await this.operationIdService.cacheOperationIdDataToMemory(operationId, responseData);
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
if (!reuseExistingCache) {
await this.operationIdService.cacheOperationIdDataToFile(operationId, responseData);
}
}

await this.repositoryModuleManager.updateOperationStatus(
this.operationName,
operationId,
OPERATION_STATUS.COMPLETED,
);
for (let i = 0; i < endStatuses.length; i += 1) {
const status = endStatuses[i];
const response = {
Expand Down
5 changes: 4 additions & 1 deletion src/service/publish-service.js
Original file line number Diff line number Diff line change
Expand Up @@ -79,11 +79,14 @@ class PublishService extends OperationService {
`[PUBLISH] Minimum replication reached for operationId: ${operationId}, ` +
`datasetRoot: ${datasetRoot}, completed: ${completedNumber}/${minAckResponses}`,
);
const cachedData =
(await this.operationIdService.getCachedOperationIdData(operationId)) || null;
await this.markOperationAsCompleted(
operationId,
blockchain,
null,
cachedData,
this.completedStatuses,
{ reuseExistingCache: true },
);
await this.repositoryModuleManager.updateMinAcksReached(operationId, true);
this.logResponsesSummary(completedNumber, failedNumber);
Expand Down
100 changes: 100 additions & 0 deletions test/unit/commands/operation-id-cleaner-command.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import { describe, it, beforeEach, afterEach } from 'mocha';
import { expect } from 'chai';
import sinon from 'sinon';

import OperationIdCleanerCommand from '../../../src/commands/cleaners/operation-id-cleaner-command.js';
import {
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
OPERATION_ID_STATUS,
} from '../../../src/constants/constants.js';

describe('OperationIdCleanerCommand', () => {
let clock;
let operationIdService;
let repositoryModuleManager;
let logger;
let command;

beforeEach(() => {
clock = sinon.useFakeTimers(new Date('2023-01-01T00:00:00Z').getTime());

operationIdService = {
getOperationIdMemoryCacheSizeBytes: sinon.stub().returns(1024),
getOperationIdFileCacheSizeBytes: sinon.stub().resolves(2048),
removeExpiredOperationIdMemoryCache: sinon.stub().resolves(512),
removeExpiredOperationIdFileCache: sinon.stub().resolves(3),
};

repositoryModuleManager = {
removeOperationIdRecord: sinon.stub().resolves(),
};

logger = {
debug: sinon.spy(),
info: sinon.spy(),
warn: sinon.spy(),
error: sinon.spy(),
};

command = new OperationIdCleanerCommand({
logger,
repositoryModuleManager,
operationIdService,
fileService: {},
});
});

afterEach(() => {
clock.restore();
});

it('cleans memory with 1h TTL and files with 24h TTL while reporting footprint', async () => {
await command.execute();

expect(operationIdService.getOperationIdMemoryCacheSizeBytes.calledOnce).to.be.true;
expect(operationIdService.getOperationIdFileCacheSizeBytes.calledOnce).to.be.true;

expect(
repositoryModuleManager.removeOperationIdRecord.calledWith(
Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
[OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED],
),
).to.be.true;

expect(
operationIdService.removeExpiredOperationIdMemoryCache.calledWith(
OPERATION_ID_MEMORY_CLEANUP_TIME_MILLS,
),
).to.be.true;

expect(
operationIdService.removeExpiredOperationIdFileCache.calledWith(
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
),
).to.be.true;

expect(logger.debug.called).to.be.true;
});

it('handles missing memory cache gracefully', async () => {
operationIdService.getOperationIdMemoryCacheSizeBytes.throws(new Error('no memory cache'));
await command.execute();

expect(
repositoryModuleManager.removeOperationIdRecord.calledWith(
Date.now() - OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
[OPERATION_ID_STATUS.COMPLETED, OPERATION_ID_STATUS.FAILED],
),
).to.be.true;

expect(
operationIdService.removeExpiredOperationIdFileCache.calledWith(
OPERATION_ID_COMMAND_CLEANUP_TIME_MILLS,
OPERATION_ID_FILES_FOR_REMOVAL_MAX_NUMBER,
),
).to.be.true;
});
});
64 changes: 64 additions & 0 deletions test/unit/service/operation-id-service-cache.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { describe, it, beforeEach, afterEach } from 'mocha';
import { expect } from 'chai';
import fs from 'fs/promises';
import path from 'path';
import os from 'os';
import OperationIdService from '../../../src/service/operation-id-service.js';

describe('OperationIdService file cache cleanup', () => {
let tmpDir;
let service;

beforeEach(async () => {
tmpDir = await fs.mkdtemp(path.join(os.tmpdir(), 'opid-cache-'));
const now = Date.now();

// Older than TTL (2 hours)
const oldFile = path.join(tmpDir, 'old.json');
await fs.writeFile(oldFile, '{}');
await fs.utimes(
oldFile,
new Date(now - 2 * 60 * 60 * 1000),
new Date(now - 2 * 60 * 60 * 1000),
);

// Newer than TTL (10 minutes)
const newFile = path.join(tmpDir, 'new.json');
await fs.writeFile(newFile, '{}');
await fs.utimes(newFile, new Date(now - 10 * 60 * 1000), new Date(now - 10 * 60 * 1000));

const fileService = {
getOperationIdCachePath: () => tmpDir,
async pathExists(p) {
try {
await fs.stat(p);
return true;
} catch {
return false;
}
},
readDirectory: (p) => fs.readdir(p),
stat: (p) => fs.stat(p),
removeFile: (p) => fs.rm(p, { force: true }),
};

service = new OperationIdService({
logger: { debug: () => {}, warn: () => {}, error: () => {} },
fileService,
repositoryModuleManager: {},
eventEmitter: { emit: () => {} },
});
});

afterEach(async () => {
await fs.rm(tmpDir, { recursive: true, force: true });
});

it('removes only files older than TTL', async () => {
const deleted = await service.removeExpiredOperationIdFileCache(60 * 60 * 1000, 10);
const remainingFiles = await fs.readdir(tmpDir);

expect(deleted).to.equal(1);
expect(remainingFiles).to.deep.equal(['new.json']);
});
});
Loading