Skip to content

Commit 6b65eb1

Browse files
committed
wip: expanding event handling
- Removed `EventBus` handling adding node information to the nodeGraph. [ci skip]
1 parent b7b85ac commit 6b65eb1

File tree

8 files changed

+143
-277
lines changed

8 files changed

+143
-277
lines changed

package-lock.json

Lines changed: 7 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@
7474
"@matrixai/timer": "^1.1.1",
7575
"@matrixai/workers": "^1.3.7",
7676
"@matrixai/quic": "^0.0.21",
77-
"@matrixai/events": "^1.0.0",
77+
"@matrixai/events": "^2.0.2",
7878
"@peculiar/asn1-pkcs8": "^2.3.0",
7979
"@peculiar/asn1-schema": "^2.3.0",
8080
"@peculiar/asn1-x509": "^2.3.0",

src/PolykeyAgent.ts

Lines changed: 0 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -97,10 +97,8 @@ class PolykeyAgent {
9797
*/
9898
public static readonly eventSymbols = {
9999
[CertManager.name]: Symbol(CertManager.name),
100-
[QUICServer.name]: Symbol(QUICServer.name),
101100
} as {
102101
readonly CertManager: unique symbol;
103-
readonly QUICServer: unique symbol;
104102
};
105103

106104
/**
@@ -714,26 +712,6 @@ class PolykeyAgent {
714712
this.logger.info(`${KeyRing.name} change propagated`);
715713
},
716714
);
717-
this.events.on(
718-
PolykeyAgent.eventSymbols.QUICServer,
719-
async (data: ConnectionData) => {
720-
if (this.keyRing.getNodeId().equals(data.remoteNodeId)) return;
721-
const address = networkUtils.buildAddress(
722-
data.remoteHost,
723-
data.remotePort,
724-
);
725-
const nodeIdEncoded = nodesUtils.encodeNodeId(data.remoteNodeId);
726-
this.logger.info(
727-
`Connection adding ${nodeIdEncoded}:${address} to ${NodeGraph.name}`,
728-
);
729-
// Reverse connection was established and authenticated,
730-
// add it to the node graph
731-
await this.nodeManager.setNode(data.remoteNodeId, {
732-
host: data.remoteHost,
733-
port: data.remotePort,
734-
});
735-
},
736-
);
737715
const _networkConfig = {
738716
...config.defaults.networkConfig,
739717
...utils.filterEmptyObject(networkConfig),

src/nodes/NodeConnection.ts

Lines changed: 77 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,15 @@ import type { NodeId } from './types';
44
import type { Host, Hostname, Port, TLSConfig } from '../network/types';
55
import type { Certificate, CertificatePEM } from '../keys/types';
66
import type { ClientManifest } from '../rpc/types';
7-
import type {
8-
QUICSocket,
9-
ClientCrypto,
10-
QUICConnection,
11-
events as quicEvents,
12-
} from '@matrixai/quic';
7+
import type { QUICSocket, ClientCrypto, QUICConnection } from '@matrixai/quic';
138
import type { ContextTimedInput } from '@matrixai/contexts/dist/types';
149
import type { X509Certificate } from '@peculiar/x509';
1510
import Logger from '@matrixai/logger';
1611
import { CreateDestroy } from '@matrixai/async-init/dist/CreateDestroy';
1712
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
1813
import { Evented } from '@matrixai/events';
19-
import { QUICClient } from '@matrixai/quic';
14+
import { QUICClient, events as quicEvents } from '@matrixai/quic';
15+
import { QUICClientEvent } from '@matrixai/quic/dist/events';
2016
import * as nodesErrors from './errors';
2117
import * as nodesEvents from './events';
2218
import RPCClient from '../rpc/RPCClient';
@@ -31,6 +27,7 @@ import { never } from '../utils';
3127
*/
3228
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- False positive for M
3329
interface NodeConnection<M extends ClientManifest> extends CreateDestroy {}
30+
// eslint-disable-next-line @typescript-eslint/no-unused-vars -- False positive for M
3431
interface NodeConnection<M extends ClientManifest> extends Evented {}
3532
@CreateDestroy()
3633
@Evented()
@@ -196,33 +193,55 @@ class NodeConnection<M extends ClientManifest> {
196193
}:${quicConnection.remotePort}]`,
197194
),
198195
});
199-
// Setting up stream handling
200-
const handleConnectionStream = (
201-
streamEvent: quicEvents.QUICConnectionStreamEvent,
202-
) => {
203-
const stream = streamEvent.detail;
204-
nodeConnection.dispatchEvent(
205-
new nodesEvents.EventNodeStream({ detail: stream }),
206-
);
207-
};
208-
// TODO: All quicConnection event handling should be done under a unified handler.
196+
// TODO: remove this later based on testing
209197
quicConnection.removeEventListener('connectionStream', throwFunction);
210-
quicConnection.addEventListener('connectionStream', handleConnectionStream);
198+
// QUICClient events are...
199+
// - QUICClientEvent,
200+
// - QUICClientDestroyEvent,
201+
// - QUICClientErrorEvent,
202+
// QUICConnection events are...
203+
// - QUICConnectionEvent,
204+
// - QUICConnectionStreamEvent,
205+
// - QUICConnectionStopEvent,
206+
// - QUICConnectionErrorEvent,
207+
// QUICStream events are...
208+
// - QUICStreamEvent,
209+
// - QUICStreamDestroyEvent,
210+
const handleNodeConnectionEvents = (e) => {
211+
if (e instanceof quicEvents.QUICClientDestroyEvent) {
212+
// Trigger the nodeConnection destroying
213+
void nodeConnection.destroy({ force: false });
214+
} else if (e instanceof quicEvents.QUICConnectionStreamEvent) {
215+
const quicStream = e.detail;
216+
// Setting up stream handling
217+
quicStream.addEventListener(
218+
quicEvents.QUICStreamDestroyEvent.name,
219+
(e) => {
220+
nodeConnection.dispatchEvent(e.clone());
221+
},
222+
{ once: true },
223+
);
224+
nodeConnection.dispatchEvent(
225+
new nodesEvents.EventNodeStream({ detail: quicStream }),
226+
);
227+
}
228+
nodeConnection.dispatchEvent(e.clone());
229+
};
230+
// Setting up QUICConnection events
231+
quicConnection.addEventListener(handleNodeConnectionEvents);
211232
quicConnection.addEventListener(
212-
'connectionStop',
233+
quicEvents.QUICConnectionStopEvent.name,
213234
() => {
214-
quicConnection.removeEventListener(
215-
'connectionStream',
216-
handleConnectionStream,
217-
);
235+
quicConnection.removeEventListener(handleNodeConnectionEvents);
218236
},
219237
{ once: true },
220238
);
239+
// Setting up QUICClient events
240+
quicClient.addEventListener(handleNodeConnectionEvents);
221241
quicClient.addEventListener(
222-
'clientDestroy',
242+
quicEvents.QUICClientDestroyEvent.name,
223243
() => {
224-
// Trigger the nodeConnection destroying
225-
void nodeConnection.destroy({ force: false });
244+
quicClient.removeEventListener(handleNodeConnectionEvents);
226245
},
227246
{ once: true },
228247
);
@@ -269,31 +288,41 @@ class NodeConnection<M extends ClientManifest> {
269288
rpcClient,
270289
logger,
271290
});
272-
// Setting up stream handling
273-
// TODO: All quicConnection event handling should be done under a unified handler.
274-
const handleConnectionStream = (
275-
streamEvent: quicEvents.QUICConnectionStreamEvent,
276-
) => {
277-
nodeConnection.dispatchEvent(
278-
new nodesEvents.EventNodeStream({ detail: streamEvent.detail }),
279-
);
291+
// QUICClient events are...
292+
// - QUICClientEvent,
293+
// - QUICClientDestroyEvent,
294+
// - QUICClientErrorEvent,
295+
// QUICConnection events are...
296+
// - QUICConnectionEvent,
297+
// - QUICConnectionStreamEvent,
298+
// - QUICConnectionStopEvent,
299+
// - QUICConnectionErrorEvent,
300+
const handleNodeConnectionEvents = (e) => {
301+
if (e instanceof quicEvents.QUICClientDestroyEvent) {
302+
// Trigger the nodeConnection destroying
303+
void nodeConnection.destroy({ force: false });
304+
} else if (e instanceof quicEvents.QUICConnectionStreamEvent) {
305+
const quicStream = e.detail;
306+
// Setting up stream handling
307+
quicStream.addEventListener(
308+
quicEvents.QUICStreamDestroyEvent.name,
309+
(e) => {
310+
nodeConnection.dispatchEvent(e.clone());
311+
},
312+
{ once: true },
313+
);
314+
nodeConnection.dispatchEvent(
315+
new nodesEvents.EventNodeStream({ detail: quicStream }),
316+
);
317+
}
318+
nodeConnection.dispatchEvent(e.clone());
280319
};
281-
quicConnection.addEventListener('connectionStream', handleConnectionStream);
320+
// Setting up QUICConnection events
321+
quicConnection.addEventListener(handleNodeConnectionEvents);
282322
quicConnection.addEventListener(
283-
'connectionStop',
323+
quicEvents.QUICConnectionStopEvent.name,
284324
() => {
285-
quicConnection.removeEventListener(
286-
'connectionStream',
287-
handleConnectionStream,
288-
);
289-
},
290-
{ once: true },
291-
);
292-
quicConnection.addEventListener(
293-
'connectionStop',
294-
async () => {
295-
// Trigger the nodeConnection destroying
296-
await nodeConnection.destroy({ force: false });
325+
quicConnection.removeEventListener(handleNodeConnectionEvents);
297326
},
298327
{ once: true },
299328
);

0 commit comments

Comments
 (0)