Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions lib/runner/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,21 @@ var _ = require('lodash'),
* @param {Number} [position=0] -
* @param {Number} [iteration=0] -
* @param {String} [ref] -
* @param {Number} [partitionIndex=0] -
* @param {Number} [partitionCycles=0] -
* @constructor
*/
Cursor = function RunCursor (length, cycles, position, iteration, ref) { // eslint-disable-line func-name-matching
Cursor = function RunCursor (length, cycles, position, iteration, // eslint-disable-line func-name-matching
ref, partitionIndex, partitionCycles) {
this.length = Cursor.validate(length, 0);
this.position = Cursor.validate(position, 0, this.length);

this.cycles = Cursor.validate(cycles, 1, 1);
this.iteration = Cursor.validate(iteration, 0, this.cycles);

this.partitionIndex = Cursor.validate(partitionIndex, 0, this.cycles);
this.partitionCycles = Cursor.validate(partitionCycles, 0, this.cycles);

this.ref = ref || uuid.v4();
};

Expand Down Expand Up @@ -202,7 +208,9 @@ _.assign(Cursor.prototype, {
var base = {
ref: this.ref,
length: this.length,
cycles: this.cycles
cycles: this.cycles,
partitionCycles: this.partitionCycles,
partitionIndex: this.partitionIndex
},
position,
iteration;
Expand Down Expand Up @@ -265,6 +273,8 @@ _.assign(Cursor.prototype, {
iteration: this.iteration,
length: this.length,
cycles: this.cycles,
partitionIndex: this.partitionIndex,
partitionCycles: this.partitionCycles,
empty: this.empty(),
eof: this.eof(),
bof: this.bof(),
Expand Down Expand Up @@ -349,7 +359,8 @@ _.assign(Cursor, {
if (!_.isObject(obj)) { return new Cursor(bounds && bounds.length, bounds && bounds.cycles); }

// load Cursor values from object
return new Cursor((bounds || obj).length, (bounds || obj).cycles, obj.position, obj.iteration, obj.ref);
return new Cursor((bounds || obj).length, (bounds || obj).cycles, obj.position,
obj.iteration, obj.ref, obj.partitionIndex, obj.partitionCycles);
},

/**
Expand Down
17 changes: 11 additions & 6 deletions lib/runner/extensions/control.command.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,17 @@ module.exports = {
abort (userback, payload, next) {
// clear instruction pool and as such there will be nothing next to execute
this.pool.clear();
this.triggers.abort(null, this.state.cursor.current());

// execute the userback sent as part of the command and do so in a try block to ensure it does not hamper
// the process tick
backpack.ensure(userback, this) && userback();

this.partitionManager.clearPools();
if (!this.aborted) {
this.aborted = true;
// Always trigger abort event here to ensure it's called even if host has been disposed
this.triggers.abort(null, this.state.cursor.current());

// execute the userback sent as part of the command and
// do so in a try block to ensure it does not hamper the process tick
backpack.ensure(userback, this) && userback();
}
this.partitionManager.triggerStopAction();
next(null);
}
}
Expand Down
8 changes: 8 additions & 0 deletions lib/runner/extensions/event.command.js
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,14 @@ module.exports = {
result && result._variables &&
(this.state._variables = new sdk.VariableScope(result._variables));

// persist the pm.variables for the next request in the current iteration
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's happening here?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since each partition needs to track it's own variables, we are updating the variables for that partition only. So we do not affect other partitions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can be a helper function in partition manager, something like: this.partitionManager.persistVariables(result._variables).

const partitionIndex = payload.coords.partitionIndex;

result && result._variables &&
this.areIterationsParallelized &&
(this.partitionManager.partitions[partitionIndex]
.variables._variables = new sdk.VariableScope(result._variables));

// persist the mutated request in payload context,
// @note this will be used for the next prerequest script or
// upcoming commands(request, httprequest).
Expand Down
185 changes: 185 additions & 0 deletions lib/runner/extensions/parallel.command.js
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see we've moved some code to utils, but there's still a lot of duplicate code from the waterfall command. Why can't it queue the waterfall or find a better way to avoid this duplication?

Copy link
Collaborator

@KhudaDad414 KhudaDad414 Sep 11, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have created a separate function to handle the SNR logic, which reduces duplicate code.

While we could make changes to the ⁠waterfall command, we prefer to avoid modifying it or introducing additional conditional statements. Doing so could negatively affect its performance and make the overall logic harder to understand. For this reason, we have chosen to implement a completely new command for parallel runs, even if it means some code duplication.

Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
var _ = require('lodash'),
{ prepareVaultVariableScope, prepareVariablesScope,
extractSNR, prepareLookupHash, getIterationData
} = require('../util');

/**
* Adds options
* disableSNR:Boolean
*
* @type {Object}
*/
module.exports = {
init: function (done) {
if (!this.areIterationsParallelized) {
return done();
}
var state = this.state;

// ensure that the environment, globals and collectionVariables are in VariableScope instance format
prepareVariablesScope(state);
// prepare the vault variable scope
prepareVaultVariableScope(state.vaultSecrets);

// create the partition manager and partition the iterations
this.partitionManager.createPartitions();
const { partitions } = this.partitionManager;

// queue a parallel command for each of our partitions
partitions.forEach((partition) => {
this.queue('parallel', {
coords: partition.cursor.current(),
static: true,
start: true
});
});

return done();
},

triggers: ['beforeIteration', 'iteration'],

process: {
/**
* This processor queues partitions in parallel.
*
* @param {Object} payload -
* @param {Object} payload.coords -
* @param {Boolean} [payload.static=false] -
* @param {Function} next -
*/
parallel (payload, next) {
var partitionIndex = payload.coords.partitionIndex,
partition = this.partitionManager.partitions[partitionIndex],
coords = payload.static ? payload.coords : partition.cursor.whatnext(payload.coords),
item = this.state.items[coords.position],
delay;


if (coords.empty) {
return next();
}

if (payload.stopRunNow) {
this.triggers.iteration(null, coords);

return next();
}

// if it is a beginning of a run, we need to raise events for iteration start
if (payload.start) {
this.triggers.beforeIteration(null, coords);
}

// since we will never reach coords.eof for some partitions because each cursor
// contains cycles for the entire run, we are breaking off early here.
// this has been done to keep the contract of a cursor intact.
// cycles is defined as "number of iterations in the run"
if (coords.iteration === partition.startIndex + coords.partitionCycles) {
this.triggers.iteration(null, payload.coords);

return next();
}

if (coords.cr) {
delay = _.get(this.options, 'delay.iteration', 0);

this.triggers.iteration(null, payload.coords);
this.triggers.beforeIteration(null, coords);
}


if (coords.eof) {
this.triggers.iteration(null, coords);

return next();
}

this.queueDelay(function () {
this.queue('item', {
item: item,
coords: coords,
data: getIterationData(this.state.data, coords.iteration + partition.startIndex),
environment: partition.variables.environment,
globals: partition.variables.globals,
vaultSecrets: this.state.vaultSecrets,
collectionVariables: partition.variables.collectionVariables,
_variables: partition.variables._variables
}, function (executionError, executions) {
var snr = {},
nextCoords,
seekingToStart,
stopRunNow,

stopOnFailure = this.options.stopOnFailure;

if (!executionError) {
// extract set next request
snr = extractSNR(executions.prerequest);
snr = extractSNR(executions.test, snr);
}

if (!this.options.disableSNR && snr.defined) {
// prepare the snr lookup hash if it is not already provided
// @todo - figure out a way to reset this post run complete
!this.snrHash && (this.snrHash = prepareLookupHash(this.state.items));

// if it is null, we do not proceed further and move on
// see if a request is found in the hash and then reset the coords position to the lookup
// value.
(snr.value !== null) && (snr.position = // eslint-disable-next-line no-nested-ternary
this.snrHash[_.has(this.snrHash.ids, snr.value) ? 'ids' :
(_.has(this.snrHash.names, snr.value) ? 'names' : 'obj')][snr.value]);

snr.valid = _.isNumber(snr.position);
}

nextCoords = _.clone(coords);

if (snr.valid) {
// if the position was detected, we set the position to the one previous to the desired location
// this ensures that the next call to .whatnext() will return the desired position.
nextCoords.position = snr.position - 1;
}
else {
// if snr was requested, but not valid, we stop this iteration.
// stopping an iteration is equivalent to seeking the last position of the current
// iteration, so that the next call to .whatnext() will automatically move to the next
// iteration.
(snr.defined || executionError) && (nextCoords.position = nextCoords.length - 1);

// If we need to stop on a run, we set the stop flag to true.
(stopOnFailure && executionError) && (stopRunNow = true);
}

// @todo - do this in unhacky way
if (nextCoords.position === -1) {
nextCoords.position = 0;
seekingToStart = true;
}


partition.cursor.seek(nextCoords.position, nextCoords.iteration, function (err, chngd, coords) {
// this condition should never arise, so better throw error when this happens
if (err) {
throw err;
}

this.queue('parallel', {
coords: {
...coords,
partitionIndex
},
static: seekingToStart,
stopRunNow: stopRunNow
});
}, this);
});
}.bind(this), {
time: delay,
source: 'iteration',
cursor: coords
}, next);
}
}
};
91 changes: 12 additions & 79 deletions lib/runner/extensions/waterfall.command.js
Original file line number Diff line number Diff line change
@@ -1,65 +1,7 @@
var _ = require('lodash'),
Cursor = require('../cursor'),
VariableScope = require('postman-collection').VariableScope,
{ prepareVaultVariableScope } = require('../util'),

prepareLookupHash,
extractSNR,
getIterationData;

/**
* Returns a hash of IDs and Names of items in an array
*
* @param {Array} items -
* @returns {Object}
*/
prepareLookupHash = function (items) {
var hash = {
ids: {},
names: {},
obj: {}
};

_.forEach(items, function (item, index) {
if (item) {
item.id && (hash.ids[item.id] = index);
item.name && (hash.names[item.name] = index);
}
});

return hash;
};

extractSNR = function (executions, previous) {
var snr = previous || {};

_.isArray(executions) && executions.forEach(function (execution) {
_.has(_.get(execution, 'result.return'), 'nextRequest') && (
(snr.defined = true),
(snr.value = execution.result.return.nextRequest)
);
});

return snr;
};

/**
* Returns the data for the given iteration
*
* @function getIterationData
* @param {Array} data - The data array containing all iterations' data
* @param {Number} iteration - The iteration to get data for
* @return {Any} - The data for the iteration
*/
getIterationData = function (data, iteration) {
// if iteration has a corresponding data element use that
if (iteration < data.length) {
return data[iteration];
}

// otherwise use the last data element
return data[data.length - 1];
};
{ prepareVaultVariableScope, prepareVariablesScope, prepareLookupHash,
extractSNR, getIterationData } = require('../util');

/**
* Adds options
Expand All @@ -71,19 +13,8 @@ module.exports = {
init: function (done) {
var state = this.state;

// ensure that the environment, globals and collectionVariables are in VariableScope instance format
state.environment = VariableScope.isVariableScope(state.environment) ? state.environment :
new VariableScope(state.environment);
state.globals = VariableScope.isVariableScope(state.globals) ? state.globals :
new VariableScope(state.globals);
state.vaultSecrets = VariableScope.isVariableScope(state.vaultSecrets) ? state.vaultSecrets :
new VariableScope(state.vaultSecrets);
state.collectionVariables = VariableScope.isVariableScope(state.collectionVariables) ?
state.collectionVariables : new VariableScope(state.collectionVariables);
state._variables = VariableScope.isVariableScope(state.localVariables) ?
state.localVariables : new VariableScope(state.localVariables);

// prepare the vault variable scope
// prepare the vault variable scope and other variables
prepareVariablesScope(state);
prepareVaultVariableScope(state.vaultSecrets);

// ensure that the items and iteration data set is in place
Expand All @@ -100,16 +31,18 @@ module.exports = {
this.waterfall = state.cursor; // copy the location object to instance for quick access

// queue the iteration command on start
this.queue('waterfall', {
coords: this.waterfall.current(),
static: true,
start: true
});
if (!this.areIterationsParallelized) {
this.queue('waterfall', {
coords: this.waterfall.current(),
static: true,
start: true
});
}

// clear the variable that is supposed to store item name and id lookup hash for easy setNextRequest
this.snrHash = null; // we populate it in the first SNR call

done();
return done();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why return callback?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed.

},

triggers: ['beforeIteration', 'iteration'],
Expand Down
Loading
Loading