diff --git a/modules/pulsar/pulsar.go b/modules/pulsar/pulsar.go index 5aefe2d9df..8d73d2dfb0 100644 --- a/modules/pulsar/pulsar.go +++ b/modules/pulsar/pulsar.go @@ -72,7 +72,9 @@ func (c *Container) resolveURL(ctx context.Context, port nat.Port) (string, erro // and add a waiting strategy for the functions worker func WithFunctionsWorker() testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { - req.Cmd = []string{"/bin/bash", "-c", defaultPulsarCmd} + if err := testcontainers.WithCmd("/bin/bash", "-c", defaultPulsarCmd)(req); err != nil { + return err + } ss := []wait.Strategy{ wait.ForLog("Function worker service started"), @@ -80,9 +82,7 @@ func WithFunctionsWorker() testcontainers.CustomizeRequestOption { ss = append(ss, defaultWaitStrategies.Strategies...) - req.WaitingFor = wait.ForAll(ss...) - - return nil + return testcontainers.WithWaitStrategy(wait.ForAll(ss...))(req) } } @@ -103,9 +103,7 @@ func (c *Container) WithLogConsumers(ctx context.Context, _ ...testcontainers.Lo // WithPulsarEnv allows to use the native APIs and set each variable with PULSAR_PREFIX_ as prefix. func WithPulsarEnv(configVar string, configValue string) testcontainers.CustomizeRequestOption { return func(req *testcontainers.GenericContainerRequest) error { - req.Env["PULSAR_PREFIX_"+configVar] = configValue - - return nil + return testcontainers.WithEnv(map[string]string{"PULSAR_PREFIX_" + configVar: configValue})(req) } } @@ -124,9 +122,7 @@ func WithTransactions() testcontainers.CustomizeRequestOption { ss = append(ss, defaultWaitStrategies.Strategies...) - req.WaitingFor = wait.ForAll(ss...) - - return nil + return testcontainers.WithWaitStrategy(wait.ForAll(ss...))(req) } } @@ -146,33 +142,22 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize // // - command: "/bin/bash -c /pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf && bin/pulsar standalone --no-functions-worker -nss" func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) { - req := testcontainers.ContainerRequest{ - Image: img, - Env: map[string]string{}, - ExposedPorts: []string{defaultPulsarPort, defaultPulsarAdminPort}, - WaitingFor: defaultWaitStrategies, - Cmd: []string{"/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, defaultPulsarCmdWithoutFunctionsWorker}, " ")}, + moduleOpts := []testcontainers.ContainerCustomizer{ + testcontainers.WithExposedPorts(defaultPulsarPort, defaultPulsarAdminPort), + testcontainers.WithWaitStrategy(defaultWaitStrategies), + testcontainers.WithCmd("/bin/bash", "-c", strings.Join([]string{defaultPulsarCmd, defaultPulsarCmdWithoutFunctionsWorker}, " ")), } - genericContainerReq := testcontainers.GenericContainerRequest{ - ContainerRequest: req, - Started: true, - } - - for _, opt := range opts { - if err := opt.Customize(&genericContainerReq); err != nil { - return nil, err - } - } + moduleOpts = append(moduleOpts, opts...) - container, err := testcontainers.GenericContainer(ctx, genericContainerReq) + ctr, err := testcontainers.Run(ctx, img, moduleOpts...) var c *Container - if container != nil { - c = &Container{Container: container} + if ctr != nil { + c = &Container{Container: ctr} } if err != nil { - return c, fmt.Errorf("generic container: %w", err) + return c, fmt.Errorf("run pulsar: %w", err) } return c, nil