File tree Expand file tree Collapse file tree 2 files changed +17
-6
lines changed
packages/service-utils/src/node Expand file tree Collapse file tree 2 files changed +17
-6
lines changed Original file line number Diff line number Diff line change 1+ ---
2+ " @thirdweb-dev/service-utils " : patch
3+ ---
4+
5+ [ service-utils] Add connect promise so it only connects once
Original file line number Diff line number Diff line change @@ -31,7 +31,8 @@ export interface KafkaProducerSendOptions {
3131 */
3232export class KafkaProducer {
3333 private producer : KafkaJS . Producer ;
34- private isConnected = false ;
34+ // Use a promise to ensure `connect()` is called at most once.
35+ private connectPromise ?: Promise < void > ;
3536
3637 constructor ( options : {
3738 /**
@@ -68,10 +69,17 @@ export class KafkaProducer {
6869
6970 /**
7071 * Connects the producer. Can be called explicitly at the start of your service, or will be called automatically when sending messages.
72+ *
73+ * A cached promise is used so this function is safe to call more than once and concurrently.
7174 */
7275 async connect ( ) {
73- await this . producer . connect ( ) ;
74- this . isConnected = true ;
76+ if ( ! this . connectPromise ) {
77+ this . connectPromise = this . producer . connect ( ) . catch ( ( err ) => {
78+ this . connectPromise = undefined ;
79+ throw err ;
80+ } ) ;
81+ }
82+ await this . connectPromise ;
7583 }
7684
7785 /**
@@ -88,9 +96,7 @@ export class KafkaProducer {
8896 topic : string ,
8997 messages : Record < string , unknown > [ ] ,
9098 ) : Promise < void > {
91- if ( ! this . isConnected ) {
92- await this . connect ( ) ;
93- }
99+ await this . connect ( ) ;
94100
95101 await this . producer . send ( {
96102 topic,
You can’t perform that action at this time.
0 commit comments