|
| 1 | +package chipingressset |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "fmt" |
| 6 | + "strings" |
| 7 | + "time" |
| 8 | + |
| 9 | + networkTypes "github.com/docker/docker/api/types/network" |
| 10 | + "github.com/docker/docker/client" |
| 11 | + "github.com/pkg/errors" |
| 12 | + "github.com/smartcontractkit/chainlink-testing-framework/framework" |
| 13 | + "github.com/testcontainers/testcontainers-go/modules/compose" |
| 14 | + "github.com/testcontainers/testcontainers-go/wait" |
| 15 | +) |
| 16 | + |
| 17 | +type Output struct { |
| 18 | + ChipIngress *ChipIngressOutput |
| 19 | + RedPanda *RedPandaOutput |
| 20 | +} |
| 21 | + |
| 22 | +type ChipIngressOutput struct { |
| 23 | + GRPCInternalURL string |
| 24 | + GRPCExternalURL string |
| 25 | +} |
| 26 | + |
| 27 | +type RedPandaOutput struct { |
| 28 | + SchemaRegistryInternalURL string |
| 29 | + SchemaRegistryExternalURL string |
| 30 | + KafkaInternalURL string |
| 31 | + KafkaExternalURL string |
| 32 | +} |
| 33 | + |
| 34 | +type Input struct { |
| 35 | + ComposeFile string `toml:"compose_file"` |
| 36 | + Topics []string `toml:"topics"` |
| 37 | + Output *Output `toml:"output"` |
| 38 | + UseCache bool `toml:"use_cache"` |
| 39 | +} |
| 40 | + |
| 41 | +func defaultChipIngress(in *Input) *Input { |
| 42 | + if in.ComposeFile == "" { |
| 43 | + in.ComposeFile = "./docker-compose.yml" |
| 44 | + } |
| 45 | + return in |
| 46 | +} |
| 47 | + |
| 48 | +const ( |
| 49 | + DEFAULT_CHIP_INGRESS_GRPC_PORT = "50051" |
| 50 | + DEFAULT_CHIP_INGRESS_SERVICE_NAME = "chip-ingress" |
| 51 | + |
| 52 | + DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT = "18081" |
| 53 | + DEFAULT_RED_PANDA_KAFKA_PORT = "19092" |
| 54 | + DEFAULT_RED_PANDA_SERVICE_NAME = "redpanda-0" |
| 55 | +) |
| 56 | + |
| 57 | +func New(in *Input) (*Output, error) { |
| 58 | + if in == nil { |
| 59 | + return nil, errors.New("input is nil") |
| 60 | + } |
| 61 | + |
| 62 | + if in.UseCache { |
| 63 | + if in.Output != nil { |
| 64 | + return in.Output, nil |
| 65 | + } |
| 66 | + } |
| 67 | + |
| 68 | + in = defaultChipIngress(in) |
| 69 | + identifier := framework.DefaultTCName(DEFAULT_CHIP_INGRESS_SERVICE_NAME) |
| 70 | + |
| 71 | + stack, stackErr := compose.NewDockerComposeWith( |
| 72 | + compose.WithStackFiles(in.ComposeFile), |
| 73 | + compose.StackIdentifier(identifier), |
| 74 | + ) |
| 75 | + if stackErr != nil { |
| 76 | + return nil, errors.Wrap(stackErr, "failed to create compose stack for Chip Ingress") |
| 77 | + } |
| 78 | + |
| 79 | + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) |
| 80 | + defer cancel() |
| 81 | + |
| 82 | + upErr := stack. |
| 83 | + WithEnv(map[string]string{ |
| 84 | + "BASIC_AUTH_ENABLED": "false", |
| 85 | + "BASIC_AUTH_PREFIX": "", |
| 86 | + }). |
| 87 | + Up(ctx) |
| 88 | + |
| 89 | + if upErr != nil { |
| 90 | + return nil, errors.Wrap(upErr, "failed to start stack for Chip Ingress") |
| 91 | + } |
| 92 | + |
| 93 | + stack.WaitForService(DEFAULT_CHIP_INGRESS_SERVICE_NAME, |
| 94 | + wait.ForLog("GRPC server is live").WithPollInterval(200*time.Millisecond).WithStartupTimeout(1*time.Minute), |
| 95 | + ) |
| 96 | + |
| 97 | + chipIngressContainer, ingressErr := stack.ServiceContainer(ctx, DEFAULT_CHIP_INGRESS_SERVICE_NAME) |
| 98 | + if ingressErr != nil { |
| 99 | + return nil, errors.Wrap(ingressErr, "failed to get chip-ingress container") |
| 100 | + } |
| 101 | + |
| 102 | + cli, cliErr := client.NewClientWithOpts( |
| 103 | + client.FromEnv, |
| 104 | + client.WithAPIVersionNegotiation(), |
| 105 | + ) |
| 106 | + if cliErr != nil { |
| 107 | + return nil, errors.Wrap(cliErr, "failed to create docker client") |
| 108 | + } |
| 109 | + defer cli.Close() |
| 110 | + |
| 111 | + timeout := time.After(1 * time.Minute) |
| 112 | + tick := time.Tick(500 * time.Millisecond) |
| 113 | + |
| 114 | + // so let's try to connect to the default network a couple of times, there must be a race condition in Docker |
| 115 | + // and even when network sandbox has been created and container is running, this call can still fail |
| 116 | + // retrying is simpler than trying to figure out how to correctly wait for the network sandbox to be ready |
| 117 | + var connectDefaultNetwork = func() error { |
| 118 | + for { |
| 119 | + select { |
| 120 | + case <-timeout: |
| 121 | + return fmt.Errorf("timeout while trying to connect chip-ingress to default network") |
| 122 | + case <-tick: |
| 123 | + if networkErr := cli.NetworkConnect( |
| 124 | + ctx, |
| 125 | + framework.DefaultNetworkName, |
| 126 | + chipIngressContainer.ID, |
| 127 | + &networkTypes.EndpointSettings{ |
| 128 | + Aliases: []string{identifier}, |
| 129 | + }, |
| 130 | + ); networkErr != nil && !strings.Contains(networkErr.Error(), "already exists in network") { |
| 131 | + fmt.Println("failed to connect to default network", networkErr) |
| 132 | + continue |
| 133 | + } |
| 134 | + fmt.Println("connected to default network") |
| 135 | + return nil |
| 136 | + } |
| 137 | + } |
| 138 | + } |
| 139 | + |
| 140 | + if connectErr := connectDefaultNetwork(); connectErr != nil { |
| 141 | + return nil, errors.Wrap(connectErr, "failed to connect chip-ingress to default network") |
| 142 | + } |
| 143 | + |
| 144 | + // verify that the container is connected to framework's network |
| 145 | + inspected, inspectErr := cli.ContainerInspect(ctx, chipIngressContainer.ID) |
| 146 | + if inspectErr != nil { |
| 147 | + return nil, errors.Wrapf(inspectErr, "failed to inspect container %s", chipIngressContainer.ID) |
| 148 | + } |
| 149 | + |
| 150 | + _, ok := inspected.NetworkSettings.Networks[framework.DefaultNetworkName] |
| 151 | + if !ok { |
| 152 | + return nil, fmt.Errorf("container %s is NOT on network %s", chipIngressContainer.ID, framework.DefaultNetworkName) |
| 153 | + } |
| 154 | + |
| 155 | + // get hosts and ports for chip-ingress and redpanda |
| 156 | + chipIngressExternalHost, chipIngressExternalHostErr := chipIngressContainer.Host(ctx) |
| 157 | + if chipIngressExternalHostErr != nil { |
| 158 | + return nil, errors.Wrap(chipIngressExternalHostErr, "failed to get host for Chip Ingress") |
| 159 | + } |
| 160 | + chipIngressExternalPort, chipIngressExternalPortErr := chipIngressContainer.MappedPort(ctx, DEFAULT_CHIP_INGRESS_GRPC_PORT) |
| 161 | + if chipIngressExternalPortErr != nil { |
| 162 | + return nil, errors.Wrap(chipIngressExternalPortErr, "failed to get mapped port for Chip Ingress") |
| 163 | + } |
| 164 | + |
| 165 | + redpandaContainer, redpandaErr := stack.ServiceContainer(ctx, DEFAULT_RED_PANDA_SERVICE_NAME) |
| 166 | + if redpandaErr != nil { |
| 167 | + return nil, errors.Wrap(redpandaErr, "failed to get redpanda container") |
| 168 | + } |
| 169 | + |
| 170 | + redpandaExternalHost, redpandaExternalHostErr := redpandaContainer.Host(ctx) |
| 171 | + if redpandaExternalHostErr != nil { |
| 172 | + return nil, errors.Wrap(redpandaExternalHostErr, "failed to get host for Red Panda") |
| 173 | + } |
| 174 | + redpandaExternalKafkaPort, redpandaExternalKafkaPortErr := redpandaContainer.MappedPort(ctx, DEFAULT_RED_PANDA_KAFKA_PORT) |
| 175 | + if redpandaExternalKafkaPortErr != nil { |
| 176 | + return nil, errors.Wrap(redpandaExternalKafkaPortErr, "failed to get mapped port for Red Panda") |
| 177 | + } |
| 178 | + redpandaExternalSchemaRegistryPort, redpandaExternalSchemaRegistryPortErr := redpandaContainer.MappedPort(ctx, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT) |
| 179 | + if redpandaExternalSchemaRegistryPortErr != nil { |
| 180 | + return nil, errors.Wrap(redpandaExternalSchemaRegistryPortErr, "failed to get mapped port for Red Panda") |
| 181 | + } |
| 182 | + |
| 183 | + output := &Output{ |
| 184 | + ChipIngress: &ChipIngressOutput{ |
| 185 | + GRPCInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_CHIP_INGRESS_SERVICE_NAME, DEFAULT_CHIP_INGRESS_GRPC_PORT), |
| 186 | + GRPCExternalURL: fmt.Sprintf("http://%s:%s", chipIngressExternalHost, chipIngressExternalPort.Port()), |
| 187 | + }, |
| 188 | + RedPanda: &RedPandaOutput{ |
| 189 | + SchemaRegistryInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT), |
| 190 | + SchemaRegistryExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalHost, redpandaExternalSchemaRegistryPort.Port()), |
| 191 | + KafkaInternalURL: fmt.Sprintf("%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_KAFKA_PORT), |
| 192 | + KafkaExternalURL: fmt.Sprintf("%s:%s", redpandaExternalHost, redpandaExternalKafkaPort.Port()), |
| 193 | + }, |
| 194 | + } |
| 195 | + |
| 196 | + return output, nil |
| 197 | +} |
0 commit comments