@@ -138,7 +138,7 @@ def declare_subscriptions(self):
138138 self .manoconn .subscribe (self .function_instance_scale , t .VNF_SCALE )
139139
140140 # The topic on which terminate requests are posted.
141- self .manoconn .subscribe (self .function_instance_kill , t .VNF_KILL )
141+ self .manoconn .subscribe (self .function_instance_remove , t .VNF_REMOVE )
142142
143143 def on_lifecycle_start (self , ch , mthd , prop , msg ):
144144 """
@@ -495,9 +495,7 @@ def function_instance_scale(self, ch, method, properties, payload):
495495 add_schedule = []
496496
497497 add_schedule .append ("trigger_scale_fsm" )
498- # TODO: add interaction with Mistral when FSM responds (using the
499- # content of the response)
500- add_schedule .append ("update_vnfr_after_scale" )
498+ add_schedule .append ("update_record" )
501499 add_schedule .append ("respond_to_request" )
502500
503501 self .functions [func_id ]['schedule' ].extend (add_schedule )
@@ -509,30 +507,76 @@ def function_instance_scale(self, ch, method, properties, payload):
509507
510508 return self .functions [func_id ]['schedule' ]
511509
512- def function_instance_kill (self , ch , method , properties , payload ):
510+ def function_instance_remove (self , ch , method , properties , payload ):
513511 """
514- This method starts the vnf kill workflow
512+ This method starts the vnf remove workflow
515513 """
514+ def send_error_response (error , func_id , scaling_type = None ):
515+ response = {}
516+ response ['error' ] = error
517+
518+ response ['status' ] = 'ERROR'
519+
520+ msg = ' Response on remove request: ' + str (response )
521+ LOG .info ('Function ' + str (func_id ) + msg )
522+ self .manoconn .notify (t .VNF_REMOVE ,
523+ yaml .dump (response ),
524+ correlation_id = corr_id )
516525
517526 # Don't trigger on self created messages
518527 if self .name == properties .app_id :
519528 return
520529
521- LOG .info ("Function instance kill request received." )
530+ LOG .info ("Function instance remove request received." )
522531 message = yaml .load (payload )
523532
533+ # Check if payload is ok.
534+
524535 # Extract the correlation id
525536 corr_id = properties .correlation_id
526537
527- func_id = message ['id' ]
538+ if corr_id is None :
539+ error = 'No correlation id provided in header of request'
540+ send_error_response (error , None )
541+ return
542+
543+ if not isinstance (message , dict ):
544+ error = 'Payload is not a dictionary'
545+ send_error_response (error , None )
546+ return
547+
548+ if 'vnf_id' not in message .keys ():
549+ error = 'vnf_uuid key not provided'
550+ send_error_response (error , None )
551+ return
552+
553+ func_id = message ['vnf_id' ]
554+
555+ if 'serv_id' not in message .keys ():
556+ error = 'serv_id key not provided'
557+ send_error_response (error , func_id )
558+
559+ if 'vim_id' not in message .keys ():
560+ error = 'vim_id key not provided'
561+ send_error_response (error , func_id )
528562
529563 # recreate the ledger
530- self .recreate_ledger (message , corr_id , func_id , t .VNF_KILL )
564+ self .recreate_ledger (message , corr_id , func_id , t .VNF_REMOVE )
565+
566+ vnf = self .functions [func_id ]
567+ if vnf ['error' ] is not None :
568+ send_error_response (vnf ['error' ], func_id )
569+
570+ if vnf ['vnfr' ]['status' ] == 'terminated' :
571+ error = 'VNF is already terminated'
572+ send_error_response (error , func_id )
531573
532574 # Schedule the tasks that the FLM should do for this request.
533575 add_schedule = []
534-
535- # TODO: add the relevant methods for the kill workflow
576+ add_schedule .append ('remove_vnf' )
577+ add_schedule .append ('update_record' )
578+ add_schedule .append ('terminate_fsms' )
579+ add_schedule .append ('respond_to_request' )
536580
537581 self .functions [func_id ]['schedule' ].extend (add_schedule )
538582
@@ -663,6 +707,38 @@ def resp_instant(self, ch, method, prop, payload):
663707 # Continue with the scheduled tasks
664708 self .start_next_task (func_id )
665709
710+ def terminate_fsms (self , func_id ):
711+ """
712+ This method requests the termination of all fsms associated with a VNF.
713+ """
714+
715+ vnf = self .functions [func_id ]
716+ if 'function_specific_managers' in vnf ['vnfd' ].keys ():
717+ corr_id = str (uuid .uuid4 ())
718+ self .functions [func_id ]['act_corr_id' ] = corr_id
719+
720+ LOG .info ("Function " + func_id +
721+ ": Setting termination flag for fsms." )
722+ for fsm in vnf ['vnfd' ]['function_specific_managers' ]:
723+ if 'options' not in fsm .keys ():
724+ fsm ['options' ] = []
725+ fsm ['options' ].append ({'key' : 'termination' ,
726+ 'value' : 'true' })
727+
728+ payload = yaml .dump ({'VNFD' : vnf ['vnfd' ], 'UUID' : func_id })
729+
730+ self .manoconn .call_async (self .no_resp_needed ,
731+ t .FSM_TERM ,
732+ payload )
733+
734+ def no_resp_needed (self , ch , method , prop , payload ):
735+ """
736+ Dummy response method when other component will send a response, but
737+ FLM does not need it
738+ """
739+
740+ pass
741+
666742 def deploy_vnf (self , func_id ):
667743 """
668744 This methods requests the deployment of a vnf
@@ -695,6 +771,34 @@ def deploy_vnf(self, func_id):
695771 # Pause the chain of tasks to wait for response
696772 self .functions [func_id ]['pause_chain' ] = True
697773
774+ def remove_vnf (self , func_id ):
775+ """
776+ This method requets the removal of a vnf
777+ """
778+
779+ function = self .functions [func_id ]
780+ outg_message = {}
781+ outg_message ["service_instance_id" ] = function ['serv_id' ]
782+ outg_message ['vim_uuid' ] = function ['vim_uuid' ]
783+ outg_message ['vnf_uuid' ] = func_id
784+
785+ payload = yaml .dump (outg_message )
786+
787+ corr_id = str (uuid .uuid4 ())
788+ self .functions [func_id ]['act_corr_id' ] = corr_id
789+
790+ msg = ": IA contacted for function removal."
791+ LOG .info ("Function " + func_id + msg )
792+ LOG .debug ("Payload of request: " + payload )
793+ # Contact the IA
794+ self .manoconn .call_async (self .ia_remove_response ,
795+ t .IA_REMOVE ,
796+ payload ,
797+ correlation_id = corr_id )
798+
799+ # Pause the chain of tasks to wait for response
800+ self .functions [func_id ]['pause_chain' ] = True
801+
698802 def IA_deploy_response (self , ch , method , prop , payload ):
699803 """
700804 This method handles the response from the IA on the
@@ -733,6 +837,30 @@ def IA_deploy_response(self, ch, method, prop, payload):
733837
734838 self .start_next_task (func_id )
735839
840+ def ia_remove_response (self , ch , method , prop , payload ):
841+ """
842+ This method handles responses on IA VNF remove requests.
843+ """
844+ inc_message = yaml .load (payload )
845+
846+ func_id = tools .funcid_from_corrid (self .functions , prop .correlation_id )
847+
848+ msg = "Response from IA on vnf remove call received."
849+ LOG .info ("Function " + func_id + msg )
850+
851+ if inc_message ['request_status' ] == "COMPLETED" :
852+ LOG .info ("Vnf removal successful" )
853+ self .functions [func_id ]["vnfr" ]["status" ] = "terminated"
854+
855+ else :
856+ msg = "Removal failed: " + inc_message ["message" ]
857+ LOG .info ("Function " + func_id + msg )
858+ self .functions [func_id ]["error" ] = inc_message ["message" ]
859+ self .flm_error (func_id , self .functions [func_id ]['topic' ])
860+ return
861+
862+ self .start_next_task (func_id )
863+
736864 def store_vnfr (self , func_id ):
737865 """
738866 This method stores the vnfr in the repository
@@ -772,40 +900,23 @@ def store_vnfr(self, func_id):
772900
773901 return
774902
775- def update_vnfr_after_scale (self , func_id ):
903+ def update_record (self , func_id ):
776904 """
777905 This method updates the vnfr after a vnf scale event
778906 """
779-
780- # TODO: for now, this method only updates the version
781- # number of the record. Once the mistral interaction
782- # is added, other fields of the record might need upates
783- # as well
784-
785907 error = None
786- vnfr = self .functions [func_id ]['vnfr' ]
787- vnfr_id = func_id
788908
789909 # Updating version number
790- old_version = int (vnfr ['version' ])
791- cur_version = old_version + 1
792- vnfr ['version' ] = str (cur_version )
910+ version = int (self . functions [ func_id ][ ' vnfr' ] ['version' ])
911+ version = version + 1
912+ self . functions [ func_id ][ ' vnfr' ] ['version' ] = str (version )
793913
794- # Updating the record
795- vnfr ["id" ] = vnfr_id
796- try :
797- del vnfr ["uuid" ]
798- del vnfr ["updated_at" ]
799- del vnfr ["created_at" ]
800- except :
801- pass
914+ vnfr = self .functions [func_id ]['vnfr' ]
802915
803916 # Put it
804- url = t .vnfr_path + '/' + vnfr_id
917+ url = t .vnfr_path + '/' + func_id
805918 header = {'Content-Type' : 'application/json' }
806919
807- LOG .info ("Service " + serv_id + ": VNFR update: " + url )
808-
809920 try :
810921 vnfr_resp = requests .put (url ,
811922 data = json .dumps (vnfr ),
@@ -814,19 +925,19 @@ def update_vnfr_after_scale(self, func_id):
814925 vnfr_resp_json = str (vnfr_resp .json ())
815926
816927 if (vnfr_resp .status_code == 200 ):
817- msg = ": VNFR update accepted for " + vnfr_id
818- LOG .info ("Service " + serv_id + msg )
928+ msg = ": VNFR update accepted for " + func_id
929+ LOG .info ("Function " + func_id + msg )
819930 else :
820931 msg = ": VNFR update not accepted: " + vnfr_resp_json
821- LOG .info ("Service " + serv_id + msg )
822- error = {'http_code' : vnfr_resp .status_code ,
823- 'message' : vnfr_resp_json }
932+ LOG .info ("Function " + func_id + msg )
933+ error = str (vnfr_resp .status_code ) + ': ' + str (vnfr_resp_json )
824934 except :
825- error = {'http_code' : '0' ,
826- 'message' : 'Timeout when contacting VNFR repo' }
935+ error = '400: timeout on contacting repo for VNFR update'
827936
828937 if error is not None :
829- LOG .info ("record update failed: " + str (error ))
938+ LOG .info ()
939+ msg = ": record update failed: " + str (error )
940+ LOG .info ("Function " + func_id + msg )
830941 self .functions [func_id ]["error" ] = error
831942 self .flm_error (func_id )
832943
@@ -1067,7 +1178,7 @@ def add_function_to_ledger(self, payload, corr_id, func_id, topic):
10671178 # Add keys
10681179 self .functions [func_id ]['public_key' ] = payload ['public_key' ]
10691180 self .functions [func_id ]['private_key' ] = payload ['private_key' ]
1070-
1181+
10711182 LOG .info (str (payload ['public_key' ]))
10721183
10731184 return func_id
@@ -1084,17 +1195,33 @@ def recreate_ledger(self, payload, corr_id, func_id, topic):
10841195
10851196 # Add the function to the ledger and add instance ids
10861197 self .functions [func_id ] = {}
1198+ self .functions [func_id ]['act_corr_id' ] = None
1199+ self .functions [func_id ]['error' ] = None
10871200
1088- # TODO: add the real vnfr here
1089- vnfr = {}
1090- self .functions [func_id ]['vnfr' ] = vnfr
1091-
1092- if 'vnfd' in payload .keys ():
1093- vnfd = payload ['vnfd' ]
1094- else :
1095- # TODO: retrieve VNFD from CAT based on func_id
1096- vnfd = {}
1097- self .functions [func_id ]['vnfd' ] = vnfd
1201+ # Get VNFR
1202+ get_vnfr = tools .getRestData (t .vnfr_path + '/' , func_id )
1203+ if get_vnfr ['error' ] is not None :
1204+ error = get_vnfr ['error' ] + ': ' + get_vnfr ['content' ]
1205+ self .functions [func_id ]['error' ] = error
1206+ return False
1207+ self .functions [func_id ]['vnfr' ] = get_vnfr ['content' ]
1208+ self .functions [func_id ]['vnfr' ]['id' ] = func_id
1209+ del self .functions [func_id ]['vnfr' ]['created_at' ]
1210+ del self .functions [func_id ]['vnfr' ]['updated_at' ]
1211+ del self .functions [func_id ]['vnfr' ]['uuid' ]
1212+
1213+ # GET VNFD
1214+ vnfd_id = self .functions [func_id ]['vnfr' ]['descriptor_reference' ]
1215+ LOG .info (str (t .vnfd_path + '/' + vnfd_id ))
1216+ head = {"Content-type" : "application/json" }
1217+ get_vnfd = tools .getRestData (t .vnfd_path + '/' , vnfd_id , head = head )
1218+ if get_vnfd ['error' ] is not None :
1219+ error = str (get_vnfd ['error' ]) + ': ' + str (get_vnfd ['content' ])
1220+ LOG .info (str (error ))
1221+ self .functions [func_id ]['error' ] = error
1222+ return False
1223+ self .functions [func_id ]['vnfd' ] = get_vnfd ['content' ]['vnfd' ]
1224+ LOG .info (str (self .functions [func_id ]['vnfd' ]))
10981225
10991226 self .functions [func_id ]['id' ] = func_id
11001227
@@ -1111,15 +1238,29 @@ def recreate_ledger(self, payload, corr_id, func_id, topic):
11111238 self .functions [func_id ]['serv_id' ] = payload ['serv_id' ]
11121239
11131240 # Add the VIM uuid
1114- self .functions [func_id ]['vim_uuid' ] = ''
1241+ if 'vim_id' in payload .keys ():
1242+ self .functions [func_id ]['vim_uuid' ] = payload ['vim_id' ]
1243+ else :
1244+ vdus = self .functions [func_id ]['vnfr' ]['virtual_deployment_units' ]
1245+ vim_id = vdus [0 ]['vnfc_instance' ][0 ]['vim_id' ]
1246+ self .functions [func_id ]['vim_uuid' ] = vim_id
11151247
11161248 # Create the function schedule
11171249 self .functions [func_id ]['schedule' ] = []
11181250
11191251 # Create the FSM dict if FSMs are defined in VNFD
1120- fsm_dict = tools .get_fsm_from_vnfd (vnfd )
1252+ fsm_dict = tools .get_fsm_from_vnfd (self .functions [func_id ]['vnfd' ])
1253+ LOG .info (str (fsm_dict ))
11211254 self .functions [func_id ]['fsm' ] = fsm_dict
11221255
1256+ # Setup broker connection with the SSMs of this service.
1257+ if bool (fsm_dict ) and func_id not in self .fsm_connections .keys ():
1258+ url = self .fsm_url_base + 'fsm-' + func_id
1259+ fsm_conn = messaging .ManoBrokerRequestResponseConnection (self .name ,
1260+ url = url )
1261+
1262+ self .fsm_connections [func_id ] = fsm_conn
1263+
11231264 # Create the chain pause and kill flag
11241265
11251266 self .functions [func_id ]['pause_chain' ] = False
@@ -1129,13 +1270,9 @@ def recreate_ledger(self, payload, corr_id, func_id, topic):
11291270 self .functions [func_id ]['start' ] = None
11301271 self .functions [func_id ]['stop' ] = None
11311272 self .functions [func_id ]['configure' ] = None
1132- self .functions [func_id ]['act_corr_id' ] = None
11331273 self .functions [func_id ]['message' ] = None
11341274
1135- # Add error field
1136- self .functions [func_id ]['error' ] = None
1137-
1138- return func_id
1275+ return True
11391276
11401277
11411278def main ():
0 commit comments