Skip to content

Commit fb5ceca

Browse files
committed
add opts to executeLine; increase waitForPrompt timeouts in tests
1 parent d2ffa06 commit fb5ceca

File tree

2 files changed

+68
-35
lines changed

2 files changed

+68
-35
lines changed

packages/e2e-tests/test/e2e-streams.spec.ts

Lines changed: 63 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ describe('e2e Streams', function () {
6666
],
6767
removeSigintListeners: true,
6868
});
69-
await shell.waitForPromptOrExit({ timeout: 60_000 });
69+
await shell.waitForPromptOrExit({ timeout: 45_000 });
7070

7171
processorName = `spi${new bson.ObjectId().toHexString()}`;
7272
client = await MongoClient.connect(
@@ -99,7 +99,8 @@ describe('e2e Streams', function () {
9999
const createResult = await shell.executeLine(
100100
`sp.createStreamProcessor("${processorName}", ${JSON.stringify(
101101
aggPipeline
102-
)})`
102+
)})`,
103+
{ timeout: 45_000 }
103104
);
104105
expect(createResult).to.include(
105106
`Atlas Stream Processor: ${processorName}`
@@ -111,7 +112,9 @@ describe('e2e Streams', function () {
111112
await db.dropDatabase();
112113
await client.close();
113114

114-
const result = await shell.executeLine(`sp.${processorName}.drop()`);
115+
const result = await shell.executeLine(`sp.${processorName}.drop()`, {
116+
timeout: 45_000,
117+
});
115118
expect(result).to.include(`{ ok: 1 }`);
116119
} catch (err: any) {
117120
console.error(
@@ -122,7 +125,9 @@ describe('e2e Streams', function () {
122125
});
123126

124127
it('can list stream processors', async function () {
125-
const listResult = await shell.executeLine(`sp.listStreamProcessors()`);
128+
const listResult = await shell.executeLine(`sp.listStreamProcessors()`, {
129+
timeout: 45_000,
130+
});
126131
// make sure the processor created in the beforeEach is present
127132
expect(listResult).to.include(`name: '${processorName}'`);
128133
});
@@ -133,24 +138,27 @@ describe('e2e Streams', function () {
133138
expect(initialDocsCount).to.eq(0);
134139

135140
const startResult = await shell.executeLine(
136-
`sp.${processorName}.start()`
141+
`sp.${processorName}.start()`,
142+
{ timeout: 45_000 }
137143
);
138144
expect(startResult).to.include('{ ok: 1 }');
139145

140-
// sleep for a bit to let the processor do stuff
141-
await sleep(500);
146+
let updatedDocCount = 0;
147+
await eventually(async () => {
148+
updatedDocCount = await collection.countDocuments();
149+
expect(updatedDocCount).to.be.greaterThan(0);
150+
});
142151

143-
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
152+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`, {
153+
timeout: 45_000,
154+
});
144155
expect(stopResult).to.include('{ ok: 1 }');
145156

146-
const updatedDocCount = await collection.countDocuments();
147-
expect(updatedDocCount).to.be.greaterThan(0);
148-
149-
// sleep again to make sure the processor isn't doing any more inserts
150-
await sleep(500);
151-
152-
const countAfterStopping = await collection.countDocuments();
153-
expect(countAfterStopping).to.eq(updatedDocCount);
157+
const statsResult = await shell.executeLine(
158+
`sp.${processorName}.stats()`,
159+
{ timeout: 45_000 }
160+
);
161+
expect(statsResult).to.include(`state: 'STOPPED'`);
154162
});
155163

