Skip to content

Commit d589975

Browse files
author
arthosofteq
authored
Merge pull request #688 from RedisInsight/feature/RI-2834-streams_consumer_groups
#RI-2931 BE implementation for create new consumer group(s)
2 parents 1103ae2 + f1af098 commit d589975

File tree

8 files changed

+1241
-2
lines changed

8 files changed

+1241
-2
lines changed

redisinsight/api/src/constants/redis-error-codes.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ export enum RedisErrorCodes {
1010
ConnectionReset = 'ECONNRESET',
1111
Timeout = 'ETIMEDOUT',
1212
CommandSyntaxError = 'syntax error',
13+
BusyGroup = 'BUSYGROUP',
1314
UnknownCommand = 'unknown command',
1415
}
1516

redisinsight/api/src/modules/browser/browser.module.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ import { SharedModule } from 'src/modules/shared/shared.module';
44
import { RedisConnectionMiddleware } from 'src/middleware/redis-connection.middleware';
55
import { StreamController } from 'src/modules/browser/controllers/stream/stream.controller';
66
import { StreamService } from 'src/modules/browser/services/stream/stream.service';
7+
import { ConsumerGroupController } from 'src/modules/browser/controllers/stream/consumer-group.controller';
8+
import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service';
9+
import { ConsumerController } from 'src/modules/browser/controllers/stream/consumer.controller';
10+
import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service';
711
import { HashController } from './controllers/hash/hash.controller';
812
import { KeysController } from './controllers/keys/keys.controller';
913
import { KeysBusinessService } from './services/keys-business/keys-business.service';
@@ -32,6 +36,8 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows
3236
RejsonRlController,
3337
HashController,
3438
StreamController,
39+
ConsumerGroupController,
40+
ConsumerController,
3541
],
3642
providers: [
3743
KeysBusinessService,
@@ -42,6 +48,8 @@ import { BrowserToolClusterService } from './services/browser-tool-cluster/brows
4248
RejsonRlBusinessService,
4349
HashBusinessService,
4450
StreamService,
51+
ConsumerGroupService,
52+
ConsumerService,
4553
BrowserToolService,
4654
BrowserToolClusterService,
4755
],
@@ -58,6 +66,9 @@ export class BrowserModule implements NestModule {
5866
RouterModule.resolvePath(SetController),
5967
RouterModule.resolvePath(ZSetController),
6068
RouterModule.resolvePath(RejsonRlController),
69+
RouterModule.resolvePath(StreamController),
70+
RouterModule.resolvePath(ConsumerGroupController),
71+
RouterModule.resolvePath(ConsumerController),
6172
);
6273
}
6374
}

redisinsight/api/src/modules/browser/constants/browser-tool-commands.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,15 @@ export enum BrowserToolStreamCommands {
8080
XRevRange = 'xrevrange',
8181
XAdd = 'xadd',
8282
XDel = 'xdel',
83+
XInfoGroups = 'xinfo groups',
84+
XInfoConsumers = 'xinfo consumers',
85+
XPending = 'xpending',
86+
XAck = 'xack',
87+
XClaim = 'xclaim',
88+
XGroupCreate = 'xgroup create',
89+
XGroupSetId = 'xgroup setid',
90+
XGroupDestroy = 'xgroup destroy',
91+
XGroupDelConsumer = 'xgroup delconsumer',
8392
}
8493

