Skip to content
This repository was archived by the owner on Jan 17, 2025. It is now read-only.

Commit a145960

Browse files
authored
Support new openwhisk notification for action completion (#21)
1 parent 31e0975 commit a145960

File tree

1 file changed

+11
-8
lines changed

1 file changed

+11
-8
lines changed

conductor.js

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,10 @@ const main = (() => {
117117
const session = params.$sessionId || process.env.__OW_ACTIVATION_ID
118118

119119
// initialize openwhisk instance
120-
if (!wsk) wsk = openwhisk({ ignore_certs: true })
120+
if (!wsk) {
121+
wsk = openwhisk({ ignore_certs: true })
122+
if (!notify) wsk.actions.qs_options.invoke = ['blocking', 'notify', 'cause']
123+
}
121124

122125
// redis keys
123126
const apiKey = process.env.__OW_API_KEY.substring(0, process.env.__OW_API_KEY.indexOf(':'))
@@ -298,14 +301,14 @@ const main = (() => {
298301
break
299302
case 'Task':
300303
if (typeof json.Action === 'string') { // invoke user action
304+
const invocation = notify ? { name: json.Action, params, blocking: true } : { name: json.Action, params, notify: process.env.__OW_ACTION_NAME, cause: session }
301305
return persist(fsm, state, stack)
302-
.then(() => wsk.actions.invoke({ name: json.Action, params, blocking: notify })
303-
.catch(error => error.error && error.error.response ? error.error : badRequest(`Failed to invoke action ${json.Action}: ${encodeError(error).error}`)) // catch error reponses
304-
.then(activation => db.rpushxAsync(sessionTraceKey, activation.activationId)
305-
.then(() => activation.response ? activation : new Promise(resolve => poll(activation.activationId, resolve))) // poll if timeout
306-
.then(activation => {
307-
if (notify) return wsk.actions.invoke({ name: process.env.__OW_ACTION_NAME, params: { $activationId: activation.activationId, $sessionId: session, $result: activation.response.result } })
308-
}).then(() => blocking ? getSessionResult() : { $session: session })))
306+
.then(() => wsk.actions.invoke(invocation)
307+
.catch(error => error.error && error.error.response ? error.error : badRequest(`Failed to invoke action ${json.Action}: ${encodeError(error).error}`))) // catch error reponses
308+
.then(activation => db.rpushxAsync(sessionTraceKey, activation.activationId)
309+
.then(() => activation.response || !notify ? activation : new Promise(resolve => poll(activation.activationId, resolve)))) // poll if timeout
310+
.then(activation => notify && wsk.actions.invoke({ name: process.env.__OW_ACTION_NAME, params: { $activationId: activation.activationId, $sessionId: session, $result: activation.response.result } }))
311+
.then(() => blocking ? getSessionResult() : { $session: session })
309312
} else if (typeof json.Value !== 'undefined') { // value
310313
params = JSON.parse(JSON.stringify(json.Value))
311314
inspect()

0 commit comments

Comments
 (0)