-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathcleanup.ts
More file actions
178 lines (155 loc) Β· 4.99 KB
/
cleanup.ts
File metadata and controls
178 lines (155 loc) Β· 4.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import { assert, expect, vi } from 'vitest';
import {
ClientTransport,
Connection,
ServerTransport,
Transport,
} from '../../transport';
import { Server } from '../../router';
import { AnyServiceSchemaMap, MaybeDisposable } from '../../router/services';
import { numberOfConnections, testingSessionOptions } from '..';
import { Value } from '@sinclair/typebox/value';
import { ControlMessageAckSchema } from '../../transport/message';
const waitUntilOptions = {
timeout: 500, // account for possibility of conn backoff
interval: 5, // check every 5ms
};
export async function advanceFakeTimersByHeartbeat() {
await vi.advanceTimersByTimeAsync(testingSessionOptions.heartbeatIntervalMs);
}
export async function advanceFakeTimersByDisconnectGrace() {
for (let i = 0; i < testingSessionOptions.heartbeatsUntilDead + 1; i++) {
await advanceFakeTimersByHeartbeat();
}
}
export async function advanceFakeTimersBySessionGrace() {
await vi.advanceTimersByTimeAsync(
testingSessionOptions.sessionDisconnectGraceMs,
);
}
export async function advanceFakeTimersByConnectionBackoff() {
await vi.advanceTimersByTimeAsync(500);
}
export async function ensureTransportIsClean(t: Transport<Connection>) {
await advanceFakeTimersBySessionGrace();
await waitFor(() =>
expect(
t.sessions,
`[post-test cleanup] transport ${t.clientId} should not have open sessions after the test`,
).toStrictEqual(new Map()),
);
await waitFor(() =>
expect(
numberOfConnections(t),
`[post-test cleanup] transport ${t.clientId} should not have open connections after the test`,
).toBe(0),
);
}
export function waitFor<T>(cb: () => T | Promise<T>) {
return vi.waitFor(cb, waitUntilOptions);
}
export async function ensureTransportBuffersAreEventuallyEmpty(
t: Transport<Connection>,
) {
// wait for send buffers to be flushed
// ignore heartbeat messages
await waitFor(() =>
expect(
new Map(
[...t.sessions]
.map(([client, sess]) => {
// get all messages that are not heartbeats
const buff = sess.sendBuffer.filter((encodedMsg) => {
const decoded = sess.codec.fromBuffer(encodedMsg.data);
assert(decoded.ok);
return !Value.Check(
ControlMessageAckSchema,
decoded.value.payload,
);
});
return [client, buff] as const;
})
.filter((entry) => entry[1].length > 0),
),
`[post-test cleanup] transport ${t.clientId} should not have any messages waiting to send after the test`,
).toStrictEqual(new Map()),
);
}
export async function ensureServerIsClean(
s: Server<MaybeDisposable, object, AnyServiceSchemaMap>,
) {
return waitFor(() =>
expect(
s.streams,
`[post-test cleanup] server should not have any open streams after the test`,
).toStrictEqual(new Map()),
);
}
export async function cleanupTransports<ConnType extends Connection>(
transports: Array<Transport<ConnType>>,
) {
for (const t of transports) {
if (t.getStatus() !== 'closed') {
t.log?.info('*** end of test cleanup ***', { clientId: t.clientId });
t.close();
}
}
}
export async function testFinishesCleanly({
clientTransports,
serverTransport,
server,
}: Partial<{
clientTransports: Array<ClientTransport<Connection>>;
// MetadataSchema and ParsedMetadata are not used in this test,
// so we can safely use any here
// eslint-disable-next-line @typescript-eslint/no-explicit-any
serverTransport: ServerTransport<Connection, any, any>;
server: Server<MaybeDisposable, object, AnyServiceSchemaMap>;
}>) {
// pre-close invariants
// invariant check servers first as heartbeats are authoritative on their side
const allTransports = [
...(serverTransport ? [serverTransport] : []),
...(clientTransports ?? []),
];
for (const t of allTransports) {
t.log?.info('*** end of test invariant checks ***', {
clientId: t.clientId,
});
}
// wait for one round of heartbeats to propagate
await advanceFakeTimersByHeartbeat();
// make sure clients have sent everything
for (const t of clientTransports ?? []) {
await ensureTransportBuffersAreEventuallyEmpty(t);
}
// wait for one round of heartbeats to propagate
await advanceFakeTimersByHeartbeat();
// make sure servers finally received everything
if (serverTransport) {
await ensureTransportBuffersAreEventuallyEmpty(serverTransport);
}
if (server) {
await ensureServerIsClean(server);
}
// close all the things
await cleanupTransports(allTransports);
// post-close invariants
for (const t of allTransports) {
await ensureTransportIsClean(t);
}
}
export const createPostTestCleanups = () => {
const cleanupFns: Array<() => Promise<void>> = [];
return {
addPostTestCleanup: (fn: () => Promise<void>) => {
cleanupFns.push(fn);
},
postTestCleanup: async () => {
while (cleanupFns.length > 0) {
await cleanupFns.pop()?.();
}
},
};
};