Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions db/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,12 @@
},
"dependencies": {
"@filecoin-station/spark-evaluate": "^1.2.0",
"pg": "^8.13.3",
"pg": "^8.14.0",
"postgrator": "^8.0.0"
},
"standard": {
"env": [
"mocha"
]
}
}
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
}
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @Goddhi

44 changes: 29 additions & 15 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion stats/bin/migrate.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ import {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please undo this change to keep the diff clean

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alright, noted

const pgPools = await getPgPools()
await migrateStatsDB(pgPools.stats)
await migrateEvaluateDB(pgPools.evaluate)
await migrateEvaluateDB(pgPools.evaluate)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
await migrateEvaluateDB(pgPools.evaluate)
await migrateEvaluateDB(pgPools.evaluate)

11 changes: 7 additions & 4 deletions stats/bin/spark-stats.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
import '../lib/instrument.js'
import { createApp } from '../lib/app.js'
import { getPgPools } from '@filecoin-station/spark-stats-db'

const {
PORT = '8080',
HOST = '127.0.0.1',
SPARK_API_BASE_URL = 'https://api.filspark.com/',
REQUEST_LOGGING = 'true'
REQUEST_LOGGING = 'true',
DATABASE_URL,
EVALUATE_DB_URL
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add default values for both env variables:

Suggested change
DATABASE_URL,
EVALUATE_DB_URL
DATABASE_URL = 'postgres://localhost:5432/spark_stats',
EVALUATE_DB_URL = 'postgres://localhost:5432/spark_evaluate'

} = process.env

const pgPools = await getPgPools()

const app = await createApp({
SPARK_API_BASE_URL,
pgPools,
DATABASE_URL,
EVALUATE_DB_URL,
logger: {
level: ['1', 'true'].includes(REQUEST_LOGGING) ? 'info' : 'error'
}
})

console.log('Starting the http server on host %j port %s', HOST, PORT)
const baseUrl = app.listen({ port: Number(PORT), host: HOST })
console.log(baseUrl)

30 changes: 22 additions & 8 deletions stats/lib/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,28 +2,41 @@ import * as Sentry from '@sentry/node'
import Fastify from 'fastify'
import cors from '@fastify/cors'
import urlData from '@fastify/url-data'
import fastifyPostgres from '@fastify/postgres'

import { addRoutes } from './routes.js'
import { addPlatformRoutes } from './platform-routes.js'

/** @typedef {import('@filecoin-station/spark-stats-db').PgPools} PgPools */
/** @typedef {import('./typings.js').DateRangeFilter} DateRangeFilter */

/**
* @param {object} args
* @param {string} args.SPARK_API_BASE_URL
* @param {import('@filecoin-station/spark-stats-db').PgPools} args.pgPools
* @param {Fastify.FastifyLoggerOptions} args.logger
* @returns
* @param {string} args.DATABASE_URL - Connection string for stats database
* @param {string} args.EVALUATE_DB_URL - Connection string for evaluate database
* @param {import('fastify').FastifyLoggerOptions} args.logger
* @returns {Promise<import('fastify').FastifyInstance>}
*/
export const createApp = ({

export const createApp = async ({
SPARK_API_BASE_URL,
pgPools,
DATABASE_URL,
EVALUATE_DB_URL,
logger
}) => {
const app = Fastify({ logger })
Sentry.setupFastifyErrorHandler(app)

await app.register(fastifyPostgres, {
connectionString: DATABASE_URL,
name: 'stats'
})

await app.register(fastifyPostgres, {
connectionString: EVALUATE_DB_URL,
name: 'evaluate',
})

app.register(cors, {
origin: [
'http://localhost:3000',
Expand All @@ -33,11 +46,12 @@ export const createApp = ({
]
})
app.register(urlData)
addRoutes(app, pgPools, SPARK_API_BASE_URL)
addPlatformRoutes(app, pgPools)
addRoutes(app, SPARK_API_BASE_URL)
addPlatformRoutes(app)
app.get('/', (request, reply) => {
reply.send('OK')
})

return app
}

37 changes: 27 additions & 10 deletions stats/lib/platform-routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,39 +14,56 @@ import { filterPreHandlerHook, filterOnSendHook } from './request-helpers.js'

/** @typedef {import('./typings.js').RequestWithFilter} RequestWithFilter */

export const addPlatformRoutes = (app, pgPools) => {
/**
* Create an adapter to convert Fastify pg to the expected pgPools format
* @param {any} pg Fastify pg object
* @returns {object} pgPools compatible object
*/

function adaptPgPools(pg) {
return {
stats: pg.stats,
evaluate: pg.evaluate,
end: async () => {} // Empty implementation
};
}


export const addPlatformRoutes = (app) => {
app.register(async app => {
app.addHook('preHandler', filterPreHandlerHook)
app.addHook('onSend', filterOnSendHook)

app.get('/stations/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyStationCount(pgPools.evaluate, request.filter))
reply.send(await fetchDailyStationCount(request.server.pg.evaluate, request.filter))
})
app.get('/stations/monthly', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchMonthlyStationCount(pgPools.evaluate, request.filter))
reply.send(await fetchMonthlyStationCount(request.server.pg.evaluate, request.filter))
})
app.get('/stations/desktop/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyDesktopUsers(pgPools.stats, request.filter))
reply.send(await fetchDailyDesktopUsers(request.server.pg.stats, request.filter))
})
app.get('/measurements/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchDailyStationMeasurementCounts(pgPools.evaluate, request.filter))
reply.send(await fetchDailyStationMeasurementCounts(request.server.pg.evaluate, request.filter))
})
app.get('/participants/top-measurements', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchParticipantsWithTopMeasurements(pgPools.evaluate, request.filter))
reply.send(await fetchParticipantsWithTopMeasurements(request.server.pg.evaluate, request.filter))
})
app.get('/participants/top-earning', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchTopEarningParticipants(pgPools.stats, request.filter))
})
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchTopEarningParticipants(pgPools.stats, request.filter)) })
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchTopEarningParticipants(pgPools.stats, request.filter)) })
reply.send(await fetchTopEarningParticipants(request.server.pg.stats, request.filter)) })

