diff --git a/packages/mongodb-log-writer/src/mongo-log-manager.spec.ts b/packages/mongodb-log-writer/src/mongo-log-manager.spec.ts index 39eea94a..2c36f8d6 100644 --- a/packages/mongodb-log-writer/src/mongo-log-manager.spec.ts +++ b/packages/mongodb-log-writer/src/mongo-log-manager.spec.ts @@ -28,7 +28,7 @@ describe('MongoLogManager', function () { }); afterEach(async function () { - await fs.rmdir(directory, { recursive: true }); + await fs.rm(directory, { recursive: true }); sinon.restore(); }); diff --git a/packages/mongodb-log-writer/src/mongo-log-writer.spec.ts b/packages/mongodb-log-writer/src/mongo-log-writer.spec.ts index 7f03f654..b32afaa6 100644 --- a/packages/mongodb-log-writer/src/mongo-log-writer.spec.ts +++ b/packages/mongodb-log-writer/src/mongo-log-writer.spec.ts @@ -177,4 +177,25 @@ describe('MongoLogWriter', function () { new Set(['TypeError']) ); }); + + it('flushes pending writes on the MongoLogWriter itself', async function() { + const chunks: string[] = []; + const target = new stream.Writable({ + write(chunk, encoding, callback) { + chunks.push(chunk.toString()); + // Simulate a 'slow' consumer (i.e. one that does actual I/O, + // as opposed to one that does not do asynchronous work outside + // of microtask queues (promises, nextTick) + setImmediate(callback); + } + }); + const w = new MongoLogWriter('id', null, target); + for (let i = 0; i < 5; i++) { + w.info('component', mongoLogId(0), 'ctx', 'msg', { i }); + } + await w.flush(); + expect(chunks.map(c => c ? JSON.parse(c).attr.i : c)).to.deep.equal([ + 0, 1, 2, 3, 4, '' + ]); + }); }); diff --git a/packages/mongodb-log-writer/src/mongo-log-writer.ts b/packages/mongodb-log-writer/src/mongo-log-writer.ts index befabacf..b66eddd9 100644 --- a/packages/mongodb-log-writer/src/mongo-log-writer.ts +++ b/packages/mongodb-log-writer/src/mongo-log-writer.ts @@ -2,6 +2,8 @@ import { EJSON } from 'bson'; import { Writable } from 'stream'; import { inspect } from 'util'; +const kFlushDummy = Symbol('kFlushDummy'); + type PlainWritable = Pick; /** @@ -112,10 +114,14 @@ export class MongoLogWriter extends Writable { } _write( - info: MongoLogEntry, + info: MongoLogEntry | typeof kFlushDummy, encoding: unknown, callback: (err?: Error | null | undefined) => void ): void { + if (info === kFlushDummy) { + this._target.write('', callback); + return; + } const validationError = validateLogEntry(info); if (validationError) { callback(validationError); @@ -183,7 +189,7 @@ export class MongoLogWriter extends Writable { /** Wait until all pending data has been written to the underlying stream. */ async flush(): Promise { - await new Promise((resolve) => this._target.write('', resolve)); + await new Promise((resolve) => this.write(kFlushDummy, resolve)); } /**