Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 98 additions & 14 deletions packages/mongodb-log-writer/src/mongo-log-manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { MongoLogManager, mongoLogId } from '.';
import { ObjectId } from 'bson';
import { once } from 'events';
import type { Stats } from 'fs';
import { promises as fs } from 'fs';
import path from 'path';
import os from 'os';
Expand All @@ -27,6 +28,7 @@ describe('MongoLogManager', function () {
});
afterEach(async function () {
await fs.rmdir(directory, { recursive: true });
sinon.restore();
});

it('allows creating and writing to log files', async function () {
Expand Down Expand Up @@ -86,6 +88,19 @@ describe('MongoLogManager', function () {
}
});

const getFilesState = async (paths: string[]) => {
return (
await Promise.all(
paths.map((path) =>
fs.stat(path).then(
() => 1,
() => 0
)
)
)
).join('');
};

it('cleans up least recent log files when requested', async function () {
const manager = new MongoLogManager({
directory,
Expand All @@ -106,21 +121,90 @@ describe('MongoLogManager', function () {
paths.unshift(filename);
}

const getFiles = async () => {
return (
await Promise.all(
paths.map((path) =>
fs.stat(path).then(
() => 1,
() => 0
)
)
)
).join('');
};
expect(await getFiles()).to.equal('1111111111');
expect(await getFilesState(paths)).to.equal('1111111111');
await manager.cleanupOldLogFiles();
expect(await getFilesState(paths)).to.equal('0000011111');
});

it('if fs.stat fails, it errors and is not considered towards the logs limit', async function () {
const manager = new MongoLogManager({
directory,
retentionDays,
retentionGB: 3,
onwarn,
onerror,
});

const offset = Math.floor(Date.now() / 1000);

const faultyFile = path.join(
directory,
ObjectId.createFromTime(offset - 10).toHexString() + '_log'
);
await fs.writeFile(faultyFile, '');

const faultyFileError = new Error('test error');

const validFiles: string[] = [];
// Create 5 valid files.
for (let i = 5; i >= 0; i--) {
const filename = path.join(
directory,
ObjectId.createFromTime(offset - i).toHexString() + '_log'
);
await fs.writeFile(filename, '');
validFiles.push(filename);
}

expect(onerror).not.called;

const fsStatStub = sinon.stub(fs, 'stat');

fsStatStub.resolves({
size: 1024 * 1024 * 1024,
} as Stats);
fsStatStub.withArgs(faultyFile).rejects(faultyFileError);

await manager.cleanupOldLogFiles();

expect(onerror).calledOnceWithExactly(faultyFileError, faultyFile);

// fs.stat is stubbed so getFilesState will not be accurate.
const leftoverFiles = (await fs.readdir(directory))
.sort()
.map((file) => path.join(directory, file));

expect(leftoverFiles).to.have.lengthOf(4);
expect(leftoverFiles).deep.equals([faultyFile, ...validFiles.slice(3)]);
});

it('cleans up least recent log files when requested with a storage limit', async function () {
const manager = new MongoLogManager({
directory,
retentionDays,
maxLogFileCount: 1000,
// 6 KB
retentionGB: 6 / 1024 / 1024,
onwarn,
onerror,
});

const paths: string[] = [];
const offset = Math.floor(Date.now() / 1000);

// Create 10 files of 1 KB each.
for (let i = 0; i < 10; i++) {
const filename = path.join(
directory,
ObjectId.createFromTime(offset - i).toHexString() + '_log'
);
await fs.writeFile(filename, '0'.repeat(1024));
paths.unshift(filename);
}

expect(await getFilesState(paths)).to.equal('1111111111');
await manager.cleanupOldLogFiles();
expect(await getFiles()).to.equal('0000011111');
expect(await getFilesState(paths)).to.equal('0000111111');
});

it('cleaning up old log files is a no-op by default', async function () {
Expand Down
89 changes: 63 additions & 26 deletions packages/mongodb-log-writer/src/mongo-log-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { ObjectId } from 'bson';
import { once } from 'events';
import { createWriteStream, promises as fs } from 'fs';
import { createGzip, constants as zlibConstants } from 'zlib';
import { Heap } from 'heap-js';
import { MongoLogWriter } from './mongo-log-writer';
import { Writable } from 'stream';

Expand All @@ -17,9 +16,11 @@ interface MongoLogOptions {
retentionDays: number;
/** The maximal number of log files which are kept. */
maxLogFileCount?: number;
/** A handler for warnings related to a specific filesystem path. */
onerror: (err: Error, path: string) => unknown | Promise<void>;
/** The maximal GB of log files which are kept. */
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/** The maximal GB of log files which are kept. */
/** The maximal size of log files which are kept. */

retentionGB?: number;
/** A handler for errors related to a specific filesystem path. */
onerror: (err: Error, path: string) => unknown | Promise<void>;
/** A handler for warnings related to a specific filesystem path. */
onwarn: (err: Error, path: string) => unknown | Promise<void>;
}

Expand All @@ -38,9 +39,37 @@ export class MongoLogManager {
/** Clean up log files older than `retentionDays`. */
async cleanupOldLogFiles(maxDurationMs = 5_000): Promise<void> {
const dir = this._options.directory;
let dirHandle;
const sortedLogFiles: {
fullPath: string;
id: string;
size?: number;
}[] = [];
let usedStorageSize = this._options.retentionGB ? 0 : -Infinity;

try {
dirHandle = await fs.opendir(dir);
const files = await fs.readdir(dir, { withFileTypes: true });
for (const file of files) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not .opendir(), which properly streams, if we're iterating it in a loop anyway?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh yeah, I originally had a reduce-based logic (which is also how the sort got lost...) which is why I switched to read, will switch back

const { id } =
/^(?<id>[a-f0-9]{24})_log(\.gz)?$/i.exec(file.name)?.groups ?? {};

if (!file.isFile() || !id) {
continue;
}

const fullPath = path.join(dir, file.name);
let size: number | undefined;
if (this._options.retentionGB) {
try {
size = (await fs.stat(fullPath)).size;
usedStorageSize += size;
} catch (err) {
this._options.onerror(err as Error, fullPath);
continue;
}
}

sortedLogFiles.push({ fullPath, id, size });
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This being a loop means we should be checking maxDurationMs now here as well

It's also a bit unfortunate that if this part takes longer than the maximum duration, we now don't end up deleting files even if we have already identified that they should be deleted regardless of that (e.g. through the expiration time setting or the max file count setting)

} catch {
return;
}
Expand All @@ -49,48 +78,56 @@ export class MongoLogManager {
// Delete files older than N days
const deletionCutoffTimestamp =
deletionStartTimestamp - this._options.retentionDays * 86400 * 1000;
// Store the known set of least recent files in a heap in order to be able to
// delete all but the most recent N files.
const leastRecentFileHeap = new Heap<{
fileTimestamp: number;
fullPath: string;
}>((a, b) => a.fileTimestamp - b.fileTimestamp);

for await (const dirent of dirHandle) {
const storageSizeLimit = this._options.retentionGB
? this._options.retentionGB * 1024 * 1024 * 1024
: Infinity;

for await (const { id, fullPath } of [...sortedLogFiles]) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit confused – Where is sortedLogFiles being sorted?

// Cap the overall time spent inside this function. Consider situations like
// a large number of machines using a shared network-mounted $HOME directory
// where lots and lots of log files end up and filesystem operations happen
// with network latency.
if (Date.now() - deletionStartTimestamp > maxDurationMs) break;

if (!dirent.isFile()) continue;
const { id } =
/^(?<id>[a-f0-9]{24})_log(\.gz)?$/i.exec(dirent.name)?.groups ?? {};
if (!id) continue;
const fileTimestamp = +new ObjectId(id).getTimestamp();
const fullPath = path.join(dir, dirent.name);
let toDelete: string | undefined;
let toDelete:
| {
fullPath: string;
/** If the file wasn't deleted right away and there is a
* retention size limit, its size should be accounted */
fileSize?: number;
}
| undefined;

// If the file is older than expected, delete it. If the file is recent,
// add it to the list of seen files, and if that list is too large, remove
// the least recent file we've seen so far.
if (fileTimestamp < deletionCutoffTimestamp) {
toDelete = fullPath;
} else if (this._options.maxLogFileCount) {
leastRecentFileHeap.push({ fullPath, fileTimestamp });
if (leastRecentFileHeap.size() > this._options.maxLogFileCount) {
toDelete = leastRecentFileHeap.pop()?.fullPath;
toDelete = {
fullPath,
};
} else if (this._options.retentionGB || this._options.maxLogFileCount) {
const reachedMaxStorageSize = usedStorageSize > storageSizeLimit;
const reachedMaxFileCount =
this._options.maxLogFileCount &&
sortedLogFiles.length > this._options.maxLogFileCount;

if (reachedMaxStorageSize || reachedMaxFileCount) {
toDelete = sortedLogFiles.shift();
}
}

if (!toDelete) continue;
try {
await fs.unlink(toDelete);
await fs.unlink(toDelete.fullPath);
if (toDelete.fileSize) {
usedStorageSize -= toDelete.fileSize;
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (err: any) {
if (err?.code !== 'ENOENT') {
// eslint-disable-next-line @typescript-eslint/no-unsafe-argument
this._options.onerror(err, fullPath);
this._options.onerror(err as Error, fullPath);
}
}
}
Expand Down
Loading