Skip to content

Commit a8ad028

Browse files
committed
wip: fixing up events with changes
[ci skip]
1 parent 066e4dd commit a8ad028

File tree

5 files changed

+127
-44
lines changed

5 files changed

+127
-44
lines changed

package-lock.json

Lines changed: 4 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@
7373
"@matrixai/resources": "^1.1.5",
7474
"@matrixai/timer": "^1.1.1",
7575
"@matrixai/workers": "^1.3.7",
76-
"@matrixai/quic": "^0.0.21",
77-
"@matrixai/events": "^2.0.2",
76+
"@matrixai/quic": "^0.0.21",
77+
"@matrixai/events": "^3.0.0",
7878
"@peculiar/asn1-pkcs8": "^2.3.0",
7979
"@peculiar/asn1-schema": "^2.3.0",
8080
"@peculiar/asn1-x509": "^2.3.0",

src/nodes/NodeConnection.ts

Lines changed: 52 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import type { X509Certificate } from '@peculiar/x509';
1010
import Logger from '@matrixai/logger';
1111
import { CreateDestroy } from '@matrixai/async-init/dist/CreateDestroy';
1212
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
13-
import { Evented } from '@matrixai/events';
13+
import { EventDefault, Evented } from '@matrixai/events';
1414
import { QUICClient, events as quicEvents } from '@matrixai/quic';
1515
import * as nodesErrors from './errors';
1616
import * as nodesEvents from './events';
@@ -207,10 +207,7 @@ class NodeConnection<M extends ClientManifest> {
207207
// - QUICStreamEvent,
208208
// - QUICStreamDestroyEvent,
209209
const handleNodeConnectionEvents = (e) => {
210-
if (e instanceof quicEvents.QUICClientDestroyEvent) {
211-
// Trigger the nodeConnection destroying
212-
void nodeConnection.destroy({ force: false });
213-
} else if (e instanceof quicEvents.QUICConnectionStreamEvent) {
210+
if (e instanceof quicEvents.QUICConnectionStreamEvent) {
214211
const quicStream = e.detail;
215212
// Setting up stream handling
216213
quicStream.addEventListener(
@@ -227,20 +224,42 @@ class NodeConnection<M extends ClientManifest> {
227224
nodeConnection.dispatchEvent(e.clone());
228225
};
229226
// Setting up QUICConnection events
230-
quicConnection.addEventListener(handleNodeConnectionEvents);
227+
quicConnection.addEventListener(
228+
EventDefault.name,
229+
handleNodeConnectionEvents,
230+
);
231231
quicConnection.addEventListener(
232232
quicEvents.QUICConnectionStopEvent.name,
233233
() => {
234-
quicConnection.removeEventListener(handleNodeConnectionEvents);
234+
quicConnection.removeEventListener(
235+
EventDefault.name,
236+
handleNodeConnectionEvents,
237+
);
235238
},
236239
{ once: true },
237240
);
238241
// Setting up QUICClient events
239-
quicClient.addEventListener(handleNodeConnectionEvents);
242+
quicClient.addEventListener(EventDefault.name, handleNodeConnectionEvents);
240243
quicClient.addEventListener(
241244
quicEvents.QUICClientDestroyEvent.name,
242-
() => {
243-
quicClient.removeEventListener(handleNodeConnectionEvents);
245+
async (e: quicEvents.QUICConnectionErrorEvent) => {
246+
quicClient.removeEventListener(
247+
EventDefault.name,
248+
handleNodeConnectionEvents,
249+
);
250+
try {
251+
// Trigger the nodeConnection destroying
252+
await nodeConnection.destroy({ force: false });
253+
nodeConnection.dispatchEvent(
254+
new nodesEvents.EventNodeConnectionError({ detail: e }),
255+
);
256+
} catch (err) {
257+
nodeConnection.dispatchEvent(
258+
new nodesEvents.EventNodeConnectionError({
259+
detail: new AggregateError([err, e.detail]),
260+
}),
261+
);
262+
}
244263
},
245264
{ once: true },
246265
);
@@ -297,10 +316,7 @@ class NodeConnection<M extends ClientManifest> {
297316
// - QUICConnectionStopEvent,
298317
// - QUICConnectionErrorEvent,
299318
const handleNodeConnectionEvents = (e) => {
300-
if (e instanceof quicEvents.QUICClientDestroyEvent) {
301-
// Trigger the nodeConnection destroying
302-
void nodeConnection.destroy({ force: false });
303-
} else if (e instanceof quicEvents.QUICConnectionStreamEvent) {
319+
if (e instanceof quicEvents.QUICConnectionStreamEvent) {
304320
const quicStream = e.detail;
305321
// Setting up stream handling
306322
quicStream.addEventListener(
@@ -317,11 +333,30 @@ class NodeConnection<M extends ClientManifest> {
317333
nodeConnection.dispatchEvent(e.clone());
318334
};
319335
// Setting up QUICConnection events
320-
quicConnection.addEventListener(handleNodeConnectionEvents);
336+
quicConnection.addEventListener(
337+
EventDefault.name,
338+
handleNodeConnectionEvents,
339+
);
321340
quicConnection.addEventListener(
322341
quicEvents.QUICConnectionStopEvent.name,
323-
() => {
324-
quicConnection.removeEventListener(handleNodeConnectionEvents);
342+
async (e: quicEvents.QUICConnectionErrorEvent) => {
343+
quicConnection.removeEventListener(
344+
EventDefault.name,
345+
handleNodeConnectionEvents,
346+
);
347+
try {
348+
// Trigger the nodeConnection destroying
349+
await nodeConnection.destroy({ force: false });
350+
nodeConnection.dispatchEvent(
351+
new nodesEvents.EventNodeConnectionError({ detail: e }),
352+
);
353+
} catch (err) {
354+
nodeConnection.dispatchEvent(
355+
new nodesEvents.EventNodeConnectionError({
356+
detail: new AggregateError([err, e.detail]),
357+
}),
358+
);
359+
}
325360
},
326361
{ once: true },
327362
);

src/nodes/NodeConnectionManager.ts

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import { IdInternal } from '@matrixai/id';
2323
import { Lock, LockBox } from '@matrixai/async-locks';
2424
import { Timer } from '@matrixai/timer';
2525
import { timedCancellable, context } from '@matrixai/contexts/dist/decorators';
26-
import { Evented } from '@matrixai/events';
26+
import { EventDefault, Evented } from '@matrixai/events';
2727
import { QUICSocket, QUICServer, events as quicEvents } from '@matrixai/quic';
2828
import NodeConnection from './NodeConnection';
2929
import * as nodesUtils from './utils';
@@ -140,6 +140,12 @@ class NodeConnectionManager {
140140

141141
protected connectionLocks: LockBox<Lock> = new LockBox();
142142

143+
protected eventQUICServerConnectionHandler = (
144+
e: quicEvents.QUICServerConnectionEvent,
145+
) => {
146+
void this.handleConnectionReverse(e.detail);
147+
};
148+
143149
protected handleQUICSocketEvents = (e: quicEvents.QUICSocketEvent) => {
144150
// QUICSocket events are...
145151
// - QUICSocketEvent,
@@ -158,9 +164,6 @@ class NodeConnectionManager {
158164
// - QUICServerStartEvent,
159165
// - QUICServerStopEvent,
160166
// - QUICServerErrorEvent,
161-
if (e instanceof quicEvents.QUICServerConnectionEvent) {
162-
void this.handleConnectionReverse(e.detail);
163-
}
164167
this.dispatchEvent(e.clone());
165168
};
166169

@@ -311,17 +314,37 @@ class NodeConnectionManager {
311314
ipv6Only,
312315
});
313316

314-
this.quicSocket.addEventListener(this.handleQUICSocketEvents);
315-
this.quicServer.addEventListener(this.handleQUICServerEvents);
317+
this.quicSocket.addEventListener(
318+
EventDefault.name,
319+
this.handleQUICSocketEvents,
320+
);
321+
this.quicServer.addEventListener(
322+
quicEvents.QUICServerConnectionEvent.name,
323+
this.eventQUICServerConnectionHandler,
324+
);
325+
this.quicServer.addEventListener(
326+
EventDefault.name,
327+
this.handleQUICServerEvents,
328+
);
316329

317330
this.logger.info(`Started ${this.constructor.name}`);
318331
}
319332

320333
public async stop() {
321334
this.logger.info(`Stop ${this.constructor.name}`);
322335

323-
this.quicSocket.removeEventListener(this.handleQUICSocketEvents);
324-
this.quicServer.removeEventListener(this.handleQUICServerEvents);
336+
this.quicServer.removeEventListener(
337+
EventDefault.name,
338+
this.handleQUICServerEvents,
339+
);
340+
this.quicServer.removeEventListener(
341+
quicEvents.QUICServerConnectionEvent.name,
342+
this.eventQUICServerConnectionHandler,
343+
);
344+
this.quicSocket.removeEventListener(
345+
EventDefault.name,
346+
this.handleQUICSocketEvents,
347+
);
325348

326349
const destroyProms: Array<Promise<void>> = [];
327350
for (const [nodeId, connAndTimer] of this.connections) {
@@ -806,26 +829,39 @@ class NodeConnectionManager {
806829
const nodeIdString = nodeId.toString() as NodeIdString;
807830
// Check if exists in map, this should never happen but better safe than sorry.
808831
if (this.connections.has(nodeIdString)) utils.never();
809-
// TODO: set up event handling for all events here, need to make sure that the stream event and connection event is propagated.
810-
// The connection event should only contain connection metadata and not the connection itself otherwise we circumvent the locking.
811832
// Setting up events
812833
const nodeConnectionEventsHandler = (e) => {
813834
// Propagate all events upwards
814835
this.dispatchEvent(e.clone());
815836
};
816-
nodeConnection.addEventListener(nodeConnectionEventsHandler);
817837
nodeConnection.addEventListener(
818-
nodesEvents.EventNodeConnectionDestroy.name,
819-
() => {
838+
EventDefault.name,
839+
nodeConnectionEventsHandler,
840+
);
841+
nodeConnection.addEventListener(
842+
nodesEvents.EventNodeConnectionError.name,
843+
async (e: nodesEvents.EventNodeConnectionError) => {
820844
this.logger.debug('stream destroyed event');
821845
nodeConnection.removeEventListener(nodeConnectionEventsHandler);
822-
// To avoid deadlock only in the case where this is called
823-
// we want to check for destroying connection and read lock
824-
// If the connection is calling destroyCallback then it SHOULD exist in the connection map.
825-
if (!this.connections.has(nodeIdString)) return;
826-
// Already locked so already destroying
827-
if (this.connectionLocks.isLocked(nodeIdString)) return;
828-
void this.destroyConnection(nodeId);
846+
try {
847+
// To avoid deadlock only in the case where this is called
848+
// we want to check for destroying connection and read lock
849+
// If the connection is calling destroyCallback then it SHOULD exist in the connection map.
850+
// Already locked so already destroying
851+
if (this.connectionLocks.isLocked(nodeIdString)) return;
852+
await this.destroyConnection(nodeId);
853+
this.dispatchEvent(
854+
new nodesEvents.EventNodeConnectionManagerConnectionFailure({
855+
detail: e,
856+
}),
857+
);
858+
} catch (err) {
859+
this.dispatchEvent(
860+
new nodesEvents.EventNodeConnectionManagerConnectionFailure({
861+
detail: new AggregateError([err, e.detail]),
862+
}),
863+
);
864+
}
829865
},
830866
{ once: true },
831867
);

src/nodes/events.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,14 @@
1-
import type { QUICStream } from '@matrixai/quic';
1+
import type { QUICStream, events as quicEvents } from '@matrixai/quic';
22
import type { ConnectionData } from '../network/types';
33
import { AbstractEvent } from '@matrixai/events';
44

55
abstract class EventsNode<T> extends AbstractEvent<T> {}
66
abstract class EventsNodeConnection<T> extends EventsNode<T> {}
77

8+
class EventNodeConnectionError extends EventsNodeConnection<
9+
Error | quicEvents.QUICConnectionErrorEvent
10+
> {}
11+
812
class EventNodeConnectionDestroy extends EventsNodeConnection<null> {}
913

1014
class EventNodeStream extends EventsNode<QUICStream> {}
@@ -13,8 +17,16 @@ abstract class EventNodeConnectionManager<T> extends EventsNode<T> {}
1317

1418
class EventNodeConnectionManagerConnection extends EventNodeConnectionManager<ConnectionData> {}
1519

20+
class EventNodeConnectionManagerConnectionFailure extends EventNodeConnectionManager<
21+
Error | EventNodeConnectionError
22+
> {}
23+
1624
export {
25+
EventsNode,
26+
EventsNodeConnection,
27+
EventNodeConnectionError,
1728
EventNodeConnectionDestroy,
1829
EventNodeStream,
1930
EventNodeConnectionManagerConnection,
31+
EventNodeConnectionManagerConnectionFailure,
2032
};

0 commit comments

Comments
 (0)