@@ -33,9 +33,30 @@ const JUNCTION_TABLE_MAP = {
33
33
@EventSubscriber ( )
34
34
export class PostgresSubscriber implements EntitySubscriberInterface {
35
35
private adapter : Web3Adapter ;
36
+ private pendingChanges : Map < string , number > = new Map ( ) ;
36
37
37
38
constructor ( ) {
38
39
this . adapter = adapter ;
40
+
41
+ // Clean up old pending changes every 5 minutes to prevent memory leaks
42
+ setInterval ( ( ) => {
43
+ this . cleanupOldPendingChanges ( ) ;
44
+ } , 5 * 60 * 1000 ) ;
45
+ }
46
+
47
+ /**
48
+ * Clean up old pending changes to prevent memory leaks
49
+ */
50
+ private cleanupOldPendingChanges ( ) : void {
51
+ const now = Date . now ( ) ;
52
+ const maxAge = 10 * 60 * 1000 ; // 10 minutes
53
+
54
+ for ( const [ key , timestamp ] of this . pendingChanges . entries ( ) ) {
55
+ if ( now - timestamp > maxAge ) {
56
+ this . pendingChanges . delete ( key ) ;
57
+ console . log ( `Cleaned up old pending change: ${ key } ` ) ;
58
+ }
59
+ }
39
60
}
40
61
41
62
/**
@@ -164,28 +185,49 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
164
185
const data = this . entityToPlain ( entity ) ;
165
186
if ( ! data . id ) return ;
166
187
188
+ // Create a unique key for this entity change to prevent duplicates
189
+ const changeKey = `${ tableName } :${ entity . id } ` ;
190
+
191
+ // Check if we already have a pending change for this entity
192
+ if ( this . pendingChanges . has ( changeKey ) ) {
193
+ console . log ( `Change already pending for ${ changeKey } , skipping duplicate` ) ;
194
+ return ;
195
+ }
196
+
197
+ // Mark this change as pending with timestamp
198
+ this . pendingChanges . set ( changeKey , Date . now ( ) ) ;
199
+
167
200
try {
168
201
setTimeout ( async ( ) => {
169
- let globalId = await this . adapter . mappingDb . getGlobalId (
170
- entity . id
171
- ) ;
172
- globalId = globalId ?? "" ;
173
-
174
- if ( this . adapter . lockedIds . includes ( globalId ) )
175
- return console . log ( "locked skipping " , globalId ) ;
176
-
177
- console . log (
178
- "sending packet for global Id" ,
179
- globalId ,
180
- entity . id
181
- ) ;
182
- const envelope = await this . adapter . handleChange ( {
183
- data,
184
- tableName : tableName . toLowerCase ( ) ,
185
- } ) ;
202
+ try {
203
+ let globalId = await this . adapter . mappingDb . getGlobalId (
204
+ entity . id
205
+ ) ;
206
+ globalId = globalId ?? "" ;
207
+
208
+ if ( this . adapter . lockedIds . includes ( globalId ) ) {
209
+ console . log ( "locked skipping " , globalId ) ;
210
+ return ;
211
+ }
212
+
213
+ console . log (
214
+ "sending packet for global Id" ,
215
+ globalId ,
216
+ entity . id
217
+ ) ;
218
+ const envelope = await this . adapter . handleChange ( {
219
+ data,
220
+ tableName : tableName . toLowerCase ( ) ,
221
+ } ) ;
222
+ } finally {
223
+ // Always remove the pending change flag
224
+ this . pendingChanges . delete ( changeKey ) ;
225
+ }
186
226
} , 3_000 ) ;
187
227
} catch ( error ) {
188
228
console . error ( `Error processing change for ${ tableName } :` , error ) ;
229
+ // Remove the pending change flag on error
230
+ this . pendingChanges . delete ( changeKey ) ;
189
231
}
190
232
}
191
233
@@ -217,34 +259,52 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
217
259
return ;
218
260
}
219
261
220
- let globalId = await this . adapter . mappingDb . getGlobalId ( entity . id ) ;
221
- globalId = globalId ?? "" ;
262
+ // Create a unique key for this junction table change to prevent duplicates
263
+ const changeKey = `junction:${ junctionInfo . entity } :${ parentId } ` ;
264
+
265
+ // Check if we already have a pending change for this parent entity
266
+ if ( this . pendingChanges . has ( changeKey ) ) {
267
+ console . log ( `Junction change already pending for ${ changeKey } , skipping duplicate` ) ;
268
+ return ;
269
+ }
270
+
271
+ // Mark this change as pending with timestamp
272
+ this . pendingChanges . set ( changeKey , Date . now ( ) ) ;
222
273
223
- // Use immediate locking instead of setTimeout to prevent race conditions
274
+ // Use setTimeout to prevent race conditions
224
275
try {
225
276
setTimeout ( async ( ) => {
226
- let globalId = await this . adapter . mappingDb . getGlobalId (
227
- entity . id
228
- ) ;
229
- globalId = globalId ?? "" ;
230
-
231
- if ( this . adapter . lockedIds . includes ( globalId ) )
232
- return console . log ( "locked skipping " , globalId ) ;
233
-
234
- console . log (
235
- "sending packet for global Id" ,
236
- globalId ,
237
- entity . id
238
- ) ;
239
-
240
- const tableName = `${ junctionInfo . entity . toLowerCase ( ) } s` ;
241
- await this . adapter . handleChange ( {
242
- data : this . entityToPlain ( parentEntity ) ,
243
- tableName,
244
- } ) ;
277
+ try {
278
+ let globalId = await this . adapter . mappingDb . getGlobalId (
279
+ entity . id
280
+ ) ;
281
+ globalId = globalId ?? "" ;
282
+
283
+ if ( this . adapter . lockedIds . includes ( globalId ) ) {
284
+ console . log ( "locked skipping " , globalId ) ;
285
+ return ;
286
+ }
287
+
288
+ console . log (
289
+ "sending packet for global Id" ,
290
+ globalId ,
291
+ entity . id
292
+ ) ;
293
+
294
+ const tableName = `${ junctionInfo . entity . toLowerCase ( ) } s` ;
295
+ await this . adapter . handleChange ( {
296
+ data : this . entityToPlain ( parentEntity ) ,
297
+ tableName,
298
+ } ) ;
299
+ } finally {
300
+ // Always remove the pending change flag
301
+ this . pendingChanges . delete ( changeKey ) ;
302
+ }
245
303
} , 3_000 ) ;
246
304
} catch ( error ) {
247
305
console . error ( error ) ;
306
+ // Remove the pending change flag on error
307
+ this . pendingChanges . delete ( changeKey ) ;
248
308
}
249
309
} catch ( error ) {
250
310
console . error ( "Error handling junction table change:" , error ) ;
0 commit comments