Skip to content

Commit f3ff98c

Browse files
authored
feat(shell-api): add ability to drill into stream processor attributes STREAMS-1136 (#2578)
1 parent ed0f87c commit f3ff98c

File tree

3 files changed

+151
-15
lines changed

3 files changed

+151
-15
lines changed

packages/shell-api/src/stream-processor.ts

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,18 +12,49 @@ import {
1212
import type { Streams } from './streams';
1313
import type { MQLPipeline } from './mql-types';
1414

15+
export type StreamProcessorData = Document & { name: string };
16+
1517
@shellApiClassDefault
1618
export class StreamProcessor extends ShellApiWithMongoClass {
17-
constructor(public _streams: Streams, public name: string) {
19+
private _streams: Streams;
20+
21+
public name: string;
22+
23+
constructor(_streams: Streams, data: StreamProcessorData) {
1824
super();
25+
26+
this._streams = _streams;
27+
this.name = data.name;
28+
29+
// We may overwrite the name property but that should be fine
30+
for (const [key, value] of Object.entries(data)) {
31+
Object.defineProperty(this, key, {
32+
value,
33+
configurable: true,
34+
enumerable: true,
35+
writable: true,
36+
});
37+
}
1938
}
2039

2140
get _mongo(): Mongo {
2241
return this._streams._mongo;
2342
}
2443

2544
[asPrintable]() {
26-
return `Atlas Stream Processor: ${this.name}`;
45+
const result: Document = {};
46+
const descriptors = Object.getOwnPropertyDescriptors(this);
47+
for (const [key, value] of Object.entries(descriptors)) {
48+
if (key.startsWith('_')) {
49+
return;
50+
}
51+
52+
if (value.value && value.enumerable) {
53+
result[key] = value.value;
54+
}
55+
}
56+
57+
return result;
2758
}
2859

2960
@returnsPromise
@@ -142,7 +173,7 @@ export class StreamProcessor extends ShellApiWithMongoClass {
142173
try {
143174
await Promise.race([
144175
this._instanceState.shellApi.sleep(1000), // wait 1 second
145-
interruptable.promise, // unless interruppted
176+
interruptable.promise, // unless interrupted
146177
]);
147178
} finally {
148179
interruptable.destroy();

packages/shell-api/src/streams.spec.ts

Lines changed: 101 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Database } from './database';
66
import { Streams } from './streams';
77
import { InterruptFlag, MongoshInterruptedError } from './interruptor';
88
import type { MongoshInvalidInputError } from '@mongosh/errors';
9+
import { asPrintable } from './enums';
910

1011
describe('Streams', function () {
1112
let mongo: Mongo;
@@ -60,7 +61,7 @@ describe('Streams', function () {
6061

6162
const pipeline = [{ $match: { foo: 'bar' } }];
6263
const result = await streams.createStreamProcessor('spm', pipeline);
63-
expect(result).to.eql(streams.getProcessor('spm'));
64+
expect(result).to.eql(streams.getProcessor({ name: 'spm', pipeline }));
6465

6566
const cmd = { createStreamProcessor: 'spm', pipeline };
6667
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
@@ -172,7 +173,7 @@ describe('Streams', function () {
172173
.resolves({ ok: 1 });
173174
const pipeline = [{ $match: { foo: 'bar' } }];
174175
const processor = await streams.createStreamProcessor(name, pipeline);
175-
expect(processor).to.eql(streams.getProcessor(name));
176+
expect(processor).to.eql(streams.getProcessor({ name, pipeline }));
176177
const cmd = { createStreamProcessor: name, pipeline };
177178
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
178179
return { runCmdStub, processor };
@@ -313,6 +314,104 @@ describe('Streams', function () {
313314
});
314315
});
315316

317+
describe('listStreamProcessors', function () {
318+
it('passes filter parameter correctly', async function () {
319+
const runCmdStub = sinon
320+
.stub(mongo._serviceProvider, 'runCommand')
321+
.resolves({ ok: 1, streamProcessors: [] });
322+
323+
const complexFilter = {
324+
name: { $regex: '^test' },
325+
state: { $in: ['STARTED', 'STOPPED'] },
326+
'options.dlq.connectionName': 'atlas-sql',
327+
};
328+
329+
await streams.listStreamProcessors(complexFilter);
330+
331+
const cmd = { listStreamProcessors: 1, filter: complexFilter };
332+
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
333+
});
334+
335+
it('returns error when command fails', async function () {
336+
const error = { ok: 0, errmsg: 'Command failed' };
337+
sinon.stub(mongo._serviceProvider, 'runCommand').resolves(error);
338+
339+
const filter = { name: 'test' };
340+
const result = await streams.listStreamProcessors(filter);
341+
expect(result).to.eql(error);
342+
});
343+
344+
it('returns empty array when no processors exist', async function () {
345+
const runCmdStub = sinon
346+
.stub(mongo._serviceProvider, 'runCommand')
347+
.resolves({ ok: 1, streamProcessors: [] });
348+
349+
const filter = {};
350+
const result = await streams.listStreamProcessors(filter);
351+
352+
expect(Array.isArray(result)).to.be.true;
353+
expect(result.length).to.equal(0);
354+
355+
const cmd = { listStreamProcessors: 1, filter };
356+
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;
357+
});
358+
359+
it('ensures complete stream processor objects are defined in response', async function () {
360+
const completeProcessor = {
361+
id: '6916541aa9733d72cff41f27',
362+
name: 'complete-processor',
363+
pipeline: [
364+
{ $match: { status: 'active' } },
365+
{ $project: { _id: 1, name: 1, timestamp: 1 } },
366+
],
367+
state: 'STARTED',
368+
tier: 'SP2',
369+
errorMsg: '',
370+
lastModified: new Date('2023-01-01T00:00:00Z'),
371+
lastStateChange: new Date('2023-01-01T00:00:00Z'),
372+
};
373+
374+
sinon
375+
.stub(mongo._serviceProvider, 'runCommand')
376+
.resolves({ ok: 1, streamProcessors: [completeProcessor] });
377+
378+
const result = await streams.listStreamProcessors({});
379+
380+
// Verify the raw processor data is preserved in asPrintable
381+
const rawProcessors = result[asPrintable]();
382+
expect(rawProcessors).to.have.length(1);
383+
384+
// deep comparison to ensure all fields are present an equal
385+
expect(rawProcessors[0]).to.eql(completeProcessor);
386+
});
387+
388+
it('ensures you can drill down into individual processor attributes', async function () {
389+
const completeProcessor = {
390+
id: '6916541aa9733d72cff41f27',
391+
name: 'complete-processor',
392+
pipeline: [
393+
{ $match: { status: 'active' } },
394+
{ $project: { _id: 1, name: 1, timestamp: 1 } },
395+
],
396+
state: 'STARTED',
397+
tier: 'SP2',
398+
errorMsg: '',
399+
lastModified: new Date('2023-01-01T00:00:00Z'),
400+
lastStateChange: new Date('2023-01-01T00:00:00Z'),
401+
};
402+
403+
sinon
404+
.stub(mongo._serviceProvider, 'runCommand')
405+
.resolves({ ok: 1, streamProcessors: [completeProcessor] });
406+
407+
const result = await streams.listStreamProcessors({});
408+
409+
// verify users can access properties on individual processors
410+
expect(result[0].name).to.equal(completeProcessor.name);
411+
expect(result[0].pipeline).to.eql(completeProcessor.pipeline);
412+
});
413+
});
414+
316415
describe('listWorkspaceDefaults', function () {
317416
it('returns tier and maxTierSize', async function () {
318417
const runCmdStub = sinon

packages/shell-api/src/streams.ts

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
shellApiClassDefault,
77
ShellApiWithMongoClass,
88
} from './decorators';
9+
import type { StreamProcessorData } from './stream-processor';
910
import StreamProcessor from './stream-processor';
1011
import { ADMIN_DB, asPrintable, shellApiType } from './enums';
1112
import type { Database, DatabaseWithSchema } from './database';
@@ -34,7 +35,7 @@ export class Streams<
3435
return v;
3536
}
3637
if (typeof prop === 'string' && !prop.startsWith('_')) {
37-
return target.getProcessor(prop);
38+
return target.getProcessor({ name: prop });
3839
}
3940
},
4041
});
@@ -55,8 +56,8 @@ export class Streams<
5556
return 'Atlas Stream Processing';
5657
}
5758

58-
getProcessor(name: string): StreamProcessor {
59-
return new StreamProcessor(this, name);
59+
getProcessor(data: StreamProcessorData): StreamProcessor {
60+
return new StreamProcessor(this, data);
6061
}
6162

6263
@returnsPromise
@@ -82,7 +83,7 @@ export class Streams<
8283
limit: number;
8384
cursorId: number;
8485
};
85-
const sp = this.getProcessor(name);
86+
const sp = this.getProcessor({ name });
8687

8788
async function dropSp() {
8889
try {
@@ -129,8 +130,7 @@ export class Streams<
129130
if (result.ok !== 1) {
130131
return result;
131132
}
132-
133-
return this.getProcessor(name);
133+
return this.getProcessor({ name, pipeline, ...options });
134134
}
135135

136136
@returnsPromise
@@ -143,9 +143,15 @@ export class Streams<
143143
return result;
144144
}
145145
const rawProcessors = result.streamProcessors;
146-
const sps = rawProcessors.map((sp: StreamProcessor) =>
147-
this.getProcessor(sp.name)
148-
);
146+
const sps = rawProcessors
147+
.map((sp: Document) => {
148+
if (sp.name) {
149+
return this.getProcessor(sp as StreamProcessorData);
150+
}
151+
152+
return undefined;
153+
})
154+
.filter((sp: Document | undefined) => !!sp);
149155

150156
return Object.defineProperties(sps, {
151157
[asPrintable]: { value: () => rawProcessors },
@@ -174,7 +180,7 @@ export class Streams<
174180
})) as WorkspaceDefaults;
175181
}
176182

177-
async _runStreamCommand(cmd: Document, options: Document = {}) {
183+
_runStreamCommand(cmd: Document, options: Document = {}): Promise<Document> {
178184
return this._mongo._serviceProvider.runCommand(ADMIN_DB, cmd, options); // run cmd
179185
}
180186
}

0 commit comments

Comments
 (0)