Skip to content

Commit 52f2d08

Browse files
committed
chore: QueueMessage is just string
1 parent bdec925 commit 52f2d08

File tree

4 files changed

+47
-91
lines changed

4 files changed

+47
-91
lines changed

packages/ocap-kernel/src/remotes/MessageQueue.test.ts

Lines changed: 26 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { describe, it, expect, beforeEach } from 'vitest';
22

33
import { MessageQueue } from './MessageQueue.ts';
4-
import type { QueuedMessage } from './MessageQueue.ts';
54

65
describe('MessageQueue', () => {
76
let queue: MessageQueue;
@@ -34,12 +33,8 @@ describe('MessageQueue', () => {
3433
queue.enqueue('message2');
3534

3635
expect(queue).toHaveLength(2);
37-
expect(queue.messages[0]).toStrictEqual({
38-
message: 'message1',
39-
});
40-
expect(queue.messages[1]).toStrictEqual({
41-
message: 'message2',
42-
});
36+
expect(queue.messages[0]).toBe('message1');
37+
expect(queue.messages[1]).toBe('message2');
4338
});
4439

4540
it('drops oldest message when at capacity', () => {
@@ -55,9 +50,9 @@ describe('MessageQueue', () => {
5550
smallQueue.enqueue('msg4');
5651

5752
expect(smallQueue).toHaveLength(3);
58-
expect(smallQueue.messages[0]?.message).toBe('msg2');
59-
expect(smallQueue.messages[1]?.message).toBe('msg3');
60-
expect(smallQueue.messages[2]?.message).toBe('msg4');
53+
expect(smallQueue.messages[0]).toBe('msg2');
54+
expect(smallQueue.messages[1]).toBe('msg3');
55+
expect(smallQueue.messages[2]).toBe('msg4');
6156
});
6257

6358
it('maintains FIFO order when dropping messages', () => {
@@ -69,10 +64,7 @@ describe('MessageQueue', () => {
6964
smallQueue.enqueue('fourth');
7065

7166
// Should have dropped 'first' and 'second'
72-
expect(smallQueue.messages).toStrictEqual([
73-
{ message: 'third' },
74-
{ message: 'fourth' },
75-
]);
67+
expect(smallQueue.messages).toStrictEqual(['third', 'fourth']);
7668
});
7769
});
7870

@@ -83,11 +75,9 @@ describe('MessageQueue', () => {
8375

8476
const dequeued = queue.dequeue();
8577

86-
expect(dequeued).toStrictEqual({
87-
message: 'first',
88-
});
78+
expect(dequeued).toBe('first');
8979
expect(queue).toHaveLength(1);
90-
expect(queue.messages[0]?.message).toBe('second');
80+
expect(queue.messages[0]).toBe('second');
9181
});
9282

9383
it('returns undefined for empty queue', () => {
@@ -99,9 +89,9 @@ describe('MessageQueue', () => {
9989
queue.enqueue('2');
10090
queue.enqueue('3');
10191

102-
expect(queue.dequeue()?.message).toBe('1');
103-
expect(queue.dequeue()?.message).toBe('2');
104-
expect(queue.dequeue()?.message).toBe('3');
92+
expect(queue.dequeue()).toBe('1');
93+
expect(queue.dequeue()).toBe('2');
94+
expect(queue.dequeue()).toBe('3');
10595
expect(queue.dequeue()).toBeUndefined();
10696
});
10797
});
@@ -114,11 +104,7 @@ describe('MessageQueue', () => {
114104

115105
const allMessages = queue.dequeueAll();
116106

117-
expect(allMessages).toStrictEqual([
118-
{ message: 'msg1' },
119-
{ message: 'msg2' },
120-
{ message: 'msg3' },
121-
]);
107+
expect(allMessages).toStrictEqual(['msg1', 'msg2', 'msg3']);
122108
expect(queue).toHaveLength(0);
123109
expect(queue.messages).toStrictEqual([]);
124110
});
@@ -134,7 +120,7 @@ describe('MessageQueue', () => {
134120
queue.enqueue('msg');
135121

136122
const result = queue.dequeueAll();
137-
result.push({ message: 'extra' });
123+
result.push('extra');
138124

139125
// Queue should still be empty after dequeueAll
140126
expect(queue).toHaveLength(0);
@@ -151,8 +137,8 @@ describe('MessageQueue', () => {
151137
queue.dropOldest();
152138

153139
expect(queue).toHaveLength(2);
154-
expect(queue.messages[0]?.message).toBe('second');
155-
expect(queue.messages[1]?.message).toBe('third');
140+
expect(queue.messages[0]).toBe('second');
141+
expect(queue.messages[1]).toBe('third');
156142
});
157143

158144
it('handles empty queue gracefully', () => {
@@ -195,7 +181,7 @@ describe('MessageQueue', () => {
195181
queue.enqueue('after');
196182

197183
expect(queue).toHaveLength(1);
198-
expect(queue.messages[0]?.message).toBe('after');
184+
expect(queue.messages[0]).toBe('after');
199185
});
200186
});
201187

@@ -224,10 +210,7 @@ describe('MessageQueue', () => {
224210

225211
const { messages } = queue;
226212

227-
expect(messages).toStrictEqual([
228-
{ message: 'msg1' },
229-
{ message: 'msg2' },
230-
]);
213+
expect(messages).toStrictEqual(['msg1', 'msg2']);
231214

232215
// TypeScript enforces read-only at compile time
233216
// At runtime, verify the array reference is the internal one
@@ -246,7 +229,7 @@ describe('MessageQueue', () => {
246229
queue.dequeue();
247230
const messages3 = queue.messages;
248231
expect(messages3).toHaveLength(1);
249-
expect(messages3[0]?.message).toBe('second');
232+
expect(messages3[0]).toBe('second');
250233
});
251234
});
252235

@@ -255,11 +238,7 @@ describe('MessageQueue', () => {
255238
queue.enqueue('old1');
256239
queue.enqueue('old2');
257240

258-
const newMessages: QueuedMessage[] = [
259-
{ message: 'new1' },
260-
{ message: 'new2' },
261-
{ message: 'new3' },
262-
];
241+
const newMessages: string[] = ['new1', 'new2', 'new3'];
263242

264243
queue.replaceAll(newMessages);
265244

@@ -277,29 +256,23 @@ describe('MessageQueue', () => {
277256
});
278257

279258
it('is not affected by changes to input array', () => {
280-
const messages: QueuedMessage[] = [{ message: 'msg1' }];
259+
const messages: string[] = ['msg1'];
281260

282261
queue.replaceAll(messages);
283262

284263
// Modify the input array
285-
messages.push({ message: 'msg2' });
286-
messages[0] = { message: 'modified' };
264+
messages.push('msg2');
265+
messages[0] = 'modified';
287266

288267
// Queue should not be affected
289268
expect(queue).toHaveLength(1);
290-
expect(queue.messages[0]).toStrictEqual({
291-
message: 'msg1',
292-
});
269+
expect(queue.messages[0]).toBe('msg1');
293270
});
294271

295272
it('works when replacing with more messages than capacity', () => {
296273
const smallQueue = new MessageQueue(2);
297274

298-
const messages: QueuedMessage[] = [
299-
{ message: 'msg1' },
300-
{ message: 'msg2' },
301-
{ message: 'msg3' },
302-
];
275+
const messages: string[] = ['msg1', 'msg2', 'msg3'];
303276

304277
smallQueue.replaceAll(messages);
305278

@@ -316,15 +289,15 @@ describe('MessageQueue', () => {
316289
queue.enqueue('msg2');
317290

318291
const first = queue.dequeue();
319-
expect(first?.message).toBe('msg1');
292+
expect(first).toBe('msg1');
320293

321294
queue.enqueue('msg3');
322295
queue.enqueue('msg4');
323296

324297
expect(queue).toHaveLength(3);
325298

326299
queue.dropOldest();
327-
expect(queue.messages[0]?.message).toBe('msg3');
300+
expect(queue.messages[0]).toBe('msg3');
328301

329302
const all = queue.dequeueAll();
330303
expect(all).toHaveLength(2);

packages/ocap-kernel/src/remotes/MessageQueue.ts

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
1-
export type QueuedMessage = {
2-
message: string;
3-
};
4-
51
/**
62
* Message queue management for remote communications.
73
*/
84
export class MessageQueue {
9-
readonly #queue: QueuedMessage[] = [];
5+
readonly #queue: string[] = [];
106

117
readonly #maxCapacity: number;
128

@@ -29,15 +25,15 @@ export class MessageQueue {
2925
if (this.#queue.length >= this.#maxCapacity) {
3026
this.dropOldest();
3127
}
32-
this.#queue.push({ message });
28+
this.#queue.push(message);
3329
}
3430

3531
/**
3632
* Remove and return the first message in the queue.
3733
*
3834
* @returns The first message in the queue, or undefined if the queue is empty.
3935
*/
40-
dequeue(): QueuedMessage | undefined {
36+
dequeue(): string | undefined {
4137
return this.#queue.shift();
4238
}
4339

@@ -46,7 +42,7 @@ export class MessageQueue {
4642
*
4743
* @returns All messages in the queue.
4844
*/
49-
dequeueAll(): QueuedMessage[] {
45+
dequeueAll(): string[] {
5046
const messages = [...this.#queue];
5147
this.#queue.length = 0;
5248
return messages;
@@ -80,7 +76,7 @@ export class MessageQueue {
8076
*
8177
* @returns A read-only view of the messages.
8278
*/
83-
get messages(): readonly QueuedMessage[] {
79+
get messages(): readonly string[] {
8480
return this.#queue;
8581
}
8682

@@ -89,7 +85,7 @@ export class MessageQueue {
8985
*
9086
* @param messages - The new messages to replace the queue with.
9187
*/
92-
replaceAll(messages: QueuedMessage[]): void {
88+
replaceAll(messages: string[]): void {
9389
this.#queue.length = 0;
9490
this.#queue.push(...messages);
9591
}

packages/ocap-kernel/src/remotes/network.test.ts

Lines changed: 9 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@ import {
1010
beforeAll,
1111
} from 'vitest';
1212

13-
import type { QueuedMessage } from './MessageQueue.ts';
14-
1513
// Import the module we're testing - must be after mocks are set up
1614
let initNetwork: typeof import('./network.ts').initNetwork;
1715

@@ -23,7 +21,7 @@ const mockMessageQueue = {
2321
replaceAll: vi.fn(),
2422
clear: vi.fn(),
2523
length: 0,
26-
messages: [] as QueuedMessage[],
24+
messages: [] as string[],
2725
};
2826

2927
vi.mock('./MessageQueue.ts', () => {
@@ -613,10 +611,7 @@ describe('network.initNetwork', () => {
613611
.mockReturnValueOnce({ message: 'queued-2' })
614612
.mockReturnValue(undefined);
615613
mockMessageQueue.length = 2;
616-
mockMessageQueue.messages = [
617-
{ message: 'queued-1' },
618-
{ message: 'queued-2' },
619-
];
614+
mockMessageQueue.messages = ['queued-1', 'queued-2'];
620615

621616
// Setup for reconnection scenario
622617
const mockChannel = createMockChannel('peer-1');
@@ -670,11 +665,7 @@ describe('network.initNetwork', () => {
670665
.mockReturnValueOnce({ message: 'queued-3' })
671666
.mockReturnValue(undefined);
672667
mockMessageQueue.length = 3;
673-
mockMessageQueue.messages = [
674-
{ message: 'queued-1' },
675-
{ message: 'queued-2' },
676-
{ message: 'queued-3' },
677-
];
668+
mockMessageQueue.messages = ['queued-1', 'queued-2', 'queued-3'];
678669
const mockChannel = createMockChannel('peer-1');
679670
mockChannel.msgStream.write
680671
.mockRejectedValueOnce(
@@ -865,10 +856,7 @@ describe('network.initNetwork', () => {
865856

866857
// Set up queue with messages
867858
mockMessageQueue.length = 2;
868-
mockMessageQueue.messages = [
869-
{ message: 'queued-1' },
870-
{ message: 'queued-2' },
871-
];
859+
mockMessageQueue.messages = ['queued-1', 'queued-2'];
872860

873861
await closeConnection('peer-1');
874862

@@ -1320,7 +1308,7 @@ describe('network.initNetwork', () => {
13201308
.mockReturnValueOnce({ message: 'queued-msg' })
13211309
.mockReturnValue(undefined);
13221310
mockMessageQueue.length = 1;
1323-
mockMessageQueue.messages = [{ message: 'queued-msg' }];
1311+
mockMessageQueue.messages = ['queued-msg'];
13241312

13251313
const mockChannel = createMockChannel('peer-1');
13261314
mockChannel.msgStream.write.mockRejectedValue(
@@ -1372,7 +1360,7 @@ describe('network.initNetwork', () => {
13721360
.mockReturnValueOnce({ message: 'queued-msg' })
13731361
.mockReturnValue(undefined);
13741362
mockMessageQueue.length = 1;
1375-
mockMessageQueue.messages = [{ message: 'queued-msg' }];
1363+
mockMessageQueue.messages = ['queued-msg'];
13761364

13771365
const mockChannel = createMockChannel('peer-1');
13781366
mockChannel.msgStream.write.mockRejectedValue(
@@ -1440,8 +1428,8 @@ describe('network.initNetwork', () => {
14401428
mockConnectionFactory.dialIdempotent.mockResolvedValue(mockChannel);
14411429
// Set up queue with messages that will be flushed during reconnection
14421430
// Each reconnection attempt will try to flush these messages, and they will fail
1443-
const queuedMsg1 = { message: 'queued-1' };
1444-
const queuedMsg2 = { message: 'queued-2' };
1431+
const queuedMsg1 = 'queued-1';
1432+
const queuedMsg2 = 'queued-2';
14451433
// dequeue should return messages for each flush attempt (each reconnection)
14461434
mockMessageQueue.dequeue.mockImplementation(() => {
14471435
// Return messages in order, then undefined
@@ -1654,7 +1642,7 @@ describe('network.initNetwork', () => {
16541642
(abortableDelay as ReturnType<typeof vi.fn>).mockResolvedValue(undefined);
16551643

16561644
// Set up queue with messages
1657-
const queuedMsg = { message: 'queued-msg' };
1645+
const queuedMsg = 'queued-msg';
16581646
mockMessageQueue.dequeue
16591647
.mockReturnValueOnce(queuedMsg)
16601648
.mockReturnValue(undefined);

packages/ocap-kernel/src/remotes/network.ts

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import { toString as bufToString, fromString } from 'uint8arrays';
99

1010
import { ConnectionFactory } from './ConnectionFactory.ts';
1111
import { MessageQueue } from './MessageQueue.ts';
12-
import type { QueuedMessage } from './MessageQueue.ts';
1312
import { ReconnectionManager } from './ReconnectionManager.ts';
1413
import type {
1514
RemoteMessageHandler,
@@ -143,7 +142,7 @@ export async function initNetwork(
143142
* @param from - The peer ID that the message is from.
144143
* @param message - The message to receive.
145144
*/
146-
async function receiveMsg(from: string, message: string): Promise<void> {
145+
async function receiveMessage(from: string, message: string): Promise<void> {
147146
logger.log(`${from}:: recv ${message}`);
148147
await remoteMessageHandler(from, message);
149148
}
@@ -191,7 +190,7 @@ export async function initNetwork(
191190
}
192191
if (readBuf) {
193192
reconnectionManager.resetBackoff(channel.peerId); // successful inbound traffic
194-
await receiveMsg(channel.peerId, bufToString(readBuf.subarray()));
193+
await receiveMessage(channel.peerId, bufToString(readBuf.subarray()));
195194
} else {
196195
// Stream ended (returned undefined), exit the read loop
197196
logger.log(`${channel.peerId}:: stream ended`);
@@ -346,13 +345,13 @@ export async function initNetwork(
346345
logger.log(`${peerId}:: flushing ${queue.length} queued messages`);
347346

348347
// Process queued messages
349-
const failedMessages: QueuedMessage[] = [];
350-
let queuedMsg: QueuedMessage | undefined;
348+
const failedMessages: string[] = [];
349+
let queuedMsg: string | undefined;
351350

352351
while ((queuedMsg = queue.dequeue()) !== undefined) {
353352
try {
354-
logger.log(`${peerId}:: send (queued) ${queuedMsg.message}`);
355-
await writeWithTimeout(channel, fromString(queuedMsg.message), 10_000);
353+
logger.log(`${peerId}:: send (queued) ${queuedMsg}`);
354+
await writeWithTimeout(channel, fromString(queuedMsg), 10_000);
356355
} catch (problem) {
357356
outputError(peerId, `sending queued message`, problem);
358357
// Preserve the failed message and all remaining messages

0 commit comments

Comments
 (0)