Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 14 additions & 19 deletions src/app.js
Original file line number Diff line number Diff line change
@@ -1,25 +1,20 @@
import { call, ifElse, juxt, pipe } from 'ramda'
import initManager from './manager'
import initWorker from './worker'
import setupProcessHandlers from './processHandlers'

// initApp :: (Configuration, Logger, Queue) => _
export default (configuration, logger, queue) => call(pipe(
juxt([
ifElse(
() => configuration.manager.enabled,
() => initManager(configuration, logger, queue),
() => () => {}, // Empty graceful shutdown function
),
ifElse(
() => configuration.worker.enabled,
() => initWorker(configuration, logger, queue),
() => () => {}, // Empty graceful shutdown function
),
]),
([ shutdownManager, shutdownWorker ]) => async () => {
export default (configuration, logger, queue) => {
const shutdownManager = configuration.manager.enabled
? initManager(configuration, logger, queue)
: async () => {}

const shutdownWorker = configuration.worker.enabled
? initWorker(configuration, logger, queue)
: async () => {}

const shutdownFunction = async () => {
await shutdownWorker()
await shutdownManager()
},
shutdownFunction => setupProcessHandlers(logger, shutdownFunction),
))
}

return setupProcessHandlers(logger, shutdownFunction)
}
11 changes: 0 additions & 11 deletions src/configuration.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,14 @@ const DEFAULT_WORKER_RENDERER_CHROME_OPTIONS = [
'--use-gl=disabled',
]

// isDefined :: Mixed -> Boolean
const isDefined = both(complement(isNil), complement(isEmpty))

// isLogConfigurationValid :: Configuration -> Boolean
const isLogConfigurationValid = compose(includes(__, validLogLevels), path(['log', 'level']))

// isQueueConfigurationValid :: Configuration -> Boolean
const isQueueConfigurationValid = compose(isDefined, path(['queue', 'redis_dsn']))

// isManagerConfigurationValid :: Configuration -> Boolean
const isManagerConfigurationValid = T

