Skip to content

Commit d179a97

Browse files
committed
write tests
1 parent 5f3f1a5 commit d179a97

File tree

11 files changed

+234
-63
lines changed

11 files changed

+234
-63
lines changed

__tests__/allocation.test.ts

Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
import { beforeEach, describe, test, expect, vi, assert } from 'vitest';
2+
import { TestSetupHelpers, transports } from '../testUtil/fixtures/transports';
3+
import { BinaryCodec, Codec } from '../codec';
4+
import {
5+
advanceFakeTimersByHeartbeat,
6+
createPostTestCleanups,
7+
} from '../testUtil/fixtures/cleanup';
8+
import { createServer } from '../router/server';
9+
import { createClient } from '../router/client';
10+
import { TestServiceSchema } from '../testUtil/fixtures/services';
11+
import { waitFor } from '../testUtil/fixtures/cleanup';
12+
import { numberOfConnections, closeAllConnections } from '../testUtil';
13+
import { cleanupTransports } from '../testUtil/fixtures/cleanup';
14+
import { testFinishesCleanly } from '../testUtil/fixtures/cleanup';
15+
import { ProtocolError } from '../transport/events';
16+
17+
let isOom = false;
18+
// simulate RangeError: Array buffer allocation failed
19+
const OomableCodec: Codec = {
20+
toBuffer(obj) {
21+
if (isOom) {
22+
throw new RangeError('failed allocation');
23+
}
24+
25+
return BinaryCodec.toBuffer(obj);
26+
},
27+
fromBuffer: (buff: Uint8Array) => {
28+
return BinaryCodec.fromBuffer(buff);
29+
},
30+
};
31+
32+
describe.each(transports)(
33+
'failed allocation test ($name transport)',
34+
async (transport) => {
35+
const clientOpts = { codec: OomableCodec };
36+
const serverOpts = { codec: BinaryCodec };
37+
38+
const { addPostTestCleanup, postTestCleanup } = createPostTestCleanups();
39+
let getClientTransport: TestSetupHelpers['getClientTransport'];
40+
let getServerTransport: TestSetupHelpers['getServerTransport'];
41+
beforeEach(async () => {
42+
// only allow client to oom, server has sane oom handling already
43+
const setup = await transport.setup({
44+
client: clientOpts,
45+
server: serverOpts,
46+
});
47+
getClientTransport = setup.getClientTransport;
48+
getServerTransport = setup.getServerTransport;
49+
isOom = false;
50+
51+
return async () => {
52+
await postTestCleanup();
53+
await setup.cleanup();
54+
};
55+
});
56+
57+
test('oom during heartbeat kills the session, client starts new session', async () => {
58+
// setup
59+
const clientTransport = getClientTransport('client');
60+
const serverTransport = getServerTransport();
61+
const services = { test: TestServiceSchema };
62+
const server = createServer(serverTransport, services);
63+
const client = createClient<typeof services>(
64+
clientTransport,
65+
serverTransport.clientId,
66+
);
67+
68+
const errMock = vi.fn();
69+
clientTransport.addEventListener('protocolError', errMock);
70+
addPostTestCleanup(async () => {
71+
clientTransport.removeEventListener('protocolError', errMock);
72+
await cleanupTransports([clientTransport, serverTransport]);
73+
});
74+
75+
// establish initial connection
76+
const result = await client.test.add.rpc({ n: 1 });
77+
expect(result).toStrictEqual({ ok: true, payload: { result: 1 } });
78+
79+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
80+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
81+
const oldClientSession = serverTransport.sessions.get('client');
82+
const oldServerSession = clientTransport.sessions.get('SERVER');
83+
assert(oldClientSession);
84+
assert(oldServerSession);
85+
86+
// simulate some OOM during heartbeat
87+
for (let i = 0; i < 5; i++) {
88+
isOom = i % 2 === 0;
89+
await advanceFakeTimersByHeartbeat();
90+
}
91+
92+
// verify session on client is dead
93+
await waitFor(() => expect(clientTransport.sessions.size).toBe(0));
94+
95+
// verify we got MessageSendFailure errors
96+
await waitFor(() => {
97+
expect(errMock).toHaveBeenCalledWith(
98+
expect.objectContaining({
99+
type: ProtocolError.MessageSendFailure,
100+
}),
101+
);
102+
});
103+
104+
// client should be able to reconnect and make new calls
105+
isOom = false;
106+
const result2 = await client.test.add.rpc({ n: 2 });
107+
expect(result2).toStrictEqual({ ok: true, payload: { result: 3 } });
108+
109+
// verify new session IDs are different from old ones
110+
const newClientSession = serverTransport.sessions.get('client');
111+
const newServerSession = clientTransport.sessions.get('SERVER');
112+
assert(newClientSession);
113+
assert(newServerSession);
114+
expect(newClientSession.id).not.toBe(oldClientSession.id);
115+
expect(newServerSession.id).not.toBe(oldServerSession.id);
116+
117+
await testFinishesCleanly({
118+
clientTransports: [clientTransport],
119+
serverTransport,
120+
server,
121+
});
122+
});
123+
124+
test('oom during handshake kills the session, client starts new session', async () => {
125+
// setup
126+
const clientTransport = getClientTransport('client');
127+
const serverTransport = getServerTransport();
128+
const services = { test: TestServiceSchema };
129+
const server = createServer(serverTransport, services);
130+
const client = createClient<typeof services>(
131+
clientTransport,
132+
serverTransport.clientId,
133+
);
134+
const errMock = vi.fn();
135+
clientTransport.addEventListener('protocolError', errMock);
136+
addPostTestCleanup(async () => {
137+
clientTransport.removeEventListener('protocolError', errMock);
138+
await cleanupTransports([clientTransport, serverTransport]);
139+
});
140+
141+
// establish initial connection
142+
await client.test.add.rpc({ n: 1 });
143+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(1));
144+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(1));
145+
146+
// close connection to force reconnection
147+
closeAllConnections(clientTransport);
148+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0));
149+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0));
150+
151+
// simulate OOM during handshake
152+
isOom = true;
153+
clientTransport.connect('SERVER');
154+
await waitFor(() => expect(numberOfConnections(serverTransport)).toBe(0));
155+
await waitFor(() => expect(numberOfConnections(clientTransport)).toBe(0));
156+
157+
await waitFor(() => {
158+
expect(errMock).toHaveBeenCalledWith(
159+
expect.objectContaining({
160+
type: ProtocolError.MessageSendFailure,
161+
}),
162+
);
163+
});
164+
165+
// client should be able to reconnect and make new calls
166+
isOom = false;
167+
const result = await client.test.add.rpc({ n: 2 });
168+
expect(result).toStrictEqual({ ok: true, payload: { result: 3 } });
169+
170+
await testFinishesCleanly({
171+
clientTransports: [clientTransport],
172+
serverTransport,
173+
server,
174+
});
175+
});
176+
},
177+
);

