Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 39 additions & 38 deletions internal/transport/http2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -1465,17 +1465,14 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
contentTypeErr = "malformed header: missing HTTP content-type"
grpcMessage string
recvCompress string
httpStatusCode *int
httpStatusErr string
rawStatusCode = codes.Unknown
// the code from the grpc-status header, if present
grpcStatusCode = codes.Internal
// headerError is set if an error is encountered while parsing the headers
headerError string
httpStatus string
)

if initialHeader {
httpStatusErr = "malformed header: missing HTTP status"
}

for _, hf := range frame.Fields {
switch hf.Name {
case "content-type":
Expand All @@ -1495,13 +1492,39 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
rawStatusCode = codes.Code(uint32(code))
grpcStatusCode = codes.Code(uint32(code))
case "grpc-message":
grpcMessage = decodeGrpcMessage(hf.Value)
case ":status":
c, err := strconv.ParseInt(hf.Value, 10, 32)
httpStatus = hf.Value
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}

// If a non gRPC response is received, then evaluate entire http status and
// process close stream / response.
// In case http status doesn't provide any error information (status : 200),
// evalute response code to be Unknown.
if !isGRPC {
var grpcErrorCode = codes.Internal
switch httpStatus {
case "":
httpStatusErr = "malformed header: missing HTTP status"
default:
// Any other status code (e.g., "404", "503"). We must parse it.
c, err := strconv.ParseInt(httpStatus, 10, 32)
if err != nil {
se := status.New(grpcErrorCode, fmt.Sprintf("transport: malformed http-status: %v", err))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
Expand All @@ -1512,52 +1535,30 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
"protocol error: informational header with status code %d must not have END_STREAM set", statusCode))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
}
// In case of informational headers return
return
}
httpStatusCode = &statusCode
if statusCode == 200 {
httpStatusErr = ""
break
}

httpStatusErr = fmt.Sprintf(
"unexpected HTTP status code received from server: %d (%s)",
statusCode,
http.StatusText(statusCode),
)
default:
if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
break
}
v, err := decodeMetadataHeader(hf.Name, hf.Value)
if err != nil {
headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
break
}
mdata[hf.Name] = append(mdata[hf.Name], v)
}
}

if !isGRPC || httpStatusErr != "" {
var code = codes.Internal // when header does not include HTTP status, return INTERNAL

if httpStatusCode != nil {
var ok bool
code, ok = HTTPStatusConvTab[*httpStatusCode]
grpcErrorCode, ok = HTTPStatusConvTab[statusCode]
if !ok {
code = codes.Unknown
grpcErrorCode = codes.Unknown
}
}
var errs []string
if httpStatusErr != "" {
errs = append(errs, httpStatusErr)
}

if contentTypeErr != "" {
errs = append(errs, contentTypeErr)
}
// Verify the HTTP response is a 200.
se := status.New(code, strings.Join(errs, "; "))

se := status.New(grpcErrorCode, strings.Join(errs, "; "))
t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
return
}
Expand Down Expand Up @@ -1611,7 +1612,7 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
return
}

status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
status := istatus.NewWithProto(grpcStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])

