@@ -17,6 +17,8 @@ import (
1717 "github.com/testcontainers/testcontainers-go/modules/compose"
1818 "github.com/testcontainers/testcontainers-go/wait"
1919
20+ "github.com/smartcontractkit/chainlink-common/pkg/chipingress"
21+ "github.com/smartcontractkit/chainlink-common/pkg/chipingress/pb"
2022 "github.com/smartcontractkit/chainlink-testing-framework/framework"
2123)
2224
@@ -236,8 +238,8 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
236238
237239 output := & Output {
238240 ChipIngress : & ChipIngressOutput {
239- GRPCInternalURL : fmt .Sprintf ("http:// %s:%s" , DEFAULT_CHIP_INGRESS_SERVICE_NAME , DEFAULT_CHIP_INGRESS_GRPC_PORT ),
240- GRPCExternalURL : fmt .Sprintf ("http:// %s:%s" , chipIngressExternalHost , DEFAULT_CHIP_INGRESS_GRPC_PORT ),
241+ GRPCInternalURL : fmt .Sprintf ("%s:%s" , DEFAULT_CHIP_INGRESS_SERVICE_NAME , DEFAULT_CHIP_INGRESS_GRPC_PORT ),
242+ GRPCExternalURL : fmt .Sprintf ("%s:%s" , chipIngressExternalHost , DEFAULT_CHIP_INGRESS_GRPC_PORT ),
241243 },
242244 ChipConfig : & ChipConfigOutput {
243245 GRPCInternalURL : fmt .Sprintf ("%s:%s" , DEFAULT_CHIP_CONFIG_SERVICE_NAME , DEFAULT_CHIP_CONFIG_INTERNAL_PORT ),
@@ -258,7 +260,12 @@ func NewWithContext(ctx context.Context, in *Input) (*Output, error) {
258260 in .UseCache = true
259261 framework .L .Info ().Msg ("Chip Ingress stack started" )
260262
261- return output , checkSchemaRegistryReadiness (ctx , 2 * time .Minute , 300 * time .Millisecond , output .RedPanda .SchemaRegistryExternalURL , 3 )
263+ rpReadyErr := checkSchemaRegistryReadiness (ctx , 2 * time .Minute , 300 * time .Millisecond , output .RedPanda .SchemaRegistryExternalURL , 3 )
264+ if rpReadyErr != nil {
265+ return nil , errors .Wrap (rpReadyErr , "redpanda not ready" )
266+ }
267+
268+ return output , checkChipIngressReadiness (ctx , 2 * time .Minute , 300 * time .Millisecond , 3 , output .ChipIngress )
262269}
263270
264271func composeFilePath (rawFilePath string ) (string , error ) {
@@ -323,9 +330,11 @@ func connectNetwork(connCtx context.Context, timeout time.Duration, dockerClient
323330// checkSchemaRegistryReadiness verifies that the Schema Registry answers 2xx on GET /subjects
324331// for minSuccessCount *consecutive* attempts, polling every `interval`, with an overall `timeout`.
325332func checkSchemaRegistryReadiness (ctx context.Context , timeout , interval time.Duration , registryURL string , minSuccessCount int ) error {
326- if minSuccessCount < 1 {
327- minSuccessCount = 1
328- }
333+ framework .L .Info ().Msg ("Starting Schema Registry readiness check" )
334+ defer func () {
335+ framework .L .Info ().Msg ("Schema Registry readiness check finished" )
336+ }()
337+
329338 u , uErr := url .Parse (registryURL )
330339 if uErr != nil {
331340 return fmt .Errorf ("parse registry URL: %w" , uErr )
@@ -356,10 +365,7 @@ func checkSchemaRegistryReadiness(ctx context.Context, timeout, interval time.Du
356365 t := time .NewTicker (interval )
357366 defer t .Stop ()
358367
359- consecutive := 0
360- var lastErr error
361-
362- for {
368+ return readinessCheck (ctx , timeout , interval , minSuccessCount , func (ctx context.Context ) error {
363369 req , _ := http .NewRequestWithContext (ctx , http .MethodGet , u .String (), nil )
364370 // small belt-and-suspenders to ensure no reuse even if transport changes
365371 req .Close = true
@@ -372,21 +378,99 @@ func checkSchemaRegistryReadiness(ctx context.Context, timeout, interval time.Du
372378 }
373379
374380 if err == nil && resp .StatusCode / 100 == 2 {
375- framework .L .Debug ().Msgf ("schema registry ready check succeeded with status %d (%d/%d)" , resp .StatusCode , consecutive + 1 , minSuccessCount )
381+ return nil
382+ } else {
383+ if err != nil {
384+ return fmt .Errorf ("GET /subjects failed: %w" , err )
385+ } else {
386+ return fmt .Errorf ("GET /subjects status %d" , resp .StatusCode )
387+ }
388+ }
389+ })
390+ }
391+
392+ // checkChipIngressReadiness verifies that the Chip Ingress can register a schema
393+ // for minSuccessCount *consecutive* attempts, polling every `interval`, with an overall `timeout`.
394+ func checkChipIngressReadiness (ctx context.Context , timeout , interval time.Duration , minSuccessCount int , chipIngressOutput * ChipIngressOutput ) error {
395+ client , cErr := chipingress .NewClient (chipIngressOutput .GRPCExternalURL )
396+ if cErr != nil {
397+ return fmt .Errorf ("failed to create Chip client: %w" , cErr )
398+ }
399+
400+ framework .L .Info ().Msg ("Starting Chip Ingress readiness check" )
401+ defer func () {
402+ framework .L .Info ().Msg ("Chip Ingress readiness check finished" )
403+ }()
404+
405+ ctx , cancel := context .WithTimeout (ctx , timeout )
406+ defer cancel ()
407+
408+ t := time .NewTicker (interval )
409+ defer t .Stop ()
410+
411+ // raw schema from demo_client_payload.proto with one new field
412+ schema := `
413+ syntax = "proto3";
414+
415+ option go_package = "github.com/smartcontractkit/atlas/chip-ingress/cmd/demo_client/pb";
416+
417+ package pb;
418+
419+ // Used for testing
420+ message DemoClientPayload {
421+ string id=1;
422+ string domain=2;
423+ string entity=3;
424+ int64 batch_num=4;
425+ int64 message_num=5;
426+ int64 batch_position=6;
427+ optional string new_field=7; // New field added
428+ }
429+ `
430+
431+ subject := "platform-demo"
432+ schemaObj := & pb.Schema {
433+ Subject : subject ,
434+ Format : pb .SchemaType_PROTOBUF ,
435+ Schema : schema ,
436+ }
437+
438+ request := & pb.RegisterSchemaRequest {
439+ Schemas : []* pb.Schema {schemaObj },
440+ }
441+
442+ return readinessCheck (ctx , timeout , interval , minSuccessCount , func (ctx context.Context ) error {
443+ _ , err := client .RegisterSchema (ctx , request )
444+ return err
445+ })
446+ }
447+
448+ func readinessCheck (ctx context.Context , timeout , interval time.Duration , minSuccessCount int , checkFunc func (context.Context ) error ) error {
449+ if minSuccessCount < 1 {
450+ minSuccessCount = 1
451+ }
452+
453+ ctx , cancel := context .WithTimeout (ctx , timeout )
454+ defer cancel ()
455+
456+ t := time .NewTicker (interval )
457+ defer t .Stop ()
458+
459+ consecutive := 0
460+ var lastErr error
461+
462+ for {
463+ if err := checkFunc (ctx ); err == nil {
464+ framework .L .Debug ().Msgf ("readiness check succeeded (%d/%d)" , consecutive + 1 , minSuccessCount )
376465 consecutive ++
377466 if consecutive >= minSuccessCount {
378- framework .L .Debug ().Msg ("schema registry is ready" )
467+ framework .L .Debug ().Msg ("service is ready" )
379468 return nil
380469 }
381470 } else {
382471 consecutive = 0
383- if err != nil {
384- framework .L .Debug ().Msgf ("schema registry ready check failed with error %v (need %d/%d consecutive successes)" , err , consecutive , minSuccessCount )
385- lastErr = fmt .Errorf ("GET /subjects failed: %w" , err )
386- } else {
387- framework .L .Debug ().Msgf ("schema registry ready check failed with error %v and status code %d (need %d/%d consecutive successes)" , err , resp .StatusCode , consecutive , minSuccessCount )
388- lastErr = fmt .Errorf ("GET /subjects status %d" , resp .StatusCode )
389- }
472+ framework .L .Debug ().Msgf ("readiness check failed with error %v (need %d/%d consecutive successes)" , err , consecutive , minSuccessCount )
473+ lastErr = fmt .Errorf ("readiness check failed: %w" , err )
390474 }
391475
392476 select {
@@ -397,7 +481,7 @@ func checkSchemaRegistryReadiness(ctx context.Context, timeout, interval time.Du
397481 return fmt .Errorf ("schema registry not ready after %s; needed %d consecutive successes (got %d): %w" ,
398482 timeout , minSuccessCount , consecutive , lastErr )
399483 case <- t .C :
400- framework .L .Debug ().Msg ("schema registry not ready yet, retrying..." )
484+ framework .L .Debug ().Msg ("Schema registry not ready yet, retrying..." )
401485 // poll again
402486 }
403487 }
0 commit comments