Skip to content

Commit 00d30b4

Browse files
authored
Add configuration for flow concurrency limit [sc-32677] (#388)
1 parent a1966ef commit 00d30b4

File tree

4 files changed

+65
-2
lines changed

4 files changed

+65
-2
lines changed

packages/spectral/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@prismatic-io/spectral",
3-
"version": "10.12.2",
3+
"version": "10.12.3",
44
"description": "Utility library for building Prismatic connectors and code-native integrations",
55
"keywords": ["prismatic"],
66
"main": "dist/index.js",

packages/spectral/src/cni-testing.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,58 @@ describe("test convert flow", () => {
260260
});
261261
});
262262

263+
describe("test convert flow with queueConfig", () => {
264+
const baseTestFlowInput = {
265+
name: "Test Flow",
266+
stableKey: "my-test-flow",
267+
description: "This is a test flow with queue config",
268+
onExecution: async () => {
269+
return { data: 123 };
270+
},
271+
};
272+
273+
it("passes through valid concurrencyLimit values", () => {
274+
const testFlow = flow({
275+
...baseTestFlowInput,
276+
queueConfig: {
277+
concurrencyLimit: 5,
278+
},
279+
});
280+
281+
const result = convertFlow(testFlow, {}, "test-reference-key");
282+
expect(result.queueConfig).toMatchObject({
283+
usesFifoQueue: false,
284+
concurrencyLimit: 5,
285+
});
286+
});
287+
288+
it("throws error for concurrencyLimit below minimum (1)", () => {
289+
const testFlow = flow({
290+
...baseTestFlowInput,
291+
queueConfig: {
292+
concurrencyLimit: 1,
293+
},
294+
});
295+
296+
expect(() => convertFlow(testFlow, {}, "test-reference-key")).toThrow(
297+
"Test Flow has an invalid concurrencyLimit of 1. concurrencyLimit must be between 2 and 10.",
298+
);
299+
});
300+
301+
it("throws error for concurrencyLimit above maximum (11)", () => {
302+
const testFlow = flow({
303+
...baseTestFlowInput,
304+
queueConfig: {
305+
concurrencyLimit: 11,
306+
},
307+
});
308+
309+
expect(() => convertFlow(testFlow, {}, "test-reference-key")).toThrow(
310+
"Test Flow has an invalid concurrencyLimit of 11. concurrencyLimit must be between 2 and 10.",
311+
);
312+
});
313+
});
314+
263315
describe("test convert CNI component", () => {
264316
const myIntegrationDefinition: IntegrationDefinition = {
265317
name: "My integration",

packages/spectral/src/serverTypes/convertIntegration.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,15 @@ export const convertFlow = (
624624
);
625625
}
626626

627+
if (
628+
queueConfig.concurrencyLimit !== undefined &&
629+
(queueConfig.concurrencyLimit < 2 || queueConfig.concurrencyLimit > 10)
630+
) {
631+
throw new Error(
632+
`${flow.name} has an invalid concurrencyLimit of ${queueConfig.concurrencyLimit}. concurrencyLimit must be between 2 and 10.`,
633+
);
634+
}
635+
627636
result.queueConfig = {
628637
usesFifoQueue: false, // Should be false by default, even if undefined
629638
...queueConfig,

packages/spectral/src/types/IntegrationDefinition.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,16 @@ export type RetryConfig = {
183183
uniqueRequestIdField?: string;
184184
};
185185

186-
/** Defines attributes of a retry configuration used by a flow of an integration. */
186+
/** Defines attributes of a queue configuration used by a flow of an integration. */
187187
export type QueueConfig = {
188188
/** Determines whether the flow should be executed using FIFO ordering. Not valid for synchonous or scheduled flows. */
189189
usesFifoQueue?: boolean;
190190
/** Reference to the field in the flow's trigger return payload; used to determine whether to queue the execution. */
191191
dedupeIdField?: string;
192192
/** Determines whether the flow should be setup for singleton executions. Only valid for scheduled/polling trigger-based flows. */
193193
singletonExecutions?: boolean;
194+
/** The maximum number of concurrent executions for this flow. Must be between 2 and 10. */
195+
concurrencyLimit?: number;
194196
};
195197

196198
/** Defines attributes of a step error configuration used to determine how to handle errors during flow step execution. */

0 commit comments

Comments
 (0)