Skip to content

Commit 48a9abe

Browse files
committed
Fix notify service for core lib v1.2
1 parent 8764776 commit 48a9abe

File tree

9 files changed

+49
-67
lines changed

9 files changed

+49
-67
lines changed

src/notify/amqp/ampq.service.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { Inject, Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
2-
import { etag, Notify, Process } from '@letsflow/core/process';
2+
import { createMessage, etag, Notify, Process } from '@letsflow/core/process';
33
import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';
44
import { ConfigService } from '@/common/config/config.service';
5-
import { createMessage } from '../utils/message';
65
import { NotifyProvider } from '../notify-provider.interface';
76

87
const DEFAULT_APPID = 'letsflow';
@@ -88,7 +87,7 @@ export class AmqpService implements NotifyProvider, OnModuleDestroy {
8887
const { url: _, exchange, routingKey, responseTimeout, reply, ...options } = settings;
8988

9089
const channel = await this.getChannel(args.service);
91-
const message = args.message ?? createMessage(process, args.trigger);
90+
const message = args.message ?? createMessage(process, args.service);
9291

9392
options.appId ??= DEFAULT_APPID;
9493
options.messageId = etag(process);

src/notify/amqp/amqp.service.spec.ts

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
22
import { AmqpService } from './ampq.service';
33
import { ConfigService } from '@/common/config/config.service';
44
import { AmqpConnectionManager, ChannelWrapper } from 'amqp-connection-manager';
5-
import { Notify, Process } from '@letsflow/core/process';
5+
import { Process } from '@letsflow/core/process';
66

77
describe('AmqpService', () => {
88
let service: AmqpService;
@@ -14,7 +14,8 @@ describe('AmqpService', () => {
1414
const process = {
1515
id: '00000000-0000-0000-0001-000000000001',
1616
current: {
17-
actions: [{ key: 'next' }],
17+
actions: [{ key: 'next', actor: ['service:pushService', 'service:replyService'] }],
18+
instructions: { 'service:pushService': 'Go to next' } as Record<string, string>,
1819
},
1920
events: [{ hash: '1234' }],
2021
} as Process;
@@ -119,7 +120,7 @@ describe('AmqpService', () => {
119120
},
120121
});
121122

122-
const args = { service: 'pushService', trigger: 'next' } as Notify;
123+
const args = { service: 'pushService', after: 0 };
123124

124125
await service.notify(process, args);
125126

@@ -128,7 +129,8 @@ describe('AmqpService', () => {
128129
'test-key',
129130
JSON.stringify({
130131
process: '00000000-0000-0000-0001-000000000001',
131-
action: { key: 'next' },
132+
actions: [{ key: 'next', actor: ['service:pushService', 'service:replyService'] }],
133+
instructions: 'Go to next',
132134
etag: '1234',
133135
}),
134136
{
@@ -161,7 +163,7 @@ describe('AmqpService', () => {
161163
});
162164
});
163165

164-
const args = { service: 'replyService', trigger: 'next' } as Notify;
166+
const args = { service: 'replyService', after: 0 };
165167

166168
const result = await service.notify(process, args);
167169

@@ -172,7 +174,7 @@ describe('AmqpService', () => {
172174
'',
173175
JSON.stringify({
174176
process: '00000000-0000-0000-0001-000000000001',
175-
action: { key: 'next' },
177+
actions: [{ key: 'next', actor: ['service:pushService', 'service:replyService'] }],
176178
etag: '1234',
177179
}),
178180
{
@@ -188,7 +190,7 @@ describe('AmqpService', () => {
188190
});
189191

190192
it('should throw an error if the response timeout is exceeded', async () => {
191-
const args = { service: 'replyService', trigger: 'next' } as Notify;
193+
const args = { service: 'replyService', after: 0 };
192194

193195
await expect(service.notify(process, args)).rejects.toThrow('Response timeout exceeded');
194196
});

src/notify/utils/message.ts

Lines changed: 0 additions & 24 deletions
This file was deleted.

src/notify/webhook/webhook.service.spec.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ describe('WebhookService', () => {
1111
const process = {
1212
id: '00000000-0000-0000-0001-000000000001',
1313
current: {
14-
actions: [{ key: 'next' }],
14+
actions: [{ key: 'next', actor: ['service:test'] }],
15+
instructions: { 'service:test': 'Go to next' } as Record<string, string>,
1516
},
1617
events: [{ hash: '1234' }],
1718
} as Process;
@@ -26,7 +27,7 @@ describe('WebhookService', () => {
2627
provide: ConfigService,
2728
useValue: {
2829
get: jest.fn(() => ({
29-
testService: {
30+
test: {
3031
url: 'https://example.com/webhook',
3132
timeout: 10000,
3233
},
@@ -59,18 +60,18 @@ describe('WebhookService', () => {
5960

6061
it('should throw an error if the url is missing in settings', async () => {
6162
jest.spyOn(configService, 'get').mockReturnValueOnce({
62-
testService: {},
63+
test: {},
6364
});
6465

65-
await expect(service.notify(process, { service: 'testService' } as Notify)).rejects.toThrow(
66-
"Service 'testService' is missing url setting",
66+
await expect(service.notify(process, { service: 'test' } as Notify)).rejects.toThrow(
67+
"Service 'test' is missing url setting",
6768
);
6869
});
6970

7071
it('should send a message with correct headers and body', async () => {
7172
fetchMock.mockResolvedValue({ ok: true, status: 202 });
7273

73-
const args = { service: 'testService', trigger: 'next' } as Notify;
74+
const args = { service: 'test', after: 0 };
7475
const result = await service.notify(process, args);
7576

7677
expect(fetchMock).toHaveBeenCalledWith('https://example.com/webhook', {
@@ -79,7 +80,8 @@ describe('WebhookService', () => {
7980
headers: { 'Content-Type': 'application/json' },
8081
body: JSON.stringify({
8182
process: '00000000-0000-0000-0001-000000000001',
82-
action: { key: 'next' },
83+
actions: [{ key: 'next', actor: ['service:test'] }],
84+
instructions: 'Go to next',
8385
etag: '1234',
8486
}),
8587
signal: expect.any(AbortSignal),
@@ -96,7 +98,7 @@ describe('WebhookService', () => {
9698
text: jest.fn().mockResolvedValue('Server error'),
9799
});
98100

99-
const args = { service: 'testService', trigger: 'next' } as Notify;
101+
const args = { service: 'test', after: 0 };
100102

101103
await expect(service.notify(process, args)).rejects.toThrow(
102104
'Webhook failed with status 500 Internal Server Error: Server error',
@@ -111,7 +113,7 @@ describe('WebhookService', () => {
111113
text: jest.fn().mockResolvedValue('Success'),
112114
});
113115

114-
const args = { service: 'testService', trigger: 'next' } as Notify;
116+
const args = { service: 'test', after: 0 };
115117
const result = await service.notify(process, args);
116118

117119
expect(result).toBe('Success');
@@ -125,7 +127,7 @@ describe('WebhookService', () => {
125127
json: jest.fn().mockResolvedValue({ success: true }),
126128
});
127129

128-
const args = { service: 'testService', trigger: 'next' } as Notify;
130+
const args = { service: 'test', after: 0 };
129131
const result = await service.notify(process, args);
130132

131133
expect(result).toEqual({ success: true });

src/notify/webhook/webhook.service.ts

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import { Inject, Injectable } from '@nestjs/common';
22
import { ConfigService } from '@/common/config/config.service';
3-
import { Notify, Process } from '@letsflow/core/process';
4-
import { createMessage } from '@/notify/utils/message';
3+
import { createMessage, Notify, Process } from '@letsflow/core/process';
54
import { NotifyProvider } from '@/notify/notify-provider.interface';
65

76
interface WebhookSettings extends RequestInit {
@@ -28,7 +27,7 @@ export class WebhookService implements NotifyProvider {
2827
throw new Error(`Service '${args.service}' is missing url setting`);
2928
}
3029

31-
const message = args.message ?? createMessage(process, args.trigger);
30+
const message = args.message ?? createMessage(process, args.service);
3231
settings.headers ??= {};
3332
settings.headers['Content-Type'] ??= typeof message === 'string' ? 'text/plain' : 'application/json';
3433

src/notify/zeromq/zeromq.service.spec.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { Test, TestingModule } from '@nestjs/testing';
22
import { ZeromqService } from './zeromq.service';
33
import { ConfigService } from '@/common/config/config.service';
44
import { Push } from 'zeromq';
5-
import { Notify, Process } from '@letsflow/core/process';
5+
import { Process } from '@letsflow/core/process';
66

77
describe('ZeromqService', () => {
88
let service: ZeromqService;
@@ -12,7 +12,8 @@ describe('ZeromqService', () => {
1212
const process = {
1313
id: '00000000-0000-0000-0001-000000000001',
1414
current: {
15-
actions: [{ key: 'next' }],
15+
actions: [{ key: 'next', actor: ['service:test'] }],
16+
instructions: { 'service:test': 'Go to next' } as Record<string, string>,
1617
},
1718
events: [{ hash: '1234' }],
1819
} as Process;
@@ -27,7 +28,7 @@ describe('ZeromqService', () => {
2728
provide: ConfigService,
2829
useValue: {
2930
get: jest.fn(() => ({
30-
testService: { type: 'push' },
31+
test: { type: 'push' },
3132
})),
3233
},
3334
},
@@ -49,7 +50,7 @@ describe('ZeromqService', () => {
4950
describe('onModuleDestroy', () => {
5051
it('should close all sockets', () => {
5152
const mockSocket = { close: jest.fn() };
52-
service['sockets'].set('testService', mockSocket as any);
53+
service['sockets'].set('test', mockSocket as any);
5354

5455
service.onModuleDestroy();
5556

@@ -60,9 +61,9 @@ describe('ZeromqService', () => {
6061
describe('getSocket', () => {
6162
it('should return an existing socket if it exists', () => {
6263
const mockSocket = {};
63-
service['sockets'].set('testService', mockSocket as any);
64+
service['sockets'].set('test', mockSocket as any);
6465

65-
const result = service.getSocket('testService');
66+
const result = service.getSocket('test');
6667

6768
expect(result).toBe(mockSocket);
6869
});
@@ -71,7 +72,7 @@ describe('ZeromqService', () => {
7172
const mockPushSocket = new Push();
7273
createSocket.mockReturnValue(mockPushSocket);
7374

74-
const result = service.getSocket('testService');
75+
const result = service.getSocket('test');
7576

7677
expect(createSocket).toHaveBeenCalledWith({ type: 'push' });
7778
expect(result).toBe(mockPushSocket);
@@ -89,14 +90,15 @@ describe('ZeromqService', () => {
8990
const mockPushSocket = { send: jest.fn() };
9091
createSocket.mockReturnValue(mockPushSocket);
9192

92-
const args = { service: 'testService', trigger: 'next' } as Notify;
93+
const args = { service: 'test', after: 0 };
9394

9495
const result = await service.notify(process, args);
9596

9697
expect(mockPushSocket.send).toHaveBeenCalledWith(
9798
JSON.stringify({
9899
process: '00000000-0000-0000-0001-000000000001',
99-
action: { key: 'next' },
100+
actions: [{ key: 'next', actor: ['service:test'] }],
101+
instructions: 'Go to next',
100102
etag: '1234',
101103
}),
102104
);
@@ -108,7 +110,7 @@ describe('ZeromqService', () => {
108110
const mockPushSocket = { send: jest.fn().mockRejectedValue({ message: 'Test error' }) };
109111
createSocket.mockReturnValue(mockPushSocket);
110112

111-
const args = { service: 'testService', trigger: 'next' } as Notify;
113+
const args = { service: 'test', after: 0 };
112114

113115
expect(service.notify(process, args)).rejects.toEqual({ message: 'Test error' });
114116
});
@@ -120,14 +122,15 @@ describe('ZeromqService', () => {
120122
};
121123
createSocket.mockReturnValue(mockReplySocket);
122124

123-
const args = { service: 'testService', trigger: 'next' } as Notify;
125+
const args = { service: 'test', after: 0 };
124126

125127
const result = await service.notify(process, args);
126128

127129
expect(mockReplySocket.send).toHaveBeenCalledWith(
128130
JSON.stringify({
129131
process: '00000000-0000-0000-0001-000000000001',
130-
action: { key: 'next' },
132+
actions: [{ key: 'next', actor: ['service:test'] }],
133+
instructions: 'Go to next',
131134
etag: '1234',
132135
}),
133136
);
@@ -144,7 +147,7 @@ describe('ZeromqService', () => {
144147

145148
createSocket.mockReturnValue(mockReplySocket);
146149

147-
const args = { service: 'testService', trigger: 'next' } as Notify;
150+
const args = { service: 'test', after: 0 };
148151

149152
expect(service.notify(process, args)).rejects.toEqual({ message: 'Test error' });
150153
});

src/notify/zeromq/zeromq.service.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
import { Inject, Injectable, OnModuleDestroy } from '@nestjs/common';
2-
import { Notify, Process } from '@letsflow/core/process';
2+
import { createMessage, Notify, Process } from '@letsflow/core/process';
33
import { Push, Reply, SocketOptions } from 'zeromq';
44
import { ConfigService } from '@/common/config/config.service';
5-
import { createMessage } from '../utils/message';
65
import { NotifyProvider } from '../notify-provider.interface';
76

87
export type ZeromqOptions =
@@ -42,12 +41,12 @@ export class ZeromqService implements NotifyProvider, OnModuleDestroy {
4241

4342
async notify(process: Process, args: Notify): Promise<any> {
4443
const socket = this.getSocket(args.service);
45-
const message = args.message ?? createMessage(process, args.trigger);
44+
const message = args.message ?? createMessage(process, args.service);
4645

4746
await socket.send(typeof message === 'string' ? message : JSON.stringify(message));
4847

48+
// Use `in` and not `instanceof` for compatibility with Jest mocks
4949
if ('receive' in socket) {
50-
// Don't use instanceof for compatibility with Jest mocks
5150
const [response] = await socket.receive();
5251
return response;
5352
}

src/process/process.service.spec.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,6 @@ describe('ProcessService', () => {
236236
description: '',
237237
title: 'complete',
238238
key: 'complete',
239-
if: true,
240239
response: {},
241240
},
242241
]);
@@ -424,7 +423,7 @@ describe('ProcessService', () => {
424423
});
425424

426425
it('should step through a process', async () => {
427-
const save = jest.spyOn(service, 'save');
426+
const save = jest.spyOn(service, 'save').mockResolvedValue(undefined);
428427
const process = instantiate(scenario);
429428

430429
const updatedProcess = await service.step(process, 'complete', { key: 'actor' });

src/process/process.service.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,10 @@ export class ProcessService {
211211
const updatedProcess = step(process, action, actor, response);
212212
await this.save(updatedProcess);
213213

214-
if (process.current.timestamp.getTime() !== updatedProcess.current.timestamp.getTime()) {
214+
if (
215+
process.current.key !== updatedProcess.current.key ||
216+
process.current.timestamp.getTime() !== updatedProcess.current.timestamp.getTime()
217+
) {
215218
this.eventEmitter.emit('process.stepped', updatedProcess);
216219
}
217220

0 commit comments

Comments
 (0)