@@ -6,6 +6,7 @@ import { SubscriptionProvider } from 'src/modules/pub-sub/providers/subscription
6
6
import { IFindRedisClientInstanceByOptions , RedisService } from 'src/modules/core/services/redis/redis.service' ;
7
7
import { PublishResponse } from 'src/modules/pub-sub/dto/publish.response' ;
8
8
import { PublishDto } from 'src/modules/pub-sub/dto/publish.dto' ;
9
+ import { PubSubAnalyticsService } from 'src/modules/pub-sub/pub-sub.analytics.service' ;
9
10
import { InstancesBusinessService } from 'src/modules/shared/services/instances-business/instances-business.service' ;
10
11
import { catchAclError } from 'src/utils' ;
11
12
@@ -18,6 +19,7 @@ export class PubSubService {
18
19
private readonly subscriptionProvider : SubscriptionProvider ,
19
20
private redisService : RedisService ,
20
21
private instancesBusinessService : InstancesBusinessService ,
22
+ private analyticsService : PubSubAnalyticsService ,
21
23
) { }
22
24
23
25
/**
@@ -33,6 +35,7 @@ export class PubSubService {
33
35
await Promise . all ( dto . subscriptions . map ( ( subDto ) => session . subscribe (
34
36
this . subscriptionProvider . createSubscription ( userClient , subDto ) ,
35
37
) ) ) ;
38
+ this . analyticsService . sendChannelSubscribeEvent ( userClient . getDatabaseId ( ) ) ;
36
39
} catch ( e ) {
37
40
this . logger . error ( 'Unable to create subscriptions' , e ) ;
38
41
@@ -57,6 +60,7 @@ export class PubSubService {
57
60
await Promise . all ( dto . subscriptions . map ( ( subDto ) => session . unsubscribe (
58
61
this . subscriptionProvider . createSubscription ( userClient , subDto ) ,
59
62
) ) ) ;
63
+ this . analyticsService . sendChannelUnsubscribeEvent ( userClient . getDatabaseId ( ) ) ;
60
64
} catch ( e ) {
61
65
this . logger . error ( 'Unable to unsubscribe' , e ) ;
62
66
@@ -81,9 +85,12 @@ export class PubSubService {
81
85
this . logger . log ( 'Publishing message.' ) ;
82
86
83
87
const client = await this . getClient ( clientOptions ) ;
88
+ const affected = await client . publish ( dto . channel , dto . message ) ;
89
+
90
+ this . analyticsService . sendMessagePublishedEvent ( clientOptions . instanceId , affected ) ;
84
91
85
92
return {
86
- affected : await client . publish ( dto . channel , dto . message ) ,
93
+ affected,
87
94
} ;
88
95
} catch ( e ) {
89
96
this . logger . error ( 'Unable to publish a message' , e ) ;
0 commit comments