Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@
on:click={handleVersionTap}
disabled={isRetrying}
>
Version v0.2.1.1
Version v0.2.1.2
</button>

{#if retryMessage}
Expand Down
138 changes: 99 additions & 39 deletions platforms/pictique-api/src/web3adapter/watchers/subscriber.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,30 @@ const JUNCTION_TABLE_MAP = {
@EventSubscriber()
export class PostgresSubscriber implements EntitySubscriberInterface {
private adapter: Web3Adapter;
private pendingChanges: Map<string, number> = new Map();

constructor() {
this.adapter = adapter;

// Clean up old pending changes every 5 minutes to prevent memory leaks
setInterval(() => {
this.cleanupOldPendingChanges();
}, 5 * 60 * 1000);
}

/**
* Clean up old pending changes to prevent memory leaks
*/
private cleanupOldPendingChanges(): void {
const now = Date.now();
const maxAge = 10 * 60 * 1000; // 10 minutes

for (const [key, timestamp] of this.pendingChanges.entries()) {
if (now - timestamp > maxAge) {
this.pendingChanges.delete(key);
console.log(`Cleaned up old pending change: ${key}`);
}
}
}

/**
Expand Down Expand Up @@ -164,28 +185,49 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
const data = this.entityToPlain(entity);
if (!data.id) return;

// Create a unique key for this entity change to prevent duplicates
const changeKey = `${tableName}:${entity.id}`;

// Check if we already have a pending change for this entity
if (this.pendingChanges.has(changeKey)) {
console.log(`Change already pending for ${changeKey}, skipping duplicate`);
return;
}

// Mark this change as pending with timestamp
this.pendingChanges.set(changeKey, Date.now());

try {
setTimeout(async () => {
let globalId = await this.adapter.mappingDb.getGlobalId(
entity.id
);
globalId = globalId ?? "";

if (this.adapter.lockedIds.includes(globalId))
return console.log("locked skipping ", globalId);

console.log(
"sending packet for global Id",
globalId,
entity.id
);
const envelope = await this.adapter.handleChange({
data,
tableName: tableName.toLowerCase(),
});
try {
let globalId = await this.adapter.mappingDb.getGlobalId(
entity.id
);
globalId = globalId ?? "";

if (this.adapter.lockedIds.includes(globalId)) {
console.log("locked skipping ", globalId);
return;
}

console.log(
"sending packet for global Id",
globalId,
entity.id
);
const envelope = await this.adapter.handleChange({
data,
tableName: tableName.toLowerCase(),
});
} finally {
// Always remove the pending change flag
this.pendingChanges.delete(changeKey);
}
}, 3_000);
} catch (error) {
console.error(`Error processing change for ${tableName}:`, error);
// Remove the pending change flag on error
this.pendingChanges.delete(changeKey);
}
}

Expand Down Expand Up @@ -217,34 +259,52 @@ export class PostgresSubscriber implements EntitySubscriberInterface {
return;
}

let globalId = await this.adapter.mappingDb.getGlobalId(entity.id);
globalId = globalId ?? "";
// Create a unique key for this junction table change to prevent duplicates
const changeKey = `junction:${junctionInfo.entity}:${parentId}`;

// Check if we already have a pending change for this parent entity
if (this.pendingChanges.has(changeKey)) {
console.log(`Junction change already pending for ${changeKey}, skipping duplicate`);
return;
}

// Mark this change as pending with timestamp
this.pendingChanges.set(changeKey, Date.now());

// Use immediate locking instead of setTimeout to prevent race conditions
// Use setTimeout to prevent race conditions
try {
setTimeout(async () => {
let globalId = await this.adapter.mappingDb.getGlobalId(
entity.id
);
globalId = globalId ?? "";

if (this.adapter.lockedIds.includes(globalId))
return console.log("locked skipping ", globalId);

console.log(
"sending packet for global Id",
globalId,
entity.id
);

const tableName = `${junctionInfo.entity.toLowerCase()}s`;
await this.adapter.handleChange({
data: this.entityToPlain(parentEntity),
tableName,
});
try {
let globalId = await this.adapter.mappingDb.getGlobalId(
entity.id
);
globalId = globalId ?? "";

if (this.adapter.lockedIds.includes(globalId)) {
console.log("locked skipping ", globalId);
return;
}

console.log(
"sending packet for global Id",
globalId,
entity.id
);

const tableName = `${junctionInfo.entity.toLowerCase()}s`;
await this.adapter.handleChange({
data: this.entityToPlain(parentEntity),
tableName,
});
} finally {
// Always remove the pending change flag
this.pendingChanges.delete(changeKey);
}
}, 3_000);
} catch (error) {
console.error(error);
// Remove the pending change flag on error
this.pendingChanges.delete(changeKey);
}
} catch (error) {
console.error("Error handling junction table change:", error);
Expand Down
Loading