Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
82 changes: 78 additions & 4 deletions index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ interface Options {
logger?: Pick<Console, Severity>;
queryForwarding?: boolean;
fetch?: any;
maxConnectionTimeout?: number;
forward?: boolean;
}

const proxyAgent = new EnvHttpProxyAgent();
Expand All @@ -28,6 +30,8 @@ class SmeeClient {
#logger: Pick<Console, Severity>;
#events: EventSource | null = null;
#queryForwarding: boolean = true;
#maxConnectionTimeout: number | undefined;
#forward: boolean | undefined = undefined;

#onerror: (err: ErrorEvent) => void = (err) => {
if (this.#events?.readyState === EventSource.CLOSED) {
Expand All @@ -40,6 +44,10 @@ class SmeeClient {
#onopen: () => void = () => {};

#onmessage: (msg: MessageEvent) => Promise<void> = async (msg) => {
if (!this.#forward) {
return;
}

const data = JSON.parse(msg.data);

const target = url.parse(this.#target, true);
Expand Down Expand Up @@ -90,13 +98,17 @@ class SmeeClient {
target,
logger = console,
fetch = undiciFetch,
maxConnectionTimeout,
queryForwarding = true,
forward,
}: Options) {
this.#source = source;
this.#target = target;
this.#logger = logger!;
this.#fetch = fetch;
this.#queryForwarding = queryForwarding;
this.#maxConnectionTimeout = maxConnectionTimeout;
this.#forward = forward;

if (
!validator.isURL(this.#source, {
Expand Down Expand Up @@ -196,10 +208,14 @@ class SmeeClient {
// Reconnect immediately
(events as any).reconnectInterval = 0; // This isn't a valid property of EventSource

const connected = new Promise<void>((resolve, reject) => {
const establishConnection = new Promise<void>((resolve, reject) => {
events.addEventListener("open", () => {
this.#logger.info(`Connected to ${this.#source}`);
events.removeEventListener("error", reject);

if (this.#forward !== false) {
this.#startForwarding();
}
resolve();
});
events.addEventListener("error", reject);
Expand All @@ -221,20 +237,78 @@ class SmeeClient {
events.onerror = this.#events_onerror;
}

this.#logger.info(`Forwarding ${this.#source} to ${this.#target}`);

await connected;
if (this.#maxConnectionTimeout !== undefined) {
const timeoutConnection = new Promise<void>((_, reject) => {
setTimeout(async () => {
if (events.readyState === EventSource.OPEN) {
// If the connection is already open, we don't need to reject
return;
}

this.#logger.error(
`Connection to ${this.#source} timed out after ${this.#maxConnectionTimeout}ms`,
);
reject(
new Error(
`Connection to ${this.#source} timed out after ${this.#maxConnectionTimeout}ms`,
),
);
await this.stop();
}, this.#maxConnectionTimeout)?.unref();
});
await Promise.race([establishConnection, timeoutConnection]);
} else {
await establishConnection;
}

return events;
}

async stop() {
if (this.#events) {
this.#stopForwarding();
this.#events.close();
this.#events = null as any;
this.#forward = undefined;
this.#logger.info("Connection closed");
}
}

#startForwarding() {
if (this.#forward === true) {
return;
}
this.#forward = true;
this.#logger.info(`Forwarding ${this.#source} to ${this.#target}`);
}

startForwarding() {
if (this.#forward === true) {
this.#logger.info(
`Forwarding ${this.#source} to ${this.#target} is already enabled`,
);
return;
}
this.#startForwarding();
}

#stopForwarding() {
if (this.#forward !== true) {
return;
}
this.#forward = false;
this.#logger.info(`Stopped forwarding ${this.#source} to ${this.#target}`);
}

stopForwarding() {
if (this.#forward !== true) {
this.#logger.info(
`Forwarding ${this.#source} to ${this.#target} is already disabled`,
);
return;
}
this.#stopForwarding();
}
}

export {
Expand Down
4 changes: 2 additions & 2 deletions test/connection-error.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ describe("connection", () => {

expect(logger.infoCalls.length).toBe(2);
expect(logger.infoCalls[0][0]).toBe(
`Forwarding ${smeeServer.channelUrl} to ${webhookServer.url}`,
`Connected to ${smeeServer.channelUrl}`,
);
expect(logger.infoCalls[1][0]).toBe(
`Connected to ${smeeServer.channelUrl}`,
`Forwarding ${smeeServer.channelUrl} to ${webhookServer.url}`,
);

await smeeClient.stop();
Expand Down
58 changes: 58 additions & 0 deletions test/connection-timeout.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { describe, test, expect } from "vitest";

import { SmeeServer } from "./helpers/smee-server.ts";
import { VoidLogger } from "./helpers/void-logger.ts";
import { WebhookServer } from "./helpers/webhook-server.ts";

import SmeeClient from "../index.ts";

describe("connection timeout", () => {
test("should throw a connection timeout", async () => {
const webhookServer = new WebhookServer({
handler: async (req, res) => {
await new Promise((resolve) => setTimeout(resolve, 2000));
res.writeHead(200).end("OK");
},
});
await webhookServer.start();

const smeeServer = new SmeeServer();
await smeeServer.start();

const logger = new VoidLogger();

const smeeClient = new SmeeClient({
source: webhookServer.url,
target: webhookServer.url,
maxConnectionTimeout: 500,
logger,
});

expect(smeeClient.onopen).toBe(null);
expect(logger.infoCalls.length).toBe(0);

try {
await smeeClient.start();
throw new Error("Expected start to throw due to timeout");
} catch (error) {
expect(error).toBeInstanceOf(Error);
expect(error.message).toBe(
`Connection to ${webhookServer.url} timed out after 500ms`,
);
}

await new Promise((resolve) => setTimeout(resolve, 100));

expect(logger.errorCalls.length).toBe(1);
expect(logger.errorCalls[0][0]).toBe(
`Connection to ${webhookServer.url} timed out after 500ms`,
);

expect(logger.infoCalls.length).toBe(1);
expect(logger.infoCalls[0][0]).toBe(`Connection closed`);

await smeeClient.stop();
await smeeServer.stop();
await webhookServer.stop();
});
});
Loading