Skip to content

Commit 2d24aa5

Browse files
committed
WIP#1
1 parent 4c45835 commit 2d24aa5

File tree

12 files changed

+1932
-866
lines changed

12 files changed

+1932
-866
lines changed
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
golang 1.25.3
Lines changed: 298 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,298 @@
1+
package chipingressset
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"maps"
8+
"os"
9+
"path"
10+
"path/filepath"
11+
"slices"
12+
"strings"
13+
14+
"github.com/jhump/protocompile"
15+
"google.golang.org/protobuf/reflect/protoreflect"
16+
17+
cc "github.com/smartcontractkit/atlas/chip-config/client" // TODO: can we move it to chainlink-common?
18+
"github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
19+
)
20+
21+
// code copied from: https://github.com/smartcontractkit/atlas/blob/master/chip-cli/config/config.go and https://github.com/smartcontractkit/atlas/blob/master/chip-cli/config/proto_validator.go
22+
// reason: avoid dependency on the chip-cli module in the testing framework
23+
func chipConfigClient(ctx context.Context, chipConfigOutput *ChipConfigOutput) (cc.ChipConfigClient, error) {
24+
fmt.Printf("🔌 Initiating connection to Chip Config at \033[1m%s\033[0m...\n\n", chipConfigOutput.GRPCExternalURL)
25+
26+
var clientOpts []cc.ClientOpt
27+
clientOpts = append(clientOpts, cc.WithBasicAuth(chipConfigOutput.Username, chipConfigOutput.Password))
28+
29+
client, err := cc.NewChipConfigClient(chipConfigOutput.GRPCExternalURL, clientOpts...)
30+
if err != nil {
31+
return nil, fmt.Errorf("failed to create Chip Config client: %w", err)
32+
}
33+
34+
// Check we can connect to the server
35+
_, pErr := client.Ping(ctx)
36+
if pErr != nil {
37+
return nil, fmt.Errorf("failed to connect to Chip Config: %w", pErr)
38+
}
39+
40+
fmt.Printf("🔗 Connected to Chip Config\n\n")
41+
42+
return client, nil
43+
}
44+
45+
func convertToPbSchemas(schemas map[string]*Schema, domain string) []*pb.Schema {
46+
pbSchemas := make([]*pb.Schema, len(schemas))
47+
48+
for i, schema := range slices.Collect(maps.Values(schemas)) {
49+
50+
pbReferences := make([]*pb.SchemaReference, len(schema.References))
51+
for j, reference := range schema.References {
52+
pbReferences[j] = &pb.SchemaReference{
53+
Subject: fmt.Sprintf("%s-%s", domain, reference.Entity),
54+
Name: reference.Name,
55+
// Explicitly omit Version, this tells chip-config to use the latest version of the schema for this reference
56+
}
57+
}
58+
59+
pbSchema := &pb.Schema{
60+
Subject: fmt.Sprintf("%s-%s", domain, schema.Entity),
61+
Schema: schema.SchemaContent,
62+
References: pbReferences,
63+
}
64+
65+
// If the schema has metadata, we need to add pb metadata to the schema
66+
if schema.Metadata.Stores != nil {
67+
68+
stores := make(map[string]*pb.Store, len(schema.Metadata.Stores))
69+
for key, store := range schema.Metadata.Stores {
70+
stores[key] = &pb.Store{
71+
Index: store.Index,
72+
Partition: store.Partition,
73+
}
74+
}
75+
76+
pbSchema.Metadata = &pb.MetaData{
77+
Stores: stores,
78+
}
79+
}
80+
81+
pbSchemas[i] = pbSchema
82+
}
83+
84+
return pbSchemas
85+
}
86+
87+
type RegistrationConfig struct {
88+
Domain string `json:"domain"`
89+
Schemas []Schema `json:"schemas"`
90+
}
91+
92+
type Schema struct {
93+
Entity string `json:"entity"`
94+
Path string `json:"path"`
95+
References []SchemaReference `json:"references,omitempty"`
96+
SchemaContent string
97+
Metadata Metadata `json:"metadata,omitempty"`
98+
}
99+
100+
type Metadata struct {
101+
Stores map[string]Store `json:"stores"`
102+
}
103+
104+
type Store struct {
105+
Index []string `json:"index"`
106+
Partition []string `json:"partition"`
107+
}
108+
109+
type SchemaReference struct {
110+
Name string `json:"name"`
111+
Entity string `json:"entity"`
112+
Path string `json:"path"`
113+
}
114+
115+
func parseSchemaConfig(configFilePath, schemaDir string) (*RegistrationConfig, map[string]*Schema, error) {
116+
cfg, err := readConfig(configFilePath)
117+
if err != nil {
118+
return nil, nil, err
119+
}
120+
121+
if err := ValidateEntityNames(cfg, schemaDir); err != nil {
122+
return nil, nil, fmt.Errorf("entity name validation failed: %w", err)
123+
}
124+
125+
// Our end goal is to generate a schema registration request to chip config
126+
// We will use a map to store the schemas by entity and path
127+
// this is because more than one schema may reference the same schema
128+
// technically, since SR is idempotent, this is not strictly necessary, as duplicate registrations are noop
129+
schemas := make(map[string]*Schema)
130+
131+
for _, schema := range cfg.Schemas {
132+
133+
// For each of the schemas, we need to get the references schema content
134+
for _, reference := range schema.References {
135+
136+
// read schema contents
137+
refSchemaContent, err := os.ReadFile(path.Join(schemaDir, reference.Path))
138+
if err != nil {
139+
return nil, nil, fmt.Errorf("error reading schema: %v", err)
140+
}
141+
142+
// generate key with entity and path since other schemas may also reference this schema
143+
key := fmt.Sprintf("%s:%s", reference.Entity, reference.Path)
144+
145+
// if the schema already exists, skip it
146+
if _, ok := schemas[key]; ok {
147+
continue
148+
}
149+
150+
schemas[key] = &Schema{
151+
Entity: reference.Entity,
152+
Path: reference.Path,
153+
SchemaContent: string(refSchemaContent),
154+
}
155+
}
156+
157+
// add the root schema to the map
158+
schemaContent, err := os.ReadFile(path.Join(schemaDir, schema.Path))
159+
if err != nil {
160+
return nil, nil, fmt.Errorf("error reading schema: %v", err)
161+
}
162+
163+
key := fmt.Sprintf("%s:%s", schema.Entity, schema.Path)
164+
// if the schema already exists, that means it is referenced by another schema.
165+
// so we just need to add the references to the existing schema in the map
166+
if s, ok := schemas[key]; ok {
167+
s.References = append(s.References, schema.References...)
168+
continue
169+
}
170+
171+
schemas[key] = &Schema{
172+
Entity: schema.Entity,
173+
Path: schema.Path,
174+
SchemaContent: string(schemaContent),
175+
References: schema.References,
176+
}
177+
178+
}
179+
180+
return cfg, schemas, nil
181+
}
182+
183+
func readConfig(path string) (*RegistrationConfig, error) {
184+
f, err := os.Open(path)
185+
if err != nil {
186+
return nil, fmt.Errorf("failed to open config file '%s': %w", path, err)
187+
}
188+
defer f.Close()
189+
190+
var cfg RegistrationConfig
191+
192+
dErr := json.NewDecoder(f).Decode(&cfg)
193+
if dErr != nil {
194+
return nil, fmt.Errorf("failed to decode config: %w", dErr)
195+
}
196+
197+
return &cfg, nil
198+
}
199+
200+
// ValidateEntityNames validates that all entity names in the config match the fully qualified
201+
// protobuf names (package.MessageName) from their corresponding proto files.
202+
// It collects all validation errors and returns them together for better user experience.
203+
func ValidateEntityNames(cfg *RegistrationConfig, schemaDir string) error {
204+
var errors []string
205+
206+
for _, schema := range cfg.Schemas {
207+
if err := validateEntityName(schema.Entity, schema.Path, schemaDir); err != nil {
208+
errors = append(errors, fmt.Sprintf(" - schema '%s': %s", schema.Path, err))
209+
}
210+
211+
for _, ref := range schema.References {
212+
if err := validateEntityName(ref.Entity, ref.Path, schemaDir); err != nil {
213+
errors = append(errors, fmt.Sprintf(" - referenced schema '%s': %s", ref.Path, err))
214+
}
215+
}
216+
}
217+
218+
if len(errors) > 0 {
219+
return fmt.Errorf("entity name validation failed with %d error(s):\n%s", len(errors), strings.Join(errors, "\n"))
220+
}
221+
222+
return nil
223+
}
224+
225+
func validateEntityName(entityName, protoPath, schemaDir string) error {
226+
fullPath := path.Join(schemaDir, protoPath)
227+
228+
// Find the message descriptor that matches the entity name
229+
msgDesc, err := findMessageDescriptor(fullPath, entityName)
230+
if err != nil {
231+
return fmt.Errorf("failed to find message descriptor in '%s': %w", protoPath, err)
232+
}
233+
234+
// Extract the expected entity name from the message descriptor
235+
expectedEntity := string(msgDesc.FullName())
236+
if entityName != expectedEntity {
237+
return fmt.Errorf(
238+
"entity name mismatch in chip.json:\n"+
239+
" Proto file: %s\n"+
240+
" Expected: %s\n"+
241+
" Got: %s\n"+
242+
" \n"+
243+
" The entity name must be the fully qualified protobuf name: {package}.{MessageName}",
244+
protoPath,
245+
expectedEntity,
246+
entityName,
247+
)
248+
}
249+
250+
return nil
251+
}
252+
253+
// findMessageDescriptor finds a message descriptor by name (either full name or short name)
254+
// This matches the logic in chip-ingress/internal/serde/message.go
255+
func findMessageDescriptor(filePath, targetMessageName string) (protoreflect.MessageDescriptor, error) {
256+
compiler := protocompile.Compiler{
257+
Resolver: &protocompile.SourceResolver{
258+
ImportPaths: getImportPaths(filePath, 3),
259+
},
260+
}
261+
262+
filename := filepath.Base(filePath)
263+
fds, err := compiler.Compile(context.Background(), filename)
264+
if err != nil {
265+
return nil, fmt.Errorf("failed to compile proto file: %w", err)
266+
}
267+
268+
if len(fds) == 0 {
269+
return nil, fmt.Errorf("no file descriptors found")
270+
}
271+
272+
// Search through all file descriptors for the target message
273+
for _, fd := range fds {
274+
messages := fd.Messages()
275+
for i := range messages.Len() {
276+
msgDesc := messages.Get(i)
277+
278+
// Match by full name (e.g., "package.MessageName") or short name (e.g., "MessageName")
279+
if string(msgDesc.FullName()) == targetMessageName || string(msgDesc.Name()) == targetMessageName {
280+
return msgDesc, nil
281+
}
282+
}
283+
}
284+
285+
return nil, fmt.Errorf("message descriptor not found for name: %s", targetMessageName)
286+
}
287+
288+
func getImportPaths(path string, depth int) []string {
289+
paths := make([]string, 0, depth+1)
290+
paths = append(paths, filepath.Dir(path))
291+
292+
currentPath := path
293+
for i := 0; i < depth; i++ {
294+
currentPath = filepath.Dir(currentPath)
295+
paths = append(paths, currentPath)
296+
}
297+
return paths
298+
}

