@@ -57,6 +57,7 @@ import (
5757 "google.golang.org/grpc"
5858 "google.golang.org/grpc/credentials"
5959 "google.golang.org/grpc/credentials/insecure"
60+ "google.golang.org/grpc/metadata"
6061 "gopkg.in/yaml.v3"
6162
6263 debuginfopb "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1"
@@ -125,6 +126,7 @@ type Flags struct {
125126 Insecure bool `kong:"help='Send gRPC requests via plaintext instead of TLS.'"`
126127 InsecureSkipVerify bool `kong:"help='Skip TLS certificate verification.'"`
127128 ExternalLabel map [string ]string `kong:"help='Label(s) to attach to all profiles in scraper-only mode.'"`
129+ GRPCHeaders map [string ]string `kong:"help='Additional gRPC headers to send with each request to the remote store (key=value pairs).'"`
128130
129131 Hidden FlagsHidden `embed:"" prefix:""`
130132}
@@ -661,6 +663,28 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
661663 return nil
662664}
663665
666+ // customHeadersUnaryInterceptor adds custom headers to all unary RPC calls.
667+ func customHeadersUnaryInterceptor (headers map [string ]string ) grpc.UnaryClientInterceptor {
668+ return func (ctx context.Context , method string , req , reply interface {}, cc * grpc.ClientConn , invoker grpc.UnaryInvoker , opts ... grpc.CallOption ) error {
669+ // Add headers to outgoing context
670+ for key , value := range headers {
671+ ctx = metadata .AppendToOutgoingContext (ctx , key , value )
672+ }
673+ return invoker (ctx , method , req , reply , cc , opts ... )
674+ }
675+ }
676+
677+ // customHeadersStreamInterceptor adds custom headers to all streaming RPC calls.
678+ func customHeadersStreamInterceptor (headers map [string ]string ) grpc.StreamClientInterceptor {
679+ return func (ctx context.Context , desc * grpc.StreamDesc , cc * grpc.ClientConn , method string , streamer grpc.Streamer , opts ... grpc.CallOption ) (grpc.ClientStream , error ) {
680+ // Add headers to outgoing context
681+ for key , value := range headers {
682+ ctx = metadata .AppendToOutgoingContext (ctx , key , value )
683+ }
684+ return streamer (ctx , desc , cc , method , opts ... )
685+ }
686+ }
687+
664688func runForwarder (
665689 ctx context.Context ,
666690 logger log.Logger ,
@@ -686,17 +710,27 @@ func runForwarder(
686710
687711 propagators := propagation .NewCompositeTextMapPropagator (propagation.TraceContext {}, propagation.Baggage {})
688712
713+ // Build interceptor chains
714+ unaryInterceptors := []grpc.UnaryClientInterceptor {
715+ metrics .UnaryClientInterceptor (),
716+ }
717+ streamInterceptors := []grpc.StreamClientInterceptor {
718+ metrics .StreamClientInterceptor (),
719+ }
720+
721+ // Add custom headers interceptor if headers are configured
722+ if len (flags .GRPCHeaders ) > 0 {
723+ unaryInterceptors = append ([]grpc.UnaryClientInterceptor {customHeadersUnaryInterceptor (flags .GRPCHeaders )}, unaryInterceptors ... )
724+ streamInterceptors = append ([]grpc.StreamClientInterceptor {customHeadersStreamInterceptor (flags .GRPCHeaders )}, streamInterceptors ... )
725+ }
726+
689727 opts := []grpc.DialOption {
690728 grpc .WithStatsHandler (otelgrpc .NewServerHandler (
691729 otelgrpc .WithTracerProvider (tracer ),
692730 otelgrpc .WithPropagators (propagators ),
693731 )),
694- grpc .WithChainUnaryInterceptor (
695- metrics .UnaryClientInterceptor (),
696- ),
697- grpc .WithChainStreamInterceptor (
698- metrics .StreamClientInterceptor (),
699- ),
732+ grpc .WithChainUnaryInterceptor (unaryInterceptors ... ),
733+ grpc .WithChainStreamInterceptor (streamInterceptors ... ),
700734 }
701735 if flags .Insecure {
702736 opts = append (opts , grpc .WithTransportCredentials (insecure .NewCredentials ()))
0 commit comments