@@ -671,3 +671,45 @@ def test_ovsdb_connections(self):
671
671
** kwargs )
672
672
self .assertEqual (set (idlutils .parse_connection (conns )),
673
673
set (idl ._session .remotes ))
674
+
675
+
676
+ class TestPortBindingChassisEvent (base .TestOVNFunctionalBase ,
677
+ test_l3 .L3NatTestCaseMixin ):
678
+
679
+ def setUp (self , ** kwargs ):
680
+ super ().setUp (** kwargs )
681
+ self .chassis = self .add_fake_chassis ('ovs-host1' )
682
+ self .l3_plugin = directory .get_plugin (plugin_constants .L3 )
683
+ kwargs = {'arg_list' : (external_net .EXTERNAL ,),
684
+ external_net .EXTERNAL : True }
685
+ self .net = self ._make_network (
686
+ self .fmt , 'ext_net' , True , as_admin = True , ** kwargs )
687
+ self ._make_subnet (self .fmt , self .net , '20.0.10.1' , '20.0.10.0/24' )
688
+ port_res = self ._create_port (self .fmt , self .net ['network' ]['id' ])
689
+ self .port = self .deserialize (self .fmt , port_res )['port' ]
690
+
691
+ self .ext_api = test_extensions .setup_extensions_middleware (
692
+ test_l3 .L3TestExtensionManager ())
693
+ self .pb_event_match = mock .patch .object (
694
+ self .sb_api .idl ._portbinding_event , 'match_fn' ).start ()
695
+
696
+ def _check_pb_type (self , _type ):
697
+ def check_pb_type (_type ):
698
+ if len (self .pb_event_match .call_args_list ) < 1 :
699
+ return False
700
+
701
+ pb_row = self .pb_event_match .call_args_list [0 ].args [1 ]
702
+ return _type == pb_row .type
703
+
704
+ n_utils .wait_until_true (lambda : check_pb_type (_type ), timeout = 5 )
705
+
706
+ def test_pb_type_patch (self ):
707
+ router = self ._make_router (self .fmt , self ._tenant_id )
708
+ self ._add_external_gateway_to_router (router ['router' ]['id' ],
709
+ self .net ['network' ]['id' ])
710
+ self ._check_pb_type ('patch' )
711
+
712
+ def test_pb_type_empty (self ):
713
+ self .sb_api .lsp_bind (self .port ['id' ], self .chassis ,
714
+ may_exist = True ).execute (check_error = True )
715
+ self ._check_pb_type ('' )
0 commit comments