Skip to content

Commit e5bf9be

Browse files
authored
Merge pull request #43 from afdezayl/master
FIX: transmitPublish not allowing interception
2 parents d8a7a40 + 80687ae commit e5bf9be

File tree

2 files changed

+85
-0
lines changed

2 files changed

+85
-0
lines changed

serversocket.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -760,6 +760,7 @@ AGServerSocket.prototype._processInboundPacket = async function (packet, message
760760
}
761761

762762
if (isPublish) {
763+
packet.data.data = newData;
763764
await this._processInboundPublishPacket(packet);
764765
}
765766

test/integration.js

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3265,6 +3265,90 @@ describe('Integration tests', function () {
32653265
clientB.disconnect();
32663266
clientC.disconnect();
32673267
});
3268+
it('Should allow to change message in middleware when client invokePublish', async function() {
3269+
const clientMessage = 'world';
3270+
const middlewareMessage = 'intercepted';
3271+
const middlewareFunction = async function (middlewareStream) {
3272+
for await (let action of middlewareStream) {
3273+
if (action.type === AGAction.PUBLISH_IN) {
3274+
action.allow({data: middlewareMessage});
3275+
} else {
3276+
action.allow();
3277+
}
3278+
}
3279+
};
3280+
3281+
server.setMiddleware(server.MIDDLEWARE_INBOUND, middlewareFunction);
3282+
3283+
const client = socketClusterClient.create({
3284+
hostname: clientOptions.hostname,
3285+
port: PORT_NUMBER
3286+
});
3287+
3288+
let helloChannel = client.subscribe('hello');
3289+
await helloChannel.listener('subscribe').once();
3290+
3291+
let receivedMessages = [];
3292+
(async () => {
3293+
for await (let data of helloChannel) {
3294+
receivedMessages.push(data);
3295+
}
3296+
})();
3297+
3298+
let error;
3299+
try {
3300+
await client.invokePublish('hello', clientMessage);
3301+
} catch (err) {
3302+
error = err;
3303+
}
3304+
3305+
await wait(100);
3306+
3307+
assert.notEqual(clientMessage, middlewareMessage);
3308+
assert.equal(receivedMessages[0], middlewareMessage);
3309+
});
3310+
it('Should allow to change message in middleware when client transmitPublish', async function() {
3311+
const clientMessage = 'world';
3312+
const middlewareMessage = 'intercepted';
3313+
const middlewareFunction = async function (middlewareStream) {
3314+
for await (let action of middlewareStream) {
3315+
if (action.type === AGAction.PUBLISH_IN) {
3316+
action.allow({data: middlewareMessage});
3317+
} else {
3318+
action.allow();
3319+
}
3320+
}
3321+
};
3322+
3323+
server.setMiddleware(server.MIDDLEWARE_INBOUND, middlewareFunction);
3324+
3325+
const client = socketClusterClient.create({
3326+
hostname: clientOptions.hostname,
3327+
port: PORT_NUMBER
3328+
});
3329+
3330+
let helloChannel = client.subscribe('hello');
3331+
await helloChannel.listener('subscribe').once();
3332+
3333+
let receivedMessages = [];
3334+
(async () => {
3335+
for await (let data of helloChannel) {
3336+
receivedMessages.push(data);
3337+
}
3338+
})();
3339+
3340+
let error;
3341+
try {
3342+
await client.transmitPublish('hello', clientMessage);
3343+
} catch (err) {
3344+
error = err;
3345+
}
3346+
3347+
await wait(100);
3348+
3349+
assert.notEqual(clientMessage, middlewareMessage);
3350+
assert.equal(receivedMessages[0], middlewareMessage);
3351+
})
32683352
});
32693353

32703354
describe('SUBSCRIBE action', function () {

0 commit comments

Comments
 (0)