Skip to content

Commit f74918b

Browse files
feat(shell-api): add options in stream processor start, stop, and drop MONGOSH-1920
1 parent dce0b68 commit f74918b

File tree

2 files changed

+62
-6
lines changed

2 files changed

+62
-6
lines changed

packages/shell-api/src/stream-processor.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,30 @@ export default class StreamProcessor extends ShellApiWithMongoClass {
2626
}
2727

2828
@returnsPromise
29-
async start() {
29+
async start(options: Document = {}) {
3030
return await this._streams._runStreamCommand({
3131
startStreamProcessor: this.name,
32+
...options,
3233
});
3334
}
3435

3536
@returnsPromise
36-
async stop() {
37+
async stop(options: Document = {}) {
3738
return await this._streams._runStreamCommand({
3839
stopStreamProcessor: this.name,
40+
...options,
3941
});
4042
}
4143

4244
@returnsPromise
43-
async drop() {
44-
return this._drop();
45+
async drop(options: Document = {}) {
46+
return this._drop(options);
4547
}
4648

47-
async _drop() {
49+
async _drop(options: Document = {}) {
4850
return await this._streams._runStreamCommand({
4951
dropStreamProcessor: this.name,
52+
...options,
5053
});
5154
}
5255

packages/shell-api/src/streams.spec.ts

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import { Streams } from './streams';
77
import { InterruptFlag, MongoshInterruptedError } from './interruptor';
88
import type { MongoshInvalidInputError } from '@mongosh/errors';
99

10-
describe('Streams', function () {
10+
describe.only('Streams', function () {
1111
let mongo: Mongo;
1212
let streams: Streams;
1313
const identity = (a: unknown) => a;
@@ -164,6 +164,59 @@ describe('Streams', function () {
164164
});
165165
});
166166

167+
// Validate supplying options in start,stop, and drop commands.
168+
describe('options', function () {
169+
it('supplies options in start, stop, and drop', async function () {
170+
// Create the stream processor.
171+
const runCmdStub = sinon
172+
.stub(mongo._serviceProvider, 'runCommand')
173+
.resolves({ ok: 1 });
174+
const name = 'optionsTest';
175+
const pipeline = [{ $match: { foo: 'bar' } }];
176+
const processor = await streams.createStreamProcessor(name, pipeline);
177+
expect(processor).to.eql(streams.getProcessor(name));
178+
const cmd = { createStreamProcessor: name, pipeline };
179+
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
180+
181+
// Start the stream processor with an extra option.
182+
await processor.start({ resumeFromCheckpoint: false });
183+
expect(
184+
runCmdStub.calledWithExactly(
185+
'admin',
186+
{ startStreamProcessor: name, resumeFromCheckpoint: false },
187+
{}
188+
)
189+
).to.be.true;
190+
191+
// Stop the stream processor with an extra option.
192+
await processor.stop({ force: true });
193+
expect(
194+
runCmdStub.calledWithExactly(
195+
'admin',
196+
{ stopStreamProcessor: name, force: true },
197+
{}
198+
)
199+
).to.be.true;
200+
201+
// Drop the stream processor with a few extra options.
202+
const opts = {
203+
force: true,
204+
ttl: { unit: 'day', size: 30 },
205+
};
206+
await processor.drop(opts);
207+
expect(
208+
runCmdStub.calledWithExactly(
209+
'admin',
210+
{
211+
dropStreamProcessor: name,
212+
...opts,
213+
},
214+
{}
215+
)
216+
).to.be.true;
217+
});
218+
});
219+
167220
describe('modify', function () {
168221
it('throws with invalid parameters', async function () {
169222
// Create the stream processor.

0 commit comments

Comments
 (0)