Skip to content

Commit 499381d

Browse files
authored
chore(index): Few adjustments (medusajs#12557)
**What** - Adjust lock duration, it is in seconds and not in ms - Log on lock release - Renew lock separately from the other promises - Add more logs - Log when lock can't be acquired. It can be expected in case two processes try to sync the same entity and in that case it can be ignored. But at least it gives some information in case it happens for another reason - Log when the release lock failed, the lock will remain in the locking provider for 1 minute before being removed. But it wont prevent other entities to be synced
1 parent 22687d6 commit 499381d

File tree

5 files changed

+65
-35
lines changed

5 files changed

+65
-35
lines changed

.changeset/proud-carrots-punch.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@medusajs/index": patch
3+
---
4+
5+
chore(index): Few adjustments

packages/modules/index/integration-tests/__tests__/orchestrator.spec.ts

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { asValue } from "awilix"
2-
import { container } from "@medusajs/framework"
2+
import { container, logger } from "@medusajs/framework"
33
import type { IndexTypes } from "@medusajs/types"
44
import { Orchestrator } from "@utils"
55

@@ -12,7 +12,7 @@ function creatingFakeLockingModule() {
1212
}
1313
this.lockEntities.add(key)
1414
},
15-
release(key: string) {
15+
async release(key: string) {
1616
this.lockEntities.delete(key)
1717
},
1818
}
@@ -55,6 +55,7 @@ describe("Orchestrator", () => {
5555
entities.map((e) => e.entity),
5656
{
5757
lockDuration: 60 * 1000,
58+
logger: logger,
5859
}
5960
)
6061

@@ -101,6 +102,7 @@ describe("Orchestrator", () => {
101102
entities.map((e) => e.entity),
102103
{
103104
lockDuration: 60 * 1000,
105+
logger: logger,
104106
}
105107
)
106108

@@ -150,6 +152,7 @@ describe("Orchestrator", () => {
150152
entityNames,
151153
{
152154
lockDuration: 60 * 1000,
155+
logger: logger,
153156
}
154157
)
155158

@@ -162,23 +165,26 @@ describe("Orchestrator", () => {
162165
entityNames,
163166
{
164167
lockDuration: 60 * 1000,
168+
logger: logger,
165169
}
166170
)
167171

168172
await Promise.all([
169173
orchestrator.process(taskRunner),
170174
orchestrator1.process(taskRunner2),
171175
])
172-
expect(processedEntities).toEqual([
173-
{
174-
entity: "brand",
175-
owner: "instance-1",
176-
},
177-
{
178-
entity: "product",
179-
owner: "instance-2",
180-
},
181-
])
176+
expect(processedEntities).toEqual(
177+
expect.arrayContaining([
178+
{
179+
entity: "brand",
180+
owner: "instance-1",
181+
},
182+
{
183+
entity: "product",
184+
owner: "instance-2",
185+
},
186+
])
187+
)
182188
expect(lockingModule.lockEntities.size).toEqual(0)
183189
})
184190

@@ -221,6 +227,7 @@ describe("Orchestrator", () => {
221227
entities.map((e) => e.entity),
222228
{
223229
lockDuration: 60 * 1000,
230+
logger: logger,
224231
}
225232
)
226233

@@ -269,6 +276,7 @@ describe("Orchestrator", () => {
269276
entities.map((e) => e.entity),
270277
{
271278
lockDuration: 60 * 1000,
279+
logger: logger,
272280
}
273281
)
274282

packages/modules/index/src/services/data-synchronizer.ts

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,13 @@ export class DataSynchronizer {
8585
fields: string
8686
fields_hash: string
8787
}[],
88-
lockDuration: number = 1000 * 60 * 5
88+
lockDuration: number = 60 // 1 minute
8989
) {
9090
this.#isReadyOrThrow()
9191
const entitiesToSync = entities.map((entity) => entity.entity)
9292
this.#orchestrator = new Orchestrator(this.#locking, entitiesToSync, {
9393
lockDuration,
94+
logger: this.#logger,
9495
})
9596
await this.#orchestrator.process(this.#taskRunner.bind(this))
9697
}
@@ -156,26 +157,23 @@ export class DataSynchronizer {
156157
ack: async (ack) => {
157158
const endTime = performance.now()
158159
const chunkElapsedTime = (endTime - chunkStartTime).toFixed(2)
159-
const promises: Promise<any>[] = []
160160

161161
if (ack.lastCursor) {
162162
this.#logger.debug(
163163
`[Index engine] syncing entity '${entity}' updating last cursor to ${ack.lastCursor} (+${chunkElapsedTime}ms)`
164164
)
165165

166-
promises.push(
167-
this.#indexSyncService.update({
168-
data: {
169-
last_key: ack.lastCursor,
170-
},
171-
selector: {
172-
entity,
173-
},
174-
})
175-
)
166+
await this.#indexSyncService.update({
167+
data: {
168+
last_key: ack.lastCursor,
169+
},
170+
selector: {
171+
entity,
172+
},
173+
})
176174

