Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions book/src/SUMMARY.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
- [NodeSet](framework/components/chainlink/nodeset.md)
- [Storage](framework/components/storage.md)
- [S3](framework/components/storage/s3.md)
- [Chip Ingress Set](framework/components/chipingresset/chip_ingress.md)
- [Clients]()
- [Chainlink]()
- [RPC]()
Expand Down
116 changes: 116 additions & 0 deletions book/src/framework/components/chipingresset/chip_ingress.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
# Chip Ingress Set

Chip Ingress Set is a composite component that collects Beholder events. It is a thin `testcontainers-go` wrapper over a Docker Compose file copied from the [Atlas](https://github.com/smartcontractkit/atlas/blob/master/chip-ingress/docker-compose.yml) repo.

It consists of 3 components:
- Chip Ingress
- Red Panda
- Red Panda Console

## Configuration

To add it to your stack use following TOML:
```toml
[chip_ingress]
compose_file='../../components/chip_ingress_set/docker-compose.yml'
extra_docker_networks = ["my-existing-network"]
```

Where compose file indicates the location of the `docker-compose.yml` file (remote URLs are supported) and `extra_docker_networks` an optional slice of existing Docker networks, to which whole stack should be connected to.

## Exposed ports

These 3 components expose a variety of ports, but the most important ones from the point of view of user interaction are:
- schema registry port: `18081`
- Kafka port: `19092`
- Red Panda console port: `8080`

## Useful helper methods

Packge contains also a bunch of helper functions tha can:
- create and delete Kafka topics
- fetch `.proto` files from remote repositories and register them with Red Panda


### Topic management
```go
import chipingressset "github.com/smartcontractkit/chainlink-testing-framework/framework/components/dockercompose/chip_ingress_set"

topicsErr := chipingressset.DeleteAllTopics(cmd.Context(), redPandaKafkaURLFlag)
if topicsErr != nil {
panic(topicsErr)
}

createTopicsErr := chipingressset.CreateTopics(ctx, out.RedPanda.KafkaExternalURL, []string{"cre"})
if createTopicsErr != nil {
panic(createTopicsErr)
}
```

### Protobuf schema registration
```go
out, outErr := chipingressset.New(in.ChipIngress)
if outErr != nil {
panic(outErr)
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

// we recommend to use GITHUB_TOKEN with read access to repositories with protos to avoid heavy rate limiting
var client *github.Client
if token := os.Getenv("GITHUB_TOKEN"); token != "" {
ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: token})
tc := oauth2.NewClient(ctx, ts)
client = github.NewClient(tc)
} else {
client = github.NewClient(nil)
}

protoErr := chipingressset.DefaultRegisterAndFetchProtos(ctx, client, []chipingressset.RepoConfiguration{
{
Owner: "smartcontractkit",
Repo: "chainlink-protos",
Ref: "626c42d55bdcb36dffe0077fff58abba40acc3e5",
Folders: []string{"workflows"},
},
}, out.RedPanda.SchemaRegistryExternalURL)
if protoErr != nil {
panic(protoErr)
}
```

Since `ProtoSchemaSet` has TOML tags you can also read it from a TOML file with this content:
```toml
[[proto_schema_set]]
owner = 'smartcontractkit'
repository = 'chainlink-protos'
ref = '626c42d55bdcb36dffe0077fff58abba40acc3e5'
folders = ['workflows']
```

using this code:
```go
var protoSchemaSets []chipingressset.ProtoSchemaSet
for _, schemaSet := range configFiles {
file, fileErr := os.ReadFile(schemaSet)
if fileErr != nil {
return errors.Wrapf(fileErr, "failed to read proto schema set config file: %s", schemaSet)
}

type protoSchemaSets struct {
Sets []chipingressset.ProtoSchemaSet `toml:"proto_schema_set"`
}

var sets protoSchemaSets
if err := toml.Unmarshal(file, &sets); err != nil {
return errors.Wrapf(err, "failed to unmarshal proto config file: %s", protoConfig)
}

protoSchemaSets = append(reposConfigs, sets.Sets...)
}
```

Registration logic is very simple and should handle cases of protos that import other protos as long they are all available in the `ProtoSchemaSet`s provided to the registration function. That function uses an algorithm called "topological sorting by trail", which will try to register all protos in a loop until it cannot register any more protos or it has registered all of them. That allows us to skip dependency parsing completely.

Since Kafka doesn't have any automatic discoverability mechanism for subject - schema relationship (it has to be provided out-of-band) code currently only knows how to correctly register protos from [chainlink-protos](https://github.com/smartcontractkit/chainlink-protos) repository.
5 changes: 5 additions & 0 deletions framework/components/dockercompose/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# Docker Compose

Go module for `framework` packages that wrap `docker-compose.yml` files.

This module is separated from the `framework`, because `testcontainers-go` module that adds Docker Compose support pulls in a lot of dependencies and we want to limit the blast radius as much as possible.
261 changes: 261 additions & 0 deletions framework/components/dockercompose/chip_ingress_set/chip_ingress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
package chipingressset

import (
"context"
"fmt"
"io"
"net/http"
"os"
"strings"
"time"

networkTypes "github.com/docker/docker/api/types/network"
"github.com/docker/docker/client"
"github.com/pkg/errors"
"github.com/smartcontractkit/chainlink-testing-framework/framework"
"github.com/testcontainers/testcontainers-go/modules/compose"
"github.com/testcontainers/testcontainers-go/wait"
)

type Output struct {
ChipIngress *ChipIngressOutput
RedPanda *RedPandaOutput
}

type ChipIngressOutput struct {
GRPCInternalURL string
GRPCExternalURL string
}

type RedPandaOutput struct {
SchemaRegistryInternalURL string
SchemaRegistryExternalURL string
KafkaInternalURL string
KafkaExternalURL string
ConsoleExternalURL string
}

type Input struct {
ComposeFile string `toml:"compose_file"`
ExtraDockerNetworks []string `toml:"extra_docker_networks"`
Output *Output `toml:"output"`
UseCache bool `toml:"use_cache"`
}

func defaultChipIngress(in *Input) *Input {
if in.ComposeFile == "" {
in.ComposeFile = "./docker-compose.yml"
}
return in
}

const (
DEFAULT_STACK_NAME = "chip-ingress"

DEFAULT_CHIP_INGRESS_GRPC_PORT = "50051"
DEFAULT_CHIP_INGRESS_SERVICE_NAME = "chip-ingress"

DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT = "18081"
DEFAULT_RED_PANDA_KAFKA_PORT = "19092"
DEFAULT_RED_PANDA_SERVICE_NAME = "redpanda-0"

DEFAULT_RED_PANDA_CONSOLE_SERVICE_NAME = "redpanda-console"
DEFAULT_RED_PANDA_CONSOLE_PORT = "8080"
)

func New(in *Input) (*Output, error) {
if in == nil {
return nil, errors.New("input is nil")
}

if in.UseCache {
if in.Output != nil {
return in.Output, nil
}
}

in = defaultChipIngress(in)
identifier := framework.DefaultTCName(DEFAULT_STACK_NAME)
framework.L.Debug().Str("Compose file", in.ComposeFile).Msgf("Starting Chip Ingress stack with identifier %s", framework.DefaultTCName(DEFAULT_STACK_NAME))

composeFilePath, fileErr := composeFilePath(in.ComposeFile)
if fileErr != nil {
return nil, errors.Wrap(fileErr, "failed to get compose file path")
}

stack, stackErr := compose.NewDockerComposeWith(
compose.WithStackFiles(composeFilePath),
compose.StackIdentifier(identifier),
)
if stackErr != nil {
return nil, errors.Wrap(stackErr, "failed to create compose stack for Chip Ingress")
}

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
defer cancel()

upErr := stack.
WithEnv(map[string]string{
"BASIC_AUTH_ENABLED": "false",
"BASIC_AUTH_PREFIX": "",
}).
Up(ctx)

if upErr != nil {
return nil, errors.Wrap(upErr, "failed to start stack for Chip Ingress")
}

stack.WaitForService(DEFAULT_CHIP_INGRESS_SERVICE_NAME,
wait.ForLog("GRPC server is live").WithPollInterval(200*time.Millisecond).WithStartupTimeout(1*time.Minute),
)

chipIngressContainer, ingressErr := stack.ServiceContainer(ctx, DEFAULT_CHIP_INGRESS_SERVICE_NAME)
if ingressErr != nil {
return nil, errors.Wrap(ingressErr, "failed to get chip-ingress container")
}

cli, cliErr := client.NewClientWithOpts(
client.FromEnv,
client.WithAPIVersionNegotiation(),
)
if cliErr != nil {
return nil, errors.Wrap(cliErr, "failed to create docker client")
}
defer cli.Close()

timeout := time.After(1 * time.Minute)
tick := time.Tick(500 * time.Millisecond)

// so let's try to connect to a Docker network a couple of times, there must be a race condition in Docker
// and even when network sandbox has been created and container is running, this call can still fail
// retrying is simpler than trying to figure out how to correctly wait for the network sandbox to be ready
var connectNetwork = func(networkName string) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we just be on the same network? Seems like we either need to remove the default network from CTF or to configure this compose to use the same CTF network

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and what's the problem with this approach?

for {
select {
case <-timeout:
return fmt.Errorf("timeout while trying to connect chip-ingress to default network")
case <-tick:
if networkErr := cli.NetworkConnect(
ctx,
networkName,
chipIngressContainer.ID,
&networkTypes.EndpointSettings{
Aliases: []string{identifier},
},
); networkErr != nil && !strings.Contains(networkErr.Error(), "already exists in network") {
framework.L.Trace().Msgf("failed to connect to default network: %v", networkErr)
continue
}
framework.L.Trace().Msgf("connected to %s network", networkName)
return nil
}
}
}

networks := []string{framework.DefaultNetworkName}
networks = append(networks, in.ExtraDockerNetworks...)

for _, networkName := range networks {
framework.L.Debug().Msgf("Connecting chip-ingress to %s network", networkName)
if connectErr := connectNetwork(networkName); connectErr != nil {
return nil, errors.Wrapf(connectErr, "failed to connect chip-ingress to %s network", networkName)
}
// verify that the container is connected to framework's network
inspected, inspectErr := cli.ContainerInspect(ctx, chipIngressContainer.ID)
if inspectErr != nil {
return nil, errors.Wrapf(inspectErr, "failed to inspect container %s", chipIngressContainer.ID)
}

_, ok := inspected.NetworkSettings.Networks[networkName]
if !ok {
return nil, fmt.Errorf("container %s is NOT on network %s", chipIngressContainer.ID, networkName)
}
}

// get hosts and ports for chip-ingress and redpanda
chipIngressExternalHost, chipIngressExternalHostErr := chipIngressContainer.Host(ctx)
if chipIngressExternalHostErr != nil {
return nil, errors.Wrap(chipIngressExternalHostErr, "failed to get host for Chip Ingress")
}
chipIngressExternalPort, chipIngressExternalPortErr := chipIngressContainer.MappedPort(ctx, DEFAULT_CHIP_INGRESS_GRPC_PORT)
if chipIngressExternalPortErr != nil {
return nil, errors.Wrap(chipIngressExternalPortErr, "failed to get mapped port for Chip Ingress")
}

redpandaContainer, redpandaErr := stack.ServiceContainer(ctx, DEFAULT_RED_PANDA_SERVICE_NAME)
if redpandaErr != nil {
return nil, errors.Wrap(redpandaErr, "failed to get redpanda container")
}

redpandaExternalHost, redpandaExternalHostErr := redpandaContainer.Host(ctx)
if redpandaExternalHostErr != nil {
return nil, errors.Wrap(redpandaExternalHostErr, "failed to get host for Red Panda")
}
redpandaExternalKafkaPort, redpandaExternalKafkaPortErr := redpandaContainer.MappedPort(ctx, DEFAULT_RED_PANDA_KAFKA_PORT)
if redpandaExternalKafkaPortErr != nil {
return nil, errors.Wrap(redpandaExternalKafkaPortErr, "failed to get mapped port for Red Panda")
}
redpandaExternalSchemaRegistryPort, redpandaExternalSchemaRegistryPortErr := redpandaContainer.MappedPort(ctx, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT)
if redpandaExternalSchemaRegistryPortErr != nil {
return nil, errors.Wrap(redpandaExternalSchemaRegistryPortErr, "failed to get mapped port for Red Panda")
}

redpandaConsoleContainer, redpandaConsoleErr := stack.ServiceContainer(ctx, DEFAULT_RED_PANDA_CONSOLE_SERVICE_NAME)
if redpandaConsoleErr != nil {
return nil, errors.Wrap(redpandaConsoleErr, "failed to get redpanda-console container")
}

redpandaExternalConsoleHost, redpandaExternalConsoleHostErr := redpandaConsoleContainer.Host(ctx)
if redpandaExternalConsoleHostErr != nil {
return nil, errors.Wrap(redpandaExternalConsoleHostErr, "failed to get host for Red Panda Console")
}
redpandaExternalConsolePort, redpandaExternalConsolePortErr := redpandaConsoleContainer.MappedPort(ctx, DEFAULT_RED_PANDA_CONSOLE_PORT)
if redpandaExternalConsolePortErr != nil {
return nil, errors.Wrap(redpandaExternalConsolePortErr, "failed to get mapped port for Red Panda Console")
}

output := &Output{
ChipIngress: &ChipIngressOutput{
GRPCInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_CHIP_INGRESS_SERVICE_NAME, DEFAULT_CHIP_INGRESS_GRPC_PORT),
GRPCExternalURL: fmt.Sprintf("http://%s:%s", chipIngressExternalHost, chipIngressExternalPort.Port()),
},
RedPanda: &RedPandaOutput{
SchemaRegistryInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT),
SchemaRegistryExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalHost, redpandaExternalSchemaRegistryPort.Port()),
KafkaInternalURL: fmt.Sprintf("%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_KAFKA_PORT),
KafkaExternalURL: fmt.Sprintf("%s:%s", redpandaExternalHost, redpandaExternalKafkaPort.Port()),
ConsoleExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalConsoleHost, redpandaExternalConsolePort.Port()),
},
}

framework.L.Info().Msg("Chip Ingress stack start")

return output, nil
}

func composeFilePath(rawFilePath string) (string, error) {
// if it's not a URL, return it as is and assume it's a local file
if !strings.HasPrefix(rawFilePath, "http") {
return rawFilePath, nil
}

resp, respErr := http.Get(rawFilePath)
if respErr != nil {
return "", errors.Wrap(respErr, "failed to download docker-compose file")
}
defer resp.Body.Close()

tempFile, tempErr := os.CreateTemp("", "chip-ingress-docker-compose-*.yml")
if tempErr != nil {
return "", errors.Wrap(tempErr, "failed to create temp file")
}
defer tempFile.Close()

_, copyErr := io.Copy(tempFile, resp.Body)
if copyErr != nil {
tempFile.Close()
return "", errors.Wrap(copyErr, "failed to write compose file")
}

return tempFile.Name(), nil
}
Loading
Loading