Skip to content

Commit 4993f54

Browse files
authored
[DX-1407] Added support for loading protobufs from local filesystem (#1984)
1 parent cac07d4 commit 4993f54

File tree

5 files changed

+163
-31
lines changed

5 files changed

+163
-31
lines changed

book/src/framework/components/chipingresset/chip_ingress.md

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,10 @@ It consists of 3 components:
1212
To add it to your stack use following TOML:
1313
```toml
1414
[chip_ingress]
15-
compose_file='../../components/chip_ingress_set/docker-compose.yml'
15+
# using a local docker-compose file
16+
compose_file='file://../../components/chip_ingress_set/docker-compose.yml'
17+
# using a remote file
18+
# compose_file='https://my.awesome.resource.io/docker-compose.yml'
1619
extra_docker_networks = ["my-existing-network"]
1720
```
1821

@@ -69,8 +72,7 @@ if token := os.Getenv("GITHUB_TOKEN"); token != "" {
6972

7073
protoErr := chipingressset.DefaultRegisterAndFetchProtos(ctx, client, []chipingressset.RepoConfiguration{
7174
{
72-
Owner: "smartcontractkit",
73-
Repo: "chainlink-protos",
75+
URI: "https://github.com/smartcontractkit/chainlink-protostractkit",
7476
Ref: "626c42d55bdcb36dffe0077fff58abba40acc3e5",
7577
Folders: []string{"workflows"},
7678
},
@@ -83,13 +85,21 @@ if protoErr != nil {
8385
Since `ProtoSchemaSet` has TOML tags you can also read it from a TOML file with this content:
8486
```toml
8587
[[proto_schema_set]]
86-
owner = 'smartcontractkit'
87-
repository = 'chainlink-protos'
88+
# reading from remote registry (only github.com supported)
89+
uri = 'https://github.com/smartcontractkit/chainlink-protos'
8890
ref = '626c42d55bdcb36dffe0077fff58abba40acc3e5'
8991
folders = ['workflows']
92+
subject_prefix = 'cre-'
93+
94+
[[proto_schema_set]]
95+
# reading from local folder
96+
uri = 'file://../../chainlink-protos'
97+
# ref is not supported, when reading from local folders
98+
folders = ['workflows']
99+
subject_prefix = 'cre-'
90100
```
91101

92-
using this code:
102+
And then use this Go code to register them:
93103
```go
94104
var protoSchemaSets []chipingressset.ProtoSchemaSet
95105
for _, schemaSet := range configFiles {
@@ -113,4 +123,4 @@ for _, schemaSet := range configFiles {
113123

114124
Registration logic is very simple and should handle cases of protos that import other protos as long they are all available in the `ProtoSchemaSet`s provided to the registration function. That function uses an algorithm called "topological sorting by trail", which will try to register all protos in a loop until it cannot register any more protos or it has registered all of them. That allows us to skip dependency parsing completely.
115125

116-
Since Kafka doesn't have any automatic discoverability mechanism for subject - schema relationship (it has to be provided out-of-band) code currently only knows how to correctly register protos from [chainlink-protos](https://github.com/smartcontractkit/chainlink-protos) repository.
126+
Kafka doesn't have any automatic discoverability mechanism for subject - schema relationship (it has to be provided out-of-band). Currenly, we create the subject in the following way: <subject_prefix>.<package>.<1st-message-name>. Subject prefix is optional and if it's not present, then subject is equal to: <package>.<1st-message-name>. Only the first message in the `.proto` file is ever registered.
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
- Added support for loading protobufs from local filesystem
2+
- Require 'file://' prefix when using local docker-compose file

framework/components/dockercompose/chip_ingress_set/chip_ingress.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,18 @@ func New(in *Input) (*Output, error) {
217217
},
218218
}
219219

220-
framework.L.Info().Msg("Chip Ingress stack start")
220+
framework.L.Info().Msg("Chip Ingress stack started")
221221

222222
return output, nil
223223
}
224224

225225
func composeFilePath(rawFilePath string) (string, error) {
226226
// if it's not a URL, return it as is and assume it's a local file
227227
if !strings.HasPrefix(rawFilePath, "http") {
228-
return rawFilePath, nil
228+
if !strings.HasPrefix(rawFilePath, "file://") {
229+
return "", fmt.Errorf("docker compose URI must start either with 'file://', 'http://' or 'https://', but '%s' was found", rawFilePath)
230+
}
231+
return strings.TrimPrefix(rawFilePath, "file://"), nil
229232
}
230233

231234
resp, respErr := http.Get(rawFilePath)

framework/components/dockercompose/chip_ingress_set/protos.go

Lines changed: 118 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"net/http"
10+
"os"
1011
"path/filepath"
1112
"regexp"
1213
"strings"
@@ -23,8 +24,7 @@ type protoFile struct {
2324
}
2425

2526
type ProtoSchemaSet struct {
26-
Owner string `toml:"owner"`
27-
Repository string `toml:"repository"`
27+
URI string `toml:"uri"`
2828
Ref string `toml:"ref"` // ref or tag or commit SHA
2929
Folders []string `toml:"folders"` // if not provided, all protos will be fetched, otherwise only protos in these folders will be fetched
3030
SubjectPrefix string `toml:"subject_prefix"` // optional prefix for subjects
@@ -36,16 +36,31 @@ type SubjectNamingStrategyFn func(subjectPrefix string, protoFile protoFile, rep
3636
// RepositoryToSubjectNamingStrategyFn is a map of repository names to SubjectNamingStrategyFn functions
3737
type RepositoryToSubjectNamingStrategyFn map[string]SubjectNamingStrategyFn
3838

39-
func ValidateRepoConfiguration(repoConfig ProtoSchemaSet) error {
40-
if repoConfig.Owner == "" {
41-
return errors.New("owner is required")
39+
func validateRepoConfiguration(repoConfig ProtoSchemaSet) error {
40+
if repoConfig.URI == "" {
41+
return errors.New("uri is required")
4242
}
43-
if repoConfig.Repository == "" {
44-
return errors.New("repo is required")
43+
44+
if !strings.HasPrefix(repoConfig.URI, "https://") && !strings.HasPrefix(repoConfig.URI, "file://") {
45+
return errors.New("uri has to start with either 'file://' or 'https://'")
46+
}
47+
48+
trimmedURI := strings.TrimPrefix(repoConfig.URI, "https://")
49+
if !strings.HasPrefix(trimmedURI, "github.com") {
50+
return fmt.Errorf("only repositories hosted at github.com are supported, but %s was found", repoConfig.URI)
51+
}
52+
53+
parts := strings.Split(trimmedURI, "/")
54+
if len(parts) < 3 {
55+
return fmt.Errorf("URI should have following format: 'https://github.com/<OWNER>/<REPOSITORY>', but %s was found", repoConfig.URI)
56+
}
57+
58+
if repoConfig.Ref == "" && strings.HasPrefix(repoConfig.URI, "https://") {
59+
return errors.New("ref is required, when fetching protos from Github")
4560
}
4661

47-
if repoConfig.Ref == "" {
48-
return errors.New("ref is required")
62+
if repoConfig.Ref != "" && strings.HasPrefix(repoConfig.URI, "file://") {
63+
return errors.New("ref is not supported with local protos")
4964
}
5065

5166
return nil
@@ -59,9 +74,15 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch
5974
framework.L.Debug().Msgf("Registering and fetching protos from %d repositories", len(protoSchemaSets))
6075

6176
for _, protoSchemaSet := range protoSchemaSets {
62-
protos, protosErr := fetchProtoFilesInFolders(ctx, client, protoSchemaSet.Owner, protoSchemaSet.Repository, protoSchemaSet.Ref, protoSchemaSet.Folders)
77+
if valErr := validateRepoConfiguration(protoSchemaSet); valErr != nil {
78+
return errors.Wrapf(valErr, "invalid repo configuration for schema set: %v", protoSchemaSet)
79+
}
80+
}
81+
82+
for _, protoSchemaSet := range protoSchemaSets {
83+
protos, protosErr := fetchProtoFilesInFolders(ctx, client, protoSchemaSet.URI, protoSchemaSet.Ref, protoSchemaSet.Folders)
6384
if protosErr != nil {
64-
return errors.Wrapf(protosErr, "failed to fetch protos from %s/%s", protoSchemaSet.Owner, protoSchemaSet.Repository)
85+
return errors.Wrapf(protosErr, "failed to fetch protos from %s", protoSchemaSet.URI)
6586
}
6687

6788
protoMap := make(map[string]string)
@@ -71,7 +92,7 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch
7192
protoMap[proto.Path] = proto.Content
7293

7394
var subjectStrategy SubjectNamingStrategyFn
74-
if strategy, ok := repoToSubjectNamingStrategy[protoSchemaSet.Owner+"/"+protoSchemaSet.Repository]; ok {
95+
if strategy, ok := repoToSubjectNamingStrategy[protoSchemaSet.URI]; ok {
7596
subjectStrategy = strategy
7697
} else {
7798
subjectStrategy = DefaultSubjectNamingStrategy
@@ -86,7 +107,7 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch
86107

87108
registerErr := registerAllWithTopologicalSortingByTrial(schemaRegistryURL, protoMap, subjects)
88109
if registerErr != nil {
89-
return errors.Wrapf(registerErr, "failed to register protos from %s/%s", protoSchemaSet.Owner, protoSchemaSet.Repository)
110+
return errors.Wrapf(registerErr, "failed to register protos from %s", protoSchemaSet.URI)
90111
}
91112
}
92113

@@ -142,8 +163,22 @@ func extractTopLevelMessageNamesWithRegex(protoSrc string) ([]string, error) {
142163
}
143164

144165
// Fetches .proto files from a GitHub repo optionally scoped to specific folders. It is recommended to use `*github.Client` with auth token to avoid rate limiting.
145-
func fetchProtoFilesInFolders(ctx context.Context, client *github.Client, owner, repository, ref string, folders []string) ([]protoFile, error) {
146-
framework.L.Debug().Msgf("Fetching proto files from %s/%s in folders: %s", owner, repository, strings.Join(folders, ", "))
166+
func fetchProtoFilesInFolders(ctx context.Context, client *github.Client, uri, ref string, folders []string) ([]protoFile, error) {
167+
framework.L.Debug().Msgf("Fetching proto files from %s in folders: %s", uri, strings.Join(folders, ", "))
168+
169+
if strings.HasPrefix(uri, "file://") {
170+
return fetchProtosFromFilesystem(uri, folders)
171+
}
172+
173+
parts := strings.Split(strings.TrimPrefix(uri, "https://"), "/")
174+
175+
return fetchProtosFromGithub(ctx, client, parts[1], parts[2], ref, folders)
176+
}
177+
178+
func fetchProtosFromGithub(ctx context.Context, client *github.Client, owner, repository, ref string, folders []string) ([]protoFile, error) {
179+
if client == nil {
180+
return nil, errors.New("github client cannot be nil")
181+
}
147182

148183
var files []protoFile
149184

@@ -210,7 +245,7 @@ searchLoop:
210245
})
211246
}
212247

213-
framework.L.Debug().Msgf("Fetched %d proto files from %s/%s", len(files), owner, repository)
248+
framework.L.Debug().Msgf("Fetched %d proto files from Github's %s/%s", len(files), owner, repository)
214249

215250
if len(files) == 0 {
216251
return nil, fmt.Errorf("no proto files found in %s/%s in folders %s", owner, repository, strings.Join(folders, ", "))
@@ -219,6 +254,73 @@ searchLoop:
219254
return files, nil
220255
}
221256

257+
func fetchProtosFromFilesystem(uri string, folders []string) ([]protoFile, error) {
258+
var files []protoFile
259+
260+
protoDirPath := strings.TrimPrefix(uri, "file://")
261+
walkErr := filepath.Walk(protoDirPath, func(path string, info os.FileInfo, err error) error {
262+
if err != nil {
263+
return err
264+
}
265+
266+
if info.IsDir() {
267+
return nil
268+
}
269+
270+
var folderFound string
271+
if len(folders) > 0 {
272+
matched := false
273+
for _, folder := range folders {
274+
if strings.HasPrefix(strings.TrimPrefix(strings.TrimPrefix(path, protoDirPath), "/"), folder) {
275+
matched = true
276+
folderFound = folder
277+
break
278+
}
279+
}
280+
281+
if !matched {
282+
return nil
283+
}
284+
}
285+
286+
if !strings.HasSuffix(path, ".proto") {
287+
return nil
288+
}
289+
290+
content, contentErr := os.ReadFile(path)
291+
if contentErr != nil {
292+
return errors.Wrapf(contentErr, "failed to read file at %s", path)
293+
}
294+
295+
// subtract the folder from the path if it was provided, because if it is imported by some other protos
296+
// most probably it will be imported as a relative path, so we need to remove the folder from the path
297+
protoPath := strings.TrimPrefix(strings.TrimPrefix(path, protoDirPath), "/")
298+
if folderFound != "" {
299+
protoPath = strings.TrimPrefix(strings.TrimPrefix(protoPath, folderFound), strings.TrimSuffix(folderFound, "/"))
300+
protoPath = strings.TrimPrefix(protoPath, "/")
301+
}
302+
303+
files = append(files, protoFile{
304+
Name: filepath.Base(path),
305+
Path: protoPath,
306+
Content: string(content),
307+
})
308+
309+
return nil
310+
})
311+
if walkErr != nil {
312+
return nil, errors.Wrapf(walkErr, "failed to walk through directory %s", protoDirPath)
313+
}
314+
315+
framework.L.Debug().Msgf("Fetched %d proto files from local %s", len(files), protoDirPath)
316+
317+
if len(files) == 0 {
318+
return nil, fmt.Errorf("no proto files found in '%s' in folders %s", protoDirPath, strings.Join(folders, ", "))
319+
}
320+
321+
return files, nil
322+
}
323+
222324
func resolveRefSHA(ctx context.Context, client *github.Client, owner, repository, ref string) (string, error) {
223325
if refObj, _, err := client.Git.GetRef(ctx, owner, repository, "refs/tags/"+ref); err == nil {
224326
return refObj.GetObject().GetSHA(), nil

framework/examples/myproject/smoke_chip_ingress_test.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,10 @@ func TestChipIngressSmoke(t *testing.T) {
2626

2727
out, err := chipingressset.New(in.ChipIngress)
2828
require.NoError(t, err, "failed to create chip ingress set")
29+
require.NotEmpty(t, out.ChipIngress.GRPCExternalURL, "GRPCExternalURL is not set")
30+
require.NotEmpty(t, out.RedPanda.SchemaRegistryExternalURL, "SchemaRegistryExternalURL is not set")
2931

30-
t.Run("chainlink-protos can be registered", func(t *testing.T) {
31-
require.NotEmpty(t, out.ChipIngress.GRPCExternalURL, "GRPCExternalURL is not set")
32-
require.NotEmpty(t, out.RedPanda.SchemaRegistryExternalURL, "SchemaRegistryExternalURL is not set")
33-
32+
t.Run("remote chainlink-protos can be registered", func(t *testing.T) {
3433
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
3534
defer cancel()
3635

@@ -48,13 +47,29 @@ func TestChipIngressSmoke(t *testing.T) {
4847

4948
err := chipingressset.DefaultRegisterAndFetchProtos(ctx, client, []chipingressset.ProtoSchemaSet{
5049
{
51-
Owner: "smartcontractkit",
52-
Repository: "chainlink-protos",
50+
URI: "https://github.com/smartcontractkit/chainlink-protos",
5351
Ref: "95decc005a91a1fd2621af9d9f00cb36d8061067",
5452
Folders: []string{"workflows"},
5553
SubjectPrefix: "cre-",
5654
},
5755
}, out.RedPanda.SchemaRegistryExternalURL)
5856
require.NoError(t, err, "failed to register protos")
5957
})
58+
59+
t.Run("local protos can be registered", func(t *testing.T) {
60+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
61+
defer cancel()
62+
63+
createTopicsErr := chipingressset.CreateTopics(ctx, out.RedPanda.KafkaExternalURL, []string{"cre"})
64+
require.NoError(t, createTopicsErr, "failed to create topics")
65+
66+
err := chipingressset.DefaultRegisterAndFetchProtos(ctx, nil, []chipingressset.ProtoSchemaSet{
67+
{
68+
URI: "file://../../../../chainlink-protos", // works also with absolute path
69+
Folders: []string{"workflows"},
70+
SubjectPrefix: "cre-",
71+
},
72+
}, out.RedPanda.SchemaRegistryExternalURL)
73+
require.NoError(t, err, "failed to register protos")
74+
})
6075
}

0 commit comments

Comments
 (0)