Skip to content

Commit 448c64f

Browse files
author
Artem
committed
#RI-2945 BE Ack Pending entries
1 parent 5f4f603 commit 448c64f

File tree

4 files changed

+101
-11
lines changed

4 files changed

+101
-11
lines changed

redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ export enum BrowserToolStreamCommands {
8383
XInfoGroups = 'xinfo groups',
8484
XInfoConsumers = 'xinfo consumers',
8585
XPending = 'xpending',
86+
XAck = 'xack',
8687
XGroupCreate = 'xgroup create',
8788
XGroupSetId = 'xgroup setid',
8889
XGroupDestroy = 'xgroup destroy',

redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
import { ApiTags } from '@nestjs/swagger';
1010
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
1111
import {
12+
AckPendingEntriesDto, AckPendingEntriesResponse,
1213
ConsumerDto,
1314
ConsumerGroupDto,
1415
GetConsumersDto, GetPendingEntriesDto, PendingEntryDto,
@@ -52,10 +53,29 @@ export class ConsumerController {
5253
},
5354
],
5455
})
55-
async getPendingMessages(
56+
async getPendingEntries(
5657
@Param('dbInstance') instanceId: string,
5758
@Body() dto: GetPendingEntriesDto,
5859
): Promise<PendingEntryDto[]> {
59-
return this.service.getPendingMessages({ instanceId }, dto);
60+
return this.service.getPendingEntries({ instanceId }, dto);
61+
}
62+
63+
@Post('/pending-messages/ack')
64+
@ApiRedisInstanceOperation({
65+
description: 'Get pending messages list',
66+
statusCode: 200,
67+
responses: [
68+
{
69+
status: 200,
70+
type: PendingEntryDto,
71+
isArray: true,
72+
},
73+
],
74+
})
75+
async ackPendingEntriers(
76+
@Param('dbInstance') instanceId: string,
77+
@Body() dto: AckPendingEntriesDto,
78+
): Promise<AckPendingEntriesResponse> {
79+
return this.service.ackPendingEntries({ instanceId }, dto);
6080
}
6181
}

redisinsight/api/src/modules/browser/dto/stream.dto.ts

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ export class CreateStreamDto extends IntersectionType(
189189
export class ConsumerGroupDto {
190190
@ApiProperty({
191191
type: String,
192-
description: 'Consumer group name',
192+
description: 'Consumer Group name',
193193
example: 'group',
194194
})
195195
name: string;
@@ -294,7 +294,7 @@ export class ConsumerDto {
294294
@ApiProperty({
295295
type: String,
296296
description: 'The consumer\'s name',
297-
example: 'consumer-1',
297+
example: 'consumer-2',
298298
})
299299
name: string;
300300

@@ -396,3 +396,25 @@ export class GetPendingEntriesDto extends IntersectionType(
396396
@Min(1)
397397
count?: number = 500;
398398
}
399+
400+
export class AckPendingEntriesDto extends GetConsumersDto {
401+
@ApiProperty({
402+
description: 'Entries IDs',
403+
type: String,
404+
isArray: true,
405+
example: ['1650985323741-0', '1650985323770-0'],
406+
})
407+
@IsDefined()
408+
@IsArray()
409+
@ArrayNotEmpty()
410+
@Type(() => String)
411+
entries: string[];
412+
}
413+
414+
export class AckPendingEntriesResponse {
415+
@ApiProperty({
416+
description: 'Number of affected entries',
417+
type: Number,
418+
})
419+
affected: number;
420+
}

redisinsight/api/src/modules/browser/services/stream/consumer.service.ts

Lines changed: 54 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import {
1010
import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service';
1111
import ERROR_MESSAGES from 'src/constants/error-messages';
1212
import {
13+
AckPendingEntriesDto, AckPendingEntriesResponse,
1314
ConsumerDto,
1415
GetConsumersDto, GetPendingEntriesDto, PendingEntryDto,
1516
} from 'src/modules/browser/dto/stream.dto';
@@ -61,16 +62,16 @@ export class ConsumerService {
6162
}
6263

6364
/**
64-
* Get list of pending messages info for particular consumer
65+
* Get list of pending entries info for particular consumer
6566
* @param clientOptions
6667
* @param dto
6768
*/
68-
async getPendingMessages(
69+
async getPendingEntries(
6970
clientOptions: IFindRedisClientInstanceByOptions,
7071
dto: GetPendingEntriesDto,
7172
): Promise<PendingEntryDto[]> {
7273
try {
73-
this.logger.log('Getting pending messages list.');
74+
this.logger.log('Getting pending entries list.');
7475

7576
const exists = await this.browserTool.execCommand(
7677
clientOptions,
@@ -82,7 +83,7 @@ export class ConsumerService {
8283
return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST));
8384
}
8485

85-
return ConsumerService.formatReplyToPendingMessagesDto(await this.browserTool.execCommand(
86+
return ConsumerService.formatReplyToPendingEntriesDto(await this.browserTool.execCommand(
8687
clientOptions,
8788
BrowserToolStreamCommands.XPending,
8889
[dto.keyName, dto.groupName, dto.start, dto.end, dto.count, dto.consumerName],
@@ -100,6 +101,52 @@ export class ConsumerService {
100101
}
101102
}
102103

104+
/**
105+
* Get list of pending entries info for particular consumer
106+
* @param clientOptions
107+
* @param dto
108+
*/
109+
async ackPendingEntries(
110+
clientOptions: IFindRedisClientInstanceByOptions,
111+
dto: AckPendingEntriesDto,
112+
): Promise<AckPendingEntriesResponse> {
113+
try {
114+
this.logger.log('Acknowledging pending entries.');
115+
116+
const exists = await this.browserTool.execCommand(
117+
clientOptions,
118+
BrowserToolKeysCommands.Exists,
119+
[dto.keyName],
120+
);
121+
122+
if (!exists) {
123+
return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST));
124+
}
125+
126+
const affected = await this.browserTool.execCommand(
127+
clientOptions,
128+
BrowserToolStreamCommands.XAck,
129+
[dto.keyName, dto.groupName, ...dto.entries],
130+
);
131+
132+
this.logger.log('Successfully acknowledged pending entries.');
133+
134+
return {
135+
affected,
136+
};
137+
} catch (error) {
138+
if (error instanceof NotFoundException) {
139+
throw error;
140+
}
141+
142+
if (error?.message.includes(RedisErrorCodes.WrongType)) {
143+
throw new BadRequestException(error.message);
144+
}
145+
146+
throw catchAclError(error);
147+
}
148+
}
149+
103150
/**
104151
* Converts RESP response from Redis
105152
* [
@@ -167,15 +214,15 @@ export class ConsumerService {
167214
* ]
168215
* @param reply
169216
*/
170-
static formatReplyToPendingMessagesDto(reply: Array<Array<string | number>>): PendingEntryDto[] {
171-
return reply.map(ConsumerService.formatArrayToPendingMessageDto);
217+
static formatReplyToPendingEntriesDto(reply: Array<Array<string | number>>): PendingEntryDto[] {
218+
return reply.map(ConsumerService.formatArrayToPendingEntryDto);
172219
}
173220

174221
/**
175222
* Format single reply entry to DTO
176223
* @param entry
177224
*/
178-
static formatArrayToPendingMessageDto(entry: Array<string | number>): PendingEntryDto {
225+
static formatArrayToPendingEntryDto(entry: Array<string | number>): PendingEntryDto {
179226
if (!entry?.length) {
180227
return null;
181228
}

0 commit comments

Comments
 (0)