Skip to content

Commit a858a6e

Browse files
committed
Add support for arbitrary headers
1 parent 279aba3 commit a858a6e

File tree

2 files changed

+43
-6
lines changed

2 files changed

+43
-6
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,9 @@ Flags:
166166
--external-label=KEY=VALUE;...
167167
Label(s) to attach to all profiles in
168168
scraper-only mode.
169+
--grpc-headers=KEY=VALUE;...
170+
Additional gRPC headers to send with each
171+
request to the remote store (key=value pairs).
169172
```
170173
<!-- prettier-ignore-end -->
171174

pkg/parca/parca.go

Lines changed: 40 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ import (
5757
"google.golang.org/grpc"
5858
"google.golang.org/grpc/credentials"
5959
"google.golang.org/grpc/credentials/insecure"
60+
"google.golang.org/grpc/metadata"
6061
"gopkg.in/yaml.v3"
6162

6263
debuginfopb "github.com/parca-dev/parca/gen/proto/go/parca/debuginfo/v1alpha1"
@@ -125,6 +126,7 @@ type Flags struct {
125126
Insecure bool `kong:"help='Send gRPC requests via plaintext instead of TLS.'"`
126127
InsecureSkipVerify bool `kong:"help='Skip TLS certificate verification.'"`
127128
ExternalLabel map[string]string `kong:"help='Label(s) to attach to all profiles in scraper-only mode.'"`
129+
GRPCHeaders map[string]string `kong:"help='Additional gRPC headers to send with each request to the remote store (key=value pairs).'"`
128130

129131
Hidden FlagsHidden `embed:"" prefix:""`
130132
}
@@ -661,6 +663,28 @@ func Run(ctx context.Context, logger log.Logger, reg *prometheus.Registry, flags
661663
return nil
662664
}
663665

666+
// customHeadersUnaryInterceptor adds custom headers to all unary RPC calls.
667+
func customHeadersUnaryInterceptor(headers map[string]string) grpc.UnaryClientInterceptor {
668+
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
669+
// Add headers to outgoing context
670+
for key, value := range headers {
671+
ctx = metadata.AppendToOutgoingContext(ctx, key, value)
672+
}
673+
return invoker(ctx, method, req, reply, cc, opts...)
674+
}
675+
}
676+
677+
// customHeadersStreamInterceptor adds custom headers to all streaming RPC calls.
678+
func customHeadersStreamInterceptor(headers map[string]string) grpc.StreamClientInterceptor {
679+
return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
680+
// Add headers to outgoing context
681+
for key, value := range headers {
682+
ctx = metadata.AppendToOutgoingContext(ctx, key, value)
683+
}
684+
return streamer(ctx, desc, cc, method, opts...)
685+
}
686+
}
687+
664688
func runForwarder(
665689
ctx context.Context,
666690
logger log.Logger,
@@ -686,17 +710,27 @@ func runForwarder(
686710

687711
propagators := propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{})
688712

713+
// Build interceptor chains
714+
unaryInterceptors := []grpc.UnaryClientInterceptor{
715+
metrics.UnaryClientInterceptor(),
716+
}
717+
streamInterceptors := []grpc.StreamClientInterceptor{
718+
metrics.StreamClientInterceptor(),
719+
}
720+
721+
// Add custom headers interceptor if headers are configured
722+
if len(flags.GRPCHeaders) > 0 {
723+
unaryInterceptors = append([]grpc.UnaryClientInterceptor{customHeadersUnaryInterceptor(flags.GRPCHeaders)}, unaryInterceptors...)
724+
streamInterceptors = append([]grpc.StreamClientInterceptor{customHeadersStreamInterceptor(flags.GRPCHeaders)}, streamInterceptors...)
725+
}
726+
689727
opts := []grpc.DialOption{
690728
grpc.WithStatsHandler(otelgrpc.NewServerHandler(
691729
otelgrpc.WithTracerProvider(tracer),
692730
otelgrpc.WithPropagators(propagators),
693731
)),
694-
grpc.WithChainUnaryInterceptor(
695-
metrics.UnaryClientInterceptor(),
696-
),
697-
grpc.WithChainStreamInterceptor(
698-
metrics.StreamClientInterceptor(),
699-
),
732+
grpc.WithChainUnaryInterceptor(unaryInterceptors...),
733+
grpc.WithChainStreamInterceptor(streamInterceptors...),
700734
}
701735
if flags.Insecure {
702736
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))

0 commit comments

Comments
 (0)