Skip to content

Commit f5e321d

Browse files
Merge pull request #68 from hyperledger/direct-round-trips
Direct round trips
2 parents 0e7fb94 + aa8f412 commit f5e321d

File tree

1 file changed

+74
-43
lines changed

1 file changed

+74
-43
lines changed

src/routers/api.ts

Lines changed: 74 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616

1717
import { Request, Router } from 'express';
1818
import { promises as fs } from 'fs';
19+
import { createReadStream } from 'fs';
1920
import https from 'https';
2021
import path from 'path';
2122
import { v4 as uuidV4 } from 'uuid';
2223
import * as blobsHandler from '../handlers/blobs';
2324
import * as eventsHandler from '../handlers/events';
25+
import { queueEvent } from '../handlers/events';
2426
import * as messagesHandler from '../handlers/messages';
2527
import { ca, cert, certBundle, key, peerID } from '../lib/cert';
2628
import { config, persistDestinations, persistPeers } from '../lib/config';
27-
import { IStatus } from '../lib/interfaces';
29+
import { IBlobReceivedEvent, IFile, IMessageDeliveredEvent, IMessageReceivedEvent, IStatus } from '../lib/interfaces';
2830
import RequestError from '../lib/request-error';
2931
import * as utils from '../lib/utils';
3032