framework/components/dockercompose/chip_ingress_set/chip_ingress.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,15 @@ import (
1414
networkTypes "github.com/docker/docker/api/types/network"
1515
"github.com/docker/docker/client"
1616
"github.com/pkg/errors"
17-
"github.com/smartcontractkit/chainlink-testing-framework/framework"
1817
"github.com/testcontainers/testcontainers-go/modules/compose"
1918
"github.com/testcontainers/testcontainers-go/wait"
19+
20+
"github.com/smartcontractkit/chainlink-testing-framework/framework"
2021
)
2122

2223
type Output struct {
2324
ChipIngress *ChipIngressOutput `toml:"chip_ingress"`
25+
ChipConfig *ChipConfigOutput `toml:"chip_config"`
2426
RedPanda *RedPandaOutput `toml:"redpanda"`
2527
}
2628

@@ -29,6 +31,13 @@ type ChipIngressOutput struct {
2931
GRPCExternalURL string `toml:"grpc_external_url"`
3032
}
3133

34+
type ChipConfigOutput struct {
35+
GRPCInternalURL string `toml:"grpc_internal_url"`
36+
GRPCExternalURL string `toml:"grpc_external_url"`
37+
Username string `toml:"username"`
38+
Password string `toml:"password"`
39+
}
40+
3241
type RedPandaOutput struct {
3342
SchemaRegistryInternalURL string `toml:"schema_registry_internal_url"`
3443
SchemaRegistryExternalURL string `toml:"schema_registry_external_url"`
@@ -57,6 +66,12 @@ const (
5766
DEFAULT_CHIP_INGRESS_GRPC_PORT = "50051"
5867
DEFAULT_CHIP_INGRESS_SERVICE_NAME = "chip-ingress"
5968

69+
DEFAULT_CHIP_CONFIG_EXTERNAL_PORT = "50052"
70+
DEFAULT_CHIP_CONFIG_INTERNAL_PORT = "50051"
71+
DEFAULT_CHIP_CONFIG_SERVICE_NAME = "chip-config"
72+
DEFAULT_CHIP_CONFIG_USERNAME = "admin"
73+
DEFAULT_CHIP_CONFIG_PASSWORD = "password"
74+
6075
DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT = "18081"
6176
DEFAULT_RED_PANDA_KAFKA_PORT = "19092"
6277
DEFAULT_RED_PANDA_SERVICE_NAME = "redpanda-0"
@@ -138,6 +153,11 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
138153
wait.ForListeningPort(DEFAULT_RED_PANDA_CONSOLE_PORT).WithPollInterval(100*time.Millisecond),
139154
wait.NewHostPortStrategy(DEFAULT_RED_PANDA_CONSOLE_PORT).WithPollInterval(100*time.Millisecond),
140155
).WithDeadline(2*time.Minute),
156+
).WaitForService(DEFAULT_CHIP_CONFIG_SERVICE_NAME,
157+
wait.ForAll(
158+
wait.ForListeningPort(DEFAULT_CHIP_CONFIG_INTERNAL_PORT).WithPollInterval(100*time.Millisecond),
159+
wait.NewHostPortStrategy(DEFAULT_CHIP_CONFIG_EXTERNAL_PORT).WithPollInterval(100*time.Millisecond),
160+
),
141161
)
142162

