Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
317 changes: 287 additions & 30 deletions package-lock.json

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
"test-watch": "NODE_ENV=test NODE_LOG_LEVEL=info mocha -r dotenv/config dotenv_config_path=.env.test --timeout 10000 --require co-mocha -w -b --ignore './server/repositories/**/*.spec.js' './server/setup.js' './server/**/*.spec.js' './__tests__/seed.spec.js' './__tests__/supertest.js'",
"test-watch-debug": "NODE_ENV=test NODE_LOG_LEVEL=debug DOTENV_CONFIG_PATH=.env.test mocha -r dotenv/config --timeout 10000 --require co-mocha -w -b --ignore './server/repositories/**/*.spec.js' './server/setup.js' './server/**/*.spec.js' './__tests__/seed.spec.js' './__tests__/**/*.spec.js'",
"test-e2e": "NODE_TLS_REJECT_UNAUTHORIZED='0' mocha --config ./__tests__/e2e/.mocharc.js",
"test-e2e-locally": "RUN_E2E_LOCALLY=true DB_SSL=false NODE_ENV=test NODE_LOG_LEVEL=debug NODE_TLS_REJECT_UNAUTHORIZED='0' mocha --config ./__tests__/e2e/.mocharc.js ./server/setup.js",
"test-e2e-locally": "RUN_E2E_LOCALLY=true DB_SSL=false NODE_LOG_LEVEL=debug NODE_TLS_REJECT_UNAUTHORIZED='0' mocha --config ./__tests__/e2e/.mocharc.js ./server/setup.js",
"prettier-fix": "prettier ./ --write",
"db-migrate-ci": "cd database; db-migrate up",
"start-db": "docker-compose up"
Expand All @@ -34,6 +34,7 @@
"license": "GPL-3.0-or-later",
"dependencies": {
"@sentry/node": "^5.1.0",
"amqplib": "^0.8.0",
"bcryptjs": "^2.4.3",
"body-parser": "^1.18.2",
"chai-uuid": "^1.0.6",
Expand All @@ -51,6 +52,7 @@
"morgan": "^1.9.1",
"nodemon": "^2.0.4",
"pg": "^8.5.1",
"rascal": "^13.0.3",
"uuid": "^8.2.0"
},
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion server/models/Wallet.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class Wallet{
this._id = idOrJSON.id;
this._JSON = idOrJSON;
}else{
throw new HttpError(500);
throw new HttpError(500, "Wrong constructor arg for wallet");
}
const WalletService = require("../services/WalletService");
this.walletRepository = new WalletRepository(session);
Expand Down
12 changes: 12 additions & 0 deletions server/repositories/TransferRepository.js
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@ class TransferRepository extends BaseRepository{
state: Transfer.STATE.pending,
});
}

async getTokenAndCaptureIds(id){
return await this._session.getDB().raw(
`
SELECT token_id, capture_id FROM "transaction" tr
LEFT JOIN "token" t
ON tr.token_id = t.id
WHERE transfer_id = ?
`,
[id],
);
}
}

module.exports = TransferRepository;
20 changes: 20 additions & 0 deletions server/repositories/TransferRepository.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const knex = require("../database/knex");
const tracker = mockKnex.getTracker();
const Session = require("../models/Session");
const uuid = require('uuid');
const sinon = require('sinon');

describe("TransferRepository", () => {
let transferRepository;
Expand Down Expand Up @@ -74,5 +75,24 @@ describe("TransferRepository", () => {
expect(result).lengthOf(1);
});

it("getTokensById", async () => {
const data = [{
capture_id: "c",
token_id: "t",
}];
tracker.uninstall();
tracker.install();
tracker.on('query', function sendResult(query, step) {
[
function firstQuery() {
expect(query.sql).match(/capture_id/);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is match here a regex kind? I feel either we should do a robust match by verifying the exact query if possible.
Even if we did this test seems to be verifying just the method signature and knex but nothing significant in our code. I would prefer to skip the unit test for this Repository class if you ask me. It would be beneficial and reduce our developer time spent on maintaining it.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, maybe you are right, I don't realize that I added some tests maybe look not so useful, yes, it might be unnessarary, I will think it over, and another reason lead to this result is that I wrote the test first before I wrote the implementation, so when I link this model to the real system, I don't need to worry too much about that it's some problematic model, it makes integrating the code easier.

query.response(data);
},
][step - 1]();
});
const result = await transferRepository.getTokenAndCaptureIds(1);
sinon.assert.match(result, data);
});

});

