Skip to content

Commit 9810719

Browse files
author
Artem
committed
#RI-2943 BE for pending messages list
1 parent 6df022a commit 9810719

File tree

3 files changed

+175
-4
lines changed

3 files changed

+175
-4
lines changed

redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-ope
1111
import {
1212
ConsumerDto,
1313
ConsumerGroupDto,
14-
GetConsumersDto,
14+
GetConsumersDto, GetPendingMessagesDto, PendingMessageDto,
1515
} from 'src/modules/browser/dto/stream.dto';
1616
import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service';
1717

@@ -23,12 +23,11 @@ export class ConsumerController {
2323

2424
@Post('/get')
2525
@ApiRedisInstanceOperation({
26-
description: 'Get stream entries',
26+
description: 'Get group consumers',
2727
statusCode: 200,
2828
responses: [
2929
{
3030
status: 200,
31-
description: 'Returns stream consumer groups.',
3231
type: ConsumerGroupDto,
3332
isArray: true,
3433
},
@@ -40,4 +39,23 @@ export class ConsumerController {
4039
): Promise<ConsumerDto[]> {
4140
return this.service.getConsumers({ instanceId }, dto);
4241
}
42+
43+
@Post('/pending-messages/get')
44+
@ApiRedisInstanceOperation({
45+
description: 'Get pending messages list',
46+
statusCode: 200,
47+
responses: [
48+
{
49+
status: 200,
50+
type: PendingMessageDto,
51+
isArray: true,
52+
},
53+
],
54+
})
55+
async getPendingMessages(
56+
@Param('dbInstance') instanceId: string,
57+
@Body() dto: GetPendingMessagesDto,
58+
): Promise<PendingMessageDto[]> {
59+
return this.service.getPendingMessages({ instanceId }, dto);
60+
}
4361
}

redisinsight/api/src/modules/browser/dto/stream.dto.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,75 @@ export class GetConsumersDto extends KeyDto {
324324
@IsString()
325325
groupName: string;
326326
}
327+
328+
export class PendingMessageDto {
329+
@ApiProperty({
330+
type: String,
331+
description: 'Entry ID',
332+
example: '*',
333+
})
334+
id: string;
335+
336+
@ApiProperty({
337+
type: String,
338+
description: 'Consumer name',
339+
example: 'consumer-1',
340+
})
341+
consumerName: string;
342+
343+
@ApiProperty({
344+
type: Number,
345+
description: 'The number of milliseconds that elapsed since the last time '
346+
+ 'this message was delivered to this consumer',
347+
example: 22442,
348+
})
349+
idle: number = 0;
350+
351+
@ApiProperty({
352+
type: Number,
353+
description: 'The number of times this message was delivered',
354+
example: 2,
355+
})
356+
delivered: number = 0;
357+
}
358+
359+
export class GetPendingMessagesDto extends IntersectionType(
360+
KeyDto,
361+
GetConsumersDto,
362+
) {
363+
@ApiProperty({
364+
type: String,
365+
description: 'Consumer name',
366+
example: 'consumer-1',
367+
})
368+
@IsNotEmpty()
369+
@IsString()
370+
consumerName: string;
371+
372+
@ApiPropertyOptional({
373+
description: 'Specifying the start id',
374+
type: String,
375+
default: '-',
376+
})
377+
@IsString()
378+
start?: string = '-';
379+
380+
@ApiPropertyOptional({
381+
description: 'Specifying the end id',
382+
type: String,
383+
default: '+',
384+
})
385+
@IsString()
386+
end?: string = '+';
387+
388+
@ApiPropertyOptional({
389+
description:
390+
'Specifying the number of pending messages to return.',
391+
type: Number,
392+
minimum: 1,
393+
default: 500,
394+
})
395+
@IsInt()
396+
@Min(1)
397+
count?: number = 500;
398+
}

redisinsight/api/src/modules/browser/services/stream/consumer.service.ts

Lines changed: 82 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import { BrowserToolService } from 'src/modules/browser/services/browser-tool/br
1111
import ERROR_MESSAGES from 'src/constants/error-messages';
1212
import {
1313
ConsumerDto,
14-
GetConsumersDto,
14+
GetConsumersDto, GetPendingMessagesDto, PendingMessageDto,
1515
} from 'src/modules/browser/dto/stream.dto';
1616

1717
@Injectable()
@@ -60,6 +60,46 @@ export class ConsumerService {
6060
}
6161
}
6262

63+
/**
64+
* Get list of pending messages info for particular consumer
65+
* @param clientOptions
66+
* @param dto
67+
*/
68+
async getPendingMessages(
69+
clientOptions: IFindRedisClientInstanceByOptions,
70+
dto: GetPendingMessagesDto,
71+
): Promise<PendingMessageDto[]> {
72+
try {
73+
this.logger.log('Getting pending messages list.');
74+
75+
const exists = await this.browserTool.execCommand(
76+
clientOptions,
77+
BrowserToolKeysCommands.Exists,
78+
[dto.keyName],
79+
);
80+
81+
if (!exists) {
82+
return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST));
83+
}
84+
85+
return ConsumerService.formatReplyToPendingMessagesDto(await this.browserTool.execCommand(
86+
clientOptions,
87+
BrowserToolStreamCommands.XPending,
88+
[dto.keyName, dto.groupName, dto.start, dto.end, dto.count, dto.consumerName],
89+
));
90+
} catch (error) {
91+
if (error instanceof NotFoundException) {
92+
throw error;
93+
}
94+
95+
if (error?.message.includes(RedisErrorCodes.WrongType)) {
96+
throw new BadRequestException(error.message);
97+
}
98+
99+
throw catchAclError(error);
100+
}
101+
}
102+
63103
/**
64104
* Converts RESP response from Redis
65105
* [
@@ -106,4 +146,45 @@ export class ConsumerService {
106146
idle: entryObj['idle'],
107147
};
108148
}
149+
150+
/**
151+
* Converts RESP response from Redis
152+
* [
153+
* ['1567352639-0', 'consumer-1', 258741, 2],
154+
* ...
155+
* ]
156+
*
157+
* to DTO
158+
*
159+
* [
160+
* {
161+
* id: '1567352639-0',
162+
* name: 'consumer-1',
163+
* idle: 258741,
164+
* delivered: 2,
165+
* },
166+
* ...
167+
* ]
168+
* @param reply
169+
*/
170+
static formatReplyToPendingMessagesDto(reply: Array<Array<string | number>>): PendingMessageDto[] {
171+
return reply.map(ConsumerService.formatArrayToPendingMessageDto);
172+
}
173+
174+
/**
175+
* Format single reply entry to DTO
176+
* @param entry
177+
*/
178+
static formatArrayToPendingMessageDto(entry: Array<string | number>): PendingMessageDto {
179+
if (!entry?.length) {
180+
return null;
181+
}
182+
183+
return {
184+
id: `${entry[0]}`,
185+
consumerName: `${entry[1]}`,
186+
idle: +entry[2],
187+
delivered: +entry[3],
188+
};
189+
}
109190
}

0 commit comments

Comments
 (0)