Skip to content

Commit 8c52ee0

Browse files
committed
ts: discover multiple coord dbs
1 parent 332d08f commit 8c52ee0

File tree

2 files changed

+189
-50
lines changed

2 files changed

+189
-50
lines changed

website/backend/src/chainTracker.ts

Lines changed: 88 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { Program } from '@coral-xyz/anchor'
2-
import { Connection } from '@solana/web3.js'
2+
import { Connection, PublicKey } from '@solana/web3.js'
33
import {
44
coordinatorIdl,
55
miningPoolIdl,
@@ -8,10 +8,11 @@ import {
88
} from 'shared'
99
import { CoordinatorDataStore, MiningPoolDataStore } from './dataStore.js'
1010
import { startWatchCoordinatorChainLoop } from './coordinatorChainLoop.js'
11-
import { mkdirSync } from 'fs'
11+
import { mkdirSync, readdirSync } from 'fs'
1212
import { FlatFileCoordinatorDataStore } from './dataStores/flatFileCoordinator.js'
1313
import { FlatFileMiningPoolDataStore } from './dataStores/flatFileMiningPool.js'
1414
import { startWatchMiningPoolChainLoop } from './miningPoolChainLoop.js'
15+
import path from 'path'
1516

1617
interface ServiceConfig {
1718
connection: Connection
@@ -25,6 +26,24 @@ interface TimestampedError {
2526
error: unknown
2627
}
2728

29+
function discoverExistingCoordinators(stateDirectory: string): string[] {
30+
try {
31+
const files = readdirSync(stateDirectory)
32+
const coordinatorFiles = files.filter(file =>
33+
file.startsWith('coordinator-db-') && file.endsWith('.json')
34+
)
35+
36+
return coordinatorFiles.map(file => {
37+
// Extract program ID from filename: coordinator-db-{programId}.json
38+
const programId = file.slice('coordinator-db-'.length, -'.json'.length)
39+
return programId
40+
})
41+
} catch (error) {
42+
console.warn('Failed to discover existing coordinators:', error)
43+
return []
44+
}
45+
}
46+
2847
export function startIndexingChainToDataStores(
2948
coordinator: ServiceConfig,
3049
miningPool: ServiceConfig
@@ -141,3 +160,70 @@ export function startIndexingChainToDataStores(
141160
},
142161
}
143162
}
163+
164+
export function startIndexingMultipleCoordinators(
165+
primaryCoordinator: ServiceConfig,
166+
miningPool: ServiceConfig
167+
): {
168+
cancel: () => void
169+
coordinators: Map<string, {
170+
stopped: Promise<void>
171+
dataStore: CoordinatorDataStore
172+
errors: TimestampedError[]
173+
}>
174+
miningPool: {
175+
stopped: Promise<void>
176+
dataStore: MiningPoolDataStore
177+
errors: TimestampedError[]
178+
}
179+
} {
180+
const stateDirectory = process.env.STATE_DIRECTORY ?? process.cwd()
181+
mkdirSync(stateDirectory, { recursive: true })
182+
183+
// Discover existing coordinators from database files
184+
const existingProgramIds = discoverExistingCoordinators(stateDirectory)
185+
console.log('Discovered existing coordinators:', existingProgramIds)
186+
187+
// Start indexing the primary coordinator (current one from env)
188+
const primaryResult = startIndexingChainToDataStores(primaryCoordinator, miningPool)
189+
190+
const coordinators = new Map<string, {
191+
stopped: Promise<void>
192+
dataStore: CoordinatorDataStore
193+
errors: TimestampedError[]
194+
}>()
195+
196+
// Add primary coordinator to map
197+
const primaryProgramId = primaryCoordinator.addressOverride || coordinatorIdl.address
198+
coordinators.set(primaryProgramId, primaryResult.coordinator)
199+
200+
// Create data stores for existing coordinators (read-only mode)
201+
for (const programId of existingProgramIds) {
202+
if (programId !== primaryProgramId) {
203+
console.log(`Loading existing coordinator database: ${programId}`)
204+
205+
try {
206+
const publicKey = new PublicKey(programId)
207+
const dataStore = new FlatFileCoordinatorDataStore(stateDirectory, publicKey)
208+
209+
// Create a resolved promise since we're only reading existing data
210+
const stopped = Promise.resolve()
211+
const errors: TimestampedError[] = []
212+
213+
coordinators.set(programId, {
214+
stopped,
215+
dataStore,
216+
errors
217+
})
218+
} catch (error) {
219+
console.error(`Failed to load coordinator database for ${programId}:`, error)
220+
}
221+
}
222+
}
223+
224+
return {
225+
coordinators,
226+
miningPool: primaryResult.miningPool,
227+
cancel: primaryResult.cancel
228+
}
229+
}

website/backend/src/index.ts

Lines changed: 101 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { startIndexingChainToDataStores } from './chainTracker.js'
1+
import { startIndexingMultipleCoordinators } from './chainTracker.js'
22

33
import Fastify, { FastifyRequest } from 'fastify'
44
import cors from '@fastify/cors'
@@ -62,8 +62,8 @@ async function main() {
6262
fetch: makeRateLimitedFetch(),
6363
})
6464