// If client received END_STREAM from server while stream was still active,
// send RST_STREAM.
Expand Down
117 changes: 102 additions & 15 deletions internal/transport/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2618,6 +2618,10 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
metaHeaderFrame *http2.MetaHeadersFrame
// output
wantStatus *status.Status
// end stream output
wantStatusEndStream *status.Status
// head channel closed
headerChanClosedForEndStream bool
}{
{
name: "valid header",
Expand All @@ -2629,7 +2633,8 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
},
},
// no error
wantStatus: status.New(codes.OK, ""),
wantStatus: status.New(codes.OK, ""),
wantStatusEndStream: status.New(codes.OK, ""),
},
{
name: "missing content-type header",
Expand All @@ -2641,7 +2646,14 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
},
wantStatus: status.New(
codes.Unknown,
"malformed header: missing HTTP content-type",
"unexpected HTTP status code received from server: 200 (OK); malformed header: missing HTTP content-type",
),
headerChanClosedForEndStream: true,
// when headerChan is closed, it is already gRPC and we just need
// grpc-status
wantStatusEndStream: status.New(
codes.OK,
"",
),
},
{
Expand All @@ -2657,6 +2669,10 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
codes.Internal,
"transport: malformed grpc-status: strconv.ParseInt: parsing \"xxxx\": invalid syntax",
),
wantStatusEndStream: status.New(
codes.Internal,
"transport: malformed grpc-status: strconv.ParseInt: parsing \"xxxx\": invalid syntax",
),
},
{
name: "invalid http content type",
Expand All @@ -2669,6 +2685,31 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
codes.Internal,
"malformed header: missing HTTP status; transport: received unexpected content-type \"application/json\"",
),
// This content type will only come when end stream is also initial
// header, otherwise we would already be talking gRPC
wantStatusEndStream: status.New(
codes.Internal,
"malformed header: missing HTTP status; transport: received unexpected content-type \"application/json\"",
),
},
{
name: "invalid http content type with http status 504 translation",
metaHeaderFrame: &http2.MetaHeadersFrame{
Fields: []hpack.HeaderField{
{Name: "content-type", Value: "application/json"},
{Name: ":status", Value: "504"},
},
},
wantStatus: status.New(
codes.Unavailable,
"unexpected HTTP status code received from server: 504 (Gateway Timeout); transport: received unexpected content-type \"application/json\"",
),
// This content type will only come when end stream is also initial
// header, otherwise we would already be talking gRPC
wantStatusEndStream: status.New(
codes.Unavailable,
"unexpected HTTP status code received from server: 504 (Gateway Timeout); transport: received unexpected content-type \"application/json\"",
),
},
{
name: "http fallback and invalid http status",
Expand All @@ -2682,6 +2723,13 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
codes.Internal,
"transport: malformed http-status: strconv.ParseInt: parsing \"xxxx\": invalid syntax",
),
// for end stream, when we are already talking gRPC, we expect
// grpc - status and fail for it, we will ignore http status.
headerChanClosedForEndStream: true,
wantStatusEndStream: status.New(
codes.Internal,
"",
),
},
{
name: "http2 frame size exceeds",
Expand All @@ -2693,32 +2741,70 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
codes.Internal,
"peer header list size exceeded limit",
),
wantStatusEndStream: status.New(
codes.Internal,
"peer header list size exceeded limit",
),
},
{
name: "bad status in grpc mode",
name: "ignoring bad http status in grpc mode",
metaHeaderFrame: &http2.MetaHeadersFrame{
Fields: []hpack.HeaderField{
{Name: "content-type", Value: "application/grpc"},
{Name: "grpc-status", Value: "0"},
{Name: ":status", Value: "504"},
},
},
wantStatus: status.New(
codes.Unavailable,
"unexpected HTTP status code received from server: 504 (Gateway Timeout)",
),
wantStatus: status.New(codes.OK, ""),
wantStatusEndStream: status.New(codes.OK, ""),
},
{
name: "missing http status",
name: "missing http status and grpc status",
metaHeaderFrame: &http2.MetaHeadersFrame{
Fields: []hpack.HeaderField{
{Name: "content-type", Value: "application/grpc"},
},
},
wantStatus: status.New(
codes.Internal,
"malformed header: missing HTTP status",
),
wantStatus: status.New(codes.OK, ""),
wantStatusEndStream: status.New(codes.Internal, ""),
},
{
name: "ignore http status and fail for grpc status missing in trailer",
metaHeaderFrame: &http2.MetaHeadersFrame{
Fields: []hpack.HeaderField{
{Name: "content-type", Value: "application/grpc"},
{Name: ":status", Value: "504"},
},
},
wantStatus: status.New(codes.OK, ""),
headerChanClosedForEndStream: true,
wantStatusEndStream: status.New(codes.Internal, ""),
},
{
name: "ignore valid http status for grpc",
metaHeaderFrame: &http2.MetaHeadersFrame{
Fields: []hpack.HeaderField{
{Name: "content-type", Value: "application/grpc"},
{Name: "grpc-status", Value: "4"},
{Name: "grpc-message", Value: "Request timed out: Internal error"},
{Name: ":status", Value: "200"},
},
},
wantStatus: status.New(codes.OK, ""),
wantStatusEndStream: status.New(codes.DeadlineExceeded, "Request timed out: Internal error"),
},
{
name: "ignore illegal http status for grpc",
metaHeaderFrame: &http2.MetaHeadersFrame{
Fields: []hpack.HeaderField{
{Name: "content-type", Value: "application/grpc"},
{Name: "grpc-status", Value: "4"},
{Name: "grpc-message", Value: "Request timed out: Internal error"},
{Name: ":status", Value: "thisIsIllegal"},
},
},
wantStatus: status.New(codes.OK, ""),
wantStatusEndStream: status.New(codes.DeadlineExceeded, "Request timed out: Internal error"),
},
} {

Expand All @@ -2733,7 +2819,6 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
}

s.operateHeaders(test.metaHeaderFrame)

got := ts.status
want := test.wantStatus
if got.Code() != want.Code() || got.Message() != want.Message() {
Expand All @@ -2742,6 +2827,9 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
})
t.Run(fmt.Sprintf("%s-end_stream", test.name), func(t *testing.T) {
ts := testStream()
if test.headerChanClosedForEndStream {
ts.headerChanClosed = 1
}
s := testClient(ts)

test.metaHeaderFrame.HeadersFrame = &http2.HeadersFrame{
Expand All @@ -2752,9 +2840,8 @@ func (s) TestClientDecodeHeaderStatusErr(t *testing.T) {
}

s.operateHeaders(test.metaHeaderFrame)

got := ts.status
want := test.wantStatus
want := test.wantStatusEndStream
if got.Code() != want.Code() || got.Message() != want.Message() {
t.Fatalf("operateHeaders(%v); status = \ngot: %s\nwant: %s", test.metaHeaderFrame, got, want)
}
Expand Down
Loading