177175
if (!ack.done && !ack.err) {
178-
promises.push(this.#orchestrator.renewLock(entity))
176+
await this.#orchestrator.renewLock(entity)
179177
}
180178
}
181179

@@ -192,8 +190,6 @@ export class DataSynchronizer {
192190
)
193191
}
194192

195-
await promiseAll(promises)
196-
197193
chunkStartTime = performance.now()
198194
},
199195
})

packages/modules/index/src/utils/sync/configuration.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -139,7 +139,11 @@ export class Configuration {
139139
}
140140

141141
const changes = await this.#indexMetadataService.list({
142-
status: [IndexMetadataStatus.PENDING, IndexMetadataStatus.PROCESSING],
142+
status: [
143+
IndexMetadataStatus.PENDING,
144+
IndexMetadataStatus.PROCESSING,
145+
IndexMetadataStatus.ERROR,
146+
],
143147
})
144148

145149
this.#logger.info(

packages/modules/index/src/utils/sync/orchestrator.ts

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1-
import { ILockingModule } from "@medusajs/types"
1+
import { ILockingModule, Logger } from "@medusajs/types"
22

33
export class Orchestrator {
44
/**
55
* Reference to the locking module
66
*/
77
#lockingModule: ILockingModule
88

9+
/**
10+
* Reference to the logger
11+
*/
12+
#logger: Logger
13+
914
/**
1015
* Owner id when acquiring locks
1116
*/
@@ -74,11 +79,13 @@ export class Orchestrator {
7479
entities: string[],
7580
options: {
7681
lockDuration: number
82+
logger: Logger
7783
}
7884
) {
7985
this.#lockingModule = lockingModule
8086
this.#entities = entities
8187
this.#options = options
88+
this.#logger = options.logger
8289
}
8390

8491
/**
@@ -104,14 +111,14 @@ export class Orchestrator {
104111
}
105112

106113
/**
107-
* Processes the entity at a given index. If there are no entities
114+
* Processes the entity. If there are no entities
108115
* left, the orchestrator state will be set to completed.
109116
*
110117
* - Task runner is the implementation function to execute a task.
111118
* Orchestrator has no inbuilt execution logic and it relies on
112119
* the task runner for the same.
113120
*/
114-
async #processAtIndex(
121+
async #processEntity(
115122
taskRunner: (entity: string) => Promise<void>,
116123
entity: string
117124
) {
@@ -123,10 +130,20 @@ export class Orchestrator {
123130
this.#state = "error"
124131
throw error
125132
} finally {
126-
await this.#lockingModule.release(entity, {
127-
ownerId: this.#lockingOwner,
128-
})
133+
await this.#lockingModule
134+
.release(entity, {
135+
ownerId: this.#lockingOwner,
136+
})
137+
.catch(() => {
138+
this.#logger.error(
139+
`[Index engine] failed to release lock for entity '${entity}'`
140+
)
141+
})
129142
}
143+
} else {
144+
this.#logger.warn(
145+
`[Index engine] failed to acquire lock for entity '${entity}' on pid ${process.pid}. It means another process is already processing this entity or a lock is still present in your locking provider.`
146+
)
130147
}
131148
}
132149

@@ -152,7 +169,7 @@ export class Orchestrator {
152169
break
153170
}
154171

155-
await this.#processAtIndex(taskRunner, entity)
172+
await this.#processEntity(taskRunner, entity)
156173
}
157174

158175
this.#state = "completed"

0 commit comments

Comments
 (0)