143163
chipIngressContainer, ingressErr := stack.ServiceContainer(ctx, DEFAULT_CHIP_INGRESS_SERVICE_NAME)
@@ -181,17 +201,25 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
181201
framework.L.Debug().Msgf("Container %s is connected to network %s", chipIngressContainer.ID, networkName)
182202
}
183203

184-
// get hosts and ports for chip-ingress and redpanda
204+
// get hosts and ports for chip ingress, chip config and redpanda
185205
chipIngressExternalHost, chipIngressExternalHostErr := chipIngressContainer.Host(ctx)
186206
if chipIngressExternalHostErr != nil {
187207
return nil, errors.Wrap(chipIngressExternalHostErr, "failed to get host for Chip Ingress")
188208
}
189209

210+
chipConfigContainer, chipConfigErr := stack.ServiceContainer(ctx, DEFAULT_CHIP_CONFIG_SERVICE_NAME)
211+
if chipConfigErr != nil {
212+
return nil, errors.Wrap(chipConfigErr, "failed to get chip-config container")
213+
}
214+
chipConfigExternalHost, chipConfigExternalHostErr := chipConfigContainer.Host(ctx)
215+
if chipConfigExternalHostErr != nil {
216+
return nil, errors.Wrap(chipConfigExternalHostErr, "failed to get host for Chip Config")
217+
}
218+
190219
redpandaContainer, redpandaErr := stack.ServiceContainer(ctx, DEFAULT_RED_PANDA_SERVICE_NAME)
191220
if redpandaErr != nil {
192221
return nil, errors.Wrap(redpandaErr, "failed to get redpanda container")
193222
}
194-
195223
redpandaExternalHost, redpandaExternalHostErr := redpandaContainer.Host(ctx)
196224
if redpandaExternalHostErr != nil {
197225
return nil, errors.Wrap(redpandaExternalHostErr, "failed to get host for Red Panda")
@@ -211,6 +239,12 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
211239
GRPCInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_CHIP_INGRESS_SERVICE_NAME, DEFAULT_CHIP_INGRESS_GRPC_PORT),
212240
GRPCExternalURL: fmt.Sprintf("http://%s:%s", chipIngressExternalHost, DEFAULT_CHIP_INGRESS_GRPC_PORT),
213241
},
242+
ChipConfig: &ChipConfigOutput{
243+
GRPCInternalURL: fmt.Sprintf("%s:%s", DEFAULT_CHIP_CONFIG_SERVICE_NAME, DEFAULT_CHIP_CONFIG_INTERNAL_PORT),
244+
GRPCExternalURL: fmt.Sprintf("%s:%s", chipConfigExternalHost, DEFAULT_CHIP_CONFIG_EXTERNAL_PORT),
245+
Username: DEFAULT_CHIP_CONFIG_USERNAME,
246+
Password: DEFAULT_CHIP_CONFIG_PASSWORD,
247+
},
214248
RedPanda: &RedPandaOutput{
215249
SchemaRegistryInternalURL: fmt.Sprintf("http://%s:%s", DEFAULT_RED_PANDA_SERVICE_NAME, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT),
216250
SchemaRegistryExternalURL: fmt.Sprintf("http://%s:%s", redpandaExternalHost, DEFAULT_RED_PANDA_SCHEMA_REGISTRY_PORT),

0 commit comments

Comments
 (0)