Skip to content

Commit 39b2fa7

Browse files
committed
ts: discover multiple coords fix
1 parent 8c52ee0 commit 39b2fa7

File tree

2 files changed

+68
-32
lines changed

2 files changed

+68
-32
lines changed

website/backend/src/chainTracker.ts

Lines changed: 40 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ interface TimestampedError {
2929
function discoverExistingCoordinators(stateDirectory: string): string[] {
3030
try {
3131
const files = readdirSync(stateDirectory)
32-
const coordinatorFiles = files.filter(file =>
33-
file.startsWith('coordinator-db-') && file.endsWith('.json')
32+
const coordinatorFiles = files.filter(
33+
(file) => file.startsWith('coordinator-db-') && file.endsWith('.json')
3434
)
35-
36-
return coordinatorFiles.map(file => {
35+
36+
return coordinatorFiles.map((file) => {
3737
// Extract program ID from filename: coordinator-db-{programId}.json
3838
const programId = file.slice('coordinator-db-'.length, -'.json'.length)
3939
return programId
@@ -166,11 +166,14 @@ export function startIndexingMultipleCoordinators(
166166
miningPool: ServiceConfig
167167
): {
168168
cancel: () => void
169-
coordinators: Map<string, {
170-
stopped: Promise<void>
171-
dataStore: CoordinatorDataStore
172-
errors: TimestampedError[]
173-
}>
169+
coordinators: Map<
170+
string,
171+
{
172+
stopped: Promise<void>
173+
dataStore: CoordinatorDataStore
174+
errors: TimestampedError[]
175+
}
176+
>
174177
miningPool: {
175178
stopped: Promise<void>
176179
dataStore: MiningPoolDataStore
@@ -185,45 +188,58 @@ export function startIndexingMultipleCoordinators(
185188
console.log('Discovered existing coordinators:', existingProgramIds)
186189

187190
// Start indexing the primary coordinator (current one from env)
188-
const primaryResult = startIndexingChainToDataStores(primaryCoordinator, miningPool)
189-
190-
const coordinators = new Map<string, {
191-
stopped: Promise<void>
192-
dataStore: CoordinatorDataStore
193-
errors: TimestampedError[]
194-
}>()
191+
const primaryResult = startIndexingChainToDataStores(
192+
primaryCoordinator,
193+
miningPool
194+
)
195+
196+
const coordinators = new Map<
197+
string,
198+
{
199+
stopped: Promise<void>
200+
dataStore: CoordinatorDataStore
201+
errors: TimestampedError[]
202+
}
203+
>()
195204

196205
// Add primary coordinator to map
197-
const primaryProgramId = primaryCoordinator.addressOverride || coordinatorIdl.address
206+
const primaryProgramId =
207+
primaryCoordinator.addressOverride || coordinatorIdl.address
198208
coordinators.set(primaryProgramId, primaryResult.coordinator)
199209

200210
// Create data stores for existing coordinators (read-only mode)
201211
for (const programId of existingProgramIds) {
202212
if (programId !== primaryProgramId) {
203213
console.log(`Loading existing coordinator database: ${programId}`)
204-
214+
205215
try {
206216
const publicKey = new PublicKey(programId)
207-
const dataStore = new FlatFileCoordinatorDataStore(stateDirectory, publicKey)
208-
217+
const dataStore = new FlatFileCoordinatorDataStore(
218+
stateDirectory,
219+
publicKey
220+
)
221+
209222
// Create a resolved promise since we're only reading existing data
210223
const stopped = Promise.resolve()
211224
const errors: TimestampedError[] = []
212-
225+
213226
coordinators.set(programId, {
214227
stopped,
215228
dataStore,
216-
errors
229+
errors,
217230
})
218231
} catch (error) {
219-
console.error(`Failed to load coordinator database for ${programId}:`, error)
232+
console.error(
233+
`Failed to load coordinator database for ${programId}:`,
234+
error
235+
)
220236
}
221237
}
222238
}
223239

224240
return {
225241
coordinators,
226242
miningPool: primaryResult.miningPool,
227-
cancel: primaryResult.cancel
243+
cancel: primaryResult.cancel,
228244
}
229245
}

website/backend/src/index.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ async function main() {
130130
console.log('got shutdown signal, shutting down!')
131131
cancel()
132132
await fastify.close()
133-
const allCoordinatorPromises = Array.from(coordinators.values()).map(c => c.stopped)
133+
const allCoordinatorPromises = Array.from(coordinators.values()).map(
134+
(c) => c.stopped
135+
)
134136
await Promise.all([...allCoordinatorPromises, miningPool.stopped])
135137
process.exit(0)
136138
}
@@ -231,9 +233,13 @@ async function main() {
231233
const coordinatorSummary = coordinator.dataStore.getRunSummaries()
232234
allRuns = allRuns.concat(coordinatorSummary.runs)
233235
totalTokens += coordinatorSummary.totalTokens
234-
totalTokensPerSecondActive += coordinatorSummary.totalTokensPerSecondActive
236+
totalTokensPerSecondActive +=
237+
coordinatorSummary.totalTokensPerSecondActive
235238
} catch (error) {
236-
console.error(`Failed to get run summaries from coordinator ${programId}:`, error)
239+
console.error(
240+
`Failed to get run summaries from coordinator ${programId}:`,
241+
error
242+
)
237243
}
238244
}
239245

@@ -268,7 +274,7 @@ async function main() {
268274
// Search for the run across all coordinators
269275
let matchingRun: any = null
270276
let totalRuns = 0
271-
277+
272278
if (runId) {
273279
for (const [programId, coordinator] of coordinators) {
274280
try {
@@ -279,7 +285,10 @@ async function main() {
279285
}
280286
totalRuns += coordinator.dataStore.getNumRuns()
281287
} catch (error) {
282-
console.error(`Failed to search for run in coordinator ${programId}:`, error)
288+
console.error(
289+
`Failed to search for run in coordinator ${programId}:`,
290+
error
291+
)
283292
}
284293
}
285294
}
@@ -317,10 +326,16 @@ async function main() {
317326
res.send(stream)
318327

319328
function sendRunData(runData: RunData) {
329+
// Calculate total runs across all coordinators
330+
let totalRuns = 0
331+
for (const [_, coordinator] of coordinators) {
332+
totalRuns += coordinator.dataStore.getNumRuns()
333+
}
334+
320335
const data: ApiGetRun = {
321336
run: runData,
322337
error: coordinatorCrashed,
323-
isOnlyRun: coordinator.dataStore.getNumRuns() === 1,
338+
isOnlyRun: totalRuns === 1,
324339
}
325340
stream.write(JSON.stringify(data, replacer) + '\n')
326341
}
@@ -349,7 +364,12 @@ async function main() {
349364
errors: coordinator.errors,
350365
trackedRuns: coordinator.dataStore
351366
.getRunSummaries()
352-
.runs.map((r) => ({ id: r.id, index: r.index, status: r.status, programId: r.programId })),
367+
.runs.map((r) => ({
368+
id: r.id,
369+
index: r.index,
370+
status: r.status,
371+
programId: r.programId,
372+
})),
353373
chain: {
354374
chainSlotHeight: await coordinatorRpc.getSlot('confirmed'),
355375
indexedSlot:
@@ -363,7 +383,7 @@ async function main() {
363383
status: `error: ${error}`,
364384
errors: [],
365385
trackedRuns: [],
366-
chain: { programId }
386+
chain: { programId },
367387
}
368388
}
369389
}

0 commit comments

Comments
 (0)