Skip to content

Commit c3425ec

Browse files
author
Artem
committed
#RI-2947 BE Claim Pending entries
1 parent 448c64f commit c3425ec

File tree

4 files changed

+195
-10
lines changed

4 files changed

+195
-10
lines changed

redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ export enum BrowserToolStreamCommands {
8484
XInfoConsumers = 'xinfo consumers',
8585
XPending = 'xpending',
8686
XAck = 'xack',
87+
XClaim = 'xclaim',
8788
XGroupCreate = 'xgroup create',
8889
XGroupSetId = 'xgroup setid',
8990
XGroupDestroy = 'xgroup destroy',

redisinsight/api/src/modules/browser/controllers/stream/consumer.controller.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
import { ApiTags } from '@nestjs/swagger';
1010
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
1111
import {
12-
AckPendingEntriesDto, AckPendingEntriesResponse,
12+
AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto,
1313
ConsumerDto,
1414
ConsumerGroupDto,
1515
GetConsumersDto, GetPendingEntriesDto, PendingEntryDto,
@@ -24,7 +24,7 @@ export class ConsumerController {
2424

2525
@Post('/get')
2626
@ApiRedisInstanceOperation({
27-
description: 'Get group consumers',
27+
description: 'Get consumers list in the group',
2828
statusCode: 200,
2929
responses: [
3030
{
@@ -43,7 +43,7 @@ export class ConsumerController {
4343

4444
@Post('/pending-messages/get')
4545
@ApiRedisInstanceOperation({
46-
description: 'Get pending messages list',
46+
description: 'Get pending entries list',
4747
statusCode: 200,
4848
responses: [
4949
{
@@ -62,20 +62,37 @@ export class ConsumerController {
6262

6363
@Post('/pending-messages/ack')
6464
@ApiRedisInstanceOperation({
65-
description: 'Get pending messages list',
65+
description: 'Ack pending entries',
6666
statusCode: 200,
6767
responses: [
6868
{
6969
status: 200,
70-
type: PendingEntryDto,
71-
isArray: true,
70+
type: AckPendingEntriesResponse,
7271
},
7372
],
7473
})
75-
async ackPendingEntriers(
74+
async ackPendingEntries(
7675
@Param('dbInstance') instanceId: string,
7776
@Body() dto: AckPendingEntriesDto,
7877
): Promise<AckPendingEntriesResponse> {
7978
return this.service.ackPendingEntries({ instanceId }, dto);
8079
}
80+
81+
@Post('/pending-messages/claim')
82+
@ApiRedisInstanceOperation({
83+
description: 'Claim pending entries',
84+
statusCode: 200,
85+
responses: [
86+
{
87+
status: 200,
88+
type: ClaimPendingEntriesResponse,
89+
},
90+
],
91+
})
92+
async claimPendingEntries(
93+
@Param('dbInstance') instanceId: string,
94+
@Body() dto: ClaimPendingEntryDto,
95+
): Promise<ClaimPendingEntriesResponse> {
96+
return this.service.claimPendingEntries({ instanceId }, dto);
97+
}
8198
}

redisinsight/api/src/modules/browser/dto/stream.dto.ts

Lines changed: 103 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import {
55
ArrayNotEmpty,
66
IsArray,
77
IsDefined,
8-
IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateNested, isString,
8+
IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateNested, isString, IsOptional, NotEquals, ValidateIf, IsBoolean,
99
} from 'class-validator';
1010
import { KeyDto, KeyWithExpireDto } from 'src/modules/browser/dto/keys.dto';
1111
import { SortOrder } from 'src/constants';
@@ -418,3 +418,105 @@ export class AckPendingEntriesResponse {
418418
})
419419
affected: number;
420420
}
421+
422+
export class ClaimPendingEntryDto extends KeyDto {
423+
@ApiProperty({
424+
type: String,
425+
description: 'Consumer group name',
426+
example: 'group-1',
427+
})
428+
@IsNotEmpty()
429+
@IsString()
430+
groupName: string;
431+
432+
@ApiProperty({
433+
type: String,
434+
description: 'Consumer name',
435+
example: 'consumer-1',
436+
})
437+
@IsNotEmpty()
438+
@IsString()
439+
consumerName: string;
440+
441+
@ApiProperty({
442+
description: 'Claim only if its idle time is greater the minimum idle time ',
443+
type: Number,
444+
minimum: 0,
445+
default: 0,
446+
})
447+
@IsInt()
448+
@Min(0)
449+
minIdleTime: number = 0;
450+
451+
@ApiProperty({
452+
description: 'Entries IDs',
453+
type: String,
454+
isArray: true,
455+
example: ['1650985323741-0', '1650985323770-0'],
456+
})
457+
@IsDefined()
458+
@IsArray()
459+
@ArrayNotEmpty()
460+
@Type(() => String)
461+
entries: string[];
462+
463+
@ApiPropertyOptional({
464+
description: 'Set the idle time (last time it was delivered) of the message',
465+
type: Number,
466+
minimum: 0,
467+
})
468+
@NotEquals(null)
469+
@ValidateIf((object, value) => value !== undefined)
470+
@IsInt()
471+
@Min(0)
472+
idle?: number;
473+
474+
@ApiPropertyOptional({
475+
description: 'This is the same as IDLE but instead of a relative amount of milliseconds, '
476+
+ 'it sets the idle time to a specific Unix time (in milliseconds)',
477+
type: Number,
478+
})
479+
@NotEquals(null)
480+
@ValidateIf((object, value) => value !== undefined)
481+
@IsInt()
482+
time?: number;
483+
484+
@ApiPropertyOptional({
485+
description: 'Set the retry counter to the specified value. '
486+
+ 'This counter is incremented every time a message is delivered again. '
487+
+ 'Normally XCLAIM does not alter this counter, which is just served to clients when the XPENDING command '
488+
+ 'is called: this way clients can detect anomalies, like messages that are never processed '
489+
+ 'for some reason after a big number of delivery attempts',
490+
type: Number,
491+
minimum: 0,
492+
})
493+
@NotEquals(null)
494+
@ValidateIf((object, value) => value !== undefined)
495+
@IsInt()
496+
@Min(0)
497+
retryCount?: number;
498+
499+
@ApiPropertyOptional({
500+
description: 'Creates the pending message entry in the PEL even if certain specified IDs are not already '
501+
+ 'in the PEL assigned to a different client',
502+
type: Boolean,
503+
})
504+
@NotEquals(null)
505+
@ValidateIf((object, value) => value !== undefined)
506+
@IsBoolean()
507+
force?: boolean;
508+
}
509+
510+
export class ClaimPendingEntriesResponse {
511+
@ApiProperty({
512+
description: 'Entries IDs were affected by claim command',
513+
type: String,
514+
isArray: true,
515+
example: ['1650985323741-0', '1650985323770-0'],
516+
})
517+
@IsDefined()
518+
@IsArray()
519+
@ArrayNotEmpty()
520+
@Type(() => String)
521+
affected: string[];
522+
}

redisinsight/api/src/modules/browser/services/stream/consumer.service.ts

Lines changed: 67 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import {
1010
import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service';
1111
import ERROR_MESSAGES from 'src/constants/error-messages';
1212
import {
13-
AckPendingEntriesDto, AckPendingEntriesResponse,
13+
AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto,
1414
ConsumerDto,
1515
GetConsumersDto, GetPendingEntriesDto, PendingEntryDto,
1616
} from 'src/modules/browser/dto/stream.dto';
@@ -102,7 +102,7 @@ export class ConsumerService {
102102
}
103103

104104
/**
105-
* Get list of pending entries info for particular consumer
105+
* Acknowledge pending entries
106106
* @param clientOptions
107107
* @param dto
108108
*/
@@ -147,6 +147,71 @@ export class ConsumerService {
147147
}
148148
}
149149

150+
/**
151+
* Claim pending entries with additional parameters
152+
* @param clientOptions
153+
* @param dto
154+
*/
155+
async claimPendingEntries(
156+
clientOptions: IFindRedisClientInstanceByOptions,
157+
dto: ClaimPendingEntryDto,
158+
): Promise<ClaimPendingEntriesResponse> {
159+
try {
160+
this.logger.log('Claiming pending entries.');
161+
162+
const exists = await this.browserTool.execCommand(
163+
clientOptions,
164+
BrowserToolKeysCommands.Exists,
165+
[dto.keyName],
166+
);
167+
168+
if (!exists) {
169+
return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST));
170+
}
171+
172+
const args = [dto.keyName, dto.groupName, dto.consumerName, dto.minIdleTime, ...dto.entries];
173+
174+
if (dto.idle !== undefined) {
175+
args.push('idle', dto.idle);
176+
} else if (dto.time !== undefined) {
177+
args.push('time', dto.time);
178+
}
179+
180+
if (dto.retryCount !== undefined) {
181+
args.push('retrycount', dto.retryCount);
182+
}
183+
184+
if (dto.force) {
185+
args.push('force');
186+
}
187+
188+
// Return just an array of IDs of messages successfully claimed, without returning the actual message.
189+
args.push('justid');
190+
191+
const affected = await this.browserTool.execCommand(
192+
clientOptions,
193+
BrowserToolStreamCommands.XClaim,
194+
args,
195+
);
196+
197+
this.logger.log('Successfully claimed pending entries.');
198+
199+
return {
200+
affected,
201+
};
202+
} catch (error) {
203+
if (error instanceof NotFoundException) {
204+
throw error;
205+
}
206+
207+
if (error?.message.includes(RedisErrorCodes.WrongType)) {
208+
throw new BadRequestException(error.message);
209+
}
210+
211+
throw catchAclError(error);
212+
}
213+
}
214+
150215
/**
151216
* Converts RESP response from Redis
152217
* [

0 commit comments

Comments
 (0)