Skip to content

Commit e9b5b50

Browse files
author
Artem
committed
#RI-2931 BE implementation for create new consumer group(s)
#RI-2933 BE implementation for consumer groups list
1 parent a1f33c9 commit e9b5b50

File tree

6 files changed

+346
-1
lines changed

6 files changed

+346
-1
lines changed

redisinsight/api/src/constants/redis-error-codes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export enum RedisErrorCodes {
1010
ConnectionReset = 'ECONNRESET',
1111
Timeout = 'ETIMEDOUT',
1212
CommandSyntaxError = 'syntax error',
13+
BusyGroup = 'BUSYGROUP',
1314
UnknownCommand = 'unknown command',
1415
}
1516

redisinsight/api/src/modules/browser/browser.module.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import { SharedModule } from 'src/modules/shared/shared.module';
44
import { RedisConnectionMiddleware } from 'src/middleware/redis-connection.middleware';
55
import { StreamController } from 'src/modules/browser/controllers/stream/stream.controller';
66
import { StreamService } from 'src/modules/browser/services/stream/stream.service';
7+
import { ConsumerGroupController } from 'src/modules/browser/controllers/stream/consumer-group.controller';
8+
import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service';
79
import { HashController } from './controllers/hash/hash.controller';
810
import { KeysController } from './controllers/keys/keys.controller';
911
import { KeysBusinessService } from './services/keys-business/keys-business.service';
@@ -32,6 +34,7 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows
3234
RejsonRlController,
3335
HashController,
3436
StreamController,
37+
ConsumerGroupController,
3538
],
3639
providers: [
3740
KeysBusinessService,
@@ -42,6 +45,7 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows
4245
RejsonRlBusinessService,
4346
HashBusinessService,
4447
StreamService,
48+
ConsumerGroupService,
4549
BrowserToolService,
4650
BrowserToolClusterService,
4751
],

redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,9 @@ export enum BrowserToolStreamCommands {
8080
XRevRange = 'xrevrange',
8181
XAdd = 'xadd',
8282
XDel = 'xdel',
83+
XInfoGroups = 'xinfo groups',
84+
XPending = 'xpending',
85+
XGroupCreate = 'xgroup create',
8386
}
8487