8594
export enum BrowserToolTSCommands {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
import {
2+
Body,
3+
Controller, Delete,
4+
Param, Patch,
5+
Post,
6+
UsePipes,
7+
ValidationPipe,
8+
} from '@nestjs/common';
9+
import { ApiTags } from '@nestjs/swagger';
10+
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
11+
import {
12+
ConsumerGroupDto,
13+
CreateConsumerGroupsDto,
14+
DeleteConsumerGroupsDto,
15+
DeleteConsumerGroupsResponse,
16+
UpdateConsumerGroupDto,
17+
} from 'src/modules/browser/dto/stream.dto';
18+
import { ConsumerGroupService } from 'src/modules/browser/services/stream/consumer-group.service';
19+
import { KeyDto } from 'src/modules/browser/dto';
20+
21+
@ApiTags('Streams')
22+
@Controller('streams/consumer-groups')
23+
@UsePipes(new ValidationPipe({ transform: true }))
24+
export class ConsumerGroupController {
25+
constructor(private service: ConsumerGroupService) {}
26+
27+
@Post('/get')
28+
@ApiRedisInstanceOperation({
29+
description: 'Get consumer groups list',
30+
statusCode: 200,
31+
responses: [
32+
{
33+
status: 200,
34+
description: 'Returns stream consumer groups.',
35+
type: ConsumerGroupDto,
36+
isArray: true,
37+
},
38+
],
39+
})
40+
async getGroups(
41+
@Param('dbInstance') instanceId: string,
42+
@Body() dto: KeyDto,
43+
): Promise<ConsumerGroupDto[]> {
44+
return this.service.getGroups({ instanceId }, dto);
45+
}
46+
47+
@Post('')
48+
@ApiRedisInstanceOperation({
49+
description: 'Create stream consumer group',
50+
statusCode: 201,
51+
})
52+
async createGroups(
53+
@Param('dbInstance') instanceId: string,
54+
@Body() dto: CreateConsumerGroupsDto,
55+
): Promise<void> {
56+
return this.service.createGroups({ instanceId }, dto);
57+
}
58+
59+
@Patch('')
60+
@ApiRedisInstanceOperation({
61+
description: 'Modify last delivered ID of the Consumer Group',
62+
statusCode: 200,
63+
})
64+
async updateGroup(
65+
@Param('dbInstance') instanceId: string,
66+
@Body() dto: UpdateConsumerGroupDto,
67+
): Promise<void> {
68+
return this.service.updateGroup({ instanceId }, dto);
69+
}
70+
71+
@Delete('')
72+
@ApiRedisInstanceOperation({
73+
description: 'Delete Consumer Group',
74+
statusCode: 200,
75+
responses: [
76+
{
77+
status: 200,
78+
description: 'Returns number of affected consumer groups.',
79+
type: DeleteConsumerGroupsResponse,
80+
},
81+
],
82+
})
83+
async deleteGroup(
84+
@Param('dbInstance') instanceId: string,
85+
@Body() dto: DeleteConsumerGroupsDto,
86+
): Promise<DeleteConsumerGroupsResponse> {
87+
return this.service.deleteGroup({ instanceId }, dto);
88+
}
89+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import {
2+
Body,
3+
Controller, Delete,
4+
Param,
5+
Post,
6+
UsePipes,
7+
ValidationPipe,
8+
} from '@nestjs/common';
9+
import { ApiTags } from '@nestjs/swagger';
10+
import { ApiRedisInstanceOperation } from 'src/decorators/api-redis-instance-operation.decorator';
11+
import {
12+
AckPendingEntriesDto, AckPendingEntriesResponse, ClaimPendingEntriesResponse, ClaimPendingEntryDto,
13+
ConsumerDto,
14+
ConsumerGroupDto, DeleteConsumersDto,
15+
GetConsumersDto, GetPendingEntriesDto, PendingEntryDto,
16+
} from 'src/modules/browser/dto/stream.dto';
17+
import { ConsumerService } from 'src/modules/browser/services/stream/consumer.service';
18+
19+
@ApiTags('Streams')
20+
@Controller('streams/consumer-groups/consumers')
21+
@UsePipes(new ValidationPipe({ transform: true }))
22+
export class ConsumerController {
23+
constructor(private service: ConsumerService) {}
24+
25+
@Post('/get')
26+
@ApiRedisInstanceOperation({
27+
description: 'Get consumers list in the group',
28+
statusCode: 200,
29+
responses: [
30+
{
31+
status: 200,
32+
type: ConsumerGroupDto,
33+
isArray: true,
34+
},
35+
],
36+
})
37+
async getConsumers(
38+
@Param('dbInstance') instanceId: string,
39+
@Body() dto: GetConsumersDto,
40+
): Promise<ConsumerDto[]> {
41+
return this.service.getConsumers({ instanceId }, dto);
42+
}
43+
44+
@Delete('')
45+
@ApiRedisInstanceOperation({
46+
description: 'Delete Consumer(s) from the Consumer Group',
47+
statusCode: 200,
48+
})
49+
async deleteConsumers(
50+
@Param('dbInstance') instanceId: string,
51+
@Body() dto: DeleteConsumersDto,
52+
): Promise<void> {
53+
return this.service.deleteConsumers({ instanceId }, dto);
54+
}
55+
56+
@Post('/pending-messages/get')
57+
@ApiRedisInstanceOperation({
58+
description: 'Get pending entries list',
59+
statusCode: 200,
60+
responses: [
61+
{
62+
status: 200,
63+
type: PendingEntryDto,
64+
isArray: true,
65+
},
66+
],
67+
})
68+
async getPendingEntries(
69+
@Param('dbInstance') instanceId: string,
70+
@Body() dto: GetPendingEntriesDto,
71+
): Promise<PendingEntryDto[]> {
72+
return this.service.getPendingEntries({ instanceId }, dto);
73+
}
74+
75+
@Post('/pending-messages/ack')
76+
@ApiRedisInstanceOperation({
77+
description: 'Ack pending entries',
78+
statusCode: 200,
79+
responses: [
80+
{
81+
status: 200,
82+
type: AckPendingEntriesResponse,
83+
},
84+
],
85+
})
86+
async ackPendingEntries(
87+
@Param('dbInstance') instanceId: string,
88+
@Body() dto: AckPendingEntriesDto,
89+
): Promise<AckPendingEntriesResponse> {
90+
return this.service.ackPendingEntries({ instanceId }, dto);
91+
}
92+
93+
@Post('/pending-messages/claim')
94+
@ApiRedisInstanceOperation({
95+
description: 'Claim pending entries',
96+
statusCode: 200,
97+
responses: [
98+
{
99+
status: 200,
100+
type: ClaimPendingEntriesResponse,
101+
},
102+
],
103+
})
104+
async claimPendingEntries(
105+
@Param('dbInstance') instanceId: string,
106+
@Body() dto: ClaimPendingEntryDto,
107+
): Promise<ClaimPendingEntriesResponse> {
108+
return this.service.claimPendingEntries({ instanceId }, dto);
109+
}
110+
}

0 commit comments

Comments
 (0)