Skip to content

Commit a22d97f

Browse files
committed
Generalize StorageBackedQueue
StorageBackedQueue can wrap around a given queue to provide persistence in a shared storage queue. Allow parellel execution in subscribe, unsubscribe and done. Improve flush by avoiding update visibility timeout in storage.
1 parent d223acb commit a22d97f

File tree

4 files changed

+103
-108
lines changed

4 files changed

+103
-108
lines changed

.eslintrc.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
{
22
"env": {
33
"browser": false,
4-
"es6": true,
4+
"es2021": true,
55
"node": true,
66
"mocha": true
77
},

ghcrawler/providers/queuing/storageBackedInMemoryQueueManager.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// SPDX-License-Identifier: MIT
33

44
const AttenuatedQueue = require('./attenuatedQueue')
5-
const StorageBackedInMemoryQueue = require('./storageBackedInMemoryQueue')
5+
const StorageBackedQueue = require('./storageBackedQueue')
66
const InMemoryCrawlQueue = require('./inmemorycrawlqueue')
77

88
class StorageBackedInMemoryQueueManager {
@@ -12,7 +12,7 @@ class StorageBackedInMemoryQueueManager {
1212
createQueueChain(name, options) {
1313
const storageQueue = this._storageQueueManager.createQueue(name, options)
1414
const inMemoryQueue = new InMemoryCrawlQueue(name, options)
15-
const queue = StorageBackedInMemoryQueue.create(inMemoryQueue, storageQueue, options)
15+
const queue = StorageBackedQueue.create(inMemoryQueue, storageQueue, options)
1616
return new AttenuatedQueue(queue, options)
1717
}
1818
}

ghcrawler/providers/queuing/storageBackedInMemoryQueue.js renamed to ghcrawler/providers/queuing/storageBackedQueue.js

Lines changed: 31 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,10 @@ const NestedQueue = require('./nestedQueue')
66
const VISIBILITY_TIMEOUT_TO_REMAIN_ON_LOCAL_QUEUE = 8 * 60 * 60 // 8 hours
77
const VISIBILITY_TIMEOUT_FOR_PROCESSING = 1 * 60 * 60 // 1 hours, similar to storage queue pop visibility timeout
88

9-
class StorageBackedInMemoryQueue extends NestedQueue {
9+
class StorageBackedQueue extends NestedQueue {
1010

11-
constructor(memoryQueue, storageQueue, options) {
12-
super(memoryQueue)
11+
constructor(queue, storageQueue, options) {
12+
super(queue)
1313
this.options = options
1414
this.logger = options.logger
1515
this._sharedStorageQueue = storageQueue
@@ -45,63 +45,75 @@ class StorageBackedInMemoryQueue extends NestedQueue {
4545
// Message not found for the popReceipt and messageId stored in the request. This means
4646
// that the message popReceipt (and possibly messageId) in the request is stale. This can
4747
// happen when the message visibility timeout expired and thus was visible and later
48-
// updated/processed by others.Because the request is picked up by others, just log and
48+
// updated/processed by others. Because the request is picked up by others, just log and
4949
// continue to the next.
5050
this._log('Failed to update stale message', request)
5151
return false
5252
}
5353
}
5454

5555
async done(request) {
56+
await Promise.all([
57+
super.done(request),
58+
this._doneInStorage(request)])
59+
}
60+
61+
async _doneInStorage(request) {
5662
try {
57-
await super.done(request)
5863
await this._sharedStorageQueue.done(request)
5964
} catch (error) {
6065
if (!this._sharedStorageQueue.isMessageNotFound(error)) throw error
6166
// Message not found for the popReceipt and messageId stored in the request. This means
6267
// that the message popReceipt (and possibly messageId) in the request is stale. This can
6368
// happen when the message visibility timeout expired and thus was visible and later
64-
// updated by others.This is ok because the deletion of the request can be left to the
69+
// updated by others. This is ok because the deletion of the request can be left to the
6570
// caller with the updated popReceipt. Log and continue.
6671
this._log('Failed to remove stale message', request)
6772
}
6873
}
6974

7075
async subscribe() {
71-
await super.subscribe()
72-
await this._sharedStorageQueue.subscribe()
76+
await Promise.all([
77+
super.subscribe(),
78+
this._sharedStorageQueue.subscribe()])
7379
}
7480

7581
async unsubscribe() {
76-
await super.unsubscribe()
77-
await this._sharedStorageQueue.unsubscribe()
82+
const results = await Promise.allSettled([
83+
super.unsubscribe(),
84+
this._sharedStorageQueue.unsubscribe()])
85+
this._throwIfError(results, 'Failed to unsubscribe')
7886
}
7987

8088
async flush() {
8189
const deleteRequests = []
8290
const info = await this.getInfo()
8391
for (let count = info.count; count > 0; count--) {
84-
deleteRequests.push(this.pop().then(request => request && this.done(request)))
92+
const deleteOne = super.pop().then(request => this.done(request))
93+
deleteRequests.push(deleteOne)
8594
}
86-
return Promise.allSettled(deleteRequests)
87-
.then(results => {
88-
const found = results.find(result => result.status === 'rejected')
89-
if (found) throw new Error(found.reason)
90-
})
95+
const results = await Promise.allSettled(deleteRequests)
96+
this._throwIfError(results, 'Failed to flush')
97+
}
98+
99+
_throwIfError(results, message) {
100+
const errors = results.filter(result => result.status === 'rejected')
101+
.map(rejected => new Error(rejected.reason))
102+
if (errors.length) throw new AggregateError(errors, message)
91103
}
92104

93105
_log(actionMessage, request) {
94106
this.logger.verbose(`${actionMessage} ${request.type} ${request.url}`)
95107
}
96108

97-
static create(memoryQueue, storageQueue, options = {}) {
109+
static create(queue, storageQueue, options = {}) {
98110
const defaultOptions = {
99111
visibilityTimeout_remainLocal: VISIBILITY_TIMEOUT_TO_REMAIN_ON_LOCAL_QUEUE,
100112
visibilityTimeout: VISIBILITY_TIMEOUT_FOR_PROCESSING
101113
}
102114
const optionsWithDefaults = { ...defaultOptions, ...options }
103-
return new StorageBackedInMemoryQueue(memoryQueue, storageQueue, optionsWithDefaults)
115+
return new StorageBackedQueue(queue, storageQueue, optionsWithDefaults)
104116
}
105117
}
106118

107-
module.exports = StorageBackedInMemoryQueue
119+
module.exports = StorageBackedQueue

0 commit comments

Comments
 (0)