31 changes: 25 additions & 6 deletions server/routes/transferRouter.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ transferRouter.post(
const walletLogin = await walletService.getById(res.locals.wallet_id);
const walletSender = await walletService.getByIdOrName(req.body.sender_wallet);
const walletReceiver = await walletService.getByIdOrName(req.body.receiver_wallet);
const transferService = new TransferService(session);
// check if this transfer is a claim (claim == not transferrrable tokens)
const claim = req.body.claim;

Expand All @@ -76,7 +77,18 @@ transferRouter.post(
// TODO: get only transferrable tokens
result = await walletLogin.transferBundle(walletSender, walletReceiver, req.body.bundle.bundle_size, claim);
}
const transferService = new TransferService(session);

// send message
if (result.state === Transfer.STATE.completed) {
// just send message in production
if(process.env.NODE_ENV !== "test"){
await transferService.sendMessage(result.id);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One way we could make this a bit more robust is to raise the messages as a domain-event (a separate table that stores all the events raised) with a payload that holds the message and a status(that can be raised, sent).
The raised domain-event will be stored and should be in the same transaction along with the transfer logic.
After the first part is complete, we could proceed to dispatch the domain event payload (the message) to the queue and mark the domain-event as sent in a second db call that probably doesn't need to be in a transaction since it deals with just one table.

Copy link
Copy Markdown
Collaborator Author

@dadiorchen dadiorchen Jun 19, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have some 3rd party libraries to implement this pattern? Or we should build a module about this by ourselves? I think this might be a common demand in different our project?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, we need this in whichever service we emit/receive messages and store/process them. I started a separate project (to be treated as a submodule) https://github.com/Greenstand/domain-events. I am not sure if there is a NPM library that can meet our needs exactly. It might easier to use our own given we also need to store these events in a database.

}
}

await session.commitTransaction();

// response
result = await transferService.convertToResponse(result);
if (result.state === Transfer.STATE.completed) {
res.status(201).json(result);
Expand All @@ -88,7 +100,6 @@ transferRouter.post(
} else {
throw new Error(`Unexpected state ${result.state}`);
}
await session.commitTransaction();
}catch(e){
if(e instanceof HttpError && !e.shouldRollback()){
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -127,8 +138,12 @@ transferRouter.post(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
// just send message in production
if(process.env.NODE_ENV !== "test"){
await transferService.sendMessage(transferJson.id);
}
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -167,8 +182,8 @@ transferRouter.post(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -207,8 +222,8 @@ transferRouter.delete(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down Expand Up @@ -280,8 +295,12 @@ transferRouter.post(
const transferJson2 = await transferService.convertToResponse(
transferJson,
);
res.status(200).json(transferJson2);
// just send message in production
if(process.env.NODE_ENV !== "test"){
await transferService.sendMessage(transferJson.id);
}
await session.commitTransaction();
res.status(200).json(transferJson2);
} catch (e) {
if (e instanceof HttpError && !e.shouldRollback()) {
// if the error type is HttpError, means the exception has been handled
Expand Down
5 changes: 5 additions & 0 deletions server/routes/transferRouter.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ describe("transferRouter", () => {
id: tokenId,
state: Transfer.STATE.completed,
});
sinon.stub(TransferService.prototype, "sendMessage");
const res = await request(app)
.post('/')
.send({
Expand Down Expand Up @@ -188,6 +189,7 @@ describe("transferRouter", () => {
id: transferId,
state: Transfer.STATE.completed,
});
const sendMessage = sinon.stub(TransferService.prototype, "sendMessage");
const res = await request(app)
.post('/')
.send({
Expand All @@ -196,6 +198,9 @@ describe("transferRouter", () => {
receiver_wallet: wallet2Id,
});
expect(res).property('statusCode').eq(201);

// should not send message to queue because ENV = test
sinon.assert.notCalled(sendMessage);
});

// //TODO: test for case 1: with trust relationship, tokens specified
Expand Down
26 changes: 26 additions & 0 deletions server/services/MQConfig.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
module.exports = {
config: {
"vhosts": {
"test": {
"connection": {
"url": process.env.RABBIT_MQ_URL,
"socketOptions": {
"timeout": 1000
}
},
"exchanges": ["wallet-service-ex"],
"queues": ["token-transfer:events"],
"bindings": [
"wallet-service-ex[token.transfer] -> token-transfer:events"
],
"publications": {
"token-assigned": {
"exchange": "wallet-service-ex",
"routingKey": "token.transfer"
}
},
}
}
}
}

44 changes: 44 additions & 0 deletions server/services/MQService.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
const Broker = require('rascal').BrokerAsPromised;
const config = require("./MQConfig").config;
const HttpError = require("../utils/HttpError");
const log = require("loglevel");

class MQService{

constructor(session){
this._settsion = session;
}

sendMessage(payload){
log.warn("to send message");
return new Promise((resolve, reject) => {
Broker.create(config)
.then(broker => {
broker.publish("token-assigned", payload)
.then(publication => {
log.warn("publication is on");
publication
.on("success", () => {
log.warn("message sent!");
resolve(true);
})
.on("error", (err, messageId)=> {
const error = `Error with id ${messageId} ${err.message}`;
log.error(error);
reject(new HttpError(500, error));
});
})
.catch(err => {
log.error(err);
reject(new HttpError(500, `Error publishing message ${err}`));
})
})
.catch(err => {
log.error(err);
reject(new HttpError(500, `Error create broker ${err}`));
})
});
}
}

module.exports = MQService;
103 changes: 103 additions & 0 deletions server/services/MQService.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
const MQService = require("./MQService");
const Broker = require('rascal').BrokerAsPromised;
const sinon = require("sinon");
const {expect} = require("chai");
const jestExpect = require("expect");
const log = require("loglevel");

describe("MQService", () => {

afterEach(() => {
sinon.restore();
});

it("send message successfully", async () => {
const broker = {
publish: async () => {
console.log("publish");
return {
on(event, handler){
// mock the success event
if(event === "success"){
setImmediate(handler);
}
return this;
}
}
}
};
sinon.spy(broker, "publish");
sinon.stub(Broker, "create").resolves(broker);
const mqService = new MQService();

const payload = {a:1};
const result = await mqService.sendMessage(payload);
expect(result).eq(true);
sinon.assert.calledWith(broker.publish, "raw-capture-created", payload, "field-data.capture.creation");

});

it("send message with problem", async () => {
sinon.stub(Broker, "create").resolves({
publish: async () => {
console.log("publish");
return {
on(event, handler){
// mock the error event
if(event === "error"){
setImmediate(() => handler(new Error("Message sending wrong"), "No.1"));
}
return this;
}
}
}
});
const mqService = new MQService();

await jestExpect(async () => {
await mqService.sendMessage({a:1});
}).rejects.toThrow(/Message sending wrong/);

});

});

describe("Real operation, just for dev", () => {

it("Send and receive message", async function(){
try{

const mqService = new MQService();
const payload = {a:1};
const result = await mqService.sendMessage(payload);
log.warn("result:", result);


// await new Promise((resolve, reject) => {
// // check the message
// // Consume a message
// const config = require("./MQConfig").config;
// Broker.create(config)
// .then(broker => {
// log.info("connected to broker");
// broker.subscribeAll()
// .then(subscriptions => {
// subscriptions.forEach( subscription => {
// subscription.on('message', (message, content, ackOrNack) => {
// log.warn("message:", message, content);
// log.warn("message content received:", message.content && message.content.toString());
// ackOrNack();
// resolve();
// }).on('error', console.error);
// });
// });
// });
// const mqService = new MQService();
// const payload = {a:1};
// mqService.sendMessage(payload);
// });
}catch(e){
log.error("e:",e );
};
});
});
Loading