diff --git a/packages/testcontainers/src/wait-strategies/log-wait-strategy.test.ts b/packages/testcontainers/src/wait-strategies/log-wait-strategy.test.ts index 62e537f0c..57961c1ea 100644 --- a/packages/testcontainers/src/wait-strategies/log-wait-strategy.test.ts +++ b/packages/testcontainers/src/wait-strategies/log-wait-strategy.test.ts @@ -54,4 +54,31 @@ describe("LogWaitStrategy", { timeout: 180_000 }, () => { expect(await getRunningContainerNames()).not.toContain(containerName); }); + + it("should throw an error if the message is never received", async () => { + const containerName = `container-${new RandomUuid().nextUuid()}`; + + await expect( + new GenericContainer("cristianrgreco/testcontainer:1.1.14") + .withName(containerName) + .withCommand("/bin/sh", "-c", 'echo "Ready"') + .withWaitStrategy(Wait.forLogMessage("unexpected")) + .start() + ).rejects.toThrowError(`Log stream ended and message "unexpected" was not received`); + + expect(await getRunningContainerNames()).not.toContain(containerName); + }); + + it("does not matter if container does not send all content in a single line", async () => { + const container = await new GenericContainer("cristianrgreco/testcontainer:1.1.14") + .withCommand([ + "node", + "-e", + "process.stdout.write('Hello '); setTimeout(() => process.stdout.write('World\\n'), 2000)", + ]) + .withWaitStrategy(Wait.forLogMessage("Hello World")) + .start(); + + await container.stop(); + }); }); diff --git a/packages/testcontainers/src/wait-strategies/log-wait-strategy.ts b/packages/testcontainers/src/wait-strategies/log-wait-strategy.ts index 005ef113b..a54c3f928 100644 --- a/packages/testcontainers/src/wait-strategies/log-wait-strategy.ts +++ b/packages/testcontainers/src/wait-strategies/log-wait-strategy.ts @@ -1,5 +1,6 @@ import byline from "byline"; import Dockerode from "dockerode"; +import { setTimeout } from "timers/promises"; import { log } from "../common"; import { getContainerRuntimeClient } from "../container-runtime"; import { BoundPorts } from "../utils/bound-ports"; @@ -16,46 +17,37 @@ export class LogWaitStrategy extends AbstractWaitStrategy { } public async waitUntilReady(container: Dockerode.Container, boundPorts: BoundPorts, startTime?: Date): Promise { + await Promise.race([this.handleTimeout(container.id), this.handleLogs(container, startTime)]); + } + + async handleTimeout(containerId: string): Promise { + await setTimeout(this.startupTimeout); + this.throwError(containerId, `Log message "${this.message}" not received after ${this.startupTimeout}ms`); + } + + async handleLogs(container: Dockerode.Container, startTime?: Date): Promise { log.debug(`Waiting for log message "${this.message}"...`, { containerId: container.id }); const client = await getContainerRuntimeClient(); const stream = await client.container.logs(container, { since: startTime ? startTime.getTime() / 1000 : 0 }); - return new Promise((resolve, reject) => { - const timeout = setTimeout(() => { - const message = `Log message "${this.message}" not received after ${this.startupTimeout}ms`; - log.error(message, { containerId: container.id }); - reject(new Error(message)); - }, this.startupTimeout); - - const comparisonFn: (line: string) => boolean = (line: string) => { - if (this.message instanceof RegExp) { - return this.message.test(line); - } else { - return line.includes(this.message); - } - }; - - let count = 0; - const lineProcessor = (line: string) => { - if (comparisonFn(line)) { - if (++count === this.times) { - stream.destroy(); - clearTimeout(timeout); - log.debug(`Log wait strategy complete`, { containerId: container.id }); - resolve(); - } + + let matches = 0; + for await (const line of byline(stream)) { + if (this.matches(line)) { + if (++matches === this.times) { + return log.debug(`Log wait strategy complete`, { containerId: container.id }); } - }; - - byline(stream) - .on("data", lineProcessor) - .on("err", lineProcessor) - .on("end", () => { - stream.destroy(); - clearTimeout(timeout); - const message = `Log stream ended and message "${this.message}" was not received`; - log.error(message, { containerId: container.id }); - reject(new Error(message)); - }); - }); + } + } + + this.throwError(container.id, `Log stream ended and message "${this.message}" was not received`); + } + + matches(line: string): boolean { + return this.message instanceof RegExp ? this.message.test(line) : line.includes(this.message); + } + + throwError(containerId: string, message: string): void { + log.error(message, { containerId }); + throw new Error(message); } }