11import { anyPass , equals , isNil , map , propSatisfies , uniqWith } from 'ramda'
2+ import { addAbortSignal } from 'stream'
23import { pipeline } from 'stream/promises'
34
45import { createEndOfStoredEventsNoticeMessage , createNoticeMessage , createOutgoingEventMessage } from '../utils/messages'
@@ -57,17 +58,16 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
5758
5859 const findEvents = this . eventRepository . findByFilters ( filters ) . stream ( )
5960
61+ const abortableFintEvents = addAbortSignal ( this . abortController . signal , findEvents )
62+
6063 try {
6164 await pipeline (
62- findEvents ,
65+ abortableFintEvents ,
6366 streamFilter ( propSatisfies ( isNil , 'deleted_at' ) ) ,
6467 streamMap ( toNostrEvent ) ,
6568 streamFilter ( isSubscribedToEvent ) ,
6669 streamEach ( sendEvent ) ,
6770 streamEnd ( sendEOSE ) ,
68- {
69- signal : this . abortController . signal ,
70- }
7171 )
7272 } catch ( error ) {
7373 if ( error instanceof Error && error . name === 'AbortError' ) {
@@ -92,14 +92,14 @@ export class SubscribeMessageHandler implements IMessageHandler, IAbortable {
9292 return `Duplicate subscription ${ subscriptionId } : Ignorning`
9393 }
9494
95- const maxSubscriptions = this . settings ( ) . limits . client . subscription . maxSubscriptions
95+ const maxSubscriptions = this . settings ( ) . limits ? .client ? .subscription ? .maxSubscriptions ?? 0
9696 if ( maxSubscriptions > 0
9797 && ! existingSubscription ?. length && subscriptions . size + 1 > maxSubscriptions
9898 ) {
9999 return `Too many subscriptions: Number of subscriptions must be less than or equal to ${ maxSubscriptions } `
100100 }
101101
102- const maxFilters = this . settings ( ) . limits . client . subscription . maxFilters
102+ const maxFilters = this . settings ( ) . limits ? .client ? .subscription ? .maxFilters ?? 0
103103 if ( maxFilters > 0 ) {
104104 if ( filters . length > maxFilters ) {
105105 return `Too many filters: Number of filters per susbscription must be less then or equal to ${ maxFilters } `
0 commit comments