1
1
import json
2
2
from datetime import datetime
3
- from typing import List , Any , Awaitable , Optional , Dict
3
+ from typing import List , Any , Optional , Dict , Union
4
4
from time import time
5
5
from asyncio import sleep
6
6
from urllib .parse import urlparse , quote
11
11
from .DurableOrchestrationStatus import DurableOrchestrationStatus
12
12
from .RpcManagementOptions import RpcManagementOptions
13
13
from .OrchestrationRuntimeStatus import OrchestrationRuntimeStatus
14
- from ..models import DurableOrchestrationBindings
14
+ from ..models . DurableOrchestrationBindings import DurableOrchestrationBindings
15
15
from .utils .http_utils import get_async_request , post_async_request , delete_async_request
16
16
from azure .functions ._durable_functions import _serialize_custom_object
17
17
@@ -44,8 +44,8 @@ def __init__(self, context: str):
44
44
45
45
async def start_new (self ,
46
46
orchestration_function_name : str ,
47
- instance_id : str = None ,
48
- client_input : object = None ) -> Awaitable [ str ] :
47
+ instance_id : Optional [ str ] = None ,
48
+ client_input : Optional [ Any ] = None ) -> str :
49
49
"""Start a new instance of the specified orchestrator function.
50
50
51
51
If an orchestration instance with the specified ID already exists, the
@@ -55,10 +55,10 @@ async def start_new(self,
55
55
----------
56
56
orchestration_function_name : str
57
57
The name of the orchestrator function to start.
58
- instance_id : str
58
+ instance_id : Optional[ str]
59
59
The ID to use for the new orchestration instance. If no instance id is specified,
60
60
the Durable Functions extension will generate a random GUID (recommended).
61
- client_input : object
61
+ client_input : Optional[Any]
62
62
JSON-serializable input value for the orchestrator function.
63
63
64
64
Returns
@@ -69,22 +69,25 @@ async def start_new(self,
69
69
request_url = self ._get_start_new_url (
70
70
instance_id = instance_id , orchestration_function_name = orchestration_function_name )
71
71
72
- response = await self ._post_async_request (request_url , self ._get_json_input (client_input ))
72
+ response : List [Any ] = await self ._post_async_request (
73
+ request_url , self ._get_json_input (client_input ))
73
74
74
- if response [0 ] <= 202 and response [1 ]:
75
+ status_code : int = response [0 ]
76
+ if status_code <= 202 and response [1 ]:
75
77
return response [1 ]["id" ]
76
- elif response [ 0 ] == 400 :
78
+ elif status_code == 400 :
77
79
# Orchestrator not found, report clean exception
78
- exception_data = response [1 ]
80
+ exception_data : Dict [ str , str ] = response [1 ]
79
81
exception_message = exception_data ["ExceptionMessage" ]
80
82
raise Exception (exception_message )
81
83
else :
82
84
# Catch all: simply surfacing the durable-extension exception
83
85
# we surface the stack trace too, since this may be a more involed exception
84
- exception_message = response [1 ]
85
- raise Exception (exception_message )
86
+ ex_message : Any = response [1 ]
87
+ raise Exception (ex_message )
86
88
87
- def create_check_status_response (self , request , instance_id ):
89
+ def create_check_status_response (
90
+ self , request : func .HttpRequest , instance_id : str ) -> func .HttpResponse :
88
91
"""Create a HttpResponse that contains useful information for \
89
92
checking the status of the specified instance.
90
93
@@ -148,16 +151,16 @@ def get_client_response_links(
148
151
payload = self ._orchestration_bindings .management_urls .copy ()
149
152
150
153
for key , _ in payload .items ():
151
- request_is_not_none = not (request is None )
152
- if request_is_not_none and request .url :
154
+ if not (request is None ) and request .url :
153
155
payload [key ] = self ._replace_url_origin (request .url , payload [key ])
154
156
payload [key ] = payload [key ].replace (
155
157
self ._orchestration_bindings .management_urls ["id" ], instance_id )
156
158
157
159
return payload
158
160
159
- async def raise_event (self , instance_id , event_name , event_data = None ,
160
- task_hub_name = None , connection_name = None ):
161
+ async def raise_event (
162
+ self , instance_id : str , event_name : str , event_data : Any = None ,
163
+ task_hub_name : str = None , connection_name : str = None ) -> None :
161
164
"""Send an event notification message to a waiting orchestration instance.
162
165
163
166
In order to handle the event, the target orchestration instance must be
@@ -169,7 +172,7 @@ async def raise_event(self, instance_id, event_name, event_data=None,
169
172
The ID of the orchestration instance that will handle the event.
170
173
event_name : str
171
174
The name of the event.
172
- event_data : any , optional
175
+ event_data : Any , optional
173
176
The JSON-serializable data associated with the event.
174
177
task_hub_name : str, optional
175
178
The TaskHubName of the orchestration that will handle the event.
@@ -183,8 +186,8 @@ async def raise_event(self, instance_id, event_name, event_data=None,
183
186
Exception
184
187
Raises an exception if the status code is 404 or 400 when raising the event.
185
188
"""
186
- if not event_name :
187
- raise ValueError ("event_name must be a valid string." )
189
+ if event_name == "" :
190
+ raise ValueError ("event_name must be a non-empty string." )
188
191
189
192
request_url = self ._get_raise_event_url (
190
193
instance_id , event_name , task_hub_name , connection_name )
@@ -203,9 +206,9 @@ async def raise_event(self, instance_id, event_name, event_data=None,
203
206
if error_message :
204
207
raise Exception (error_message )
205
208
206
- async def get_status (self , instance_id : str , show_history : bool = None ,
207
- show_history_output : bool = None ,
208
- show_input : bool = None ) -> DurableOrchestrationStatus :
209
+ async def get_status (self , instance_id : str , show_history : bool = False ,
210
+ show_history_output : bool = False ,
211
+ show_input : bool = False ) -> DurableOrchestrationStatus :
209
212
"""Get the status of the specified orchestration instance.
210
213
211
214
Parameters
@@ -268,7 +271,8 @@ async def get_status_all(self) -> List[DurableOrchestrationStatus]:
268
271
if error_message :
269
272
raise Exception (error_message )
270
273
else :
271
- return [DurableOrchestrationStatus .from_json (o ) for o in response [1 ]]
274
+ statuses : List [Any ] = response [1 ]
275
+ return [DurableOrchestrationStatus .from_json (o ) for o in statuses ]
272
276
273
277
async def get_status_by (self , created_time_from : datetime = None ,
274
278
created_time_to : datetime = None ,
@@ -291,6 +295,7 @@ async def get_status_by(self, created_time_from: datetime = None,
291
295
DurableOrchestrationStatus
292
296
The status of the requested orchestration instances
293
297
"""
298
+ # TODO: do we really want folks to us this without specifying all the args?
294
299
options = RpcManagementOptions (created_time_from = created_time_from ,
295
300
created_time_to = created_time_to ,
296
301
runtime_status = runtime_status )
@@ -326,19 +331,20 @@ async def purge_instance_history(self, instance_id: str) -> PurgeHistoryResult:
326
331
response = await self ._delete_async_request (request_url )
327
332
return self ._parse_purge_instance_history_response (response )
328
333
329
- async def purge_instance_history_by (self , created_time_from : datetime = None ,
330
- created_time_to : datetime = None ,
331
- runtime_status : List [OrchestrationRuntimeStatus ] = None ) \
334
+ async def purge_instance_history_by (
335
+ self , created_time_from : Optional [datetime ] = None ,
336
+ created_time_to : Optional [datetime ] = None ,
337
+ runtime_status : Optional [List [OrchestrationRuntimeStatus ]] = None ) \
332
338
-> PurgeHistoryResult :
333
339
"""Delete the history of all orchestration instances that match the specified conditions.
334
340
335
341
Parameters
336
342
----------
337
- created_time_from : datetime
343
+ created_time_from : Optional[ datetime]
338
344
Delete orchestration history which were created after this Date.
339
- created_time_to: datetime
345
+ created_time_to: Optional[ datetime]
340
346
Delete orchestration history which were created before this Date.
341
- runtime_status: List[OrchestrationRuntimeStatus]
347
+ runtime_status: Optional[ List[OrchestrationRuntimeStatus] ]
342
348
Delete orchestration instances which match any of the runtimeStatus values
343
349
in this list.
344
350
@@ -347,14 +353,15 @@ async def purge_instance_history_by(self, created_time_from: datetime = None,
347
353
PurgeHistoryResult
348
354
The results of the request to purge history
349
355
"""
356
+ # TODO: do we really want folks to us this without specifying all the args?
350
357
options = RpcManagementOptions (created_time_from = created_time_from ,
351
358
created_time_to = created_time_to ,
352
359
runtime_status = runtime_status )
353
360
request_url = options .to_url (self ._orchestration_bindings .rpc_base_url )
354
361
response = await self ._delete_async_request (request_url )
355
362
return self ._parse_purge_instance_history_response (response )
356
363
357
- async def terminate (self , instance_id : str , reason : str ):
364
+ async def terminate (self , instance_id : str , reason : str ) -> None :
358
365
"""Terminate the specified orchestration instance.
359
366
360
367
Parameters
@@ -364,6 +371,11 @@ async def terminate(self, instance_id: str, reason: str):
364
371
reason: str
365
372
The reason for terminating the instance.
366
373
374
+ Raises
375
+ ------
376
+ Exception:
377
+ When the terminate call failed with an unexpected status code
378
+
367
379
Returns
368
380
-------
369
381
None
@@ -446,7 +458,8 @@ async def wait_for_completion_or_create_check_status_response(
446
458
return self .create_check_status_response (request , instance_id )
447
459
448
460
@staticmethod
449
- def _create_http_response (status_code : int , body : Any ) -> func .HttpResponse :
461
+ def _create_http_response (
462
+ status_code : int , body : Union [str , Any ]) -> func .HttpResponse :
450
463
body_as_json = body if isinstance (body , str ) else json .dumps (body )
451
464
response_args = {
452
465
"status_code" : status_code ,
@@ -459,7 +472,7 @@ def _create_http_response(status_code: int, body: Any) -> func.HttpResponse:
459
472
return func .HttpResponse (** response_args )
460
473
461
474
@staticmethod
462
- def _get_json_input (client_input : object ) -> str :
475
+ def _get_json_input (client_input : object ) -> Optional [ str ] :
463
476
"""Serialize the orchestrator input.
464
477
465
478
Parameters
@@ -469,8 +482,10 @@ def _get_json_input(client_input: object) -> str:
469
482
470
483
Returns
471
484
-------
472
- str
473
- A string representing the JSON-serialization of `client_input`
485
+ Optional[str]
486
+ If `client_input` is not None, return a string representing
487
+ the JSON-serialization of `client_input`. Otherwise, returns
488
+ None
474
489
475
490
Exceptions
476
491
----------
@@ -482,7 +497,7 @@ def _get_json_input(client_input: object) -> str:
482
497
return None
483
498
484
499
@staticmethod
485
- def _replace_url_origin (request_url , value_url ) :
500
+ def _replace_url_origin (request_url : str , value_url : str ) -> str :
486
501
request_parsed_url = urlparse (request_url )
487
502
value_parsed_url = urlparse (value_url )
488
503
request_url_origin = '{url.scheme}://{url.netloc}/' .format (url = request_parsed_url )
@@ -491,7 +506,8 @@ def _replace_url_origin(request_url, value_url):
491
506
return value_url
492
507
493
508
@staticmethod
494
- def _parse_purge_instance_history_response (response : [int , Any ]):
509
+ def _parse_purge_instance_history_response (
510
+ response : List [Any ]) -> PurgeHistoryResult :
495
511
switch_statement = {
496
512
200 : lambda : PurgeHistoryResult .from_json (response [1 ]), # instance completed
497
513
404 : lambda : PurgeHistoryResult (instancesDeleted = 0 ), # instance not found
@@ -506,17 +522,20 @@ def _parse_purge_instance_history_response(response: [int, Any]):
506
522
else :
507
523
raise Exception (result )
508
524
509
- def _get_start_new_url (self , instance_id , orchestration_function_name ):
525
+ def _get_start_new_url (
526
+ self , instance_id : Optional [str ], orchestration_function_name : str ) -> str :
510
527
instance_path = f'/{ instance_id } ' if instance_id is not None else ''
511
528
request_url = f'{ self ._orchestration_bindings .rpc_base_url } orchestrators/' \
512
529
f'{ orchestration_function_name } { instance_path } '
513
530
return request_url
514
531
515
- def _get_raise_event_url (self , instance_id , event_name , task_hub_name , connection_name ):
532
+ def _get_raise_event_url (
533
+ self , instance_id : str , event_name : str ,
534
+ task_hub_name : Optional [str ], connection_name : Optional [str ]) -> str :
516
535
request_url = f'{ self ._orchestration_bindings .rpc_base_url } ' \
517
536
f'instances/{ instance_id } /raiseEvent/{ event_name } '
518
537
519
- query = []
538
+ query : List [ str ] = []
520
539
if task_hub_name :
521
540
query .append (f'taskHub={ task_hub_name } ' )
522
541
0 commit comments