Skip to content

Commit 6b7bee1

Browse files
authored
Merge pull request #629 from clearlydefined/roman/azure_sdk
Upgrade Azure Storage SDK to a modern version
2 parents d5fa7c6 + 19aa5cd commit 6b7bee1

File tree

9 files changed

+769
-339
lines changed

9 files changed

+769
-339
lines changed

config/cdConfig.js

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ const config = require('painless-config')
55

66
const cd_azblob = {
77
connection: config.get('CRAWLER_AZBLOB_CONNECTION_STRING'),
8-
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME')
8+
container: config.get('CRAWLER_AZBLOB_CONTAINER_NAME'),
9+
account: config.get('CRAWLER_AZBLOB_ACCOUNT_NAME'),
10+
spnAuth: config.get('CRAWLER_AZBLOB_SPN_AUTH'),
11+
isSpnAuth: config.get('CRAWLER_AZBLOB_IS_SPN_AUTH') || false
912
}
1013

1114
const githubToken = config.get('CRAWLER_GITHUB_TOKEN')
@@ -111,7 +114,10 @@ module.exports = {
111114
},
112115
azqueue: {
113116
connectionString: cd_azblob.connection,
114-
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests'
117+
account: cd_azblob.account,
118+
queueName: config.get('CRAWLER_HARVESTS_QUEUE_NAME') || 'harvests',
119+
spnAuth: config.get('CRAWLER_HARVESTS_QUEUE_SPN_AUTH'),
120+
isSpnAuth: config.get('CRAWLER_HARVESTS_QUEUE_IS_SPN_AUTH') || false
115121
},
116122
'cd(azblob)': cd_azblob,
117123
'cd(file)': cd_file
@@ -135,7 +141,10 @@ module.exports = {
135141
maxDequeueCount: 5,
136142
attenuation: {
137143
ttl: 3000
138-
}
144+
},
145+
spnAuth: config.get('CRAWLER_QUEUE_AZURE_SPN_AUTH') || cd_azblob.spnAuth,
146+
account: config.get('CRAWLER_QUEUE_AZURE_ACCOUNT_NAME') || cd_azblob.account,
147+
isSpnAuth: config.get('CRAWLER_QUEUE_AZURE_IS_SPN_AUTH') || false
139148
},
140149
appVersion: config.get('APP_VERSION'),
141150
buildsha: config.get('BUILD_SHA')

ghcrawler/lib/crawler.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -638,7 +638,7 @@ class Crawler {
638638
metadata.errorMessage = request._error.message
639639
metadata.errorStack = request._error.stack
640640
}
641-
metadata.version = 1
641+
metadata.version = '1'
642642
metadata.meta = request.meta
643643
metadata.type = 'deadletter'
644644
metadata.url = request.url.replace('//', '//deadletter.')
Lines changed: 100 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,47 @@
11
// Copyright (c) Microsoft Corporation and others. Made available under the MIT license.
22
// SPDX-License-Identifier: MIT
33

4+
// eslint-disable-next-line no-unused-vars
5+
const { QueueServiceClient } = require('@azure/storage-queue')
46
const qlimit = require('qlimit')
57
const { cloneDeep } = require('lodash')
68