65-
const { coordinator, miningPool, cancel } =
66-
await startIndexingChainToDataStores(
65+
const { coordinators, miningPool, cancel } =
66+
await startIndexingMultipleCoordinators(
6767
{
6868
connection: coordinatorRpc,
6969
addressOverride: process.env.COORDINATOR_PROGRAM_ID,
@@ -83,28 +83,31 @@ async function main() {
8383
Set<(runData: RunData) => void>
8484
> = new Map()
8585

86-
coordinator.dataStore.eventEmitter.addListener('update', (key) => {
87-
const listeners = liveRunListeners.get(key)
88-
if (listeners) {
89-
const [programId, runId, index] = getRunFromKey(key)
90-
const runData = coordinator.dataStore.getRunDataById(runId, index)
91-
if (!runData) {
92-
console.warn(
93-
`Tried to emit updates for run ${runId} but it has no data!`
94-
)
95-
return
96-
}
97-
for (const listener of listeners) {
98-
try {
99-
listener(runData)
100-
} catch (err) {
101-
console.error(
102-
`Failed to send run data for run ${runId} to subscribed client...`
86+
// Set up event listeners for all coordinators
87+
for (const [programId, coordinator] of coordinators) {
88+
coordinator.dataStore.eventEmitter.addListener('update', (key) => {
89+
const listeners = liveRunListeners.get(key)
90+
if (listeners) {
91+
const [programId, runId, index] = getRunFromKey(key)
92+
const runData = coordinator.dataStore.getRunDataById(runId, index)
93+
if (!runData) {
94+
console.warn(
95+
`Tried to emit updates for run ${runId} from coordinator ${programId} but it has no data!`
10396
)
97+
return
98+
}
99+
for (const listener of listeners) {
100+
try {
101+
listener(runData)
102+
} catch (err) {
103+
console.error(
104+
`Failed to send run data for run ${runId} from coordinator ${programId} to subscribed client...`
105+
)
106+
}
104107
}
105108
}
106-
}
107-
})
109+
})
110+
}
108111

109112
const liveMiningPoolListeners: Set<() => void> = new Set()
110113
miningPool.dataStore.eventEmitter.addListener('update', () => {
@@ -127,15 +130,18 @@ async function main() {
127130
console.log('got shutdown signal, shutting down!')
128131
cancel()
129132
await fastify.close()
130-
await Promise.all([coordinator.stopped, miningPool.stopped])
133+
const allCoordinatorPromises = Array.from(coordinators.values()).map(c => c.stopped)
134+
await Promise.all([...allCoordinatorPromises, miningPool.stopped])
131135
process.exit(0)
132136
}
133137

134138
let coordinatorCrashed: Error | null = null
135-
coordinator.stopped.catch((err) => {
136-
console.error(`[${Date.now()}] coordinator broken: `, err)
137-
coordinatorCrashed = new Error(err)
138-
})
139+
for (const [programId, coordinator] of coordinators) {
140+
coordinator.stopped.catch((err) => {
141+
console.error(`[${Date.now()}] coordinator ${programId} broken: `, err)
142+
coordinatorCrashed = new Error(err)
143+
})
144+
}
139145

140146
let miningPoolCrashed: Error | null = null
141147
miningPool.stopped.catch((err) => {
@@ -215,8 +221,26 @@ async function main() {
215221
)
216222

217223
fastify.get('/runs', (_req, res) => {
224+
// Aggregate runs from all coordinators
225+
let allRuns: any[] = []
226+
let totalTokens = 0n
227+
let totalTokensPerSecondActive = 0n
228+
229+
for (const [programId, coordinator] of coordinators) {
230+
try {
231+
const coordinatorSummary = coordinator.dataStore.getRunSummaries()
232+
allRuns = allRuns.concat(coordinatorSummary.runs)
233+
totalTokens += coordinatorSummary.totalTokens
234+
totalTokensPerSecondActive += coordinatorSummary.totalTokensPerSecondActive
235+
} catch (error) {
236+
console.error(`Failed to get run summaries from coordinator ${programId}:`, error)
237+
}
238+
}
239+
218240
const runs: ApiGetRuns = {
219-
...coordinator.dataStore.getRunSummaries(),
241+
runs: allRuns,
242+
totalTokens,
243+
totalTokensPerSecondActive,
220244
error: coordinatorCrashed,
221245
}
222246

@@ -241,14 +265,29 @@ async function main() {
241265
throw new Error(`Invalid index ${indexStr}`)
242266
}
243267

244-
const matchingRun = runId
245-
? coordinator.dataStore.getRunDataById(runId, index)
246-
: null
268+
// Search for the run across all coordinators
269+
let matchingRun: any = null
270+
let totalRuns = 0
271+
272+
if (runId) {
273+
for (const [programId, coordinator] of coordinators) {
274+
try {
275+
const run = coordinator.dataStore.getRunDataById(runId, index)
276+
if (run) {
277+
matchingRun = run
278+
break // Found the run, stop searching
279+
}
280+
totalRuns += coordinator.dataStore.getNumRuns()
281+
} catch (error) {
282+
console.error(`Failed to search for run in coordinator ${programId}:`, error)
283+
}
284+
}
285+
}
247286

248287
const data: ApiGetRun = {
249288
run: matchingRun,
250289
error: coordinatorCrashed,
251-
isOnlyRun: coordinator.dataStore.getNumRuns() === 1,
290+
isOnlyRun: totalRuns === 1,
252291
}
253292

254293
// set header for streaming/non
@@ -263,7 +302,7 @@ async function main() {
263302
}
264303

265304
const key = runKey(
266-
coordinator.dataStore.programId.toString(),
305+
matchingRun.programId,
267306
matchingRun.info.id,
268307
matchingRun.info.index
269308
)
@@ -301,24 +340,38 @@ async function main() {
301340
)
302341

303342
fastify.get('/status', async (_, res) => {
343+
// Aggregate status from all coordinators
344+
const coordinatorStatuses: Record<string, any> = {}
345+
for (const [programId, coordinator] of coordinators) {
346+
try {
347+
coordinatorStatuses[programId] = {
348+
status: coordinatorCrashed ? coordinatorCrashed.toString() : 'ok',
349+
errors: coordinator.errors,
350+
trackedRuns: coordinator.dataStore
351+
.getRunSummaries()
352+
.runs.map((r) => ({ id: r.id, index: r.index, status: r.status, programId: r.programId })),
353+
chain: {
354+
chainSlotHeight: await coordinatorRpc.getSlot('confirmed'),
355+
indexedSlot:
356+
coordinator.dataStore.lastUpdate().highestSignature?.slot ?? 0,
357+
programId: programId,
358+
networkGenesis: await coordinatorRpc.getGenesisHash(),
359+
},
360+
}
361+
} catch (error) {
362+
coordinatorStatuses[programId] = {
363+
status: `error: ${error}`,
364+
errors: [],
365+
trackedRuns: [],
366+
chain: { programId }
367+
}
368+
}
369+
}
370+
304371
const data = {
305372
commit: process.env.GITCOMMIT ?? '???',
306373
initTime,
307-
coordinator: {
308-
status: coordinatorCrashed ? coordinatorCrashed.toString() : 'ok',
309-
errors: coordinator.errors,
310-
trackedRuns: coordinator.dataStore
311-
.getRunSummaries()
312-
.runs.map((r) => ({ id: r.id, index: r.index, status: r.status })),
313-
chain: {
314-
chainSlotHeight: await coordinatorRpc.getSlot('confirmed'),
315-
indexedSlot:
316-
coordinator.dataStore.lastUpdate().highestSignature?.slot ?? 0,
317-
programId:
318-
process.env.COORDINATOR_PROGRAM_ID ?? coordinatorIdl.address,
319-
networkGenesis: await coordinatorRpc.getGenesisHash(),
320-
},
321-
},
374+
coordinators: coordinatorStatuses,
322375
miningPool: {
323376
status: miningPoolCrashed ? miningPoolCrashed.toString() : 'ok',
324377
errors: miningPool.errors,

0 commit comments

Comments
 (0)