Skip to content

Commit 9bb8118

Browse files
Block waiting for dispatch so that HTTP calls push back to sender on queue full
Signed-off-by: Peter Broadhurst <[email protected]>
1 parent 8bebc77 commit 9bb8118

File tree

4 files changed

+10
-19
lines changed

4 files changed

+10
-19
lines changed

src/app.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,15 @@ import path from 'path';
2121
import swaggerUi from 'swagger-ui-express';
2222
import WebSocket from 'ws';
2323
import YAML from 'yamljs';
24-
import { eventEmitter as blobsEventEmitter } from './handlers/blobs';
2524
import * as eventsHandler from './handlers/events';
26-
import { eventEmitter as messagesEventEmitter } from './handlers/messages';
2725
import { genTLSContext, init as initCert, loadPeerCAs } from './lib/cert';
2826
import { config, init as initConfig } from './lib/config';
2927
import { IAckEvent } from './lib/interfaces';
3028
import { Logger } from './lib/logger';
3129
import RequestError, { errorHandler } from './lib/request-error';
3230
import * as utils from './lib/utils';
3331
import { router as apiRouter, setRefreshCACerts } from './routers/api';
34-
import { eventEmitter as p2pEventEmitter, router as p2pRouter } from './routers/p2p';
32+
import { router as p2pRouter } from './routers/p2p';
3533
import { init as initEvents } from './handlers/events';
3634

3735
const log = new Logger("app.ts");
@@ -71,10 +69,6 @@ export const start = async () => {
7169
}
7270
});
7371

74-
p2pEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
75-
blobsEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
76-
messagesEventEmitter.addListener('event', event => eventsHandler.queueEvent(event));
77-
7872
eventsHandler.getEmitter().addListener('event', event => {
7973
log.info(`Event emitted ${event.type}/${event.id}`)
8074
if (delegatedWebSocket !== undefined) {

src/handlers/blobs.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
// limitations under the License.
1616

1717
import crypto from 'crypto';
18-
import EventEmitter from 'events';
1918
import FormData from 'form-data';
2019
import { createReadStream, createWriteStream, promises as fs } from 'fs';
2120
import https from 'https';
@@ -27,12 +26,12 @@ import { BlobTask, IBlobDeliveredEvent, IBlobFailedEvent, IFile, IMetadata } fro
2726
import { Logger } from '../lib/logger';
2827
import RequestError from '../lib/request-error';
2928
import * as utils from '../lib/utils';
29+
import { queueEvent } from './events';
3030

3131
const log = new Logger("handlers/blobs.ts")
3232

3333
let blobQueue: BlobTask[] = [];
3434
let sending = false;
35-
export const eventEmitter = new EventEmitter();
3635

3736
export const retreiveBlob = async (filePath: string) => {
3837
const resolvedFilePath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.BLOBS_SUBDIRECTORY, filePath);
@@ -100,7 +99,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
10099
timeout: utils.constants.REST_API_CALL_BLOB_REQUEST_TIMEOUT,
101100
httpsAgent
102101
});
103-
eventEmitter.emit('event', {
102+
await queueEvent({
104103
id: uuidV4(),
105104
type: 'blob-delivered',
106105
path: blobPath,
@@ -109,7 +108,7 @@ export const deliverBlob = async ({ blobPath, recipient, recipientURL, requestId
109108
} as IBlobDeliveredEvent);
110109
log.trace(`Blob delivered`);
111110
} catch (err: any) {
112-
eventEmitter.emit('event', {
111+
await queueEvent({
113112
id: uuidV4(),
114113
type: 'blob-failed',
115114
path: blobPath,

src/handlers/messages.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,20 +14,19 @@
1414
// See the License for the specific language governing permissions and
1515
// limitations under the License.
1616

17-
import EventEmitter from 'events';
1817
import FormData from 'form-data';
1918
import https from 'https';
2019
import { v4 as uuidV4 } from 'uuid';
2120
import { ca, cert, key } from '../lib/cert';
2221
import { IMessageDeliveredEvent, IMessageFailedEvent, MessageTask } from '../lib/interfaces';
2322
import { Logger } from '../lib/logger';
2423
import * as utils from '../lib/utils';
24+
import { queueEvent } from './events';
2525

2626
const log = new Logger('handlers/messages.ts')
2727

2828
let messageQueue: MessageTask[] = [];
2929
let sending = false;
30-
export const eventEmitter = new EventEmitter();
3130

3231
export const sendMessage = async (message: string, recipient: string, recipientURL: string, requestId: string | undefined) => {
3332
if (sending) {
@@ -55,7 +54,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
5554
headers: formData.getHeaders(),
5655
httpsAgent
5756
});
58-
eventEmitter.emit('event', {
57+
await queueEvent({
5958
id: uuidV4(),
6059
type: 'message-delivered',
6160
message,
@@ -64,7 +63,7 @@ export const deliverMessage = async ({ message, recipient, recipientURL, request
6463
} as IMessageDeliveredEvent);
6564
log.trace(`Message delivered`);
6665
} catch(err: any) {
67-
eventEmitter.emit('event', {
66+
await queueEvent({
6867
id: uuidV4(),
6968
type: 'message-failed',
7069
message,

src/routers/p2p.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,11 @@ import { Router, Request } from 'express';
1818
import * as utils from '../lib/utils';
1919
import * as blobsHandler from '../handlers/blobs';
2020
import path from 'path';
21-
import { EventEmitter } from 'events';
2221
import { IBlobReceivedEvent, IMessageReceivedEvent } from '../lib/interfaces';
2322
import { v4 as uuidV4 } from 'uuid';
23+
import { queueEvent } from '../handlers/events';
2424

2525
export const router = Router();
26-
export const eventEmitter = new EventEmitter();
2726

2827
router.head('/ping', (_req, res) => {
2928
res.sendStatus(204);
@@ -33,7 +32,7 @@ router.post('/messages', async (req: Request, res, next) => {
3332
try {
3433
const sender = utils.extractPeerSenderFromRequest(req);
3534
const message = await utils.extractMessageFromMultipartForm(req);
36-
eventEmitter.emit('event', {
35+
await queueEvent({
3736
id: uuidV4(),
3837
type: 'message-received',
3938
sender,
@@ -52,7 +51,7 @@ router.put('/blobs/*', async (req: Request, res, next) => {
5251
const blobPath = path.join(utils.constants.RECEIVED_BLOBS_SUBDIRECTORY, sender, req.params[0]);
5352
const metadata = await blobsHandler.storeBlob(file, blobPath);
5453
res.sendStatus(204);
55-
eventEmitter.emit('event', {
54+
await queueEvent({
5655
id: uuidV4(),
5756
type: 'blob-received',
5857
sender,

0 commit comments

Comments
 (0)