@@ -577,3 +577,45 @@ def test_ovsdb_probe_interval(self):
577
577
interval = ovn_conf .get_ovn_ovsdb_probe_interval ()
578
578
for idl in idls :
579
579
self .assertEqual (interval , idl ._session .reconnect .probe_interval )
580
+
581
+
582
+ class TestPortBindingChassisEvent (base .TestOVNFunctionalBase ,
583
+ test_l3 .L3NatTestCaseMixin ):
584
+
585
+ def setUp (self , ** kwargs ):
586
+ super ().setUp (** kwargs )
587
+ self .chassis = self .add_fake_chassis ('ovs-host1' )
588
+ self .l3_plugin = directory .get_plugin (plugin_constants .L3 )
589
+ kwargs = {'arg_list' : (external_net .EXTERNAL ,),
590
+ external_net .EXTERNAL : True }
591
+ self .net = self ._make_network (
592
+ self .fmt , 'ext_net' , True , as_admin = True , ** kwargs )
593
+ self ._make_subnet (self .fmt , self .net , '20.0.10.1' , '20.0.10.0/24' )
594
+ port_res = self ._create_port (self .fmt , self .net ['network' ]['id' ])
595
+ self .port = self .deserialize (self .fmt , port_res )['port' ]
596
+
597
+ self .ext_api = test_extensions .setup_extensions_middleware (
598
+ test_l3 .L3TestExtensionManager ())
599
+ self .pb_event_match = mock .patch .object (
600
+ self .sb_api .idl ._portbinding_event , 'match_fn' ).start ()
601
+
602
+ def _check_pb_type (self , _type ):
603
+ def check_pb_type (_type ):
604
+ if len (self .pb_event_match .call_args_list ) < 1 :
605
+ return False
606
+
607
+ pb_row = self .pb_event_match .call_args_list [0 ].args [1 ]
608
+ return _type == pb_row .type
609
+
610
+ n_utils .wait_until_true (lambda : check_pb_type (_type ), timeout = 5 )
611
+
612
+ def test_pb_type_patch (self ):
613
+ router = self ._make_router (self .fmt , self ._tenant_id )
614
+ self ._add_external_gateway_to_router (router ['router' ]['id' ],
615
+ self .net ['network' ]['id' ])
616
+ self ._check_pb_type ('patch' )
617
+
618
+ def test_pb_type_empty (self ):
619
+ self .sb_api .lsp_bind (self .port ['id' ], self .chassis ,
620
+ may_exist = True ).execute (check_error = True )
621
+ self ._check_pb_type ('' )
0 commit comments