@@ -271,8 +271,12 @@ async def destroy(self):
271271 "factory_name" :self .name
272272 }
273273 factory_name = json .dumps (nameReq , indent = 2 ).encode ('utf-8' )
274- await self .connection .broker_connection .publish ('$memphis_factory_destructions' , factory_name )
275-
274+ res = await self .connection .broker_manager .request ('$memphis_factory_destructions' , factory_name )
275+ error = res .data .decode ('utf-8' )
276+ if "mongo: no documents in result" in error :
277+ print ('Producer already destroyed' )
278+ elif error != "" :
279+ raise Exception (error )
276280 except Exception as e :
277281 raise Exception (e )
278282
@@ -291,7 +295,12 @@ async def destroy(self):
291295 "station_name" :self .name
292296 }
293297 station_name = json .dumps (nameReq , indent = 2 ).encode ('utf-8' )
294- await self .connection .broker_connection .publish ('$memphis_station_destructions' , station_name )
298+ res = await self .connection .broker_manager .request ('$memphis_station_destructions' , station_name )
299+ error = res .data .decode ('utf-8' )
300+ if "mongo: no documents in result" in error :
301+ print ('Producer already destroyed' )
302+ elif error != "" :
303+ raise Exception (error )
295304
296305 except Exception as e :
297306 raise Exception (e )
@@ -328,11 +337,16 @@ async def destroy(self):
328337 try :
329338 destroyProducerReq = {
330339 "name" : self .producer_name ,
331- "station_name" :self .station_name
340+ "station_name" : self .station_name
332341 }
333- producer_name = json .dumps (destroyProducerReq , indent = 2 ).encode ('utf-8' )
334- await self .connection .broker_connection .publish ('$memphis_producer_destructions' , producer_name )
335-
342+
343+ producer_name = json .dumps (destroyProducerReq ).encode ('utf-8' )
344+ res = await self .connection .broker_manager .request ('$memphis_producer_destructions' , producer_name )
345+ error = res .data .decode ('utf-8' )
346+ if "mongo: no documents in result" in error :
347+ print ('Producer already destroyed' )
348+ elif error != "" :
349+ raise Exception (error )
336350 except Exception as e :
337351 raise Exception (e )
338352
@@ -402,7 +416,12 @@ async def destroy(self):
402416 "station_name" :self .station_name
403417 }
404418 consumer_name = json .dumps (destroyConsumerReq , indent = 2 ).encode ('utf-8' )
405- await self .connection .broker_connection .publish ('$memphis_consumer_destructions' , consumer_name )
419+ res = await self .connection .broker_manager .request ('$memphis_consumer_destructions' , consumer_name )
420+ error = res .data .decode ('utf-8' )
421+ if "mongo: no documents in result" in error :
422+ print ('Producer already destroyed' )
423+ elif error != "" :
424+ raise Exception (error )
406425
407426 except Exception as e :
408427 raise Exception (e )
0 commit comments