@@ -131,6 +131,20 @@ func RegisterAndFetchProtos(ctx context.Context, client *github.Client, protoSch
131131 subjects [proto .Path ] = subjectMessage
132132 }
133133
134+ // Determine which folder prefixes should be stripped based on configuration, i.e. if our proto is located in `workflows/workflows/v1/metadata.proto`
135+ // and folders=["workflows"], then we should strip "workflows/" from both the path and the subject name, so that the path becomes `workflows/v1/metadata.proto`
136+ // and the subject name becomes `workflows.v1.metadata`.
137+ // or in other words, we treat "workflows/" folder as the root folder for all protos in this schema set and strip it from the paths derived from the repository structure.
138+ prefixesToStrip := determineFolderPrefixesToStrip (protoSchemaSet .Folders )
139+
140+ for path := range protoMap {
141+ strippedPath := stripFolderPrefix (path , prefixesToStrip )
142+ protoMap [strippedPath ] = protoMap [path ]
143+ subjects [strippedPath ] = subjects [path ]
144+ delete (protoMap , path )
145+ delete (subjects , path )
146+ }
147+
134148 registerErr := registerAllWithTopologicalSorting (schemaRegistryURL , protoMap , subjects , protoSchemaSet .Folders )
135149 if registerErr != nil {
136150 return errors .Wrapf (registerErr , "failed to register protos from %s" , protoSchemaSet .URI )
@@ -498,21 +512,13 @@ func registerAllWithTopologicalSorting(
498512 return fmt .Errorf ("no subject found for %s" , path )
499513 }
500514
501- // Determine which folder prefixes should be stripped based on configuration
502- prefixesToStrip := determineFolderPrefixesToStrip (folders )
503-
504515 // Build references only for files that have dependencies
505516 var fileRefs []map [string ]any
506517 if deps , hasDeps := dependencies [path ]; hasDeps && len (deps ) > 0 {
507518 for _ , dep := range deps {
508519 if depSubject , depExists := subjectMap [dep ]; depExists {
509- // The schema registry expects import names without the configured folder prefixes
510- // So if folders=["workflows"] and the import is "workflows/v1/metadata.proto",
511- // the name should be "v1/metadata.proto"
512- importName := stripFolderPrefix (dep , prefixesToStrip )
513-
514520 fileRefs = append (fileRefs , map [string ]any {
515- "name" : importName ,
521+ "name" : dep ,
516522 "subject" : depSubject ,
517523 "version" : 1 ,
518524 })
@@ -528,11 +534,7 @@ func registerAllWithTopologicalSorting(
528534 continue
529535 }
530536
531- // The schema registry expects import statements without the configured folder prefixes
532- // Transform the schema content to remove these prefixes from import statements
533- modifiedSchema := transformSchemaContent (schema .Source , prefixesToStrip )
534-
535- _ , registerErr := registerSingleProto (schemaRegistryURL , subject , modifiedSchema , fileRefs )
537+ _ , registerErr := registerSingleProto (schemaRegistryURL , subject , schema .Source , fileRefs )
536538 if registerErr != nil {
537539 return errors .Wrapf (registerErr , "failed to register %s as %s" , path , subject )
538540 }
@@ -551,24 +553,42 @@ func registerAllWithTopologicalSorting(
551553func checkSchemaExists (registryURL , subject string ) (int , bool ) {
552554 url := fmt .Sprintf ("%s/subjects/%s/versions" , registryURL , subject )
553555
554- resp , err := http .Get (url )
555- if err != nil {
556- framework .L .Debug ().Msgf ("Failed to check schema existence for %s: %v" , subject , err )
556+ maxAttempts := uint (10 )
557+ var resp * http.Response
558+ existErr := retry .Do (func () error {
559+ var err error
560+ resp , err = http .Get (url )
561+ if err != nil {
562+ framework .L .Debug ().Msgf ("Failed to check schema existence for %s: %v" , subject , err )
563+ return err
564+ }
565+
566+ if resp .StatusCode == 200 {
567+ return nil
568+ }
569+
570+ return nil
571+ }, retry .Attempts (10 ), retry .Delay (100 * time .Millisecond ), retry .DelayType (retry .BackOffDelay ), retry .OnRetry (func (n uint , err error ) {
572+ framework .L .Debug ().Str ("attempt/max" , fmt .Sprintf ("%d/%d" , n , maxAttempts )).Msgf ("Retrying to check schema existence for %s: %v" , subject , err )
573+ }), retry .RetryIf (func (err error ) bool {
574+ return strings .Contains (err .Error (), "connection reset by peer" )
575+ }))
576+
577+ if existErr != nil {
557578 return 0 , false
558579 }
580+
559581 defer resp .Body .Close ()
560582
561- if resp .StatusCode == 200 {
562- var versions []struct {
563- ID int `json:"id"`
564- }
565- if err := json .NewDecoder (resp .Body ).Decode (& versions ); err != nil {
566- framework .L .Debug ().Msgf ("Failed to decode versions for %s: %v" , subject , err )
567- return 0 , false
568- }
569- if len (versions ) > 0 {
570- return versions [len (versions )- 1 ].ID , true
571- }
583+ var versions []struct {
584+ ID int `json:"id"`
585+ }
586+ if err := json .NewDecoder (resp .Body ).Decode (& versions ); err != nil {
587+ framework .L .Debug ().Msgf ("Failed to decode versions for %s: %v" , subject , err )
588+ return 0 , false
589+ }
590+ if len (versions ) > 0 {
591+ return versions [len (versions )- 1 ].ID , true
572592 }
573593
574594 return 0 , false
@@ -597,7 +617,7 @@ func registerSingleProto(
597617 maxAttempts := uint (10 )
598618
599619 var resp * http.Response
600- retry .Do (func () error {
620+ registerErr := retry .Do (func () error {
601621 var respErr error
602622 resp , respErr = http .Post (url , "application/vnd.schemaregistry.v1+json" , bytes .NewReader (payload ))
603623 if respErr != nil {
@@ -620,6 +640,10 @@ func registerSingleProto(
620640 // and will be handled by higher-level code
621641 return strings .Contains (err .Error (), "connection reset by peer" )
622642 }))
643+ if registerErr != nil {
644+ return 0 , errors .Wrapf (registerErr , "failed to register schema for subject %s" , subject )
645+ }
646+
623647 defer resp .Body .Close ()
624648
625649 var result struct {
@@ -649,8 +673,8 @@ func determineFolderPrefixesToStrip(folders []string) []string {
649673// stripFolderPrefix removes any configured folder prefixes from the given path
650674func stripFolderPrefix (path string , prefixes []string ) string {
651675 for _ , prefix := range prefixes {
652- if strings .HasPrefix (path , prefix ) {
653- return strings . TrimPrefix ( path , prefix )
676+ if after , ok := strings .CutPrefix (path , prefix ); ok {
677+ return after
654678 }
655679 }
656680 return path
0 commit comments