codec/adapter.ts

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,9 @@ export class CodecMessageAdapter {
1717
value: this.codec.toBuffer(msg),
1818
};
1919
} catch (e) {
20-
const error =
21-
e instanceof Error
22-
? e
23-
: new Error(`serialize error: ${coerceErrorString(e)}`);
24-
2520
return {
2621
ok: false,
27-
value: {
28-
code: 'serialize_error',
29-
error,
30-
},
22+
reason: coerceErrorString(e),
3123
};
3224
}
3325
}
@@ -38,10 +30,7 @@ export class CodecMessageAdapter {
3830
if (!Value.Check(OpaqueTransportMessageSchema, parsedMsg)) {
3931
return {
4032
ok: false,
41-
value: {
42-
code: 'deserialize_error',
43-
error: new Error('transport message schema mismatch'),
44-
},
33+
reason: 'transport message schema mismatch',
4534
};
4635
}
4736

@@ -50,17 +39,9 @@ export class CodecMessageAdapter {
5039
value: parsedMsg,
5140
};
5241
} catch (e) {
53-
const error =
54-
e instanceof Error
55-
? e
56-
: new Error(`deserialize error: ${coerceErrorString(e)}`);
57-
5842
return {
5943
ok: false,
60-
value: {
61-
code: 'deserialize_error',
62-
error,
63-
},
44+
reason: coerceErrorString(e),
6445
};
6546
}
6647
}