@@ -152,46 +154,38 @@ router.post('/messages', async (req, res, next) => {
152154
}
153155
let senderDestination: string | undefined = undefined;
154156
if (typeof req.body.sender === 'string') {
155-
const segments = req.body.sender.split('/');
156-
if (segments[0] !== peerID) {
157-
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400);
157+
let senderID: string;
158+
({ peerID: senderID, destination: senderDestination } = extractRecipientAndDestination(req.body.sender));
159+
if (senderID !== peerID) {
160+
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${senderDestination}`, 400);
158161
}
159-
if (segments.length > 1) {
160-
if (!config.destinations?.includes(segments[1])) {
161-
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${segments[1]}`, 400);
162-
}
163-
senderDestination = segments[1];
162+
if (senderDestination !== undefined && !config.destinations?.includes(senderDestination)) {
163+
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${senderDestination}`, 400);
164164
}
165165
}
166166
let recipientID: string;
167167
let recipientDestination: string | undefined = undefined;
168168
if (typeof req.body.recipient === 'string') {
169-
const segments = req.body.recipient.split('/');
170-
recipientID = segments[0];
171-
if (segments.length > 1) {
172-
recipientDestination = segments[1];
173-
}
169+
({ peerID: recipientID, destination: recipientDestination } = extractRecipientAndDestination(req.body.recipient));
174170
} else {
175171
throw new RequestError('Missing recipient', 400);
176172
}
177-
let recipientEndpoint: string;
173+
let requestId: string = req.body.requestId ?? uuidV4();
178174
if (recipientID === peerID) {
179-
recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`;
175+
if (recipientDestination !== undefined && !config.destinations?.includes(recipientDestination)) {
176+
throw new RequestError(`Unknown recipient destination expected=${config.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 400);
177+
}
178+
dispatchInternalMessage(req.body.sender, req.body.recipient, req.body.message, requestId);
180179
} else {
181180
let recipientPeer = config.peers.find(peer => peer.id === recipientID);
182181
if (recipientPeer === undefined) {
183182
throw new RequestError(`Unknown recipient ${recipientID}`, 400);
184183
}
185-
recipientEndpoint = recipientPeer.endpoint;
186184
if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) {
187185
throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|') ?? 'none'} recieved=${recipientDestination}`, 400);
188186
}
187+
messagesHandler.sendMessage(req.body.message, recipientID, recipientPeer.endpoint, requestId, senderDestination, recipientDestination);
189188
}
190-
let requestId = uuidV4();
191-
if (typeof req.body.requestId === 'string') {
192-
requestId = req.body.requestId;
193-
}
194-
messagesHandler.sendMessage(req.body.message, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination);
195189
res.send({ requestId });
196190
} catch (err) {
197191
next(err);
@@ -256,31 +250,25 @@ router.post('/transfers', async (req, res, next) => {
256250
await blobsHandler.retrieveMetadata(req.body.path);
257251
let senderDestination: string | undefined = undefined;
258252
if (typeof req.body.sender === 'string') {
259-
const segments = req.body.sender.split('/');
260-
if (segments[0] !== peerID) {
261-
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${segments[0]}`, 400);
253+
let senderID: string;
254+
({ peerID: senderID, destination: senderDestination } = extractRecipientAndDestination(req.body.sender));
255+
if (senderID !== peerID) {
256+
throw new RequestError(`Sender ID mismatch expected=${peerID} recieved=${senderID}`, 400);
262257
}
263-
if (segments.length > 1) {
264-
if (!config.destinations?.includes(segments[1])) {
265-
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|')} recieved=${segments[1]}`, 400);
266-
}
267-
senderDestination = segments[1];
258+
if (senderDestination !== undefined && !config.destinations?.includes(senderDestination)) {
259+
throw new RequestError(`Unknown sender destination expected=${config.destinations?.join('|')} recieved=${senderDestination}`, 400);
268260
}
269261
}
270262
let recipientID: string;
271263
let recipientDestination: string | undefined = undefined;
272264
if (typeof req.body.recipient === 'string') {
273-
const segments = req.body.recipient.split('/');
274-
recipientID = segments[0];
275-
if (segments.length > 1) {
276-
recipientDestination = segments[1];
277-
}
265+
({ peerID: recipientID, destination: recipientDestination } = extractRecipientAndDestination(req.body.recipient));
278266
} else {
279267
throw new RequestError('Missing recipient', 400);
280268
}
281-
let recipientEndpoint: string;
269+
let requestId: string = req.body.requestId ?? uuidV4();
282270
if (recipientID === peerID) {
283-
recipientEndpoint = config.p2p.endpoint ?? `https://${config.p2p.hostname}:${config.p2p.port}`;
271+
dispatchInternalBlob(req.body.sender, req.body.recipient, req.body.path);
284272
} else {
285273
let recipientPeer = config.peers.find(peer => peer.id === recipientID);
286274
if (recipientPeer === undefined) {
@@ -289,15 +277,58 @@ router.post('/transfers', async (req, res, next) => {
289277
if (recipientDestination !== undefined && !recipientPeer.destinations?.includes(recipientDestination)) {
290278
throw new RequestError(`Unknown recipient destination expected=${recipientPeer.destinations?.join('|')} recieved=${recipientDestination}`, 400);
291279
}
292-
recipientEndpoint = recipientPeer.endpoint;
293-
}
294-
let requestId = uuidV4();
295-
if (typeof req.body.requestId === 'string') {
296-
requestId = req.body.requestId;
280+
blobsHandler.sendBlob(req.body.path, recipientID, recipientPeer.endpoint, requestId, senderDestination, recipientDestination);
297281
}
298-
blobsHandler.sendBlob(req.body.path, recipientID, recipientEndpoint, requestId, senderDestination, recipientDestination);
299282
res.send({ requestId });
300283
} catch (err) {
301284
next(err);
302285
}
303286
});
287+
288+
const extractRecipientAndDestination = (value: string) => {
289+
const segments = value.split('/');
290+
return {
291+
peerID: segments[0],
292+
destination: segments.length > 1 ? segments[1] : undefined
293+
};
294+
};
295+
296+
const dispatchInternalMessage = async (sender: string, recipient: string, message: string, requestId: string) => {
297+
await queueEvent({
298+
id: uuidV4(),
299+
type: 'message-received',
300+
sender,
301+
recipient,
302+
message
303+
} as IMessageReceivedEvent);
304+
await queueEvent({
305+
id: uuidV4(),
306+
type: 'message-delivered',
307+
sender,
308+
recipient,
309+
message,
310+
requestId
311+
} as IMessageDeliveredEvent);
312+
};
313+
314+
const dispatchInternalBlob = async (sender: string, recipient: string, filePath: string) => {
315+
const originBlobPath = path.join(utils.constants.DATA_DIRECTORY, utils.constants.BLOBS_SUBDIRECTORY, filePath);
316+
const readableStream = createReadStream(originBlobPath);
317+
const file: IFile = {
318+
key: '',
319+
name: '',
320+
readableStream
321+
};
322+
const destinationBlobPath = path.join(utils.constants.RECEIVED_BLOBS_SUBDIRECTORY, sender, filePath);
323+
const metadata = await blobsHandler.storeBlob(file, destinationBlobPath);
324+
await queueEvent({
325+
id: uuidV4(),
326+
type: 'blob-received',
327+
sender,
328+
recipient,
329+
path: destinationBlobPath,
330+
hash: metadata.hash,
331+
size: metadata.size,
332+
lastUpdate: metadata.lastUpdate
333+
} as IBlobReceivedEvent);
334+
};

0 commit comments

Comments
 (0)