@@ -24,6 +24,15 @@ describe('PgPubSub', () => {
2424 let pgClient : Client ;
2525 let pubSub : PgPubSub ;
2626
27+ const listenFunc = ( pubSubCopy : PgPubSub ) => {
28+ pubSubCopy . listen ( 'TestChannel' ) . then ( ( ) => {
29+ pgClient . emit ( 'notification' , {
30+ channel : 'TestChannel' ,
31+ payload : 'true' ,
32+ } ) ;
33+ } ) ;
34+ }
35+
2736 beforeEach ( ( ) => {
2837 pgClient = new Client ( ) ;
2938 pubSub = new PgPubSub ( { pgClient } ) ;
@@ -192,12 +201,7 @@ describe('PgPubSub', () => {
192201 it ( 'should handle messages from db with acquired lock' , done => {
193202 pubSub . options . singleListener = true ;
194203
195- pubSub . listen ( 'TestChannel' ) . then ( ( ) => {
196- pgClient . emit ( 'notification' , {
197- channel : 'TestChannel' ,
198- payload : 'true' ,
199- } ) ;
200- } ) ;
204+ listenFunc ( pubSub ) ;
201205
202206 pubSub . on ( 'message' , ( chanel , message ) => {
203207 expect ( chanel ) . equals ( 'TestChannel' ) ;
@@ -230,7 +234,7 @@ describe('PgPubSub', () => {
230234 const channel = `__${ PgIpLock . name } __:TestChannel` ;
231235
232236 await pubSub . listen ( 'TestChannel' ) ;
233- await pgClient . emit ( 'notification' , {
237+ pgClient . emit ( 'notification' , {
234238 channel,
235239 payload : 'true' ,
236240 } ) ;
@@ -240,6 +244,34 @@ describe('PgPubSub', () => {
240244 ) ) . to . be . false ;
241245 expect ( spyChannel . called ) . to . be . false ;
242246 } ) ;
247+ it ( 'should handle messages from db with acquired execution '
248+ + 'lock' , done => {
249+ pubSub = new PgPubSub ( {
250+ pgClient, executionLock : true , singleListener : true ,
251+ } ) ;
252+
253+ listenFunc ( pubSub ) ;
254+
255+ pubSub . on ( 'message' , ( chanel , message ) => {
256+ expect ( chanel ) . equals ( 'TestChannel' ) ;
257+ expect ( message ) . equals ( true ) ;
258+ done ( ) ;
259+ } ) ;
260+ } ) ;
261+ it ( 'should handle messages from db with acquired execution '
262+ + 'lock and multiple listeners' , done => {
263+ pubSub = new PgPubSub ( {
264+ pgClient, executionLock : true , singleListener : false ,
265+ } ) ;
266+
267+ listenFunc ( pubSub ) ;
268+
269+ pubSub . on ( 'message' , ( chanel , message ) => {
270+ expect ( chanel ) . equals ( 'TestChannel' ) ;
271+ expect ( message ) . equals ( true ) ;
272+ done ( ) ;
273+ } ) ;
274+ } ) ;
243275 } ) ;
244276 describe ( 'unlisten()' , ( ) => {
245277 it ( 'should call SQL UNLISTEN "channel" command' , async ( ) => {
@@ -466,6 +498,42 @@ describe('PgPubSub', () => {
466498
467499 expect ( counter ) . equals ( 1 ) ;
468500 } ) ;
501+ it ( 'should filter messages if set and "filtered" option is set and'
502+ + ' execution lock is set' , async ( ) => {
503+ const pubSubCopy = new PgPubSub ( {
504+ singleListener : false ,
505+ filtered : false ,
506+ executionLock : true ,
507+ pgClient,
508+ } ) ;
509+ ( pubSubCopy as any ) . processId = 7777 ;
510+
511+ await pubSubCopy . listen ( 'Test' ) ;
512+ let counter = 0 ;
513+
514+ pubSubCopy . channels . on ( 'Test' , ( ) => ++ counter ) ;
515+ pgClient . emit ( 'notification' , {
516+ processId : 7777 ,
517+ channel : 'Test' ,
518+ payload : 'true' ,
519+ } ) ;
520+
521+ await new Promise ( res => setTimeout ( res ) ) ;
522+
523+ expect ( counter ) . equals ( 1 ) ;
524+
525+ pubSubCopy . options . filtered = true ;
526+ pgClient . emit ( 'notification' , {
527+ processId : 7777 ,
528+ channel : 'Test' ,
529+ payload : 'true' ,
530+ } ) ;
531+
532+ await new Promise ( res => setTimeout ( res ) ) ;
533+
534+ expect ( counter ) . equals ( 1 ) ;
535+ await pubSub . destroy ( ) ;
536+ } ) ;
469537 } ) ;
470538 describe ( 'destroy()' , ( ) => {
471539 it ( 'should properly handle destruction' , async ( ) => {
0 commit comments