transport/client.ts

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,29 +297,37 @@ export abstract class ClientTransport<
297297
transportMessage: msg,
298298
});
299299

300-
this.deleteSession(connectedSession, { unhealthy: true });
301300
this.protocolError({
302301
type: ProtocolError.InvalidMessage,
303302
message: reason,
304303
});
304+
this.deleteSession(connectedSession, { unhealthy: true });
305305
},
306-
onMessageSendFailure: (msg, code) => {
307-
this.log?.error(`failed to send message: ${code}`, {
306+
onMessageSendFailure: (msg, reason) => {
307+
this.log?.error(`failed to send message: ${reason}`, {
308308
...connectedSession.loggingMetadata,
309309
transportMessage: msg,
310310
});
311311

312+
this.protocolError({
313+
type: ProtocolError.MessageSendFailure,
314+
message: reason,
315+
});
312316
this.deleteSession(connectedSession, { unhealthy: true });
313317
},
314318
});
315319

316320
const res = connectedSession.sendBufferedMessages();
317321
if (res && !res.ok) {
318-
this.log?.error(`failed to send buffered messages: ${res.value.code}`, {
322+
this.log?.error(`failed to send buffered messages: ${res.reason}`, {
319323
...connectedSession.loggingMetadata,
320324
transportMessage: msg,
321325
});
322326

327+
this.protocolError({
328+
type: ProtocolError.MessageSendFailure,
329+
message: res.reason,
330+
});
323331
this.deleteSession(connectedSession, { unhealthy: true });
324332

325333
return;
@@ -498,11 +506,15 @@ export abstract class ClientTransport<
498506

499507
const res = session.sendHandshake(requestMsg);
500508
if (!res.ok) {
501-
this.log?.error(`failed to send handshake request: ${res.value.code}`, {
509+
this.log?.error(`failed to send handshake request: ${res.reason}`, {
502510
...session.loggingMetadata,
503511
transportMessage: requestMsg,
504512
});
505513

514+
this.protocolError({
515+
type: ProtocolError.MessageSendFailure,
516+
message: res.reason,
517+
});
506518
this.deleteSession(session, { unhealthy: true });
507519
}
508520
}

transport/events.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export const ProtocolError = {
1010
HandshakeFailed: 'handshake_failed',
1111
MessageOrderingViolated: 'message_ordering_violated',
1212
InvalidMessage: 'invalid_message',
13+
MessageSendFailure: 'message_send_failure',
1314
} as const;
1415

1516
export type ProtocolErrorType =

transport/results.ts

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,16 @@
11
import { OpaqueTransportMessage } from './message';
22

33
// internal use only, not to be used in public API
4-
type Result<T, Code extends string> =
4+
type Result<T> =
55
| {
66
ok: true;
77
value: T;
88
}
99
| {
1010
ok: false;
11-
value: {
12-
code: Code;
13-
error: Error;
14-
};
11+
reason: string;
1512
};
1613

17-
export type SendErrorCode = 'send_error';
18-
export type SerializeErrorCode = 'serialize_error';
19-
export type DeserializeErrorCode = 'deserialize_error';
20-
21-
export type SendResult = Result<string, SendErrorCode | SerializeErrorCode>;
22-
export type SerializeResult = Result<Uint8Array, SerializeErrorCode>;
23-
export type DeserializeResult = Result<
24-
OpaqueTransportMessage,
25-
DeserializeErrorCode
26-
>;
14+
export type SendResult = Result<string>;
15+
export type SerializeResult = Result<Uint8Array>;
16+
export type DeserializeResult = Result<OpaqueTransportMessage>;

0 commit comments

Comments
 (0)