Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ use_repo(
"com_github_gorilla_mux",
"com_github_grpc_ecosystem_go_grpc_middleware",
"com_github_grpc_ecosystem_go_grpc_prometheus",
"com_github_jhump_protoreflect_v2",
"com_github_jmespath_go_jmespath",
"com_github_klauspost_compress",
"com_github_lazybeaver_xorshift",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/grpc-ecosystem/go-grpc-middleware v1.4.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/jhump/protoreflect/v2 v2.0.0-beta.2
github.com/jmespath/go-jmespath v0.4.0
github.com/klauspost/compress v1.18.1
github.com/lazybeaver/xorshift v0.0.0-20170702203709-ce511d4823dd
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 h1:Ovs26xHkKqVztRpIrF/92Bcuy
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3 h1:NmZ1PKzSTQbuGHw9DGPFomqkkLWMC+vZCkfs+FHv1Vg=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.3/go.mod h1:zQrxl1YP88HQlA6i9c63DSVPFklWpGX4OWAc9bFuaH4=
github.com/jhump/protoreflect/v2 v2.0.0-beta.2 h1:qZU+rEZUOYTz1Bnhi3xbwn+VxdXkLVeEpAeZzVXLY88=
github.com/jhump/protoreflect/v2 v2.0.0-beta.2/go.mod h1:4tnOYkB/mq7QTyS3YKtVtNrJv4Psqout8HA1U+hZtgM=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8=
Expand Down
1 change: 1 addition & 0 deletions internal/mock/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -260,6 +260,7 @@ gomock(
"ClientConnInterface",
"ClientStream",
"ServerStream",
"ServerTransportStream",
"StreamHandler",
"Streamer",
"UnaryHandler",
Expand Down
12 changes: 12 additions & 0 deletions pkg/grpc/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ go_library(
"client_factory.go",
"deduplicating_client_factory.go",
"deny_authenticator.go",
"forwarding_stream_handler.go",
"jmespath_extractor.go",
"lazy_client_dialer.go",
"metadata_adding_interceptor.go",
Expand All @@ -26,9 +27,12 @@ go_library(
"peer_transport_credentials_linux.go",
"proto_trace_attributes_extractor.go",
"proxy_dialer.go",
"reflection_relay.go",
"request_headers_authenticator.go",
"request_metadata_tracing_interceptor.go",
"routing_stream_handler.go",
"server.go",
"server_transport_stream_context.go",
"tls_client_certificate_authenticator.go",
],
importpath = "github.com/buildbarn/bb-storage/pkg/grpc",
Expand All @@ -49,6 +53,8 @@ go_library(
"@bazel_remote_apis//build/bazel/remote/execution/v2:remote_execution_go_proto",
"@com_github_grpc_ecosystem_go_grpc_middleware//:go-grpc-middleware",
"@com_github_grpc_ecosystem_go_grpc_prometheus//:go-grpc-prometheus",
"@com_github_jhump_protoreflect_v2//grpcreflect",
"@com_github_jhump_protoreflect_v2//protoresolve",
"@io_opentelemetry_go_contrib_instrumentation_google_golang_org_grpc_otelgrpc//:otelgrpc",
"@io_opentelemetry_go_otel//attribute",
"@io_opentelemetry_go_otel_trace//:trace",
Expand All @@ -63,11 +69,15 @@ go_library(
"@org_golang_google_grpc//metadata",
"@org_golang_google_grpc//peer",
"@org_golang_google_grpc//reflection",
"@org_golang_google_grpc//reflection/grpc_reflection_v1",
"@org_golang_google_grpc//reflection/grpc_reflection_v1alpha",
"@org_golang_google_grpc//status",
"@org_golang_google_grpc_security_advancedtls//:advancedtls",
"@org_golang_google_protobuf//encoding/prototext",
"@org_golang_google_protobuf//proto",
"@org_golang_google_protobuf//reflect/protoreflect",
"@org_golang_google_protobuf//types/known/emptypb",
"@org_golang_x_sync//errgroup",
"@org_golang_x_sync//semaphore",
] + select({
"@rules_go//go/platform:android": [
Expand Down Expand Up @@ -98,6 +108,7 @@ go_test(
"authenticating_interceptor_test.go",
"deduplicating_client_factory_test.go",
"deny_authenticator_test.go",
"forwarding_stream_handler_test.go",
"jmespath_extractor_test.go",
"lazy_client_dialer_test.go",
"metadata_adding_interceptor_test.go",
Expand All @@ -107,6 +118,7 @@ go_test(
"proto_trace_attributes_extractor_test.go",
"request_headers_authenticator_test.go",
"request_metadata_tracing_interceptor_test.go",
"routing_stream_handler_test.go",
] + select({
"@rules_go//go/platform:android": [
"peer_transport_credentials_test.go",
Expand Down
91 changes: 91 additions & 0 deletions pkg/grpc/forwarding_stream_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package grpc

import (
"io"

"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/emptypb"
)

// NewForwardingStreamHandler creates a grpc.StreamHandler that forwards gRPC
// calls to a grpc.ClientConnInterface backend.
func NewForwardingStreamHandler(client grpc.ClientConnInterface) grpc.StreamHandler {
forwarder := &forwardingStreamHandler{
backend: client,
}
return forwarder.HandleStream
}

type forwardingStreamHandler struct {
backend grpc.ClientConnInterface
}

// HandleStream creates a new stream to the backend. Requests from
// incomingStream are forwarded to the backend stream and responses from the
// backend stream are sent back in the incomingStream.
func (s *forwardingStreamHandler) HandleStream(srv any, incomingStream grpc.ServerStream) error {
method := MustStreamMethodFromContext(incomingStream.Context())
desc := grpc.StreamDesc{
// According to grpc.StreamDesc documentation, StreamName and Handler
// are only used when registering handlers on a server.
StreamName: "",
Handler: nil,
// Streaming behaviour is wanted, single message is treated the same on
// transport level, the application just closes the stream after the
// first message.
ServerStreams: true,
ClientStreams: true,
}
group, groupCtx := errgroup.WithContext(incomingStream.Context())
group.Go(func() error {
// groupCtx is guaranteed to be canceled before returning from this method, so outgoingStream will not leak resources.
outgoingStream, err := s.backend.NewStream(groupCtx, &desc, method)
if err != nil {
return err
}
// Avoid group.Go because incomingStream.RecvMsg might block returning
// an error from the outgoingStream and getting the context for
// incomingStream canceled.
go func() {
for {
msg := &emptypb.Empty{}
if err := incomingStream.RecvMsg(msg); err != nil {
if err == io.EOF {
// Let's continue to receive on outgoingStream, so don't
// cancel grouptCtx.
outgoingStream.CloseSend()
return
}
// Cancel groupCtx immediately.
group.Go(func() error { return err })
return
}
if err := outgoingStream.SendMsg(msg); err != nil {
if err == io.EOF {
// The error will be returned by outgoingStream.RecvMsg(),
// no need to cancel groupCtx now.
return
}
// Cancel groupCtx immediately.
group.Go(func() error { return err })
return
}
}
}()

for {
msg := &emptypb.Empty{}
if err := outgoingStream.RecvMsg(msg); err != nil {
if err == io.EOF {
return nil
}
return err
}
if err := incomingStream.SendMsg(msg); err != nil {
return err
}
}
})
return group.Wait()
}
Loading