156164
it(`can modify an existing stream processor's pipeline`, async function () {
@@ -159,14 +167,17 @@ describe('e2e Streams', function () {
159167
const newField = 'newField';
160168

161169
const startResult = await shell.executeLine(
162-
`sp.${processorName}.start()`
170+
`sp.${processorName}.start()`,
171+
{ timeout: 45_000 }
163172
);
164173
expect(startResult).to.include('{ ok: 1 }');
165174

166175
// sleep for a bit to let the processor do stuff
167176
await sleep(500);
168177

169-
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`);
178+
const stopResult = await shell.executeLine(`sp.${processorName}.stop()`, {
179+
timeout: 45_000,
180+
});
170181
expect(stopResult).to.include('{ ok: 1 }');
171182

172183
const initialDocsWithNewField = await collection.countDocuments({
@@ -201,27 +212,29 @@ describe('e2e Streams', function () {
201212
const updatedAggPipeline = [sourceStage, addFieldStage, mergeStage];
202213

203214
const modifyResult = await shell.executeLine(
204-
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`
215+
`sp.${processorName}.modify(${JSON.stringify(updatedAggPipeline)})`,
216+
{ timeout: 45_000 }
205217
);
206218
expect(modifyResult).to.include('{ ok: 1 }');
207219

208220
const secondStartResult = await shell.executeLine(
209-
`sp.${processorName}.start()`
221+
`sp.${processorName}.start()`,
222+
{ timeout: 45_000 }
210223
);
211224
expect(secondStartResult).to.include('{ ok: 1 }');
212225

213-
// sleep again to let the processor work again with the updated pipeline
214-
await sleep(500);
215-
216-
const updatedDocsWithNewField = await collection.countDocuments({
217-
[newField]: { $exists: true },
226+
await eventually(async () => {
227+
const updatedDocsWithNewField = await collection.countDocuments({
228+
[newField]: { $exists: true },
229+
});
230+
expect(updatedDocsWithNewField).to.be.greaterThan(0);
218231
});
219-
expect(updatedDocsWithNewField).to.be.greaterThan(0);
220232
});
221233

222234
it('can view stats for a stream processor', async function () {
223235
const statsResult = await shell.executeLine(
224-
`sp.${processorName}.stats()`
236+
`sp.${processorName}.stats()`,
237+
{ timeout: 45_000 }
225238
);
226239
expect(statsResult).to.include(`name: '${processorName}'`);
227240
expect(statsResult).to.include(`state: 'CREATED'`);
@@ -247,10 +260,14 @@ describe('e2e Streams', function () {
247260
],
248261
removeSigintListeners: true,
249262
});
250-
await shell.waitForPromptOrExit({ timeout: 60_000 });
263+
await shell.waitForPromptOrExit({ timeout: 45_000 });
251264
});
252265

253266
it('should output streamed documents to the shell', async function () {
267+
if (process.platform === 'win32') {
268+
return this.skip(); // No SIGINT on Windows.
269+
}
270+
254271
// this processor is pre-defined on the cloud-dev test project
255272
// it reads from sample solar stream, appends a field with the processor name to each doc, and
256273
// inserts the docs into an Atlas collection
@@ -259,9 +276,16 @@ describe('e2e Streams', function () {
259276
shell.writeInputLine(`sp.${immortalProcessorName}.sample()`);
260277
// data from the sample solar stream isn't deterministic, so just assert that
261278
// the processorName field appears in the shell output after sampling
262-
await eventually(() => {
263-
shell.assertContainsOutput(`processorName: '${immortalProcessorName}'`);
264-
});
279+
await eventually(
280+
() => {
281+
shell.assertContainsOutput(
282+
`processorName: '${immortalProcessorName}'`
283+
);
284+
},
285+
{ timeout: 45_000 }
286+
);
287+
288+
shell.kill('SIGINT');
265289
});
266290
});
267291

@@ -282,12 +306,16 @@ describe('e2e Streams', function () {
282306
],
283307
removeSigintListeners: true,
284308
});
285-
await shell.waitForPromptOrExit({ timeout: 60_000 });
309+
await shell.waitForPromptOrExit({ timeout: 45_000 });
286310

287311
interactiveId = new bson.ObjectId().toHexString();
288312
});
289313

290314
it('should output streamed documents to the shell', async function () {
315+
if (process.platform === 'win32') {
316+
return this.skip(); // No SIGINT on Windows.
317+
}
318+
291319
// the pipeline for our interactive processor reads from sample solar stream, adds a
292320
// unique test id to each document, and inserts it into an Atlas collection
293321
const sourceStage = {
@@ -321,8 +349,10 @@ describe('e2e Streams', function () {
321349
() => {
322350
shell.assertContainsOutput(`interactiveId: '${interactiveId}'`);
323351
},
324-
{ timeout: 60_000 }
352+
{ timeout: 45_000 }
325353
);
354+
355+
shell.kill('SIGINT');
326356
});
327357
});
328358
});

packages/e2e-tests/test/test-shell.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,10 +292,13 @@ export class TestShell {
292292
this.writeInput(`${chars}\n`);
293293
}
294294

295-
async executeLine(line: string): Promise<string> {
295+
async executeLine(
296+
line: string,
297+
opts: { timeout?: number; promptPattern?: RegExp } = {}
298+
): Promise<string> {
296299
const previousOutputLength = this._output.length;
297300
this.writeInputLine(line);
298-
await this.waitForPrompt(previousOutputLength);
301+
await this.waitForPrompt(previousOutputLength, opts);
299302
return this._output.slice(previousOutputLength);
300303
}
301304

0 commit comments

Comments
 (0)