Skip to content

Commit 8c9ce85

Browse files
updates fake server to wait for confirmation message before sending new message
Signed-off-by: Elena Kolevska <[email protected]>
1 parent da09a54 commit 8c9ce85

File tree

3 files changed

+33
-20
lines changed

3 files changed

+33
-20
lines changed

tests/clients/fake_dapr_server.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -179,14 +179,15 @@ def PublishEvent(self, request, context):
179179
return empty_pb2.Empty()
180180

181181
def SubscribeTopicEventsAlpha1(self, request_iterator, context):
182-
yield api_v1.SubscribeTopicEventsResponseAlpha1(
183-
initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1()
184-
)
182+
for request in request_iterator:
183+
if request.HasField('initial_request'):
184+
yield api_v1.SubscribeTopicEventsResponseAlpha1(
185+
initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1()
186+
)
187+
break
185188

186189
extensions = struct_pb2.Struct()
187-
extensions['field1'] = 'value1'
188-
extensions['field2'] = 42
189-
extensions['field3'] = True
190+
extensions.update({'field1': 'value1', 'field2': 42, 'field3': True})
190191

191192
msg1 = appcallback_v1.TopicEventRequest(
192193
id='111',
@@ -201,6 +202,10 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context):
201202
)
202203
yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg1)
203204

205+
for request in request_iterator:
206+
if request.HasField('event_processed'):
207+
break
208+
204209
msg2 = appcallback_v1.TopicEventRequest(
205210
id='222',
206211
topic='TOPIC_A',
@@ -214,9 +219,16 @@ def SubscribeTopicEventsAlpha1(self, request_iterator, context):
214219
)
215220
yield api_v1.SubscribeTopicEventsResponseAlpha1(event_message=msg2)
216221

222+
for request in request_iterator:
223+
if request.HasField('event_processed'):
224+
break
225+
217226
# On the third message simulate a disconnection
218-
status = status_pb2.Status(code=code_pb2.UNAVAILABLE, message='Simulated disconnection')
219-
context.abort_with_status(rpc_status.to_status(status))
227+
context.abort_with_status(
228+
rpc_status.to_status(
229+
status_pb2.Status(code=code_pb2.UNAVAILABLE, message='Simulated disconnection')
230+
)
231+
)
220232

221233
def SaveState(self, request, context):
222234
self.check_for_exception(context)

tests/clients/test_dapr_grpc_client_async.py

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -305,18 +305,18 @@ async def test_subscribe_topic(self):
305305

306306
# # The client already reconnected and will start reading the messages again
307307
# # Since we're working with a fake server, the messages will be the same
308-
# message4 = await subscription.next_message()
309-
# await subscription.respond_success(message4)
310-
# self.assertEqual('111', message4.id())
311-
# self.assertEqual('app1', message4.source())
312-
# self.assertEqual('com.example.type2', message4.type())
313-
# self.assertEqual('1.0', message4.spec_version())
314-
# self.assertEqual('text/plain', message4.data_content_type())
315-
# self.assertEqual('TOPIC_A', message4.topic())
316-
# self.assertEqual('pubsub', message4.pubsub_name())
317-
# self.assertEqual(b'hello2', message4.raw_data())
318-
# self.assertEqual('text/plain', message4.data_content_type())
319-
# self.assertEqual('hello2', message4.data())
308+
message4 = await subscription.next_message()
309+
await subscription.respond_success(message4)
310+
self.assertEqual('111', message4.id())
311+
self.assertEqual('app1', message4.source())
312+
self.assertEqual('com.example.type2', message4.type())
313+
self.assertEqual('1.0', message4.spec_version())
314+
self.assertEqual('text/plain', message4.data_content_type())
315+
self.assertEqual('TOPIC_A', message4.topic())
316+
self.assertEqual('pubsub', message4.pubsub_name())
317+
self.assertEqual(b'hello2', message4.raw_data())
318+
self.assertEqual('text/plain', message4.data_content_type())
319+
self.assertEqual('hello2', message4.data())
320320

321321
await subscription.close()
322322

tox.ini

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ commands =
5151
./validate.sh error_handling
5252
./validate.sh pubsub-simple
5353
./validate.sh pubsub-streaming
54+
./validate.sh pubsub-streaming-async
5455
./validate.sh state_store
5556
./validate.sh state_store_query
5657
./validate.sh secret_store

0 commit comments

Comments
 (0)