Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
296 changes: 295 additions & 1 deletion apps/api/src/app/events/e2e/cancel-event.e2e.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { pollForJobStatusChange } from './utils/poll-for-job-status-change.util'

const axiosInstance = axios.create();

describe('Cancel event - /v1/events/trigger/:transactionId (DELETE) #novu-v2', () => {
describe('Cancel event - /v1/events/trigger (DELETE) #novu-v2', () => {
let session: UserSession;
let template: NotificationTemplateEntity;
let subscriber: SubscriberEntity;
Expand All @@ -26,6 +26,17 @@ describe('Cancel event - /v1/events/trigger/:transactionId (DELETE) #novu-v2', (
});
}

async function cancelEventByQuery(query: Record<string, string | string[]>) {
await axiosInstance.delete(`${session.serverUrl}/v1/events/trigger`, {
headers: {
authorization: `ApiKey ${session.apiKey}`,
},
params: query,
});
}



beforeEach(async () => {
session = new UserSession();
await session.initialize();
Expand Down Expand Up @@ -82,6 +93,289 @@ describe('Cancel event - /v1/events/trigger/:transactionId (DELETE) #novu-v2', (
expect(cancelledDigestJobs?.length).to.eql(1);
});

it('should cancel active digests by query filters without a transactionId', async () => {
template = await session.createTemplate({
steps: [
{
type: StepTypeEnum.DIGEST,
content: '',
metadata: {
unit: DigestUnitEnum.SECONDS,
amount: 30,
digestKey: 'groupId',
type: DigestTypeEnum.REGULAR,
},
},
{
type: StepTypeEnum.IN_APP,
content: 'Hello world {{step.events.length}}' as string,
},
],
});

await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
groupId: 'group-1',
customVar: 'trigger_1_data',
},
});

await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
groupId: 'group-1',
customVar: 'trigger_2_data',
},
});

await session.waitForWorkflowQueueCompletion();
await session.waitForSubscriberQueueCompletion();

await cancelEventByQuery({
subscriberId: subscriber.subscriberId,
workflowId: template.triggers[0].identifier,
digestKey: 'groupId',
digestValue: 'group-1',
});

const cancelledDigestJobs = await pollForJobStatusChange({
jobRepository,
query: {
_environmentId: session.environment._id,
_templateId: template._id,
status: JobStatusEnum.CANCELED,
type: StepTypeEnum.DIGEST,
},
findMultiple: true,
});

expect(cancelledDigestJobs?.length).to.eql(2);
});

it('should cancel active delay steps by query filters using stepName and stepType', async () => {
const secondSubscriber = await subscriberService.createSubscriber();
template = await session.createTemplate({
steps: [
{
name: 'Wait for cancellation',
type: StepTypeEnum.DELAY,
content: '',
metadata: {
type: DelayTypeEnum.NONE,
},
},
{
type: StepTypeEnum.IN_APP,
content: 'Hello world {{customVar}}' as string,
},
],
});

await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
customVar: 'trigger_1_data',
},
});

await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [secondSubscriber.subscriberId],
payload: {
customVar: 'trigger_2_data',
},
});

await session.waitForWorkflowQueueCompletion();
await session.waitForSubscriberQueueCompletion();

await cancelEventByQuery({
workflowId: template.triggers[0].identifier,
stepName: 'Wait for cancellation',
stepType: StepTypeEnum.DELAY,
});

const cancelledDelayJobs = await pollForJobStatusChange({
jobRepository,
query: {
_environmentId: session.environment._id,
_templateId: template._id,
status: JobStatusEnum.CANCELED,
type: StepTypeEnum.DELAY,
},
findMultiple: true,
});

expect(cancelledDelayJobs?.length).to.eql(2);
});

it('should cancel multiple delayed digests using an array of transactionIds', async () => {
template = await session.createTemplate({
steps: [
{
type: StepTypeEnum.DIGEST,
content: '',
metadata: {
unit: DigestUnitEnum.SECONDS,
amount: 30,
digestKey: 'groupId',
type: DigestTypeEnum.REGULAR,
},
},
{
type: StepTypeEnum.IN_APP,
content: 'Hello world {{step.events.length}}' as string,
},
],
});

const { result: firstResult } = await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
groupId: 'bulk-group-1',
},
});

const { result: secondResult } = await novuClient.trigger({
workflowId: template.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
groupId: 'bulk-group-2',
},
});

await session.waitForWorkflowQueueCompletion();
await session.waitForSubscriberQueueCompletion();

await cancelEventByQuery({
transactionId: [firstResult.transactionId as string, secondResult.transactionId as string],
});

const cancelledDigestJobs = await pollForJobStatusChange({
jobRepository,
query: {
_environmentId: session.environment._id,
_templateId: template._id,
status: JobStatusEnum.CANCELED,
type: StepTypeEnum.DIGEST,
},
findMultiple: true,
});

expect(cancelledDigestJobs?.length).to.eql(2);
});

it('should cancel multiple digest and delay workflows at once when stepType is omitted', async () => {
const secondSubscriber = await subscriberService.createSubscriber();

const digestTemplate = await session.createTemplate({
steps: [
{
name: 'Shared cancel gate',
type: StepTypeEnum.DIGEST,
content: '',
metadata: {
digestKey: 'groupId',
type: DigestTypeEnum.NONE,
},
},
{
type: StepTypeEnum.IN_APP,
content: 'Digest {{step.events.length}}' as string,
},
],
});

const delayTemplate = await session.createTemplate({
steps: [
{
name: 'Shared cancel gate',
type: StepTypeEnum.DELAY,
content: '',
metadata: {
type: DelayTypeEnum.NONE,
},
},
{
type: StepTypeEnum.IN_APP,
content: 'Delay {{customVar}}' as string,
},
],
});

await novuClient.trigger({
workflowId: digestTemplate.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
groupId: 'digest-a',
customVar: 'digest_1',
},
});

await novuClient.trigger({
workflowId: digestTemplate.triggers[0].identifier,
to: [secondSubscriber.subscriberId],
payload: {
groupId: 'digest-b',
customVar: 'digest_2',
},
});

await novuClient.trigger({
workflowId: delayTemplate.triggers[0].identifier,
to: [subscriber.subscriberId],
payload: {
customVar: 'delay_1',
},
});

await novuClient.trigger({
workflowId: delayTemplate.triggers[0].identifier,
to: [secondSubscriber.subscriberId],
payload: {
customVar: 'delay_2',
},
});

await session.waitForWorkflowQueueCompletion();
await session.waitForSubscriberQueueCompletion();

await cancelEventByQuery({
subscriberId: [subscriber.subscriberId, secondSubscriber.subscriberId],
stepName: 'Shared cancel gate',
});

const cancelledDigestJobs = await pollForJobStatusChange({
jobRepository,
query: {
_environmentId: session.environment._id,
_templateId: digestTemplate._id,
status: JobStatusEnum.CANCELED,
type: StepTypeEnum.DIGEST,
},
findMultiple: true,
});

const cancelledDelayJobs = await pollForJobStatusChange({
jobRepository,
query: {
_environmentId: session.environment._id,
_templateId: delayTemplate._id,
status: JobStatusEnum.CANCELED,
type: StepTypeEnum.DELAY,
},
findMultiple: true,
});

expect(cancelledDigestJobs?.length).to.eql(2);
expect(cancelledDelayJobs?.length).to.eql(2);
});

it('should cancel a delay step for all subscribers', async () => {
const secondSubscriber = await subscriberService.createSubscriber();
template = await session.createTemplate({
Expand Down
Loading
Loading