diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index fd5f05828fd0..386801e00123 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -1480,17 +1480,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { contentTypeErr = "malformed header: missing HTTP content-type" grpcMessage string recvCompress string - httpStatusCode *int httpStatusErr string - rawStatusCode = codes.Unknown + // the code from the grpc-status header, if present + grpcStatusCode = codes.Internal // headerError is set if an error is encountered while parsing the headers headerError string + httpStatus string ) - if initialHeader { - httpStatusErr = "malformed header: missing HTTP status" - } - for _, hf := range frame.Fields { switch hf.Name { case "content-type": @@ -1510,69 +1507,71 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } - rawStatusCode = codes.Code(uint32(code)) + grpcStatusCode = codes.Code(uint32(code)) case "grpc-message": grpcMessage = decodeGrpcMessage(hf.Value) case ":status": - c, err := strconv.ParseInt(hf.Value, 10, 32) + httpStatus = hf.Value + default: + if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { + break + } + v, err := decodeMetadataHeader(hf.Name, hf.Value) if err != nil { - se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err)) + headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err) + logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) + break + } + mdata[hf.Name] = append(mdata[hf.Name], v) + } + } + + // If a non-gRPC response is received, then evaluate the HTTP status to + // process the response and close the stream. + // In case http status doesn't provide any error information (status : 200), + // then evalute response code to be Unknown. + if !isGRPC { + var grpcErrorCode = codes.Internal + if httpStatus == "" { + httpStatusErr = "malformed header: missing HTTP status" + } else { + // Parse the status codes (e.g. "200", 404"). + statusCode, err := strconv.Atoi(httpStatus) + if err != nil { + se := status.New(grpcErrorCode, fmt.Sprintf("transport: malformed http-status: %v", err)) t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } - statusCode := int(c) if statusCode >= 100 && statusCode < 200 { if endStream { se := status.New(codes.Internal, fmt.Sprintf( "protocol error: informational header with status code %d must not have END_STREAM set", statusCode)) t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) } + // In case of informational headers, return. return } - httpStatusCode = &statusCode - if statusCode == 200 { - httpStatusErr = "" - break - } - httpStatusErr = fmt.Sprintf( "unexpected HTTP status code received from server: %d (%s)", statusCode, http.StatusText(statusCode), ) - default: - if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) { - break - } - v, err := decodeMetadataHeader(hf.Name, hf.Value) - if err != nil { - headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err) - logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err) - break - } - mdata[hf.Name] = append(mdata[hf.Name], v) - } - } - - if !isGRPC || httpStatusErr != "" { - var code = codes.Internal // when header does not include HTTP status, return INTERNAL - - if httpStatusCode != nil { var ok bool - code, ok = HTTPStatusConvTab[*httpStatusCode] + grpcErrorCode, ok = HTTPStatusConvTab[statusCode] if !ok { - code = codes.Unknown + grpcErrorCode = codes.Unknown } } var errs []string if httpStatusErr != "" { errs = append(errs, httpStatusErr) } + if contentTypeErr != "" { errs = append(errs, contentTypeErr) } - // Verify the HTTP response is a 200. - se := status.New(code, strings.Join(errs, "; ")) + + se := status.New(grpcErrorCode, strings.Join(errs, "; ")) t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream) return } @@ -1626,7 +1625,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) { return } - status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader]) + status := istatus.NewWithProto(grpcStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader]) // If client received END_STREAM from server while stream was still active, // send RST_STREAM. diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index a85a4673c3d7..4e46e267afe8 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -2586,51 +2586,48 @@ func (s) TestClientHandshakeInfoDialer(t *testing.T) { } } -func (s) TestClientDecodeHeaderStatusErr(t *testing.T) { - testStream := func() *ClientStream { - return &ClientStream{ - Stream: Stream{ - buf: recvBuffer{ - c: make(chan recvMsg), - mu: sync.Mutex{}, - }, +func newTestClientStream() *ClientStream { + return &ClientStream{ + Stream: Stream{ + buf: recvBuffer{ + c: make(chan recvMsg), }, - done: make(chan struct{}), - headerChan: make(chan struct{}), - } + }, + done: make(chan struct{}), + headerChan: make(chan struct{}), } +} - testClient := func(ts *ClientStream) *http2Client { - return &http2Client{ - mu: sync.Mutex{}, - activeStreams: map[uint32]*ClientStream{ - 0: ts, - }, - controlBuf: newControlBuffer(make(<-chan struct{})), - } +func newTestHTTP2Client(cs *ClientStream) *http2Client { + return &http2Client{ + activeStreams: map[uint32]*ClientStream{ + 1: cs, + }, + controlBuf: newControlBuffer(make(<-chan struct{})), } +} - for _, test := range []struct { - name string - // input +// TestClientDecodeHeader validates the handling of initial header frames that +// do not signal the end of a stream. For all headers that indicate grpc content +// type, http status will be ignored. +func (s) TestClientDecodeHeader(t *testing.T) { + tests := []struct { + name string metaHeaderFrame *http2.MetaHeadersFrame - // output - wantStatus *status.Status + wantStatus *status.Status }{ { - name: "valid header", + name: "valid_header", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ {Name: "content-type", Value: "application/grpc"}, - {Name: "grpc-status", Value: "0"}, {Name: ":status", Value: "200"}, }, }, - // no error wantStatus: status.New(codes.OK, ""), }, { - name: "missing content-type header", + name: "missing_content_type_header", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ {Name: "grpc-status", Value: "0"}, @@ -2639,11 +2636,11 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) { }, wantStatus: status.New( codes.Unknown, - "malformed header: missing HTTP content-type", + "unexpected HTTP status code received from server: 200 (OK); malformed header: missing HTTP content-type", ), }, { - name: "invalid grpc status header field", + name: "invalid_grpc_status", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ {Name: "content-type", Value: "application/grpc"}, @@ -2657,7 +2654,7 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) { ), }, { - name: "invalid http content type", + name: "invalid_content_type", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ {Name: "content-type", Value: "application/json"}, @@ -2669,22 +2666,33 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) { ), }, { - name: "http fallback and invalid http status", + name: "invalid_content_type_with_http_status_504", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: "content-type", Value: "application/json"}, + {Name: ":status", Value: "504"}, + }, + }, + wantStatus: status.New( + codes.Unavailable, + "unexpected HTTP status code received from server: 504 (Gateway Timeout); transport: received unexpected content-type \"application/json\"", + ), + }, + { + name: "http_fallback_and_invalid_http_status", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ - // No content type provided then fallback into handling http error. {Name: ":status", Value: "xxxx"}, }, }, wantStatus: status.New( codes.Internal, - "transport: malformed http-status: strconv.ParseInt: parsing \"xxxx\": invalid syntax", + "transport: malformed http-status: strconv.Atoi: parsing \"xxxx\": invalid syntax", ), }, { - name: "http2 frame size exceeds", + name: "http2_frame_size_exceeds", metaHeaderFrame: &http2.MetaHeadersFrame{ - Fields: nil, Truncated: true, }, wantStatus: status.New( @@ -2693,68 +2701,152 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) { ), }, { - name: "bad status in grpc mode", + name: "missing_http_status_and_grpc_status", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ {Name: "content-type", Value: "application/grpc"}, - {Name: "grpc-status", Value: "0"}, - {Name: ":status", Value: "504"}, }, }, - wantStatus: status.New( - codes.Unavailable, - "unexpected HTTP status code received from server: 504 (Gateway Timeout)", - ), + wantStatus: status.New(codes.OK, ""), }, { - name: "missing http status", + name: "ignore_http_status_for_grpc", metaHeaderFrame: &http2.MetaHeadersFrame{ Fields: []hpack.HeaderField{ {Name: "content-type", Value: "application/grpc"}, + {Name: ":status", Value: "504"}, }, }, - wantStatus: status.New( - codes.Internal, - "malformed header: missing HTTP status", - ), + wantStatus: status.New(codes.OK, ""), }, - } { + } - t.Run(test.name, func(t *testing.T) { - ts := testStream() - s := testClient(ts) + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cs := newTestClientStream() + s := newTestHTTP2Client(cs) - test.metaHeaderFrame.HeadersFrame = &http2.HeadersFrame{ + tc.metaHeaderFrame.HeadersFrame = &http2.HeadersFrame{ FrameHeader: http2.FrameHeader{ - StreamID: 0, + StreamID: 1, }, } - s.operateHeaders(test.metaHeaderFrame) - - got := ts.status - want := test.wantStatus + s.operateHeaders(tc.metaHeaderFrame) + got := cs.status + want := tc.wantStatus if got.Code() != want.Code() || got.Message() != want.Message() { - t.Fatalf("operateHeaders(%v); status = \ngot: %s\nwant: %s", test.metaHeaderFrame, got, want) + t.Errorf("operateHeaders(%v) got status %q, want %q", tc.metaHeaderFrame, got, want) } }) - t.Run(fmt.Sprintf("%s-end_stream", test.name), func(t *testing.T) { - ts := testStream() - s := testClient(ts) + } +} - test.metaHeaderFrame.HeadersFrame = &http2.HeadersFrame{ +// TestClientDecodeTrailer validates the handling of trailer frames, which may +// or may not also be the initial header frame (header-only response). +func (s) TestClientDecodeTrailer(t *testing.T) { + tests := []struct { + name string + metaHeaderFrame *http2.MetaHeadersFrame + wantEndStreamStatus *status.Status + }{ + { + name: "valid_trailer", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: "content-type", Value: "application/grpc"}, + {Name: "grpc-status", Value: "0"}, + {Name: ":status", Value: "200"}, + }, + }, + wantEndStreamStatus: status.New(codes.OK, ""), + }, + { + name: "missing_content_type_in_grpc_mode", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: "grpc-status", Value: "0"}, + {Name: ":status", Value: "200"}, + }, + }, + wantEndStreamStatus: status.New(codes.OK, ""), + }, + { + name: "invalid_grpc_status", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: "content-type", Value: "application/grpc"}, + {Name: "grpc-status", Value: "xxxx"}, + {Name: ":status", Value: "200"}, + }, + }, + wantEndStreamStatus: status.New( + codes.Internal, + "transport: malformed grpc-status: strconv.ParseInt: parsing \"xxxx\": invalid syntax", + ), + }, + { + name: "missing_grpc_status_in_grpc_mode", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: ":status", Value: "xxxx"}, + }, + }, + wantEndStreamStatus: status.New(codes.Internal, ""), + }, + { + name: "http2_frame_size_exceeds", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Truncated: true, + }, + wantEndStreamStatus: status.New( + codes.Internal, + "peer header list size exceeded limit", + ), + }, + { + name: "missing_grpc_status_in_trailer", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: "content-type", Value: "application/grpc"}, + }, + }, + wantEndStreamStatus: status.New(codes.Internal, ""), + }, + { + name: "deadline_exceeded_status", + metaHeaderFrame: &http2.MetaHeadersFrame{ + Fields: []hpack.HeaderField{ + {Name: "content-type", Value: "application/grpc"}, + {Name: "grpc-status", Value: "4"}, + {Name: "grpc-message", Value: "Request timed out: Internal error"}, + {Name: ":status", Value: "200"}, + }, + }, + wantEndStreamStatus: status.New(codes.DeadlineExceeded, "Request timed out: Internal error"), + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + cs := newTestClientStream() + // Mark headerChanClosed to indicate trailer frames. + cs.headerChanClosed = 1 + // Simulate the state where the initial headers have already been processed. + s := newTestHTTP2Client(cs) + + tc.metaHeaderFrame.HeadersFrame = &http2.HeadersFrame{ FrameHeader: http2.FrameHeader{ - StreamID: 0, + StreamID: 1, Flags: http2.FlagHeadersEndStream, }, } - s.operateHeaders(test.metaHeaderFrame) - - got := ts.status - want := test.wantStatus + s.operateHeaders(tc.metaHeaderFrame) + got := cs.status + want := tc.wantEndStreamStatus if got.Code() != want.Code() || got.Message() != want.Message() { - t.Fatalf("operateHeaders(%v); status = \ngot: %s\nwant: %s", test.metaHeaderFrame, got, want) + t.Errorf("operateHeaders(%v) got status %q, want %q", tc.metaHeaderFrame, got, want) } }) } diff --git a/test/http_header_end2end_test.go b/test/http_header_end2end_test.go index 0044a9f7ebf9..9ad33ef3c88a 100644 --- a/test/http_header_end2end_test.go +++ b/test/http_header_end2end_test.go @@ -103,10 +103,9 @@ func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) { errCode codes.Code }{ { - name: "missing gRPC status", + name: "missing gRPC content type", header: []string{ ":status", "403", - "content-type", "application/grpc", }, errCode: codes.PermissionDenied, }, @@ -120,23 +119,23 @@ func (s) TestHTTPHeaderFrameErrorHandlingInitialHeader(t *testing.T) { errCode: codes.Internal, }, { - name: "Malformed grpc-tags-bin field", + name: "Malformed grpc-tags-bin field ignores http status", header: []string{ ":status", "502", "content-type", "application/grpc", "grpc-status", "0", "grpc-tags-bin", "???", }, - errCode: codes.Unavailable, + errCode: codes.Internal, }, { - name: "gRPC status error", + name: "gRPC status error ignoring http status", header: []string{ ":status", "502", "content-type", "application/grpc", "grpc-status", "3", }, - errCode: codes.Unavailable, + errCode: codes.InvalidArgument, }, } { t.Run(test.name, func(t *testing.T) { @@ -161,7 +160,7 @@ func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { errCode codes.Code }{ { - name: "trailer missing grpc-status", + name: "trailer missing grpc-status to ignore http status", responseHeader: []string{ ":status", "200", "content-type", "application/grpc", @@ -170,10 +169,10 @@ func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { // trailer missing grpc-status ":status", "502", }, - errCode: codes.Unavailable, + errCode: codes.Internal, }, { - name: "malformed grpc-status-details-bin field with status 404", + name: "malformed grpc-status-details-bin field with status 404 to be ignored due to content type", responseHeader: []string{ ":status", "404", "content-type", "application/grpc", @@ -183,20 +182,19 @@ func (s) TestHTTPHeaderFrameErrorHandlingNormalTrailer(t *testing.T) { "grpc-status", "0", "grpc-status-details-bin", "????", }, - errCode: codes.Unimplemented, + errCode: codes.Internal, }, { - name: "malformed grpc-status-details-bin field with status 200", + name: "malformed grpc-status-details-bin field with status 404 and no content type", responseHeader: []string{ - ":status", "200", - "content-type", "application/grpc", + ":status", "404", }, trailer: []string{ // malformed grpc-status-details-bin field "grpc-status", "0", "grpc-status-details-bin", "????", }, - errCode: codes.Internal, + errCode: codes.Unimplemented, }, } for _, test := range tests {