Skip to content

Commit 2f5bee0

Browse files
author
Florian Treml
committed
BOT-2097 added queue handling
1 parent 5f925e5 commit 2f5bee0

File tree

3 files changed

+133
-113
lines changed

3 files changed

+133
-113
lines changed

index.js

Lines changed: 132 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ const { DirectLine, ConnectionStatus } = require('botframework-directlinejs')
55
const debug = require('debug')('botium-connector-directline3')
66
const FormData = require('form-data')
77
const fetch = require('node-fetch')
8+
const Queue = require('better-queue')
89
const fs = require('fs')
910
const path = require('path')
1011
const xhr2 = require('xhr2')
@@ -74,6 +75,10 @@ class BotiumConnectorDirectline3 {
7475

7576
async Start () {
7677
debug('Start called')
78+
this.queue = new Queue((input, cb) => {
79+
input().then(result => cb(null, result)).catch(err => cb(err))
80+
})
81+
7782
global.XMLHttpRequest = xhr2
7883
global.WebSocket = ws
7984
if (debug.enabled) {
@@ -261,7 +266,7 @@ class BotiumConnectorDirectline3 {
261266
botMsg.messageText = message.type
262267
}
263268
}
264-
setTimeout(() => this.queueBotSays(botMsg), 0)
269+
this._runInQueue(async () => { this.queueBotSays(botMsg) })
265270
}
266271
},
267272
err => {
@@ -336,131 +341,134 @@ class BotiumConnectorDirectline3 {
336341
return resultPromise
337342
}
338343

339-
UserSays (msg) {
344+
async UserSays (msg) {
340345
debug('UserSays called')
341-
return new Promise(async (resolve, reject) => { // eslint-disable-line no-async-promise-executor
342-
const activity = Object.assign({}, msg.sourceData || this.caps.DIRECTLINE3_ACTIVITY_TEMPLATE || {})
343-
if (msg.buttons && msg.buttons.length > 0 && (msg.buttons[0].text || msg.buttons[0].payload)) {
344-
let payload = msg.buttons[0].payload || msg.buttons[0].text
345-
try {
346-
payload = JSON.parse(payload)
347-
} catch (err) {
348-
}
349-
activity.type = this.caps[Capabilities.DIRECTLINE3_BUTTON_TYPE]
350-
_.set(activity, this.caps[Capabilities.DIRECTLINE3_BUTTON_VALUE_FIELD], payload)
351-
} else {
352-
if (!activity.type) {
353-
activity.type = 'message'
354-
}
355-
if (!_.isUndefined(msg.messageText)) {
356-
activity.text = msg.messageText
357-
}
346+
347+
const activity = Object.assign({}, msg.sourceData || this.caps.DIRECTLINE3_ACTIVITY_TEMPLATE || {})
348+
if (msg.buttons && msg.buttons.length > 0 && (msg.buttons[0].text || msg.buttons[0].payload)) {
349+
let payload = msg.buttons[0].payload || msg.buttons[0].text
350+
try {
351+
payload = JSON.parse(payload)
352+
} catch (err) {
358353
}
359-
if (!activity.from) {
360-
activity.from = { id: this.me }
361-
} else if (!activity.from.id) {
362-
activity.from.id = this.me
354+
activity.type = this.caps[Capabilities.DIRECTLINE3_BUTTON_TYPE]
355+
_.set(activity, this.caps[Capabilities.DIRECTLINE3_BUTTON_VALUE_FIELD], payload)
356+
} else {
357+
if (!activity.type) {
358+
activity.type = 'message'
363359
}
364-
365-
if (msg.forms) {
366-
activity.value = activity.value || {}
367-
msg.forms.forEach(f => {
368-
_.set(activity.value, f.name, f.value)
369-
})
360+
if (!_.isUndefined(msg.messageText)) {
361+
activity.text = msg.messageText
370362
}
363+
}
364+
if (!activity.from) {
365+
activity.from = { id: this.me }
366+
} else if (!activity.from.id) {
367+
activity.from.id = this.me
368+
}
371369

372-
if (msg.SET_ACTIVITY_VALUE) {
373-
_.keys(msg.SET_ACTIVITY_VALUE).forEach(key => {
374-
_.set(activity, key, msg.SET_ACTIVITY_VALUE[key])
375-
})
376-
}
370+
if (msg.forms) {
371+
activity.value = activity.value || {}
372+
msg.forms.forEach(f => {
373+
_.set(activity.value, f.name, f.value)
374+
})
375+
}
377376

378-
// validating the activity
379-
if (activity.text) {
380-
if (_.isObject(activity.text)) {
381-
const msg = `Activity is not correct. There is a JSON ${JSON.stringify(activity.text)} in text field. Check your capabilities`
382-
if (this.caps.DIRECTLINE3_ACTIVITY_VALIDATION === 'error') {
383-
reject(new Error(msg))
384-
} else {
385-
debug(msg)
386-
}
377+
if (msg.SET_ACTIVITY_VALUE) {
378+
_.keys(msg.SET_ACTIVITY_VALUE).forEach(key => {
379+
_.set(activity, key, msg.SET_ACTIVITY_VALUE[key])
380+
})
381+
}
382+
383+
// validating the activity
384+
if (activity.text) {
385+
if (_.isObject(activity.text)) {
386+
const msg = `Activity is not correct. There is a JSON ${JSON.stringify(activity.text)} in text field. Check your capabilities`
387+
if (this.caps.DIRECTLINE3_ACTIVITY_VALIDATION === 'error') {
388+
throw new Error(msg)
389+
} else {
390+
debug(msg)
387391
}
388392
}
393+
}
389394

390-
if (msg.media && msg.media.length > 0) {
391-
debug('Posting activity with attachments ', JSON.stringify(activity, null, 2))
392-
msg.sourceData = Object.assign(msg.sourceData || {}, { activity })
393-
394-
const formData = new FormData()
395-
396-
formData.append('activity', Buffer.from(JSON.stringify(activity)), {
397-
contentType: 'application/vnd.microsoft.activity',
398-
filename: 'blob'
399-
})
400-
401-
for (let i = 0; i < msg.media.length; i++) {
402-
const attachment = msg.media[i]
403-
const attachmentName = path.basename(attachment.mediaUri)
404-
405-
if (attachment.buffer) {
406-
formData.append('file', attachment.buffer, {
407-
filename: attachmentName
408-
})
409-
} else if (attachment.downloadUri && attachment.downloadUri.startsWith('file://')) {
410-
// This check is maybe not required. If possible and safe, MediaImport extracts Buffer from downloadUri.
411-
// This if-case should not be called at all.
412-
if (!this.caps[CoreCapabilities.SECURITY_ALLOW_UNSAFE]) {
413-
return reject(new BotiumError(
414-
'Security Error. Illegal configured MediaInput. Sending attachment using the filesystem is not allowed',
415-
{
416-
type: 'security',
417-
subtype: 'allow unsafe',
418-
source: 'botium-connector-directline',
419-
cause: { attachment }
420-
}
421-
))
422-
}
423-
const filepath = attachment.downloadUri.split('file://')[1]
424-
formData.append('file', fs.createReadStream(filepath), {
425-
filename: attachmentName
426-
})
427-
} else if (attachment.downloadUri) {
428-
const res = await fetch(attachment.downloadUri)
429-
const body = await res.buffer()
430-
431-
formData.append('file', body, {
432-
filename: attachmentName
433-
})
434-
} else {
435-
return reject(new Error(`Media attachment ${attachment.mediaUri} not downloaded`))
395+
if (msg.media && msg.media.length > 0) {
396+
debug('Posting activity with attachments ', JSON.stringify(activity, null, 2))
397+
msg.sourceData = Object.assign(msg.sourceData || {}, { activity })
398+
399+
const formData = new FormData()
400+
401+
formData.append('activity', Buffer.from(JSON.stringify(activity)), {
402+
contentType: 'application/vnd.microsoft.activity',
403+
filename: 'blob'
404+
})
405+
406+
for (let i = 0; i < msg.media.length; i++) {
407+
const attachment = msg.media[i]
408+
const attachmentName = path.basename(attachment.mediaUri)
409+
410+
if (attachment.buffer) {
411+
formData.append('file', attachment.buffer, {
412+
filename: attachmentName
413+
})
414+
} else if (attachment.downloadUri && attachment.downloadUri.startsWith('file://')) {
415+
// This check is maybe not required. If possible and safe, MediaImport extracts Buffer from downloadUri.
416+
// This if-case should not be called at all.
417+
if (!this.caps[CoreCapabilities.SECURITY_ALLOW_UNSAFE]) {
418+
throw new BotiumError(
419+
'Security Error. Illegal configured MediaInput. Sending attachment using the filesystem is not allowed',
420+
{
421+
type: 'security',
422+
subtype: 'allow unsafe',
423+
source: 'botium-connector-directline',
424+
cause: { attachment }
425+
}
426+
)
436427
}
428+
const filepath = attachment.downloadUri.split('file://')[1]
429+
formData.append('file', fs.createReadStream(filepath), {
430+
filename: attachmentName
431+
})
432+
} else if (attachment.downloadUri) {
433+
const res = await fetch(attachment.downloadUri)
434+
const body = await res.buffer()
435+
436+
formData.append('file', body, {
437+
filename: attachmentName
438+
})
439+
} else {
440+
throw new Error(`Media attachment ${attachment.mediaUri} not downloaded`)
437441
}
442+
}
438443

439-
await this.directLine.checkConnection(true)
440-
const uploadUrl = `${this.directLine.domain}/conversations/${this.directLine.conversationId}/upload?userId=${activity.from.id}`
441-
debug(`Uploading attachments to ${uploadUrl}`)
442-
fetch(uploadUrl, {
443-
method: 'POST',
444-
headers: {
445-
Authorization: `Bearer ${this.directLine.token}`
446-
},
447-
body: formData
448-
}).then(async (res) => {
444+
await this.directLine.checkConnection(true)
445+
const uploadUrl = `${this.directLine.domain}/conversations/${this.directLine.conversationId}/upload?userId=${activity.from.id}`
446+
debug(`Uploading attachments to ${uploadUrl}`)
447+
return this._runInQueue(async () => {
448+
try {
449+
const res = await fetch(uploadUrl, {
450+
method: 'POST',
451+
headers: {
452+
Authorization: `Bearer ${this.directLine.token}`
453+
},
454+
body: formData
455+
})
449456
const json = await res.json()
450457
if (json && json.id) {
451458
debug('Posted activity with attachments, assigned ID:', json.id)
452-
resolve()
453459
} else {
454-
reject(new Error('Error posting activity with attachments, no activity id returned'))
460+
throw new Error('No activity id returned')
455461
}
456-
}).catch(err => {
462+
} catch (err) {
457463
debug('Error posting activity with attachments', err)
458-
reject(new Error(`Error posting activity: ${err.message || err}`))
459-
})
460-
} else {
461-
debug('Posting activity ', JSON.stringify(activity, null, 2))
462-
msg.sourceData = Object.assign(msg.sourceData || {}, { activity })
464+
throw new Error(`Error posting activity: ${err.message || err}`)
465+
}
466+
})
467+
} else {
468+
debug('Posting activity ', JSON.stringify(activity, null, 2))
469+
msg.sourceData = Object.assign(msg.sourceData || {}, { activity })
463470

471+
return this._runInQueue(() => new Promise((resolve, reject) => {
464472
this.directLine.postActivity(activity).subscribe(
465473
id => {
466474
debug('Posted activity, assigned ID:', id)
@@ -471,20 +479,19 @@ class BotiumConnectorDirectline3 {
471479
reject(new Error(`Error posting activity: ${err.message || err}`))
472480
}
473481
)
474-
}
475-
})
482+
}))
483+
}
476484
}
477485

478-
Stop () {
486+
async Stop () {
479487
debug('Stop called')
488+
this.queue = null
480489
this._stopSubscription()
481-
return Promise.resolve()
482490
}
483491

484-
Clean () {
492+
async Clean () {
485493
debug('Clean called')
486494
this._stopSubscription()
487-
return Promise.resolve()
488495
}
489496

490497
_stopSubscription () {
@@ -513,6 +520,18 @@ class BotiumConnectorDirectline3 {
513520
}
514521
}
515522

523+
async _runInQueue (fn) {
524+
if (this.queue) {
525+
return new Promise((resolve, reject) => {
526+
this.queue.push(fn)
527+
.on('finish', resolve)
528+
.on('failed', reject)
529+
})
530+
} else {
531+
throw new Error('Connector not yet started')
532+
}
533+
}
534+
516535
_deepFilter (item, selectFn, filterFn) {
517536
if (!item) {
518537
return []

package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
},
3939
"dependencies": {
4040
"@babel/runtime": "^7.8.7",
41+
"better-queue": "^3.8.10",
4142
"botframework-directlinejs": "^0.11.6",
4243
"debug": "^4.1.1",
4344
"form-data": "^3.0.0",

samples/convo/spec/convo/logo.png

3.26 KB
Loading

0 commit comments

Comments
 (0)