Skip to content

Commit 9ac0a7e

Browse files
authored
Merge pull request #493 from qtomlinson/qt/delay_cleanup
Delay cleanup of cached fetch result and add StorageBackedInMemoryQueue
2 parents 422ad80 + aa27606 commit 9ac0a7e

24 files changed

+838
-98
lines changed

.eslintrc.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"env": {
33
"browser": false,
4-
"es6": true,
4+
"es2021": true,
55
"node": true,
66
"mocha": true
77
},

config/cdConfig.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ const cd_file = {
1515
}
1616
const crawlerStoreProvider = config.get('CRAWLER_STORE_PROVIDER') || 'cd(file)'
1717
const maxRequeueAttemptCount = config.get('CRAWLER_MAX_REQUEUE_ATTEMPTS') || 5
18+
const fetchedCacheTtlSeconds = config.get('CRAWLER_FETCHED_CACHE_TTL_SECONDS') || 60 * 60 * 8 //8 hours
1819

1920
module.exports = {
2021
provider: 'memory', // change this to redis if/when we want distributed config
@@ -30,7 +31,7 @@ module.exports = {
3031
fetch: {
3132
dispatcher: 'cdDispatch',
3233
cdDispatch: {
33-
fetched: { defaultTtlSeconds: 60 * 60 * 8 }
34+
fetched: { defaultTtlSeconds: fetchedCacheTtlSeconds }
3435
},
3536
cocoapods: { githubToken },
3637
cratesio: {},
@@ -123,6 +124,7 @@ module.exports = {
123124
connectionString: cd_azblob.connection,
124125
queueName: config.get('CRAWLER_QUEUE_PREFIX') || 'cdcrawlerdev',
125126
visibilityTimeout: 8 * 60 * 60, // 8 hours
127+
visibilityTimeout_remainLocal: fetchedCacheTtlSeconds,
126128
maxDequeueCount: 5,
127129
attenuation: {
128130
ttl: 3000

ghcrawler/crawlerFactory.js

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class CrawlerFactory {
5959
} = {}
6060
) {
6161
logger.info('creating crawler')
62-
queues = queues || CrawlerFactory.createScopedQueueSets(options.queue)
62+
queues = queues || CrawlerFactory.createQueues(options.queue)
6363
store = store || CrawlerFactory.createStore(options.store)
6464
deadletters = deadletters || CrawlerFactory.createDeadLetterStore(options.deadletter)
6565
locker = locker || CrawlerFactory.createLocker(options.lock)
@@ -218,10 +218,9 @@ class CrawlerFactory {
218218
return new QueueSet([immediate, soon, normal, later], options)
219219
}
220220

221-
static createScopedQueueSets(queueOptions) {
222-
const globalQueues = CrawlerFactory.createQueues(queueOptions)
223-
const memoryOpts = { provider: 'memory', memory: queueOptions[queueOptions.provider] }
224-
const localQueues = CrawlerFactory.createQueues(memoryOpts)
221+
static createScopedQueueSets({ globalManager, localManager }, queueOptions) {
222+
const globalQueues = CrawlerFactory.createQueueSet(globalManager, queueOptions)
223+
const localQueues = CrawlerFactory.createQueueSet(localManager, queueOptions)
225224
return new ScopedQueueSets(globalQueues, localQueues)
226225
}
227226
}

ghcrawler/providers/queuing/inmemorycrawlqueue.js

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,10 @@ class InMemoryCrawlQueue {
2626
return
2727
}
2828

29+
async unsubscribe() {
30+
return
31+
}
32+
2933
async pop() {
3034
const result = this.queue.shift()
3135
if (!result) {

ghcrawler/providers/queuing/memoryFactory.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ module.exports = options => {
1111
return new AttenuatedQueue(new InMemoryCrawlQueue(name, options), options)
1212
}
1313
}
14-
return CrawlerFactory.createQueueSet(manager, options)
14+
return CrawlerFactory.createScopedQueueSets({ globalManager: manager, localManager: manager }, options)
1515
}

ghcrawler/providers/queuing/scopedQueueSets.js

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
// (c) Copyright 2022, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
22
// SPDX-License-Identifier: MIT
3+
const debug = require('debug')('crawler:scopedQueueSets')
4+
debug.log = console.info.bind(console)
35

46
class ScopedQueueSets {
57
constructor(globalQueues, localQueues) {
@@ -83,9 +85,10 @@ class ScopedQueueSets {
8385
for (let count = info.count; count > 0; count--) {
8486
localRequests.push(
8587
localQueue.pop()
86-
.then(request => request && localQueue.done(request)
87-
.then(() => this.push(request, localQueue.getName(), 'global'))))
88+
.then(request => request && localQueue.done(request).then(() => request.createRequeuable()))
89+
.then(request => request && this.push(request, localQueue.getName(), 'global')))
8890
}
91+
debug(`publishing ${localRequests.length} to ${localQueue.getName()}`)
8992
return Promise.all(localRequests)
9093
}
9194

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// (c) Copyright 2022, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const AttenuatedQueue = require('./attenuatedQueue')
5+
const StorageBackedQueue = require('./storageBackedQueue')
6+
const InMemoryCrawlQueue = require('./inmemorycrawlqueue')
7+
8+
class StorageBackedInMemoryQueueManager {
9+
constructor(storageQueueManager) {
10+
this._storageQueueManager = storageQueueManager
11+
}
12+
createQueueChain(name, options) {
13+
const storageQueue = this._storageQueueManager.createQueue(name, options)
14+
const inMemoryQueue = new InMemoryCrawlQueue(name, options)
15+
const queue = StorageBackedQueue.create(inMemoryQueue, storageQueue, options)
16+
return new AttenuatedQueue(queue, options)
17+
}
18+
}
19+
20+
module.exports = StorageBackedInMemoryQueueManager
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
// (c) Copyright 2022, SAP SE and ClearlyDefined contributors. Licensed under the MIT license.
2+
// SPDX-License-Identifier: MIT
3+
4+
const NestedQueue = require('./nestedQueue')
5+
6+
const VISIBILITY_TIMEOUT_TO_REMAIN_ON_LOCAL_QUEUE = 8 * 60 * 60 // 8 hours
7+
const VISIBILITY_TIMEOUT_FOR_PROCESSING = 1 * 60 * 60 // 1 hours, similar to storage queue pop visibility timeout
8+
9+
class StorageBackedQueue extends NestedQueue {
10+
11+
constructor(queue, storageQueue, options) {
12+
super(queue)
13+
this.options = options
14+
this.logger = options.logger
15+
this._sharedStorageQueue = storageQueue
16+
}
17+
18+
async push(requests) {
19+
requests = Array.isArray(requests) ? requests : [requests]
20+
await this._pushToStorage(requests)
21+
await super.push(requests)
22+
}
23+
24+
async _pushToStorage(requests) {
25+
const visibilityTimeout = this.options.visibilityTimeout_remainLocal
26+
const storageReceipts = await this._sharedStorageQueue.push(requests, { visibilityTimeout })
27+
requests.map((request, index) => Object.assign(request, storageReceipts[index]))
28+
}
29+
30+
async pop() {
31+
const request = await super.pop()
32+
if (!request) return
33+
const success = await this._hideInStorage(request)
34+
if (success) return request
35+
return await this.pop()
36+
}
37+
38+
async _hideInStorage(request) {
39+
try {
40+
const receipt = await this._sharedStorageQueue.updateVisibilityTimeout(request, this.options.visibilityTimeout)
41+
Object.assign(request, receipt)
42+
return true
43+
} catch (error) {
44+
if (!this._sharedStorageQueue.isMessageNotFound(error)) throw error
45+
// Message not found for the popReceipt and messageId stored in the request. This means
46+
// that the message popReceipt (and possibly messageId) in the request is stale. This can
47+
// happen when the message visibility timeout expired and thus was visible and later
48+
// updated/processed by others. Because the request is picked up by others, just log and
49+
// continue to the next.
50+
this._log('Failed to update stale message', request)
51+
return false
52+
}
53+
}
54+
55+
async done(request) {
56+
await Promise.all([
57+
super.done(request),
58+
this._doneInStorage(request)])
59+
}
60+
61+
async _doneInStorage(request) {
62+
try {
63+
await this._sharedStorageQueue.done(request)
64+
} catch (error) {
65+
if (!this._sharedStorageQueue.isMessageNotFound(error)) throw error
66+
// Message not found for the popReceipt and messageId stored in the request. This means
67+
// that the message popReceipt (and possibly messageId) in the request is stale. This can
68+
// happen when the message visibility timeout expired and thus was visible and later
69+
// updated by others. This is ok because the deletion of the request can be left to the
70+
// caller with the updated popReceipt. Log and continue.
71+
this._log('Failed to remove stale message', request)
72+
}
73+
}
74+
75+
async subscribe() {
76+
await Promise.all([
77+
super.subscribe(),
78+
this._sharedStorageQueue.subscribe()])
79+
}
80+
81+
async unsubscribe() {
82+
const results = await Promise.allSettled([
83+
super.unsubscribe(),
84+
this._sharedStorageQueue.unsubscribe()])
85+
this._throwIfError(results, 'Failed to unsubscribe')
86+
}
87+
88+
async flush() {
89+
const deleteRequests = []
90+
const info = await this.getInfo()
91+
for (let count = info.count; count > 0; count--) {
92+
const deleteOne = super.pop().then(request => this.done(request))
93+
deleteRequests.push(deleteOne)
94+
}
95+
const results = await Promise.allSettled(deleteRequests)
96+
this._throwIfError(results, 'Failed to flush')
97+
}
98+
99+
_throwIfError(results, message) {
100+
const errors = results.filter(result => result.status === 'rejected')
101+
.map(rejected => new Error(rejected.reason))
102+
if (errors.length) throw new AggregateError(errors, message)
103+
}
104+
105+
_log(actionMessage, request) {
106+
this.logger.verbose(`${actionMessage} ${request.type} ${request.url}`)
107+
}
108+
109+
static create(queue, storageQueue, options = {}) {
110+
const defaultOptions = {
111+
visibilityTimeout_remainLocal: VISIBILITY_TIMEOUT_TO_REMAIN_ON_LOCAL_QUEUE,
112+
visibilityTimeout: VISIBILITY_TIMEOUT_FOR_PROCESSING
113+
}
114+
const optionsWithDefaults = { ...defaultOptions, ...options }
115+
return new StorageBackedQueue(queue, storageQueue, optionsWithDefaults)
116+
}
117+
}
118+
119+
module.exports = StorageBackedQueue

ghcrawler/providers/queuing/storageQueue.js

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// SPDX-License-Identifier: MIT
33

44
const qlimit = require('qlimit')
5+
const { cloneDeep } = require('lodash')
56

67
class StorageQueue {
78
constructor(client, name, queueName, formatter, options) {
@@ -29,26 +30,32 @@ class StorageQueue {
2930
return
3031
}
3132

32-
async push(requests) {
33+
async push(requests, option) {
3334
requests = Array.isArray(requests) ? requests : [requests]
3435
return Promise.all(
3536
requests.map(
3637
qlimit(this.options.parallelPush || 1)(request => {
3738
const body = JSON.stringify(request)
3839
return new Promise((resolve, reject) => {
39-
this.client.createMessage(this.queueName, body, error => {
40+
this.client.createMessage(this.queueName, body, option, (error, queueMessageResult) => {
4041
if (error) {
4142
return reject(error)
4243
}
4344
this._log('Queued', request)
44-
resolve()
45+
resolve(this._buildMessageReceipt(queueMessageResult, request))
4546
})
4647
})
4748
})
4849
)
4950
)
5051
}
5152

53+
_buildMessageReceipt(queueMessageResult, requestBody) {
54+
const _message = { ...queueMessageResult, body: cloneDeep(requestBody) }
55+
return { _message }
56+
}
57+
58+
5259
async pop() {
5360
const msgOptions = { numOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
5461
return new Promise((resolve, reject) => {
@@ -101,14 +108,18 @@ class StorageQueue {
101108
if (!request || !request._message) {
102109
return
103110
}
111+
await this.updateVisibilityTimeout(request)
112+
}
113+
114+
updateVisibilityTimeout(request, visibilityTimeout = 0) {
104115
return new Promise((resolve, reject) => {
105116
// visibilityTimeout is updated to 0 to unlock/unlease the message
106-
this.client.updateMessage(this.queueName, request._message.messageId, request._message.popReceipt, 0, error => {
117+
this.client.updateMessage(this.queueName, request._message.messageId, request._message.popReceipt, visibilityTimeout, (error, result) => {
107118
if (error) {
108119
return reject(error)
109120
}
110121
this._log('NAKed', request._message.body)
111-
resolve()
122+
resolve(this._buildMessageReceipt(result, request._message.body))
112123
})
113124
})
114125
}
@@ -144,6 +155,10 @@ class StorageQueue {
144155
_log(actionMessage, message) {
145156
this.logger.verbose(`${actionMessage} ${message.type} ${message.url}`)
146157
}
158+
159+
isMessageNotFound(error) {
160+
return error?.code === 'MessageNotFound'
161+
}
147162
}
148163

149164
module.exports = StorageQueue

ghcrawler/providers/queuing/storageQueueFactory.js

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@
33

44
const StorageQueueManager = require('./storageQueueManager')
55
const CrawlerFactory = require('../../crawlerFactory')
6+
const StorageBackedInMemoryQueueManager = require('./storageBackedInMemoryQueueManager')
67

78
module.exports = options => {
89
const { connectionString } = options
9-
const manager = new StorageQueueManager(connectionString, options)
10-
return CrawlerFactory.createQueueSet(manager, options)
10+
const storageQueueManager = new StorageQueueManager(connectionString, options)
11+
const localManager = new StorageBackedInMemoryQueueManager(storageQueueManager)
12+
return CrawlerFactory.createScopedQueueSets({ globalManager: storageQueueManager, localManager}, options)
1113
}

0 commit comments

Comments
 (0)