Skip to content

Commit df7b82d

Browse files
Update event queue to support readahead to FireFly core
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent adeab1f commit df7b82d

File tree

10 files changed

+786
-557
lines changed

10 files changed

+786
-557
lines changed

README.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ This will make it possible for the organizations to establish MTLS communication
137137
|message-delivered| Emitted to the sender when a message has been delivered | recipient, message, requestId (optional)
138138
|message-failed | Emitted to the sender when a message could not be delivered| recipient, message, requestId (optional)
139139

140-
- After receiving a websocket message, a commit must be sent in order to receive the next one:
140+
- After receiving a websocket message, an ack must be sent ("commit" is a synonym for "ack"):
141141
```
142-
{ "action": "commit" }
142+
{ "action": "ack", "id": "<ID_FROM_EVENT>" }
143143
```
144144
- Messages arrive in the same order they were sent
145145
- Up to 1,000 messages will be queued

package-lock.json

Lines changed: 438 additions & 498 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -20,40 +20,40 @@
2020
"author": "",
2121
"license": "Apache-2.0",
2222
"dependencies": {
23-
"ajv": "^8.8.2",
24-
"axios": "^0.24.0",
25-
"busboy": "^0.3.1",
26-
"express": "^4.17.2",
23+
"ajv": "^8.11.0",
24+
"axios": "^0.26.1",
25+
"busboy": "^1.5.0",
26+
"express": "^4.17.3",
2727
"form-data": "^4.0.0",
28-
"jsrsasign": "^10.5.1",
28+
"jsrsasign": "^10.5.14",
2929
"swagger-ui-express": "^4.3.0",
3030
"uuid": "^8.3.2",
31-
"ws": "^8.4.0",
31+
"ws": "^8.5.0",
3232
"yamljs": "^0.3.0"
3333
},
3434
"devDependencies": {
3535
"@types/bunyan": "^1.8.8",
36-
"@types/busboy": "^0.3.1",
36+
"@types/busboy": "^1.5.0",
3737
"@types/chai": "^4.3.0",
3838
"@types/express": "^4.17.13",
39-
"@types/jsrsasign": "^9.0.3",
40-
"@types/mocha": "^9.0.0",
41-
"@types/node": "^17.0.8",
39+
"@types/jsrsasign": "^10.2.1",
40+
"@types/mocha": "^9.1.0",
41+
"@types/node": "^17.0.23",
4242
"@types/swagger-ui-express": "^4.1.3",
43-
"@types/uuid": "^8.3.3",
44-
"@types/ws": "^8.2.2",
43+
"@types/uuid": "^8.3.4",
44+
"@types/ws": "^8.5.3",
4545
"@types/yamljs": "^0.2.31",
46-
"chai": "^4.3.4",
47-
"mocha": "^9.1.3",
46+
"chai": "^4.3.6",
47+
"mocha": "^9.2.2",
4848
"moment": "^2.29.1",
4949
"nyc": "^15.1.0",
5050
"rimraf": "^3.0.2",
51-
"sinon": "^12.0.1",
51+
"sinon": "^13.0.1",
5252
"sinon-chai": "^3.7.0",
5353
"source-map-support": "^0.5.21",
54-
"ts-node": "^10.4.0",
54+
"ts-node": "^10.7.0",
5555
"ts-sinon": "^2.0.2",
56-
"typescript": "^4.5.4"
56+
"typescript": "^4.6.3"
5757
},
5858
"nyc": {
5959
"extension": [

src/app.ts

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import * as eventsHandler from './handlers/events';
2626
import { eventEmitter as messagesEventEmitter } from './handlers/messages';
2727
import { genTLSContext, init as initCert, loadPeerCAs } from './lib/cert';
2828
import { config, init as initConfig } from './lib/config';
29+
import { IAckEvent } from './lib/interfaces';
2930
import { Logger } from './lib/logger';
3031
import RequestError, { errorHandler } from './lib/request-error';
3132
import * as utils from './lib/utils';
3233
import { router as apiRouter, setRefreshCACerts } from './routers/api';
3334
import { eventEmitter as p2pEventEmitter, router as p2pRouter } from './routers/p2p';
35+
import { init as initEvents } from './handlers/events';
3436

3537
const log = new Logger("app.ts");
3638

@@ -51,6 +53,7 @@ setRefreshCACerts(refreshCACerts)
5153
export const start = async () => {
5254
await initConfig();
5355
await initCert();
56+
await initEvents(config);
5457

5558
const apiApp = express();
5659
apiServer = http.createServer(apiApp);
@@ -72,7 +75,7 @@ export const start = async () => {
7275
blobsEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
7376
messagesEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
7477

75-
eventsHandler.eventEmitter.addListener('event', event => {
78+
eventsHandler.getEmitter().addListener('event', event => {
7679
log.info(`Event emitted ${event.type}/${event.id}`)
7780
if (delegatedWebSocket !== undefined) {
7881
delegatedWebSocket.send(JSON.stringify(event));
@@ -82,21 +85,16 @@ export const start = async () => {
8285
const assignWebSocketDelegate = (webSocket: WebSocket) => {
8386
log.info('New WebSocket delegate assigned');
8487
delegatedWebSocket = webSocket;
85-
const event = eventsHandler.getCurrentEvent();
8688
webSocket.on('message', async message => {
8789
try {
8890
const messageContent = JSON.parse(message.toLocaleString());
89-
if (messageContent.action === 'commit') {
90-
log.info(`Event comitted ${event?`${event.type}/${event.id}`:`[no event in flight]`}`)
91-
eventsHandler.handleCommit();
91+
if (messageContent.action === 'ack' || messageContent.action == 'commit') {
92+
eventsHandler.handleAck(messageContent as IAckEvent);
9293
}
9394
} catch (err) {
9495
log.error(`Failed to process websocket message ${err}`);
9596
}
9697
});
97-
if (event !== undefined) {
98-
webSocket.send(JSON.stringify(event));
99-
}
10098
webSocket.on('close', () => {
10199
log.info('WebSocket delegate disconnected');
102100
const nextDelegatedWebSocket = wss.clients.values().next().value;
@@ -106,6 +104,8 @@ export const start = async () => {
106104
delegatedWebSocket = undefined;
107105
}
108106
});
107+
// Anything that's in-flight needs to be sent again
108+
eventsHandler.reDispatchInFlight();
109109
};
110110

111111
wss.on('connection', (webSocket: WebSocket) => {

src/handlers/events.ts

Lines changed: 102 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,39 +15,120 @@
1515
// limitations under the License.
1616

1717
import EventEmitter from "events";
18-
import { OutboundEvent } from "../lib/interfaces";
18+
import { IAckEvent, IConfig, OutboundEvent } from "../lib/interfaces";
1919
import { Logger } from "../lib/logger";
2020
import * as utils from '../lib/utils';
2121

2222
const log = new Logger("handlers/events.ts")
2323

24-
let eventQueue: OutboundEvent[] = [];
25-
export const eventEmitter = new EventEmitter();
24+
let maxInflight = utils.constants.DEFAULT_MAX_INFLIGHT;
25+
let maxEventQueueSize = utils.constants.MAX_EVENT_QUEUE_SIZE;
26+
let eventQueue: OutboundEvent[];
27+
let inFlight: OutboundEvent[];
2628

27-
export const queueEvent = (socketEvent: OutboundEvent) => {
28-
if(eventQueue.length < utils.constants.MAX_EVENT_QUEUE_SIZE) {
29-
eventQueue.push(socketEvent);
30-
if(eventQueue.length === 1) {
31-
eventEmitter.emit('event', eventQueue[0]);
29+
let eventEmitter: EventEmitter;
30+
let unblockPromise: Promise<void> | undefined;
31+
let unblock: (() => void) | undefined;
32+
33+
export const init = async (config: IConfig) => {
34+
eventEmitter = new EventEmitter();
35+
eventQueue = [];
36+
inFlight = [];
37+
unblockPromise = undefined;
38+
unblock = undefined;
39+
if (config.events?.maxInflight !== undefined) {
40+
maxInflight = config.events.maxInflight;
41+
}
42+
if (config.events?.queueSize !== undefined) {
43+
maxEventQueueSize = config.events.queueSize;
44+
}
45+
}
46+
47+
const dispatchNext = () => {
48+
if (inFlight.length < maxInflight) {
49+
const event = eventQueue.shift();
50+
if (event) {
51+
inFlight.push(event)
52+
log.debug(`${event.id}: dispatched`);
53+
eventEmitter.emit('event', event);
3254
}
33-
} else {
34-
log.warn('Max queue size reached');
3555
}
36-
};
3756

38-
export const handleCommit = () => {
39-
eventQueue.shift();
40-
if(eventQueue.length > 0) {
41-
eventEmitter.emit('event', eventQueue[0]);
57+
if (eventQueue.length < maxEventQueueSize && unblock) {
58+
unblock();
59+
unblockPromise = undefined;
60+
unblock = undefined;
61+
log.info(`Event queue unblocked (length=${eventQueue.length})`);
4262
}
4363
}
4464

45-
export const getCurrentEvent = () => {
46-
if(eventQueue.length > 0) {
47-
return eventQueue[0];
65+
export const queueEvent = async (socketEvent: OutboundEvent) => {
66+
67+
let currentUnblockPromise = unblockPromise;
68+
if (currentUnblockPromise) {
69+
let blockedTime = Date.now();
70+
log.warn(`${socketEvent.id}: delaying receive due to full event queue (length=${eventQueue.length})`);
71+
await currentUnblockPromise;
72+
log.info(`${socketEvent.id}: unblocked receive after ${Date.now()-blockedTime}ms`);
4873
}
49-
};
5074

51-
export const getQueueSize = () => {
52-
return eventQueue.length;
75+
eventQueue.push(socketEvent);
76+
if (eventQueue.length >= maxEventQueueSize && !unblockPromise) {
77+
log.warn(`Event queue became full (length=${eventQueue.length})`);
78+
unblockPromise = new Promise(resolve => {
79+
unblock = resolve;
80+
})
81+
}
82+
83+
dispatchNext();
5384
};
85+
86+
export const reDispatchInFlight = () => {
87+
for (const event of inFlight) {
88+
eventEmitter.emit('event', event)
89+
}
90+
}
91+
92+
export const handleAck = (ack: IAckEvent) => {
93+
94+
// Check we have something in-flight
95+
if (inFlight.length <= 0) {
96+
log.error(`Ack for ${ack.id} while no events in-flight`);
97+
return
98+
}
99+
100+
// If no ID supplied (back-level API) we
101+
if (ack.id === undefined) {
102+
log.warn(`FireFly core is back-level and did not supply an event ID`);
103+
ack.id = inFlight[0].id;
104+
}
105+
106+
// Remove from our in-flight map
107+
let event;
108+
for (let i = 0; i < inFlight.length; i++) {
109+
const candidate = inFlight[i]
110+
if (ack.id === candidate.id) {
111+
event = candidate;
112+
inFlight.splice(i, 1);
113+
break;
114+
}
115+
}
116+
if (!event) {
117+
log.warn(`Ack received for ${ack.id} which is not in-flight`);
118+
return
119+
}
120+
log.debug(`${ack.id}: acknowledged`);
121+
122+
dispatchNext();
123+
}
124+
125+
export const getEmitter = () => {
126+
return eventEmitter;
127+
}
128+
129+
export const getStats = () => {
130+
return {
131+
messageQueueSize: eventQueue.length,
132+
inFlightCount: inFlight.length,
133+
}
134+
}

src/lib/config.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,8 +51,8 @@ const loadConfig = async () => {
5151
throw err;
5252
}
5353
}
54+
config = data as IConfig;
5455
if(validateConfig(data)) {
55-
config = data as IConfig;
5656
for(const peer of config.peers) {
5757
if(peer.endpoint.endsWith('/')) {
5858
peer.endpoint = peer.endpoint.slice(-0, -1);

src/lib/interfaces.ts

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ export interface IConfig {
1919
hostname: string
2020
port: number
2121
}
22+
events?: {
23+
maxInflight?: number
24+
queueSize?: number;
25+
}
2226
p2p: {
2327
hostname: string
2428
port: number
@@ -93,16 +97,17 @@ export interface IBlobFailedEvent {
9397

9498
export type InboundEvent =
9599
IMessageEvent |
96-
ICommitEvent
100+
IAckEvent
97101

98102
export interface IMessageEvent {
99103
type: 'message'
100104
recipient: string
101105
message: string
102106
}
103107

104-
export interface ICommitEvent {
105-
type: 'commit'
108+
export interface IAckEvent {
109+
type: 'commit' | 'ack'
110+
id?: string
106111
}
107112

108113
export type MessageTask = {
@@ -121,6 +126,7 @@ export type BlobTask = {
121126

122127
export interface IStatus {
123128
messageQueueSize: number
129+
inFlightCount: number
124130
peers: {
125131
id: string
126132
endpoint: string

src/lib/utils.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// limitations under the License.
1616

1717
import axios, { AxiosRequestConfig } from 'axios';
18-
import Busboy, { BusboyHeaders } from 'busboy';
18+
import newBusboy from 'busboy';
1919
import { Request } from 'express';
2020
import { promises as fs } from 'fs';
2121
import { X509 } from 'jsrsasign';
@@ -45,7 +45,8 @@ export const constants = {
4545
HASH_HEADER_NAME: 'dx-hash',
4646
SIZE_HEADER_NAME: 'dx-size',
4747
LAST_UPDATE_HEADER_NAME: 'dx-last-update',
48-
DEFAULT_JSON_PARSER_LIMIT: '1mb'
48+
DEFAULT_JSON_PARSER_LIMIT: '1mb',
49+
DEFAULT_MAX_INFLIGHT: 100
4950
};
5051
const log = new Logger('utils.ts');
5152
axios.defaults.timeout = constants.REST_API_CALL_REQUEST_TIMEOUT;
@@ -72,12 +73,12 @@ export const fileExists = async (filePath: string): Promise<boolean> => {
7273
export const extractFileFromMultipartForm = (req: Request): Promise<IFile> => {
7374
return new Promise(async (resolve, reject) => {
7475
let fileFound = false;
75-
req.pipe(new Busboy({ headers: req.headers as BusboyHeaders })
76-
.on('file', (fieldname, readableStream, fileName) => {
76+
req.pipe(newBusboy({ headers: req.headers })
77+
.on('file', (fieldname, readableStream, file) => {
7778
fileFound = true;
7879
resolve({
7980
key: fieldname,
80-
name: fileName,
81+
name: file.filename,
8182
readableStream
8283
});
8384
})).on('finish', () => {
@@ -91,7 +92,7 @@ export const extractFileFromMultipartForm = (req: Request): Promise<IFile> => {
9192
export const extractMessageFromMultipartForm = (req: Request): Promise<string> => {
9293
return new Promise(async (resolve, reject) => {
9394
let fieldFound = false;
94-
req.pipe(new Busboy({ headers: req.headers as BusboyHeaders })
95+
req.pipe(newBusboy({ headers: req.headers })
9596
.on('field', (fieldname, value) => {
9697
if(fieldname === 'message') {
9798
fieldFound = true;

src/routers/api.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ router.get('/id', async (_req, res, next) => {
5151
router.get('/status', async (_req, res, next) => {
5252
try {
5353
let status: IStatus = {
54-
messageQueueSize: eventsHandler.getQueueSize(),
54+
...eventsHandler.getStats(),
5555
peers: []
5656
};
5757
let promises = [];

0 commit comments

Comments
 (0)