1
1
import inspect
2
2
import time
3
3
import unittest
4
+ from dataclasses import dataclass
4
5
5
6
from iwf .client import Client
6
7
from iwf .command_request import CommandRequest , InternalChannelCommand
22
23
idle_channel_name = "test-3"
23
24
24
25
26
+ @dataclass
27
+ class Mydata :
28
+ strdata : str
29
+ intdata : int
30
+
31
+
25
32
class WaitState (WorkflowState [None ]):
26
33
def wait_until (
27
34
self ,
@@ -102,6 +109,12 @@ def test_rpc_publish_to_idle_channel(self, com: Communication, data: str):
102
109
com .publish_to_internal_channel (idle_channel_name , data )
103
110
return com .get_internal_channel_size (idle_channel_name )
104
111
112
+ @rpc ()
113
+ def test_rpc_input_type (self , input : Mydata ) -> Mydata :
114
+ if input .intdata != 100 or input .strdata != "test" :
115
+ raise Exception ("input type test failed" )
116
+ return input
117
+
105
118
106
119
class TestRPCs (unittest .TestCase ):
107
120
@classmethod
@@ -113,6 +126,13 @@ def setUpClass(cls):
113
126
def test_simple_rpc (self ):
114
127
wf_id = f"{ inspect .currentframe ().f_code .co_name } -{ time .time_ns ()} "
115
128
self .client .start_workflow (RPCWorkflow , wf_id , 10 )
129
+
130
+ input = Mydata ("test" , 100 )
131
+ output = self .client .invoke_rpc (
132
+ wf_id , RPCWorkflow .test_rpc_input_type , input , Mydata
133
+ )
134
+ assert output == input
135
+
116
136
output = self .client .invoke_rpc (wf_id , RPCWorkflow .test_simple_rpc )
117
137
assert output == 123
118
138
wf = RPCWorkflow ()
0 commit comments