8588
export enum BrowserToolTSCommands {
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
import {
2+
Body,
3+
Controller,
4+
Param,
5+
Post,
6+
UsePipes,
7+
ValidationPipe,
8+
} from '@nestjs/common';
9+
import { ApiTags } from '@nestjs/swagger';
10+
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
11+
import {
12+
ConsumerGroupDto, CreateConsumerGroupsDto,
13+
GetStreamEntriesResponse,
14+
} from 'src/modules/browser/dto/stream.dto';
15+
import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service';
16+
import { KeyDto } from 'src/modules/browser/dto';
17+
18+
@ApiTags('Streams')
19+
@Controller('streams/consumer-groups')
20+
@UsePipes(new ValidationPipe({ transform: true }))
21+
export class ConsumerGroupController {
22+
constructor(private service: ConsumerGroupService) {}
23+
24+
@Post('/get')
25+
@ApiRedisInstanceOperation({
26+
description: 'Get stream entries',
27+
statusCode: 200,
28+
responses: [
29+
{
30+
status: 200,
31+
description: 'Returns ordered stream entries in defined range.',
32+
type: GetStreamEntriesResponse,
33+
},
34+
],
35+
})
36+
async getGroups(
37+
@Param('dbInstance') instanceId: string,
38+
@Body() dto: KeyDto,
39+
): Promise<ConsumerGroupDto[]> {
40+
return this.service.getGroups({ instanceId }, dto);
41+
}
42+
43+
@Post('')
44+
@ApiRedisInstanceOperation({
45+
description: 'Create stream consumer group',
46+
statusCode: 201,
47+
})
48+
async createGroups(
49+
@Param('dbInstance') instanceId: string,
50+
@Body() dto: CreateConsumerGroupsDto,
51+
): Promise<void> {
52+
return this.service.createGroups({ instanceId }, dto);
53+
}
54+
}

redisinsight/api/src/modules/browser/dto/stream.dto.ts

Lines changed: 79 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
import { ApiProperty, ApiPropertyOptional, IntersectionType } from '@nestjs/swagger';
1+
import {
2+
ApiProperty, ApiPropertyOptional, IntersectionType
3+
} from '@nestjs/swagger';
24
import {
35
ArrayNotEmpty,
46
IsArray,
@@ -183,3 +185,79 @@ export class CreateStreamDto extends IntersectionType(
183185
AddStreamEntriesDto,
184186
KeyWithExpireDto,
185187
) {}
188+
189+
export class ConsumerGroupDto {
190+
@ApiProperty({
191+
type: String,
192+
description: 'Consumer group name',
193+
example: 'group',
194+
})
195+
name: string;
196+
197+
@ApiProperty({
198+
type: Number,
199+
description: 'Number of consumers',
200+
example: 2,
201+
})
202+
consumers: number = 0;
203+
204+
@ApiProperty({
205+
type: Number,
206+
description: 'Number of pending messages',
207+
example: 2,
208+
})
209+
pending: number = 0;
210+
211+
@ApiProperty({
212+
type: String,
213+
description: 'Smallest Id of the message that is pending in the group',
214+
example: '1657892649-0',
215+
})
216+
smallestPendingId: string;
217+
218+
@ApiProperty({
219+
type: String,
220+
description: 'Greatest Id of the message that is pending in the group',
221+
example: '1657892680-0',
222+
})
223+
greatestPendingId: string;
224+
225+
@ApiProperty({
226+
type: String,
227+
description: 'Id of last delivered message',
228+
example: '1657892648-0',
229+
})
230+
lastDeliveredId: string;
231+
}
232+
233+
export class CreateConsumerGroupDto {
234+
@ApiProperty({
235+
type: String,
236+
description: 'Consumer group name',
237+
example: 'group',
238+
})
239+
@IsNotEmpty()
240+
@IsString()
241+
name: string;
242+
243+
@ApiProperty({
244+
type: String,
245+
description: 'Id of last delivered message',
246+
example: '1657892648-0',
247+
})
248+
@IsNotEmpty()
249+
@IsString()
250+
lastDeliveredId: string;
251+
}
252+
253+
export class CreateConsumerGroupsDto extends KeyDto {
254+
@ApiProperty({
255+
type: () => CreateConsumerGroupDto,
256+
isArray: true,
257+
description: 'List of consumer groups to create',
258+
})
259+
@ValidateNested()
260+
@IsArray()
261+
@Type(() => CreateConsumerGroupDto)
262+
consumerGroups: CreateConsumerGroupDto[];
263+
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import {
2+
BadRequestException, ConflictException, Injectable, Logger, NotFoundException,
3+
} from '@nestjs/common';
4+
import { IFindRedisClientInstanceByOptions } from 'src/modules/core/services/redis/redis.service';
5+
import { RedisErrorCodes } from 'src/constants';
6+
import { catchAclError, catchTransactionError, convertStringsArrayToObject } from 'src/utils';
7+
import {
8+
BrowserToolCommands,
9+
BrowserToolKeysCommands, BrowserToolStreamCommands,
10+
} from 'src/modules/browser/constants/browser-tool-commands';
11+
import { BrowserToolService } from 'src/modules/browser/services/browser-tool/browser-tool.service';
12+
import { KeyDto } from 'src/modules/browser/dto';
13+
import ERROR_MESSAGES from 'src/constants/error-messages';
14+
import { ConsumerGroupDto, CreateConsumerGroupsDto } from 'src/modules/browser/dto/stream.dto';
15+
16+
@Injectable()
17+
export class ConsumerGroupService {
18+
private logger = new Logger('ConsumerGroupService');
19+
20+
constructor(private browserTool: BrowserToolService) {}
21+
22+
/**
23+
* Get consumer groups list for particular stream
24+
* In addition fetch pending messages info for each group
25+
* !May be slow on huge streams as 'XPENDING' command tagged with as @slow
26+
* @param clientOptions
27+
* @param dto
28+
*/
29+
async getGroups(
30+
clientOptions: IFindRedisClientInstanceByOptions,
31+
dto: KeyDto,
32+
): Promise<ConsumerGroupDto[]> {
33+
try {
34+
this.logger.log('Getting consumer groups list.');
35+
36+
const exists = await this.browserTool.execCommand(
37+
clientOptions,
38+
BrowserToolKeysCommands.Exists,
39+
[dto.keyName],
40+
);
41+
42+
if (!exists) {
43+
return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST));
44+
}
45+
46+
const groups = ConsumerGroupService.formatReplyToDto(await this.browserTool.execCommand(
47+
clientOptions,
48+
BrowserToolStreamCommands.XInfoGroups,
49+
[dto.keyName],
50+
));
51+
52+
return await Promise.all(groups.map((group) => this.getGroupInfo(
53+
clientOptions,
54+
dto,
55+
group,
56+
)));
57+
} catch (error) {
58+
if (error instanceof NotFoundException) {
59+
throw error;
60+
}
61+
62+
if (error?.message.includes(RedisErrorCodes.WrongType)) {
63+
throw new BadRequestException(error.message);
64+
}
65+
66+
throw catchAclError(error);
67+
}
68+
}
69+
70+
/**
71+
* Get consumer group pending info using 'XPENDING' command
72+
* @param clientOptions
73+
* @param dto
74+
* @param group
75+
*/
76+
async getGroupInfo(
77+
clientOptions: IFindRedisClientInstanceByOptions,
78+
dto: KeyDto,
79+
group: ConsumerGroupDto,
80+
): Promise<ConsumerGroupDto> {
81+
const info = await this.browserTool.execCommand(
82+
clientOptions,
83+
BrowserToolStreamCommands.XPending,
84+
[dto.keyName, group.name],
85+
);
86+
87+
return {
88+
...group,
89+
smallestPendingId: info?.[1] || null,
90+
greatestPendingId: info?.[2] || null,
91+
};
92+
}
93+
94+
/**
95+
* Create consumer group(s)
96+
* @param clientOptions
97+
* @param dto
98+
*/
99+
async createGroups(
100+
clientOptions: IFindRedisClientInstanceByOptions,
101+
dto: CreateConsumerGroupsDto,
102+
): Promise<void> {
103+
try {
104+
this.logger.log('Creating consumer groups.');
105+
const { keyName, consumerGroups } = dto;
106+
107+
const exists = await this.browserTool.execCommand(
108+
clientOptions,
109+
BrowserToolKeysCommands.Exists,
110+
[keyName],
111+
);
112+
113+
if (!exists) {
114+
return Promise.reject(new NotFoundException(ERROR_MESSAGES.KEY_NOT_EXIST));
115+
}
116+
117+
const toolCommands: Array<[
118+
toolCommand: BrowserToolCommands,
119+
...args: Array<string | number>,
120+
]> = consumerGroups.map((group) => (
121+
[
122+
BrowserToolStreamCommands.XGroupCreate,
123+
keyName,
124+
group.name,
125+
group.lastDeliveredId,
126+
]
127+
));
128+
129+
const [
130+
transactionError,
131+
transactionResults,
132+
] = await this.browserTool.execMulti(clientOptions, toolCommands);
133+
catchTransactionError(transactionError, transactionResults);
134+
135+
this.logger.log('Stream consumer group(s) created.');
136+
137+
return undefined;
138+
} catch (error) {
139+
if (error instanceof NotFoundException) {
140+
throw error;
141+
}
142+
143+
if (error?.message.includes(RedisErrorCodes.WrongType)) {
144+
throw new BadRequestException(error.message);
145+
}
146+
147+
if (error?.message.includes(RedisErrorCodes.BusyGroup)) {
148+
throw new ConflictException(error.message);
149+
}
150+
151+
throw catchAclError(error);
152+
}
153+
}
154+
155+
/**
156+
* Converts RESP response from Redis
157+
* [
158+
* ['name', 'g1', 'consumers', 0, 'pending', 0, 'last-delivered-id', '1653034260278-0'],
159+
* ['name', 'g2', 'consumers', 0, 'pending', 0, 'last-delivered-id', '1653034260278-0'],
160+
* ...
161+
* ]
162+
*
163+
* to DTO
164+
*
165+
* [
166+
* {
167+
* name: 'g1',
168+
* consumers: 0,
169+
* pending: 0,
170+
* lastDeliveredId: '1653034260278-0'
171+
* },
172+
* {
173+
* name: 'g2',
174+
* consumers: 0,
175+
* pending: 0,
176+
* lastDeliveredId: '1653034260278-0'
177+
* },
178+
* ...
179+
* ]
180+
* @param reply
181+
*/
182+
static formatReplyToDto(reply: Array<Array<string | number>>): ConsumerGroupDto[] {
183+
return reply.map(ConsumerGroupService.formatArrayToDto);
184+
}
185+
186+
/**
187+
* Format single reply entry to DTO
188+
* @param entry
189+
*/
190+
static formatArrayToDto(entry: Array<string | number>): ConsumerGroupDto {
191+
if (!entry?.length) {
192+
return null;
193+
}
194+
const entryObj = convertStringsArrayToObject(entry as string[]);
195+
196+
return {
197+
name: entryObj['name'],
198+
consumers: entryObj['consumers'],
199+
pending: entryObj['pending'],
200+
lastDeliveredId: entryObj['last-delivered-id'],
201+
smallestPendingId: null,
202+
greatestPendingId: null,
203+
};
204+
}
205+
}

0 commit comments

Comments
 (0)