What do we need the adaptPgPools function for here?


app.get('/participants/accumulative/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
reply.send(await fetchAccumulativeDailyParticipantCount(pgPools.evaluate, request.filter))
reply.send(await fetchAccumulativeDailyParticipantCount(request.server.pg.evaluate, request.filter))
})
app.get('/transfers/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchDailyRewardTransfers(pgPools.stats, request.filter))
})
})

app.get('/participants/summary', async (request, reply) => {
reply.header('cache-control', `public, max-age=${24 * 3600 /* one day */}`)
reply.send(await fetchParticipantsSummary(pgPools.evaluate))
reply.send(await fetchParticipantsSummary(request.server.pg.evaluate))
})
}
34 changes: 33 additions & 1 deletion stats/lib/routes.js
Original file line number Diff line number Diff line change
Expand Up @@ -23,57 +23,89 @@ import {
/** @typedef {import('./typings.js').RequestWithFilterAndMinerId} RequestWithFilterAndMinerId */
/** @typedef {import('./typings.js').RequestWithFilterAndClientId} RequestWithFilterAndClientId */

export const addRoutes = (app, pgPools, SPARK_API_BASE_URL) => {
/**
* Create an adapter to convert Fastify pg to the expected pgPools format
* @param {any} pg Fastify pg object
* @returns {object} pgPools compatible object
*/

function adaptPgPools(pg) {
return {
stats: pg.stats,
evaluate: pg.evaluate,
end: async () => {}
};
}

export const addRoutes = (app, SPARK_API_BASE_URL) => {
app.register(async app => {
app.addHook('preHandler', filterPreHandlerHook)
app.addHook('onSend', filterOnSendHook)

app.get('/deals/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about instead changing the signature of fetchDaily... etc to accept request.server.pg? fetchDailyDealStats(request.server.pg, request.filter) etc

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, noted, i will do that now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 on changing the signature and dropping the adaptPgPools function.

reply.send(await fetchDailyDealStats(pgPools, request.filter))
})


app.get('/deals/summary', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg)
reply.send(await fetchDealSummary(pgPools, request.filter))
})
app.get('/retrieval-success-rate', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchRetrievalSuccessRate(pgPools, request.filter))
})
app.get('/participants/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg)
reply.send(await fetchDailyParticipants(pgPools, request.filter))
})
app.get('/participants/monthly', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchMonthlyParticipants(pgPools, request.filter))
})
app.get('/participants/change-rates', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchParticipantChangeRates(pgPools, request.filter))
})
app.get('/participant/:address/scheduled-rewards', async (/** @type {RequestWithFilterAndAddress} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchParticipantScheduledRewards(pgPools, request.filter, request.params.address))
})
app.get('/participant/:address/reward-transfers', async (/** @type {RequestWithFilterAndAddress} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchParticipantRewardTransfers(pgPools, request.filter, request.params.address))
})
app.get('/miners/retrieval-success-rate/summary', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchMinersRSRSummary(pgPools, request.filter))
})
app.get('/miners/retrieval-timings/summary', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchMinersTimingsSummary(pgPools, request.filter))
})
app.get('/retrieval-result-codes/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchDailyRetrievalResultCodes(pgPools, request.filter))
})
app.get('/retrieval-timings/daily', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchDailyRetrievalTimings(pgPools, request.filter))
})
app.get('/miner/:minerId/retrieval-timings/summary', async (/** @type {RequestWithFilterAndMinerId} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchDailyMinerRetrievalTimings(pgPools, request.filter, request.params.minerId))
})
app.get('/miner/:minerId/retrieval-success-rate/summary', async (/** @type {RequestWithFilterAndMinerId} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchDailyMinerRSRSummary(pgPools, request.filter, request.params.minerId))
})
app.get('/clients/retrieval-success-rate/summary', async (/** @type {RequestWithFilter} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchClientsRSRSummary(pgPools, request.filter))
})
app.get('/client/:clientId/retrieval-success-rate/summary', async (/** @type {RequestWithFilterAndClientId} */ request, reply) => {
const pgPools = adaptPgPools(request.server.pg);
reply.send(await fetchDailyClientRSRSummary(pgPools, request.filter, request.params.clientId))
})
})
Expand Down
1 change: 1 addition & 0 deletions stats/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
"dependencies": {
"@fastify/cors": "^11.0.0",
"@fastify/url-data": "^6.0.3",
"@fastify/postgres": "^5.2.0",
"@filecoin-station/spark-stats-db": "^1.0.0",
"@sentry/node": "^9.5.0",
"@sentry/profiling-node": "^9.5.0",
Expand Down