79
class StorageQueue {
10+
/**
11+
* @param {QueueServiceClient} client
12+
* @param {string} name
13+
* @param {string} queueName
14+
* @param {object} formatter
15+
* @param {object} options
16+
*/
817
constructor(client, name, queueName, formatter, options) {
9-
this.client = client
1018
this.name = name
1119
this.queueName = queueName
1220
this.messageFormatter = formatter
1321
this.options = options
1422
this.logger = options.logger
23+
this.queueClient = client.getQueueClient(this.queueName)
1524
}
1625

1726
async subscribe() {
18-
return new Promise((resolve, reject) => {
19-
this.client.createQueueIfNotExists(this.queueName, error => {
20-
if (error) {
21-
return reject(error)
22-
}
23-
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
24-
resolve()
25-
})
26-
})
27+
await this.queueClient.createIfNotExists()
28+
this.logger.info(`Subscribed to ${this.queueName} using Queue Storage`)
2729
}
2830

2931
async unsubscribe() {
30-
return
32+
// No specific unsubscribe logic for Azure Queue Storage
3133
}
3234

33-
async push(requests, option) {
35+
async push(requests) {
3436
requests = Array.isArray(requests) ? requests : [requests]
3537
return Promise.all(
3638
requests.map(
37-
qlimit(this.options.parallelPush || 1)(request => {
39+
qlimit(this.options.parallelPush || 1)(async request => {
3840
const body = JSON.stringify(request)
39-
return new Promise((resolve, reject) => {
40-
this.client.createMessage(this.queueName, body, option, (error, queueMessageResult) => {
41-
if (error) {
42-
return reject(error)
43-
}
44-
this._log('Queued', request)
45-
resolve(this._buildMessageReceipt(queueMessageResult, request))
46-
})
47-
})
41+
const encoded = this._encodeXMLSafe(body)
42+
const queueMessageResult = await this.queueClient.sendMessage(encoded)
43+
this._log('Queued', request)
44+
return this._buildMessageReceipt(queueMessageResult, request)
4845
})
4946
)
5047
)
@@ -56,47 +53,46 @@ class StorageQueue {
5653
}
5754

5855
async pop() {
59-
const msgOptions = { numOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
60-
return new Promise((resolve, reject) => {
61-
this.client.getMessages(this.queueName, msgOptions, (error, result) => {
62-
if (error) {
63-
return reject(error)
64-
}
65-
const message = result[0]
66-
if (!message) {
67-
this.logger.verbose('No messages to receive')
68-
return resolve(null)
69-
}
70-
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
71-
this.logger.verbose('maxDequeueCount exceeded')
72-
this.client.deleteMessage(this.queueName, message.messageId, message.popReceipt, error => {
73-
if (error) return reject(error)
74-
resolve(null)
75-
})
76-
} else {
77-
message.body = JSON.parse(message.messageText)
78-
const request = this.messageFormatter(message)
79-
request._message = message
80-
this._log('Popped', message.body)
81-
resolve(request)
82-
}
83-
})
84-
})
56+
const msgOptions = { numberOfMessages: 1, visibilityTimeout: this.options.visibilityTimeout || 60 * 60 }
57+
const response = await this.queueClient.receiveMessages(msgOptions)
58+
const message = response.receivedMessageItems[0]
59+
if (!message) {
60+
this.logger.verbose('No messages to receive')
61+
return null
62+
}
63+
if (this.options.maxDequeueCount && message.dequeueCount > this.options.maxDequeueCount) {
64+
this.logger.verbose('maxDequeueCount exceeded')
65+
try {
66+
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
67+
} catch (error) {
68+
this.logger.error(`Failed to delete message ${message.messageId} in storageQueue, error: ${error.message}`)
69+
throw error
70+
}
71+
return null
72+
} else {
73+
try {
74+
const decodedText = this._decodeXMLSafe(message.messageText)
75+
message.body = JSON.parse(decodedText)
76+
} catch (error) {
77+
this.logger.error(`Failed to parse message ${message.messageId}:`)
78+
this.logger.error(`Raw message: ${message.messageText}`)
79+
this.logger.error(`Parse error: ${error.message}`)
80+
await this.queueClient.deleteMessage(message.messageId, message.popReceipt)
81+
return null
82+
}
83+
const request = this.messageFormatter(message)
84+
request._message = message
85+
this._log('Popped', message.body)
86+
return request
87+
}
8588
}
8689

8790
async done(request) {
8891
if (!request || !request._message) {
8992
return
9093
}
91-
return new Promise((resolve, reject) => {
92-
this.client.deleteMessage(this.queueName, request._message.messageId, request._message.popReceipt, error => {
93-
if (error) {
94-
return reject(error)
95-
}
96-
this._log('ACKed', request._message.body)
97-
resolve()
98-
})
99-
})
94+
await this.queueClient.deleteMessage(request._message.messageId, request._message.popReceipt)
95+
this._log('ACKed', request._message.body)
10096
}
10197

10298
async defer(request) {
@@ -110,47 +106,30 @@ class StorageQueue {
110106
await this.updateVisibilityTimeout(request)
111107
}
112108

113-
updateVisibilityTimeout(request, visibilityTimeout = 0) {
114-
return new Promise((resolve, reject) => {
115-
// visibilityTimeout is updated to 0 to unlock/unlease the message
116-
this.client.updateMessage(
117-
this.queueName,
118-
request._message.messageId,
119-
request._message.popReceipt,
120-
visibilityTimeout,
121-
(error, result) => {
122-
if (error) {
123-
return reject(error)
124-
}
125-
this._log('NAKed', request._message.body)
126-
resolve(this._buildMessageReceipt(result, request._message.body))
127-
}
128-
)
129-
})
109+
async updateVisibilityTimeout(request, visibilityTimeout = 0) {
110+
const response = await this.queueClient.updateMessage(
111+
request._message.messageId,
112+
request._message.popReceipt,
113+
undefined,
114+
visibilityTimeout
115+
)
116+
this._log('NAKed', request._message.body)
117+
return this._buildMessageReceipt({ messageId: request._message.messageId, ...response }, request)
130118
}
131119

132120
async flush() {
133-
return new Promise((resolve, reject) => {
134-
this.client.deleteQueue(this.queueName, error => {
135-
if (error) return reject(error)
136-
this.client.createQueueIfNotExists(this.queueName, error => {
137-
if (error) return reject(error)
138-
resolve()
139-
})
140-
})
141-
})
121+
await this.queueClient.clearMessages()
122+
this.logger.info(`Flushed all messages from ${this.queueName}`)
142123
}
143124

144125
async getInfo() {
145-
return new Promise(resolve => {
146-
this.client.getQueueMetadata(this.queueName, (result, error) => {
147-
if (error) {
148-
this.logger.error(error)
149-
resolve(null)
150-
}
151-
resolve({ count: result[0].approximateMessageCount })
152-
})
153-
})
126+
try {
127+
const properties = await this.queueClient.getProperties()
128+
return { count: properties.approximateMessagesCount }
129+
} catch (error) {
130+
this.logger.error(error)
131+
return null
132+
}
154133
}
155134

156135
getName() {
@@ -164,6 +143,35 @@ class StorageQueue {
164143
isMessageNotFound(error) {
165144
return error?.code === 'MessageNotFound'
166145
}
146+
147+
_encodeXMLSafe(text) {
148+
if (typeof text !== 'string') return text
149+
150+
return (
151+
text
152+
// Handle & first to prevent double-encoding
153+
.replace(/&/g, '&')
154+
.replace(/"/g, '"')
155+
.replace(/'/g, ''')
156+
.replace(/</g, '&lt;')
157+
.replace(/>/g, '&gt;')
158+
)
159+
}
160+
161+
_decodeXMLSafe(text) {
162+
if (typeof text !== 'string') return text
163+
164+
return (
165+
text
166+
// Handle both XML and HTML encodings for quotes and apostrophes
167+
.replace(/&apos;|&#39;|&#x27;/g, "'")
168+
.replace(/&quot;|&#34;|&#x22;/g, '"')
169+
// Handle basic XML entities
170+
.replace(/&lt;|&#60;|&#x3[Cc];/g, '<')
171+
.replace(/&gt;|&#62;|&#x3[Ee];/g, '>')
172+
.replace(/&amp;|&#38;|&#x26;/g, '&') // Must be after other & entities
173+
)
174+
}
167175
}
168176

169177
module.exports = StorageQueue

ghcrawler/providers/queuing/storageQueueManager.js

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,47 @@
22
// SPDX-License-Identifier: MIT
33

44
const AttenuatedQueue = require('./attenuatedQueue')
5-
const AzureStorage = require('azure-storage')
5+
const { QueueServiceClient, StorageRetryPolicyType } = require('@azure/storage-queue')
66
const Request = require('../../lib/request')
77
const StorageQueue = require('./storageQueue')
8+
const { DefaultAzureCredential, ClientSecretCredential } = require('@azure/identity')
89

910
class StorageQueueManager {
10-
constructor(connectionString) {
11-
const retryOperations = new AzureStorage.ExponentialRetryPolicyFilter()
12-
this.client = AzureStorage.createQueueService(connectionString).withFilter(retryOperations)
11+
constructor(connectionString, options) {
12+
const pipelineOptions = {
13+
retryOptions: {
14+
maxTries: 3,
15+
retryDelayInMs: 1000,
16+
maxRetryDelayInMs: 120 * 1000,
17+
tryTimeoutInMs: 30000,
18+
retryPolicyType: StorageRetryPolicyType.EXPONENTIAL
19+
}
20+
}
21+
22+
const { account, spnAuth, isSpnAuth } = options
23+
if (isSpnAuth) {
24+
options.logger.info('using service principal credentials in storageQueueManager')
25+
const authParsed = JSON.parse(spnAuth)
26+
this.client = new QueueServiceClient(
27+
`https://${account}.queue.core.windows.net`,
28+
new ClientSecretCredential(authParsed.tenantId, authParsed.clientId, authParsed.clientSecret),
29+
pipelineOptions
30+
)
31+
return
32+
}
33+
34+
if (connectionString) {
35+
options.logger.info('using connection string in storageQueueManager')
36+
this.client = QueueServiceClient.fromConnectionString(connectionString, pipelineOptions)
37+
return
38+
}
39+
40+
options.logger.info('using default credentials in storageQueueManager')
41+
this.client = new QueueServiceClient(
42+
`https://${account}.queue.core.windows.net`,
43+
new DefaultAzureCredential(),
44+
pipelineOptions
45+
)
1346
}
1447

1548
createQueueClient(name, formatter, options) {

0 commit comments

Comments
 (0)