@@ -18,7 +18,7 @@ import {
1818 ILogger ,
1919 IMQService ,
2020 JsonObject ,
21- RedisCache ,
21+ RedisCache , signature ,
2222} from '@imqueue/rpc' ;
2323import { TagCache } from '@imqueue/tag-cache' ;
2424import { PgPubSub } from '@imqueue/pg-pubsub' ;
@@ -245,6 +245,7 @@ export function PgCache(options: PgCacheOptions): ClassDecorator {
245245 constructor : T ,
246246 ) : T & PgCacheable => {
247247 const init = constructor . prototype . start ;
248+ const pgCacheChannels = constructor . prototype . pgCacheChannels ;
248249
249250 class CachedService {
250251 private taggedCache : TagCache ;
@@ -254,6 +255,10 @@ export function PgCache(options: PgCacheOptions): ClassDecorator {
254255 } as any ) ;
255256
256257 public async start ( ...args : any [ ] ) : Promise < void > {
258+ this . pubSub = new PgPubSub ( {
259+ connectionString : options . postgres ,
260+ } ) ;
261+
257262 if ( init && typeof init === 'function' ) {
258263 await init . apply ( this , args ) ;
259264 }
@@ -278,7 +283,10 @@ export function PgCache(options: PgCacheOptions): ClassDecorator {
278283
279284 this . taggedCache = new TagCache ( cache ) ;
280285
281- const channels = Object . keys ( this . pgCacheChannels || { } ) ;
286+ const pgChannels = this . pgCacheChannels
287+ || pgCacheChannels
288+ || { } ;
289+ const channels = Object . keys ( pgChannels ) ;
282290
283291 if ( ! ( channels && channels . length ) ) {
284292 return ;
@@ -298,11 +306,11 @@ export function PgCache(options: PgCacheOptions): ClassDecorator {
298306 channel , payload ,
299307 ) ;
300308
301- const methods = this . pgCacheChannels [ channel ] || [ ] ;
309+ const methods = pgChannels [ channel ] || [ ] ;
302310 const data = payload as unknown as ChannelPayload ;
303311
304312 for ( const [ method , filter ] of methods ) {
305- const useTag = ` ${ className } : ${ method } ` ;
313+ const useTag = signature ( className , method , [ ] ) ;
306314
307315 if ( needInvalidate ( data , filter ) ) {
308316 invalidate ( this , useTag ) ;
0 commit comments