@@ -2,9 +2,9 @@ import { randomUUID } from "crypto"
22import { coerce , lt } from "semver"
33import { inspect } from "util"
44import { Compression , CompressionType , GzipCompression , NoneCompression } from "./compression"
5- import { Connection , ConnectionInfo , errorMessageOf } from "./connection"
6- import { ConnectionPool } from "./connection_pool"
7- import { Consumer , ConsumerFunc , StreamConsumer } from "./consumer"
5+ import { Connection , ConnectionInfo , ConnectionParams , errorMessageOf } from "./connection"
6+ import { ConnectionPool , ConnectionPurpose } from "./connection_pool"
7+ import { Consumer , ConsumerFunc , StreamConsumer , computeExtendedConsumerId } from "./consumer"
88import { STREAM_ALREADY_EXISTS_ERROR_CODE } from "./error_codes"
99import { Logger , NullLogger } from "./logger"
1010import { FilterFunc , Message , Publisher , StreamPublisher } from "./publisher"
@@ -56,10 +56,8 @@ type PublisherMappedValue = {
5656}
5757export class Client {
5858 public readonly id : string = randomUUID ( )
59- private publisherId = 0
60- private consumerId = 0
61- private consumers = new Map < number , ConsumerMappedValue > ( )
62- private publishers = new Map < number , PublisherMappedValue > ( )
59+ private consumers = new Map < string , ConsumerMappedValue > ( )
60+ private publishers = new Map < string , PublisherMappedValue > ( )
6361 private compressions = new Map < CompressionType , Compression > ( )
6462 private connection : Connection
6563
@@ -134,8 +132,8 @@ export class Client {
134132 }
135133
136134 public async declarePublisher ( params : DeclarePublisherParams , filter ?: FilterFunc ) : Promise < Publisher > {
137- const publisherId = this . incPublisherId ( )
138- const connection = await this . getConnection ( params . stream , true , params . connectionClosedListener )
135+ const connection = await this . getConnection ( params . stream , "publisher" , params . connectionClosedListener )
136+ const publisherId = connection . getNextPublisherId ( )
139137 await this . declarePublisherOnConnection ( params , publisherId , connection , filter )
140138 const streamPublisherParams = {
141139 connection : connection ,
@@ -151,32 +149,35 @@ export class Client {
151149 connection . on ( "metadata_update" , async ( metadata ) => {
152150 if ( metadata . metadataInfo . stream === publisher . streamName ) {
153151 await publisher . close ( false )
154- this . publishers . delete ( publisherId )
152+ this . publishers . delete ( publisher . extendedId )
155153 }
156154 } )
157- this . publishers . set ( publisherId , { publisher, connection, params, filter } )
155+ this . publishers . set ( publisher . extendedId , { publisher, connection, params, filter } )
158156 this . logger . info (
159157 `New publisher created with stream name ${ params . stream } , publisher id ${ publisherId } and publisher reference ${ params . publisherRef } `
160158 )
161159 return publisher
162160 }
163161
164- public async deletePublisher ( publisherId : number ) {
165- const publisherConnection = this . publishers . get ( publisherId ) ?. connection ?? this . connection
166- const res = await publisherConnection . sendAndWait < DeletePublisherResponse > ( new DeletePublisherRequest ( publisherId ) )
162+ public async deletePublisher ( extendedPublisherId : string ) {
163+ const { publisher, connection } = this . publishers . get ( extendedPublisherId ) ?? {
164+ publisher : undefined ,
165+ connection : this . connection ,
166+ }
167+ const publisherId = extractPublisherId ( extendedPublisherId )
168+ const res = await connection . sendAndWait < DeletePublisherResponse > ( new DeletePublisherRequest ( publisherId ) )
167169 if ( ! res . ok ) {
168170 throw new Error ( `Delete Publisher command returned error with code ${ res . code } - ${ errorMessageOf ( res . code ) } ` )
169171 }
170- await this . publishers . get ( publisherId ) ?. publisher . close ( true )
171- this . publishers . delete ( publisherId )
172+ await publisher ? .close ( true )
173+ this . publishers . delete ( extendedPublisherId )
172174 this . logger . info ( `deleted publisher with publishing id ${ publisherId } ` )
173175 return res . ok
174176 }
175177
176178 public async declareConsumer ( params : DeclareConsumerParams , handle : ConsumerFunc ) : Promise < Consumer > {
177- const consumerId = this . incConsumerId ( )
178-
179- const connection = await this . getConnection ( params . stream , false , params . connectionClosedListener )
179+ const connection = await this . getConnection ( params . stream , "consumer" , params . connectionClosedListener )
180+ const consumerId = connection . getNextConsumerId ( )
180181
181182 if ( params . filter && ! connection . isFilteringEnabled ) {
182183 throw new Error ( `Broker does not support message filtering.` )
@@ -192,31 +193,35 @@ export class Client {
192193 if ( params . connectionClosedListener ) {
193194 params . connectionClosedListener ( false )
194195 }
195- await this . closeConsumer ( consumerId )
196+ await this . closeConsumer ( consumer . extendedId )
196197 }
197198 } )
198- this . consumers . set ( consumerId , { connection, consumer, params } )
199+ this . consumers . set ( consumer . extendedId , { connection, consumer, params } )
199200 await this . declareConsumerOnConnection ( params , consumerId , connection )
200201 this . logger . info (
201202 `New consumer created with stream name ${ params . stream } , consumer id ${ consumerId } and offset ${ params . offset . type } `
202203 )
203204 return consumer
204205 }
205206
206- public async closeConsumer ( consumerId : number ) {
207- const { consumer, connection } = this . consumers . get ( consumerId ) ?? { consumer : undefined , connection : undefined }
207+ public async closeConsumer ( extendedConsumerId : string ) {
208+ const { consumer, connection } = this . consumers . get ( extendedConsumerId ) ?? {
209+ consumer : undefined ,
210+ connection : undefined ,
211+ }
212+ const consumerId = extractConsumerId ( extendedConsumerId )
208213
209214 if ( ! consumer ) {
210215 this . logger . error ( "Consumer does not exist" )
211- throw new Error ( `Consumer with id: ${ consumerId } does not exist` )
216+ throw new Error ( `Consumer with id: ${ extendedConsumerId } does not exist` )
212217 }
213218 const res = await connection . sendAndWait < UnsubscribeResponse > ( new UnsubscribeRequest ( consumerId ) )
214219 await consumer . close ( true )
215- this . consumers . delete ( consumerId )
220+ this . consumers . delete ( extendedConsumerId )
216221 if ( ! res . ok ) {
217222 throw new Error ( `Unsubscribe command returned error with code ${ res . code } - ${ errorMessageOf ( res . code ) } ` )
218223 }
219- this . logger . info ( `Closed consumer with id: ${ consumerId } ` )
224+ this . logger . info ( `Closed consumer with id: ${ extendedConsumerId } ` )
220225 return res . ok
221226 }
222227
@@ -248,12 +253,12 @@ export class Client {
248253
249254 private async closeAllConsumers ( manuallyClose : boolean ) {
250255 await Promise . all ( [ ...this . consumers . values ( ) ] . map ( ( { consumer } ) => consumer . close ( manuallyClose ) ) )
251- this . consumers = new Map < number , ConsumerMappedValue > ( )
256+ this . consumers = new Map < string , ConsumerMappedValue > ( )
252257 }
253258
254259 private async closeAllPublishers ( manuallyClose : boolean ) {
255260 await Promise . all ( [ ...this . publishers . values ( ) ] . map ( ( c ) => c . publisher . close ( manuallyClose ) ) )
256- this . publishers = new Map < number , PublisherMappedValue > ( )
261+ this . publishers = new Map < string , PublisherMappedValue > ( )
257262 }
258263
259264 public consumerCounts ( ) {
@@ -366,9 +371,9 @@ export class Client {
366371 }
367372
368373 public async restart ( ) {
369- this . logger . info ( `Restarting client connection ${ this . connection . id } ` )
374+ this . logger . info ( `Restarting client connection ${ this . connection . connectionId } ` )
370375 const uniqueConnectionIds = new Set < string > ( )
371- uniqueConnectionIds . add ( this . connection . id )
376+ uniqueConnectionIds . add ( this . connection . connectionId )
372377
373378 await new Promise ( async ( res ) => {
374379 setTimeout ( ( ) => {
@@ -378,21 +383,21 @@ export class Client {
378383 await this . connection . restart ( )
379384
380385 for ( const { consumer, connection, params } of this . consumers . values ( ) ) {
381- if ( ! uniqueConnectionIds . has ( connection . id ) ) {
382- this . logger . info ( `Restarting consumer connection ${ connection . id } ` )
386+ if ( ! uniqueConnectionIds . has ( connection . connectionId ) ) {
387+ this . logger . info ( `Restarting consumer connection ${ connection . connectionId } ` )
383388 await connection . restart ( )
384389 }
385- uniqueConnectionIds . add ( connection . id )
390+ uniqueConnectionIds . add ( connection . connectionId )
386391 const consumerParams = { ...params , offset : consumer . localOffset }
387392 await this . declareConsumerOnConnection ( consumerParams , consumer . consumerId , connection )
388393 }
389394
390395 for ( const { publisher, connection, params, filter } of this . publishers . values ( ) ) {
391- if ( ! uniqueConnectionIds . has ( connection . id ) ) {
392- this . logger . info ( `Restarting publisher connection ${ connection . id } ` )
396+ if ( ! uniqueConnectionIds . has ( connection . connectionId ) ) {
397+ this . logger . info ( `Restarting publisher connection ${ connection . connectionId } ` )
393398 await connection . restart ( )
394399 }
395- uniqueConnectionIds . add ( connection . id )
400+ uniqueConnectionIds . add ( connection . connectionId )
396401 await this . declarePublisherOnConnection ( params , publisher . publisherId , connection , filter )
397402 }
398403 }
@@ -466,7 +471,7 @@ export class Client {
466471 )
467472
468473 if ( ! res . ok ) {
469- this . consumers . delete ( consumerId )
474+ this . consumers . delete ( computeExtendedConsumerId ( consumerId , connection . connectionId ) )
470475 throw new Error ( `Declare Consumer command returned error with code ${ res . code } - ${ errorMessageOf ( res . code ) } ` )
471476 }
472477 }
@@ -475,21 +480,11 @@ export class Client {
475480 return connection . send ( new CreditRequest ( { ...params } ) )
476481 }
477482
478- private incPublisherId ( ) {
479- const publisherId = this . publisherId
480- this . publisherId ++
481- return publisherId
482- }
483-
484- private incConsumerId ( ) {
485- const consumerId = this . consumerId
486- this . consumerId ++
487- return consumerId
488- }
489-
490- private getDeliverV1Callback ( ) {
483+ private getDeliverV1Callback ( connectionId : string ) {
491484 return async ( response : DeliverResponse ) => {
492- const { consumer, connection } = this . consumers . get ( response . subscriptionId ) ?? {
485+ const { consumer, connection } = this . consumers . get (
486+ computeExtendedConsumerId ( response . subscriptionId , connectionId )
487+ ) ?? {
493488 consumer : undefined ,
494489 connection : undefined ,
495490 }
@@ -504,9 +499,11 @@ export class Client {
504499 }
505500 }
506501
507- private getDeliverV2Callback ( ) {
502+ private getDeliverV2Callback ( connectionId : string ) {
508503 return async ( response : DeliverResponseV2 ) => {
509- const { consumer, connection } = this . consumers . get ( response . subscriptionId ) ?? {
504+ const { consumer, connection } = this . consumers . get (
505+ computeExtendedConsumerId ( response . subscriptionId , connectionId )
506+ ) ?? {
510507 consumer : undefined ,
511508 connection : undefined ,
512509 }
@@ -525,9 +522,11 @@ export class Client {
525522 }
526523 }
527524
528- private getConsumerUpdateCallback ( ) {
525+ private getConsumerUpdateCallback ( connectionId : string ) {
529526 return async ( response : ConsumerUpdateQuery ) => {
530- const { consumer, connection } = this . consumers . get ( response . subscriptionId ) ?? {
527+ const { consumer, connection } = this . consumers . get (
528+ computeExtendedConsumerId ( response . subscriptionId , connectionId )
529+ ) ?? {
531530 consumer : undefined ,
532531 connection : undefined ,
533532 }
@@ -549,26 +548,26 @@ export class Client {
549548
550549 private async getConnection (
551550 streamName : string ,
552- leader : boolean ,
551+ purpose : ConnectionPurpose ,
553552 connectionClosedListener ?: ConnectionClosedListener
554553 ) : Promise < Connection > {
555554 const [ metadata ] = await this . queryMetadata ( { streams : [ streamName ] } )
556- const chosenNode = chooseNode ( metadata , leader )
555+ const chosenNode = chooseNode ( metadata , purpose === "publisher" )
557556 if ( ! chosenNode ) {
558557 throw new Error ( `Stream was not found on any node` )
559558 }
560- const cachedConnection = ConnectionPool . getUsableCachedConnection ( leader , streamName , chosenNode . host )
559+ const cachedConnection = ConnectionPool . getUsableCachedConnection ( purpose , streamName , chosenNode . host )
561560 if ( cachedConnection ) return cachedConnection
562561
563562 const newConnection = await this . getConnectionOnChosenNode (
564- leader ,
563+ purpose ,
565564 streamName ,
566565 chosenNode ,
567566 metadata ,
568567 connectionClosedListener
569568 )
570569
571- ConnectionPool . cacheConnection ( leader , streamName , newConnection . hostname , newConnection )
570+ ConnectionPool . cacheConnection ( purpose , streamName , newConnection . hostname , newConnection )
572571 return newConnection
573572 }
574573
@@ -593,25 +592,32 @@ export class Client {
593592 leader : boolean ,
594593 streamName : string ,
595594 connectionClosedListener ?: ConnectionClosedListener
596- ) {
595+ ) : ConnectionParams {
596+ const connectionId = randomUUID ( )
597597 const connectionListeners = {
598598 ...this . params . listeners ,
599599 connection_closed : connectionClosedListener ,
600- deliverV1 : this . getDeliverV1Callback ( ) ,
601- deliverV2 : this . getDeliverV2Callback ( ) ,
602- consumer_update_query : this . getConsumerUpdateCallback ( ) ,
600+ deliverV1 : this . getDeliverV1Callback ( connectionId ) ,
601+ deliverV2 : this . getDeliverV2Callback ( connectionId ) ,
602+ consumer_update_query : this . getConsumerUpdateCallback ( connectionId ) ,
603+ }
604+ return {
605+ ...this . params ,
606+ listeners : connectionListeners ,
607+ leader : leader ,
608+ streamName : streamName ,
609+ connectionId,
603610 }
604- return { ...this . params , listeners : connectionListeners , leader : leader , streamName : streamName }
605611 }
606612
607613 private async getConnectionOnChosenNode (
608- leader : boolean ,
614+ purpose : ConnectionPurpose ,
609615 streamName : string ,
610616 chosenNode : { host : string ; port : number } ,
611617 metadata : StreamMetadata ,
612618 connectionClosedListener ?: ConnectionClosedListener
613619 ) : Promise < Connection > {
614- const connectionParams = this . buildConnectionParams ( leader , streamName , connectionClosedListener )
620+ const connectionParams = this . buildConnectionParams ( purpose === "publisher" , streamName , connectionClosedListener )
615621 if ( this . params . addressResolver && this . params . addressResolver . enabled ) {
616622 const maxAttempts = computeMaxAttempts ( metadata )
617623 const resolver = this . params . addressResolver
@@ -756,3 +762,11 @@ const chooseNode = (metadata: { leader?: Broker; replicas?: Broker[] }, leader:
756762const computeMaxAttempts = ( metadata : { leader ?: Broker ; replicas ?: Broker [ ] } ) : number => {
757763 return Math . pow ( 2 + ( metadata . leader ? 1 : 0 ) + ( metadata . replicas ?. length ?? 0 ) , 2 )
758764}
765+
766+ const extractConsumerId = ( extendedConsumerId : string ) => {
767+ return parseInt ( extendedConsumerId . split ( "@" ) . shift ( ) ?? "0" )
768+ }
769+
770+ const extractPublisherId = ( extendedPublisherId : string ) => {
771+ return parseInt ( extendedPublisherId . split ( "@" ) . shift ( ) ?? "0" )
772+ }
0 commit comments