// isWorkerConfigurationValid :: Configuration -> Boolean
const isWorkerConfigurationValid = pipe(
path(['worker', 'renderer', 'redirections']),
reduce(
Expand All @@ -60,28 +55,23 @@ const isWorkerConfigurationValid = pipe(
),
)

// validate :: Configuration -> Boolean
const validate = allPass([
isLogConfigurationValid,
isQueueConfigurationValid,
isManagerConfigurationValid,
isWorkerConfigurationValid,
])

// stringToArray :: String -> String -> [String]
const stringToArray = separator => pipe(
split(separator),
map(trim),
filter(complement(anyPass([isNil, isEmpty]))),
)

// commaSeparatedStringToArray :: String -> [String]
const commaSeparatedStringToArray = stringToArray(',')

// pipeSeparatedStringToArray :: String -> [String]
const pipeSeparatedStringToArray = stringToArray('|')

// generate :: _ -> Configuration
const generate = () => ({
log: {
level: process.env.LOG_LEVEL ?? LEVEL_INFO,
Expand Down Expand Up @@ -125,7 +115,6 @@ const generate = () => ({
},
})

// createConfiguration :: _ -> Configuration
export default pipe(
generate,
unless(validate, () => { throw new Error('Invalid configuration.') }),
Expand Down
75 changes: 30 additions & 45 deletions src/logger.js
Original file line number Diff line number Diff line change
@@ -1,14 +1,3 @@
import { F, always, equals, findIndex, gte, ifElse, pickAll } from 'ramda'

/**
* @type Logger = {
* error :: String -> _,
* info :: String -> _,
* debug :: String -> _,
* warn :: String -> _,
* }
*/

export const LEVEL_ERROR = 'ERROR'
export const LEVEL_WARN = 'WARN'
export const LEVEL_INFO = 'INFO'
Expand All @@ -21,50 +10,46 @@ export const levels = [
LEVEL_DEBUG,
]

// resolveLogLevelIndex :: String -> Number
const resolveLogLevelIndex = level => findIndex(equals(level), levels)
const resolveLogLevelIndex = level => levels.indexOf(level)

// loggerHead :: (String, String) -> Boolean
const shouldPrintLog = (loggerLevel, logLevel) => gte(
resolveLogLevelIndex(loggerLevel),
resolveLogLevelIndex(logLevel),
const shouldPrintLog = (loggerLevel, logLevel) => (
resolveLogLevelIndex(loggerLevel) >= resolveLogLevelIndex(logLevel)
)

// loggerHead :: String-> String
const loggerHead = type => `[${(new Date()).toISOString()}] ${type.toUpperCase()}:`

// error :: (String, Output) -> Function
const error = (level, output) => ifElse(
level => shouldPrintLog(level, LEVEL_ERROR),
() => (...args) => output.error(loggerHead(LEVEL_ERROR), ...args),
always(F),
)(level)
const noop = () => false

const error = (level, output) => (
shouldPrintLog(level, LEVEL_ERROR)
? (...args) => output.error(loggerHead(LEVEL_ERROR), ...args)
: noop
)

// warn :: (String, Output) -> Function
const warn = (level, output) => ifElse(
level => shouldPrintLog(level, LEVEL_WARN),
() => (...args) => output.warn(loggerHead(LEVEL_WARN), ...args),
always(F),
)(level)
const warn = (level, output) => (
shouldPrintLog(level, LEVEL_WARN)
? (...args) => output.warn(loggerHead(LEVEL_WARN), ...args)
: noop
)

// info :: (String, Output) -> Function
const info = (level, output) => ifElse(
level => shouldPrintLog(level, LEVEL_INFO),
() => (...args) => output.info(loggerHead(LEVEL_INFO), ...args),
always(F),
)(level)
const info = (level, output) => (
shouldPrintLog(level, LEVEL_INFO)
? (...args) => output.info(loggerHead(LEVEL_INFO), ...args)
: noop
)

// debug :: (String, Output) -> Function
const debug = (level, output) => ifElse(
level => shouldPrintLog(level, LEVEL_DEBUG),
() => (...args) => output.log(loggerHead(LEVEL_DEBUG), ...args),
always(F),
)(level)
const debug = (level, output) => (
shouldPrintLog(level, LEVEL_DEBUG)
? (...args) => output.log(loggerHead(LEVEL_DEBUG), ...args)
: noop
)

// formatException :: Error -> String
export const formatException = e => JSON.stringify(pickAll(['code', 'message', 'stack'], e))
export const formatException = e => JSON.stringify({
code: e?.code,
message: e?.message,
stack: e?.stack,
})

// createLogger :: (String, Output) -> Logger
export default (level, output) => ({
error: error(level, output),
info: info(level, output),
Expand Down
21 changes: 11 additions & 10 deletions src/manager/http-server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,18 @@ import attachNotFoundMiddleware from './middlewares/notFound'
import attachNotSupportedMiddleware from './middlewares/notSupported'
import attachRenderMiddleware from './middlewares/render'
import express from 'express'
import { pipe } from 'ramda'

// createHttpServer => (Configuration, Logger, Queue, RequestRegistry) -> HttpServer
export default (configuration, logger, queue, requestRegistry) => pipe(
attachRenderMiddleware(configuration, logger, queue, requestRegistry),
attachNotFoundMiddleware,
attachNotSupportedMiddleware,
attachErrorMiddleware(logger),
app => app.listen(
export default (configuration, logger, queue, requestRegistry) => {
const app = express()

attachRenderMiddleware(configuration, logger, queue, requestRegistry)(app)
attachNotFoundMiddleware(app)
attachNotSupportedMiddleware(app)
attachErrorMiddleware(logger)(app)

return app.listen(
configuration.manager.http_server.port,
configuration.manager.http_server.host,
() => logger.debug('Manager http server started.'),
),
)(express())
)
}
1 change: 0 additions & 1 deletion src/manager/http-server/middlewares/error.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { formatException } from '../../../logger'

// errorMiddleware :: Logger -> Express.Application -> Void
export default logger => app =>
app.use((error, req, res, next) => {
logger.error(formatException(error))
Expand Down
1 change: 0 additions & 1 deletion src/manager/http-server/middlewares/notFound.js
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
// default :: Express.app -> Express.app
export default app => app.get(/.*/, (req, res) => res.status(404).send())
1 change: 0 additions & 1 deletion src/manager/http-server/middlewares/notSupported.js
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
// default :: Express.app -> Express.app
export default app => app.use((req, res) => res.status(405).send())
38 changes: 18 additions & 20 deletions src/manager/http-server/middlewares/render.js
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
import { call, complement, compose, ifElse, isNil, path, pipe } from 'ramda'
import { DEFAULT_JOB_OPTIONS } from '../../../queue'

// default :: (Configuration, Logger, Queue, RequestRegistry) -> Express.app -> Express.app
export default (configuration, logger, queue, requestRegistry) => app =>
app.get('/render', (req, res, next) => call(pipe(
() => logger.debug(`Render request for url "${req.query.url}" started.`),
ifElse(
() => compose(complement(isNil), path(['query', 'url']))(req),
pipe(
() => requestRegistry.add(req, res, next),
jobId => queue.add({
url: req.query.url,
queuedAt: Date.now(),
}, {
...DEFAULT_JOB_OPTIONS,
timeout: configuration.queue.job.timeout,
jobId,
}),
),
() => res.status(400).end('Missing url query parameter.'),
),
)))
app.get('/render', (req, res, next) => {
logger.debug(`Render request for url "${req.query.url}" started.`)

if (req.query?.url == null) {
return res.status(400).end('Missing url query parameter.')
}

const jobId = requestRegistry.add(req, res, next)

return queue.add({
url: req.query.url,
queuedAt: Date.now(),
}, {
...DEFAULT_JOB_OPTIONS,
timeout: configuration.queue.job.timeout,
jobId,
})
})
27 changes: 13 additions & 14 deletions src/manager/initManager.js
Original file line number Diff line number Diff line change
@@ -1,22 +1,21 @@
import { call, pipe, tap } from 'ramda'
import createRequestRegistry from './requestRegistry'
import initHttpServer from './http-server'
import queueJobCompletedHandler from './queue/jobCompletedHandler'
import queueJobFailedHandler from './queue/jobFailedHandler'

// initManager :: (Configuration, Logger, Queue) -> Function
export default (configuration, logger, queue) => call(pipe(
tap(() => logger.debug('Initializing manager.')),
() => createRequestRegistry(),
tap(requestRegistry => queue.on('global:completed', queueJobCompletedHandler(logger, queue, requestRegistry))),
tap(requestRegistry => queue.on('global:failed', queueJobFailedHandler(logger, queue, requestRegistry))),
requestRegistry => initHttpServer(configuration, logger, queue, requestRegistry),
tap(() => logger.debug('Manager initialized.')),
// Returns a function to be used to gracefully shutdown the manager
httpServer => async () => {
logger.debug('Gracefully shutting down manager')
export default (configuration, logger, queue) => {
logger.debug('Initializing manager.')
const requestRegistry = createRequestRegistry()

queue.on('global:completed', queueJobCompletedHandler(logger, queue, requestRegistry))
queue.on('global:failed', queueJobFailedHandler(logger, queue, requestRegistry))

const httpServer = initHttpServer(configuration, logger, queue, requestRegistry)
logger.debug('Manager initialized.')

return async () => {
logger.debug('Gracefully shutting down manager')
await queue.close()
await httpServer.close()
},
))
}
}
21 changes: 8 additions & 13 deletions src/manager/queue/jobCompletedHandler.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,12 @@
import { juxt, when } from 'ramda'
import { resolveJobDuration } from './utils'

// jobCompletedHandler :: (Logger, Queue, RequestRegistry) -> Function
export default (logger, queue, requestRegistry) => (jobId, result) => when(
jobId => requestRegistry.has(jobId),
juxt([
jobId => requestRegistry.complete(jobId, JSON.parse(result)),
async jobId => {
const job = await queue.getJob(jobId)
export default (logger, queue, requestRegistry) => async (jobId, result) => {
if (!requestRegistry.has(jobId)) return
requestRegistry.complete(jobId, JSON.parse(result))

logger.info(`${jobId} ${job.data.url} 200 ${resolveJobDuration(job)}`)
const job = await queue.getJob(jobId)
if (!job) return

job.remove()
},
]),
)(jobId)
logger.info(`${jobId} ${job.data.url} 200 ${resolveJobDuration(job)}`)
await job.remove()
}
36 changes: 14 additions & 22 deletions src/manager/queue/jobFailedHandler.js
Original file line number Diff line number Diff line change
@@ -1,27 +1,19 @@
import { T, always, cond, juxt, test, when } from 'ramda'
import { resolveJobDuration } from './utils'

// resolveStatusCodeFromError :: String -> Integer
const resolveStatusCodeFromError = cond([
[error => test(/timeout/i, error), always(504)],
[error => test(/timed out/i, error), always(504)],
[T, always(500)],
])
const resolveStatusCodeFromError = error => {
if (/timeout/i.test(error) || /timed out/i.test(error)) return 504
return 500
}

// jobFailedHandler :: (Logger, Queue, RequestRegistry) -> Function
export default (logger, queue, requestRegistry) => (jobId, error) => when(
jobId => requestRegistry.has(jobId),
juxt([
jobId => requestRegistry.fail(
jobId,
resolveStatusCodeFromError(error),
),
async jobId => {
const job = await queue.getJob(jobId)
export default (logger, queue, requestRegistry) => async (jobId, error) => {
if (!requestRegistry.has(jobId)) return

logger.error(`${jobId} ${job.data.url} ${resolveStatusCodeFromError(error)} ${resolveJobDuration(job)} ${error}`)
const statusCode = resolveStatusCodeFromError(error)
requestRegistry.fail(jobId, statusCode)

job.remove()
},
]),
)(jobId)
const job = await queue.getJob(jobId)
if (!job) return

logger.error(`${jobId} ${job.data.url} ${statusCode} ${resolveJobDuration(job)} ${error}`)
await job.remove()
}
1 change: 0 additions & 1 deletion src/manager/queue/utils.js
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
// resolveJobDuration :: Job -> Integer
export const resolveJobDuration = job => (Date.now() - job.data.queuedAt) / 1000
10 changes: 0 additions & 10 deletions src/manager/requestRegistry.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,5 @@
import { v4 as uuidv4 } from 'uuid'

/**
* @type RequestRegistry = {
* has :: String -> Boolean,
* add :: (Request, Response, Next) -> String,
* complete :: (String, String) -> _,
* fail :: (String, String) -> _,
* }
*/

// createRequestRegistry :: () -> RequestRegistry
export default () => ({
_requests: {},

Expand Down
Loading