Skip to content

Commit e733aa7

Browse files
sirtimidclaude
andcommitted
chore(remotes): refactor transport.ts into smaller modules
Extract logical components from transport.ts (~1030 lines) into: - peer-registry.ts: Manages per-peer state (channels, queues, hints) - channel-reader.ts: Handles channel reading and message dispatch - reconnection-orchestrator.ts: Manages reconnection loop logic transport.ts now acts as a thin coordinator (~590 lines). Co-Authored-By: Claude Opus 4.5 <[email protected]>
1 parent fda02f4 commit e733aa7

File tree

4 files changed

+811
-544
lines changed

4 files changed

+811
-544
lines changed
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
import { AbortError } from '@metamask/kernel-errors';
2+
import type { Logger } from '@metamask/logger';
3+
import { toString as bufToString } from 'uint8arrays';
4+
5+
import type { PeerRegistry } from './peer-registry.ts';
6+
import type { Channel, RemoteMessageHandler } from '../types.ts';
7+
8+
/** SCTP user-initiated abort code (RFC 4960) */
9+
const SCTP_USER_INITIATED_ABORT = 12;
10+
11+
type ChannelReaderDeps = {
12+
peerRegistry: PeerRegistry;
13+
remoteMessageHandler: RemoteMessageHandler;
14+
signal: AbortSignal;
15+
logger: Logger;
16+
onConnectionLoss: (peerId: string, channel?: Channel) => void;
17+
onMessageReceived: (peerId: string) => void;
18+
outputError: (peerId: string, task: string, problem: unknown) => void;
19+
};
20+
21+
/**
22+
* Creates a channel reader that processes incoming messages from peer channels.
23+
*
24+
* @param deps - Dependencies for the channel reader.
25+
* @returns Object with methods for reading channels.
26+
*/
27+
export function makeChannelReader(deps: ChannelReaderDeps): {
28+
readChannel: (channel: Channel) => Promise<void>;
29+
} {
30+
const {
31+
peerRegistry,
32+
remoteMessageHandler,
33+
signal,
34+
logger,
35+
onConnectionLoss,
36+
onMessageReceived,
37+
outputError,
38+
} = deps;
39+
40+
/**
41+
* Receive a message from a peer.
42+
*
43+
* @param from - The peer ID that the message is from.
44+
* @param message - The message to receive.
45+
*/
46+
async function receiveMessage(from: string, message: string): Promise<void> {
47+
logger.log(`${from}:: recv ${message}`);
48+
await remoteMessageHandler(from, message);
49+
}
50+
51+
/**
52+
* Start reading (and processing) messages arriving on a channel.
53+
*
54+
* @param channel - The channel to read from.
55+
*/
56+
async function readChannel(channel: Channel): Promise<void> {
57+
try {
58+
for (;;) {
59+
if (signal.aborted) {
60+
logger.log(`reader abort: ${channel.peerId}`);
61+
throw new AbortError();
62+
}
63+
let readBuf;
64+
try {
65+
readBuf = await channel.msgStream.read();
66+
} catch (problem) {
67+
const isCurrentChannel =
68+
peerRegistry.getChannel(channel.peerId) === channel;
69+
// Detect graceful disconnect
70+
const rtcProblem = problem as {
71+
errorDetail?: string;
72+
sctpCauseCode?: number;
73+
};
74+
if (
75+
rtcProblem?.errorDetail === 'sctp-failure' &&
76+
rtcProblem?.sctpCauseCode === SCTP_USER_INITIATED_ABORT
77+
) {
78+
if (isCurrentChannel) {
79+
logger.log(
80+
`${channel.peerId}:: remote intentionally disconnected`,
81+
);
82+
// Mark as intentionally closed and don't trigger reconnection
83+
peerRegistry.markIntentionallyClosed(channel.peerId);
84+
} else {
85+
logger.log(
86+
`${channel.peerId}:: stale channel intentionally disconnected`,
87+
);
88+
}
89+
} else if (isCurrentChannel) {
90+
outputError(
91+
channel.peerId,
92+
`reading message from ${channel.peerId}`,
93+
problem,
94+
);
95+
// Only trigger reconnection for non-intentional disconnects
96+
onConnectionLoss(channel.peerId, channel);
97+
} else {
98+
logger.log(`${channel.peerId}:: ignoring error from stale channel`);
99+
}
100+
logger.log(`closed channel to ${channel.peerId}`);
101+
throw problem;
102+
}
103+
if (readBuf) {
104+
onMessageReceived(channel.peerId);
105+
peerRegistry.updateLastConnectionTime(channel.peerId);
106+
await receiveMessage(channel.peerId, bufToString(readBuf.subarray()));
107+
} else {
108+
// Stream ended (returned undefined), exit the read loop
109+
logger.log(`${channel.peerId}:: stream ended`);
110+
break;
111+
}
112+
}
113+
} finally {
114+
// Always remove the channel when readChannel exits to prevent stale channels
115+
// This ensures that subsequent sends will establish a new connection
116+
if (peerRegistry.getChannel(channel.peerId) === channel) {
117+
peerRegistry.removeChannel(channel.peerId);
118+
}
119+
}
120+
}
121+
122+
return {
123+
readChannel,
124+
};
125+
}
Lines changed: 237 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,237 @@
1+
import { MessageQueue } from './message-queue.ts';
2+
import type { Channel } from '../types.ts';
3+
4+
/**
5+
* Manages per-peer state including channels, message queues, location hints,
6+
* and connection tracking.
7+
*/
8+
export class PeerRegistry {
9+
/** Currently active channels, by peer ID */
10+
readonly #channels = new Map<string, Channel>();
11+
12+
/** Per-peer message queues for when connections are unavailable */
13+
readonly #messageQueues = new Map<string, MessageQueue>();
14+
15+
/** Peers that have been intentionally closed (don't auto-reconnect) */
16+
readonly #intentionallyClosed = new Set<string>();
17+
18+
/** Last connection/activity time per peer for stale cleanup */
19+
readonly #lastConnectionTime = new Map<string, number>();
20+
21+
/** Location hints (multiaddrs) per peer */
22+
readonly #locationHints = new Map<string, string[]>();
23+
24+
/** Maximum messages to queue per peer */
25+
readonly #maxQueue: number;
26+
27+
/**
28+
* Create a new PeerRegistry.
29+
*
30+
* @param maxQueue - Maximum number of messages to queue per peer.
31+
*/
32+
constructor(maxQueue: number) {
33+
this.#maxQueue = maxQueue;
34+
}
35+
36+
/**
37+
* Get the channel for a peer.
38+
*
39+
* @param peerId - The peer ID.
40+
* @returns The channel, or undefined if not connected.
41+
*/
42+
getChannel(peerId: string): Channel | undefined {
43+
return this.#channels.get(peerId);
44+
}
45+
46+
/**
47+
* Check if a peer has an active channel.
48+
*
49+
* @param peerId - The peer ID.
50+
* @returns True if the peer has a channel.
51+
*/
52+
hasChannel(peerId: string): boolean {
53+
return this.#channels.has(peerId);
54+
}
55+
56+
/**
57+
* Set the channel for a peer.
58+
*
59+
* @param peerId - The peer ID.
60+
* @param channel - The channel to set.
61+
* @returns The previous channel if one existed.
62+
*/
63+
setChannel(peerId: string, channel: Channel): Channel | undefined {
64+
const previous = this.#channels.get(peerId);
65+
this.#channels.set(peerId, channel);
66+
this.#lastConnectionTime.set(peerId, Date.now());
67+
return previous;
68+
}
69+
70+
/**
71+
* Remove the channel for a peer.
72+
*
73+
* @param peerId - The peer ID.
74+
* @returns True if a channel was removed.
75+
*/
76+
removeChannel(peerId: string): boolean {
77+
return this.#channels.delete(peerId);
78+
}
79+
80+
/**
81+
* Get the number of active channels.
82+
*
83+
* @returns The number of active channels.
84+
*/
85+
get channelCount(): number {
86+
return this.#channels.size;
87+
}
88+
89+
/**
90+
* Get or create a message queue for a peer.
91+
*
92+
* @param peerId - The peer ID.
93+
* @returns The message queue.
94+
*/
95+
getMessageQueue(peerId: string): MessageQueue {
96+
let queue = this.#messageQueues.get(peerId);
97+
if (!queue) {
98+
queue = new MessageQueue(this.#maxQueue);
99+
this.#messageQueues.set(peerId, queue);
100+
if (!this.#lastConnectionTime.has(peerId)) {
101+
this.#lastConnectionTime.set(peerId, Date.now());
102+
}
103+
}
104+
return queue;
105+
}
106+
107+
/**
108+
* Check if a peer is marked as intentionally closed.
109+
*
110+
* @param peerId - The peer ID.
111+
* @returns True if intentionally closed.
112+
*/
113+
isIntentionallyClosed(peerId: string): boolean {
114+
return this.#intentionallyClosed.has(peerId);
115+
}
116+
117+
/**
118+
* Mark a peer as intentionally closed.
119+
*
120+
* @param peerId - The peer ID.
121+
*/
122+
markIntentionallyClosed(peerId: string): void {
123+
this.#intentionallyClosed.add(peerId);
124+
}
125+
126+
/**
127+
* Clear the intentionally closed flag for a peer.
128+
*
129+
* @param peerId - The peer ID.
130+
*/
131+
clearIntentionallyClosed(peerId: string): void {
132+
this.#intentionallyClosed.delete(peerId);
133+
}
134+
135+
/**
136+
* Update the last connection time for a peer.
137+
*
138+
* @param peerId - The peer ID.
139+
*/
140+
updateLastConnectionTime(peerId: string): void {
141+
this.#lastConnectionTime.set(peerId, Date.now());
142+
}
143+
144+
/**
145+
* Get location hints for a peer.
146+
*
147+
* @param peerId - The peer ID.
148+
* @returns The location hints, or an empty array.
149+
*/
150+
getLocationHints(peerId: string): string[] {
151+
return this.#locationHints.get(peerId) ?? [];
152+
}
153+
154+
/**
155+
* Register location hints for a peer, merging with existing hints.
156+
*
157+
* @param peerId - The peer ID.
158+
* @param hints - The hints to add.
159+
*/
160+
registerLocationHints(peerId: string, hints: string[]): void {
161+
const oldHints = this.#locationHints.get(peerId);
162+
if (oldHints) {
163+
const newHints = new Set(oldHints);
164+
for (const hint of hints) {
165+
newHints.add(hint);
166+
}
167+
this.#locationHints.set(peerId, Array.from(newHints));
168+
} else {
169+
this.#locationHints.set(peerId, Array.from(hints));
170+
}
171+
}
172+
173+
/**
174+
* Find stale peers that should be cleaned up.
175+
*
176+
* @param stalePeerTimeoutMs - Time in ms before a peer is considered stale.
177+
* @param isReconnecting - Function to check if a peer is reconnecting.
178+
* @returns Array of stale peer IDs.
179+
*/
180+
findStalePeers(
181+
stalePeerTimeoutMs: number,
182+
isReconnecting: (peerId: string) => boolean,
183+
): string[] {
184+
const now = Date.now();
185+
const stalePeers: string[] = [];
186+
187+
for (const [peerId, lastTime] of this.#lastConnectionTime.entries()) {
188+
const timeSinceLastActivity = now - lastTime;
189+
const hasActiveChannel = this.#channels.has(peerId);
190+
const reconnecting = isReconnecting(peerId);
191+
192+
if (
193+
!hasActiveChannel &&
194+
!reconnecting &&
195+
timeSinceLastActivity > stalePeerTimeoutMs
196+
) {
197+
stalePeers.push(peerId);
198+
}
199+
}
200+
201+
return stalePeers;
202+
}
203+
204+
/**
205+
* Get the last connection time for a peer.
206+
*
207+
* @param peerId - The peer ID.
208+
* @returns The last connection time, or undefined.
209+
*/
210+
getLastConnectionTime(peerId: string): number | undefined {
211+
return this.#lastConnectionTime.get(peerId);
212+
}
213+
214+
/**
215+
* Remove all state for a peer.
216+
*
217+
* @param peerId - The peer ID.
218+
*/
219+
removePeer(peerId: string): void {
220+
this.#channels.delete(peerId);
221+
this.#messageQueues.delete(peerId);
222+
this.#intentionallyClosed.delete(peerId);
223+
this.#lastConnectionTime.delete(peerId);
224+
this.#locationHints.delete(peerId);
225+
}
226+
227+
/**
228+
* Clear all state.
229+
*/
230+
clear(): void {
231+
this.#channels.clear();
232+
this.#messageQueues.clear();
233+
this.#intentionallyClosed.clear();
234+
this.#lastConnectionTime.clear();
235+
this.#locationHints.clear();
236+
}
237+
}

0 commit comments

Comments
 (0)