@@ -244,52 +244,22 @@ func TestReconnectConsumer(t *testing.T) {
244244}
245245
246246func TestReconnectedBrokerSendPermits (t * testing.T ) {
247- req := testcontainers.ContainerRequest {
248- Name : "pulsar-test" ,
249- Image : getPulsarTestImage (),
250- ExposedPorts : []string {"6650/tcp" , "8080/tcp" },
251- WaitingFor : wait .ForExposedPort (),
252- HostConfigModifier : func (config * container.HostConfig ) {
253- config .PortBindings = map [nat.Port ][]nat.PortBinding {
254- "6650/tcp" : {{HostIP : "0.0.0.0" , HostPort : "6659" }},
255- "8080/tcp" : {{HostIP : "0.0.0.0" , HostPort : "8089" }},
256- }
257- },
258- Cmd : []string {"bin/pulsar" , "standalone" , "-nfw" },
259- }
260- c , err := testcontainers .GenericContainer (context .Background (), testcontainers.GenericContainerRequest {
261- ContainerRequest : req ,
262- Started : true ,
263- Reuse : true ,
264- })
265- require .NoError (t , err , "Failed to start the pulsar container" )
266- endpoint , err := c .PortEndpoint (context .Background (), "6650" , "pulsar" )
267- require .NoError (t , err , "Failed to get the pulsar endpoint" )
268-
269247 sLogger := slog .New (slog .NewTextHandler (os .Stdout , & slog.HandlerOptions {Level : slog .LevelDebug }))
270248 client , err := NewClient (ClientOptions {
271- URL : endpoint ,
249+ URL : lookupURL ,
272250 Logger : plog .NewLoggerWithSlog (sLogger ),
273251 })
274252 assert .Nil (t , err )
275- adminEndpoint , err := c .PortEndpoint (context .Background (), "8080" , "http" )
276- assert .Nil (t , err )
277- admin , err := pulsaradmin .NewClient (& config.Config {
278- WebServiceURL : adminEndpoint ,
253+ topic := newTopicName ()
254+ consumer , err := client .Subscribe (ConsumerOptions {
255+ Topic : topic ,
256+ SubscriptionName : "my-sub" ,
257+ EnableZeroQueueConsumer : true ,
258+ Type : Shared , // using Shared subscription type to support unack subscription stats
279259 })
280260 assert .Nil (t , err )
281-
282- topic := newTopicName ()
283- var consumer Consumer
284- require .Eventually (t , func () bool {
285- consumer , err = client .Subscribe (ConsumerOptions {
286- Topic : topic ,
287- SubscriptionName : "my-sub" ,
288- EnableZeroQueueConsumer : true ,
289- Type : Shared , // using Shared subscription type to support unack subscription stats
290- })
291- return err == nil
292- }, 30 * time .Second , 1 * time .Second )
261+ admin , err := pulsaradmin .NewClient (& config.Config {})
262+ assert .Nil (t , err )
293263 ctx := context .Background ()
294264
295265 // create producer
@@ -320,11 +290,11 @@ func TestReconnectedBrokerSendPermits(t *testing.T) {
320290 log .Println ("unloaded topic" )
321291 zc , ok := consumer .(* zeroQueueConsumer )
322292 assert .True (t , ok )
293+ // wait for reconnect
323294 require .EventuallyWithT (t , func (c * assert.CollectT ) {
324295 reconnectCount := zc .pc .reconnectCount .Load ()
325296 require .Equal (c , reconnectCount , int32 (1 ))
326297 }, 30 * time .Second , 1 * time .Second )
327- //time.Sleep(1 * time.Minute)
328298
329299 // receive 10 messages
330300 for i := 0 ; i < 10 ; i ++ {
0 commit comments