Skip to content

Commit 91680a4

Browse files
committed
fix(mongodb-log-writer): fix flushing when messages are pending MONGOSH-2883
The current `.flush()` method only flushes the target output stream. This is broken behavior, because it can be run at a point in time at which other writes to the log writer itself have already been scheduled, but have not yet been processed (i.e. written to the target stream).
1 parent 04a5246 commit 91680a4

File tree

3 files changed

+30
-3
lines changed

3 files changed

+30
-3
lines changed

packages/mongodb-log-writer/src/mongo-log-manager.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ describe('MongoLogManager', function () {
2828
});
2929

3030
afterEach(async function () {
31-
await fs.rmdir(directory, { recursive: true });
31+
await fs.rm(directory, { recursive: true });
3232
sinon.restore();
3333
});
3434

packages/mongodb-log-writer/src/mongo-log-writer.spec.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,4 +177,25 @@ describe('MongoLogWriter', function () {
177177
new Set(['TypeError'])
178178
);
179179
});
180+
181+
it('flushes pending writes on the MongoLogWriter itself', async function() {
182+
const chunks: string[] = [];
183+
const target = new stream.Writable({
184+
write(chunk, encoding, callback) {
185+
chunks.push(chunk.toString());
186+
// Simulate a 'slow' consumer (i.e. one that does actual I/O,
187+
// as opposed to one that does not do asynchronous work outside
188+
// of microtask queues (promises, nextTick)
189+
setImmediate(callback);
190+
}
191+
});
192+
const w = new MongoLogWriter('id', null, target);
193+
for (let i = 0; i < 5; i++) {
194+
w.info('component', mongoLogId(0), 'ctx', 'msg', { i });
195+
}
196+
await w.flush();
197+
expect(chunks.map(c => c ? JSON.parse(c).attr.i : c)).to.deep.equal([
198+
0, 1, 2, 3, 4, ''
199+
]);
200+
});
180201
});

packages/mongodb-log-writer/src/mongo-log-writer.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ import { EJSON } from 'bson';
22
import { Writable } from 'stream';
33
import { inspect } from 'util';
44

5+
const kFlushDummy = Symbol('kFlushDummy');
6+
57
type PlainWritable = Pick<Writable, 'write' | 'end'>;
68

79
/**
@@ -112,10 +114,14 @@ export class MongoLogWriter extends Writable {
112114
}
113115

114116
_write(
115-
info: MongoLogEntry,
117+
info: MongoLogEntry | typeof kFlushDummy,
116118
encoding: unknown,
117119
callback: (err?: Error | null | undefined) => void
118120
): void {
121+
if (info === kFlushDummy) {
122+
this._target.write('', callback);
123+
return;
124+
}
119125
const validationError = validateLogEntry(info);
120126
if (validationError) {
121127
callback(validationError);
@@ -183,7 +189,7 @@ export class MongoLogWriter extends Writable {
183189

184190
/** Wait until all pending data has been written to the underlying stream. */
185191
async flush(): Promise<void> {
186-
await new Promise((resolve) => this._target.write('', resolve));
192+
await new Promise((resolve) => this.write(kFlushDummy, resolve));
187193
}
188194

189195
/**

0 commit comments

Comments
 (0)