Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 63 additions & 21 deletions postgresql.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,38 @@ module.exports = function (RED) {
}
}

function changed(cnfg, prevCnfg) {
if (cnfg === undefined) {
return undefined;
}
else if (prevCnfg === undefined) {
return true;
}
else {
// Perform actual comparison
let result = false;
const cnfgKeys = Object.keys(cnfg).sort();
const prevCnfgKeys = Object.keys(prevCnfg).sort();
// Check for different numbers of keys
if (cnfgKeys.length !== prevCnfgKeys.length) {
result = true;
}
else {
// Same nambers of keys - check for at least one changed key
if (!(cnfgKeys.every((key, index) => key === prevCnfgKeys[index]))) {
result = true
}
else {
// Same keys - check for at least one changed value
if (!(cnfgKeys.every((key) => cnfg[key] === prevCnfg[key]))) {
result = true;
}
}
}
return result;
}
}

function PostgreSQLConfigNode(n) {
const node = this;
RED.nodes.createNode(node, n);
Expand All @@ -75,22 +107,6 @@ module.exports = function (RED) {
node.passwordFieldType = n.passwordFieldType;
node.connectionTimeout = n.connectionTimeout;
node.connectionTimeoutFieldType = n.connectionTimeoutFieldType;

this.pgPool = new Pool({
user: getField(node, n.userFieldType, n.user),
password: getField(node, n.passwordFieldType, n.password),
host: getField(node, n.hostFieldType, n.host),
port: getField(node, n.portFieldType, n.port),
database: getField(node, n.databaseFieldType, n.database),
ssl: getField(node, n.sslFieldType, n.ssl),
application_name: getField(node, n.applicationNameType, n.applicationName),
max: getField(node, n.maxFieldType, n.max),
idleTimeoutMillis: getField(node, n.idleFieldType, n.idle),
connectionTimeoutMillis: getField(node, n.connectionTimeoutFieldType, n.connectionTimeout),
});
this.pgPool.on('error', (err, _) => {
node.error(err.message);
});
}

RED.nodes.registerType('postgreSQLConfig', PostgreSQLConfigNode);
Expand All @@ -102,11 +118,8 @@ module.exports = function (RED) {
node.query = config.query;
node.split = config.split;
node.rowsPerMsg = config.rowsPerMsg;
node.config = RED.nodes.getNode(config.postgreSQLConfig) || {
pgPool: {
totalCount: 0,
},
};
node.config = RED.nodes.getNode(config.postgreSQLConfig) || {};
node.config.pgPool = { totalCount: 0, end: null};

// Declare the ability of this node to provide ticks upstream for back-pressure
node.tickProvider = true;
Expand Down Expand Up @@ -155,6 +168,35 @@ module.exports = function (RED) {
updateStatus(0, false);

node.on('input', async (msg, send, done) => {

// Get current db access configuration data
let dbAccessCfgData = {};
dbAccessCfgData.user = getField(node, node.config.userFieldType, node.config.user);
dbAccessCfgData.password = getField(node, node.config.passwordFieldType, node.config.password);
dbAccessCfgData.host = getField(node, node.config.hostFieldType, node.config.host);
dbAccessCfgData.port = getField(node, node.config.portFieldType, node.config.port);
dbAccessCfgData.database = getField(node, node.config.databaseFieldType, node.config.database);
dbAccessCfgData.ssl = getField(node, node.config.sslFieldType, node.config.ssl);
dbAccessCfgData.application_name = getField(node, node.config.applicationNameType, node.config.applicationName);
dbAccessCfgData.max = getField(node, node.config.maxFieldType, node.config.max);
dbAccessCfgData.idleTimeoutMillis = getField(node, node.config.idleFieldType, node.config.idle);
dbAccessCfgData.connectionTimeoutMillis = getField(node, node.config.connectionTimeoutFieldType, node.config.connectionTimeout);

// Get previous db access configuration data
const nodeContext = node.context();
const previousDbAccessCfgData = nodeContext.get('previousDbAccessCfgData')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code introduces quite some overhead and therefore some performances penalties. This is on the "hot path", and will be called many many times. So we need to be careful and see how the negative consequences can be mitigated

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that the fact that this will run at all messages reception could become a real problem depending on the scenario considered. Quite simplistic, my initial idea, sorry!
In my specific scenario, the frequency of transactions in the database is relatively low, so I am unlikely to face performance issues, here. However, of course, we want a solution that is scalable.

Possibilities that occur to me:

  • Create flags in the endpoint configuration, to mark which parameters should be monitored for changes;
  • Leave the monitoring task out of the implementation, creating just a way to force the pool to restart (a message containing a specific flag?).
  • Check whether there have been changes to parameters/credentials only when errors occur.

Answering your question: in my specific scenario, only the database access password, which is obtained from an external system, will be changed periodically, as part of a security policy.

Copy link
Author

@joaoroscoe joaoroscoe Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is another potential issue: considering that we could have several nodes acessing the database, what would happen if a node restarts the pool, and another one is caught right in the middle of trying to use it ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those are all good ideas. Exposing a possibility to reload the pool, for instance through a specific message, could be a simple option. Please check my question further down, so I can better understand the use case.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding concurrency, that will need to be tested when we have some a clearer scenario

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have just pushed a new commit, including:

  • Pool is reset if msg includes a property with key "reconnect" and any truthy value;
  • Pool is reset if a change is detected in any connection parameter that is flagged to be monitored.

Inputs (checkboxes) were added to the configuration node editor, to flag the connection parameters to be monitored for changes. Those are read at initialization, and pushed into a "watch list" array. Only parameters on that list will be checked at message reception, greatly minimizing overhead in the "hot path".

Additionally, I made changes in node-config-input-ssl input html definition, so that the Typed Input widget will show with the same width as the other ones, to make space for 'global' type input.

I haven't touched the documentation, so far.

Your tougths ?


if (changed(dbAccessCfgData, previousDbAccessCfgData))
{
// Reset connections pool
if (node.config.pgPool.end !== null) {
node.config.pgPool.end();
}
node.config.pgPool = new Pool(dbAccessCfgData);
// Update previous db access configuration datain context
nodeContext.set('previousDbAccessCfgData',dbAccessCfgData)
}

// 'send' and 'done' require Node-RED 1.0+
send = send || function () { node.send.apply(node, arguments); };

Expand Down