3
3
#
4
4
# Copyright © 2019, SAS Institute Inc., Cary, NC, USA. All Rights Reserved.
5
5
# SPDX-License-Identifier: Apache-2.0
6
+ import datetime
6
7
8
+ from ..core import PagedItemIterator
7
9
from .service import Service
8
10
9
11
10
12
class Workflow (Service ):
11
13
"""The Workflow API provides basic resources for list, prompt details,
12
14
and running workflow processes.
15
+
16
+ Warnings
17
+ --------
18
+ Note that this service is intentionally not included in the sasctl documentation.
19
+ As of 2022.09 this service is not publicly documented on developer.sas.com and is
20
+ only partially implemented here in order to provide functionality for the Model
21
+ Manager service. All methods included in this service should be treated as
22
+ experimental and are subject to change without notice.
23
+
13
24
"""
14
25
15
26
_SERVICE_ROOT = "/workflow"
16
27
17
- def list_definitions (self , include_enabled = True , include_disabled = False ):
28
+ @classmethod
29
+ def list_definitions (cls , include_enabled = True , include_disabled = False ):
18
30
"""List workflow definitions.
19
31
20
32
Parameters
@@ -40,9 +52,16 @@ def list_definitions(self, include_enabled=True, include_disabled=False):
40
52
return []
41
53
42
54
# Header required to prevent 400 ERROR Bad Request
43
- return self .get (url , headers = {"Accept-Language" : "en-US" })
55
+ results = cls .get (url , headers = {"Accept-Language" : "en-US" })
44
56
45
- def list_enabled_definitions (self ):
57
+ if results is None :
58
+ return []
59
+ if isinstance (results , (list , PagedItemIterator )):
60
+ return results
61
+ return [results ]
62
+
63
+ @classmethod
64
+ def list_enabled_definitions (cls ):
46
65
"""List process definitions that are currently enabled.
47
66
48
67
Returns
@@ -51,9 +70,10 @@ def list_enabled_definitions(self):
51
70
The list of definitions.
52
71
53
72
"""
54
- return self .list_definitions (include_enabled = True , include_disabled = False )
73
+ return cls .list_definitions (include_enabled = True , include_disabled = False )
55
74
56
- def list_workflow_prompt (self , name ):
75
+ @classmethod
76
+ def list_workflow_prompt (cls , name ):
57
77
"""List prompt Workflow Processes Definitions.
58
78
59
79
Parameters
@@ -68,7 +88,7 @@ def list_workflow_prompt(self, name):
68
88
69
89
"""
70
90
71
- ret = self ._find_specific_workflow (name )
91
+ ret = cls ._find_specific_workflow (name )
72
92
if ret is None :
73
93
raise ValueError ("No Workflow enabled for %s name or id." % name )
74
94
@@ -78,44 +98,86 @@ def list_workflow_prompt(self, name):
78
98
# No prompt inputs on workflow
79
99
return None
80
100
81
- def run_workflow_definition (self , name , input = None ):
101
+ @classmethod
102
+ def run_workflow_definition (cls , name , prompts = None ):
82
103
"""Runs specific Workflow Processes Definitions.
83
104
84
105
Parameters
85
106
----------
86
107
name : str
87
108
Name or ID of an enabled workflow to execute
88
- input : dict, optional
89
- Input values for the workflow for initial workflow prompt
109
+ prompts : dict, optional
110
+ Input values to provide for the initial workflow prompts. Should be
111
+ specified as name:value pairs.
90
112
91
113
Returns
92
114
-------
93
115
RestObj
94
116
The executing workflow
95
117
96
118
"""
97
-
98
- workflow = self ._find_specific_workflow (name )
119
+ workflow = cls ._find_specific_workflow (name )
99
120
if workflow is None :
100
121
raise ValueError ("No Workflow enabled for %s name or id." % name )
101
122
102
- if input is None :
103
- return self .post (
123
+ if prompts is None :
124
+ return cls .post (
104
125
"/processes?definitionId=" + workflow ["id" ],
105
126
headers = {"Content-Type" : "application/vnd.sas.workflow.variables+json" },
106
127
)
107
- if isinstance (input , dict ):
108
- return self .post (
128
+ if isinstance (prompts , dict ):
129
+
130
+ variables = []
131
+
132
+ # For each prompt defined in the workflow, check if a value was provided.
133
+ for prompt in workflow .prompts :
134
+ if prompt ["variableName" ] in prompts :
135
+ name = prompt ["variableName" ]
136
+ value = prompts [name ]
137
+
138
+ if type (value ) == datetime .datetime :
139
+ # NOTE: do not use isinstance() to compare types as
140
+ # datetime.date will also evaluate as True.
141
+
142
+ # Explicitly convert to local time zone if not set.
143
+ if value .tzinfo is None :
144
+ try :
145
+ value = value .astimezone ()
146
+ except OSError :
147
+ # On Windows pre-1970 dates will cause an issue.
148
+ # See https://bugs.python.org/issue36759
149
+ pass
150
+
151
+ if value .tzinfo is None :
152
+ # Failed to convert to local time. Have to just assume it's UTC.
153
+ # Example: 2023-01-25T13:49:40.726162Z
154
+ value = value .isoformat () + "Z"
155
+ else :
156
+ # Example: 2023-01-25T13:49:40.726162-05:00
157
+ value = value .isoformat ()
158
+
159
+ variables .append (
160
+ {
161
+ "name" : name ,
162
+ "value" : value ,
163
+ "scope" : "local" ,
164
+ "type" : prompt ["variableType" ],
165
+ "version" : prompt ["version" ],
166
+ }
167
+ )
168
+
169
+ return cls .post (
109
170
"/processes?definitionId=" + workflow ["id" ],
110
- json = input ,
171
+ json = { "variables" : variables } ,
111
172
headers = {"Content-Type" : "application/vnd.sas.workflow.variables+json" },
112
173
)
113
174
114
- def _find_specific_workflow (self , name ):
175
+ @classmethod
176
+ def _find_specific_workflow (cls , name ):
115
177
# Internal helper method
116
178
# Finds a workflow with the name (can be a name or id)
117
179
# Returns a dict objects of the workflow
118
- listendef = self .list_enabled_definitions ()
180
+ listendef = cls .list_enabled_definitions ()
119
181
for tmp in listendef :
120
182
if tmp ["name" ] == name :
121
183
return tmp
0 commit comments