Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 24 additions & 27 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,13 @@ const Queue = require('promise-queue')
const env = require('./lib/env')
const dbs = require('./lib/dbs')
const statsd = require('./lib/statsd')
const { connect, cron, start } = require('./lib/scheduler')
const enterpriseSetup = require('./lib/enterprise-setup')

require('./lib/rollbar')

// if (cluster.isMaster && env.NODE_ENV !== 'development') {
// for (let i = 0; i++ < env.WORKER_SIZE;) cluster.fork()
start() // start scheduler

// cluster.on('exit', (worker, code, signal) => {
// console.log('worker %d died (%s). restarting...', worker.process.pid, signal || code)
// cluster.fork()
// })
// } else {
;(async () => {
const amqp = require('amqplib')

Expand Down Expand Up @@ -62,11 +57,12 @@ require('./lib/rollbar')
const q = queues[queueId] = queues[queueId] || new Queue(1, Infinity)
return q.add(() => worker(job))
}
channel.consume(env.EVENTS_QUEUE_NAME, consume)
channel.consume(env.JOBS_QUEUE_NAME, consume)

// connect queues with consume function
connect(channel, consume)

if (env.NODE_ENV !== 'testing') {
setInterval(function collectAccountQueueStats () {
cron('collectAccountQueueStats', function collectAccountQueueStats () {
const queueKeys = Object.keys(queues)
statsd.gauge('queues.account-jobs', queueKeys.length)
queueKeys.map((queueId) => {
Expand Down Expand Up @@ -96,28 +92,29 @@ require('./lib/rollbar')
console.log(e)
}
}

setTimeout(scheduleReminders, 5000)
setInterval(scheduleReminders, 24 * 60 * 60 * 1000)
setInterval(scheduleMonorepoReleaseSupervisor, 5 * 60 * 1000)

const isBad = (data) => {
const values = Object.values(data)
const baddies = ['gatsby', 'material-ui', 'react-cosmos']
let bad = false
baddies.forEach((baddie) => {
values.forEach((value) => {
if (String(value).match(baddie)) {
bad = true
statsd.increment('jobs.baddie', { tag: baddie })
}
})
})
return bad
}
cron('reminders', scheduleReminders, 24 * 60 * 60 * 1000)
cron('monorepoReleaseSupervisor', scheduleMonorepoReleaseSupervisor, 5 * 60 * 1000)

async function consume (job) {
const data = JSON.parse(job.content.toString())

const isBad = (data) => {
const values = Object.values(data)
const baddies = ['gatsby', 'material-ui', 'react-cosmos']
let bad = false
baddies.forEach((baddie) => {
values.forEach((value) => {
if (String(value).match(baddie)) {
bad = true
statsd.increment('jobs.baddie', { tag: baddie })
}
})
})
return bad
}

if (isBad(data)) {
channel.ack(job)
return
Expand Down
3 changes: 2 additions & 1 deletion lib/get-infos.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const githubFromGit = require('github-url-from-git')
const getRelease = require('./get-release')
const getDiffCommits = require('./get-diff-commits')
const statsd = require('../lib/statsd')
const { cron } = require('../lib/scheduler')

// returns a url object if you pass in a GitHub repositoryURL,
// returns a string with an npm URL if you just pass in a dependency name
Expand Down Expand Up @@ -70,7 +71,7 @@ function resolver ({ dependency, version, diffBase }) {
const memoizedGetInfos = _.memoize(getInfos, resolver)

if (process.env.NODE_ENV !== 'testing') {
setInterval(() => {
cron('get-info:stats', () => {
statsd.gauge('get_infos_cached', memoizedGetInfos.cache.size)
}, 60000)
}
Expand Down
3 changes: 2 additions & 1 deletion lib/github-queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const Log = require('gk-log')
const statsd = require('./statsd')
const getToken = require('./get-token')
const Github = require('../lib/github')
const { cron } = require('../lib/scheduler')

const readQueues = {}
const writeQueues = {}
Expand Down Expand Up @@ -132,7 +133,7 @@ function read (installationId, gen) {
}

if (env.NODE_ENV !== 'testing') {
setInterval(
cron('github-queue:stats',
function collectGitHubQueueStats () {
const readQueueKeys = Object.keys(readQueues)
const writeQueueKeys = Object.keys(writeQueues)
Expand Down
12 changes: 12 additions & 0 deletions lib/scheduler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
const env = require('./env')

module.exports.connect = (channel, consume) => {
channel.consume(env.EVENTS_QUEUE_NAME, consume)
channel.consume(env.JOBS_QUEUE_NAME, consume)
}

module.exports.cron = (_name, job, interval) => {
setInterval(job, interval)
}

module.exports.start = () => {}