Skip to content

Commit 20fabba

Browse files
committed
feat(frontend): support autoforwarding through headers
Signed-off-by: Shijie Sheng <liouvetren@gmail.com>
1 parent 103d8b7 commit 20fabba

File tree

9 files changed

+242
-39
lines changed

9 files changed

+242
-39
lines changed

common/headers.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,4 +59,7 @@ const (
5959

6060
// CallerTypeHeaderName refers to the name of the header that contains the caller type (CLI, UI, SDK, internal, etc.)
6161
CallerTypeHeaderName = types.CallerTypeHeaderName
62+
63+
// QueryConsistencyLevelHeaderName refers to the name of the header that contains the query consistency level (EVENTUAL, STRONG)
64+
QueryConsistencyLevelHeaderName = "cadence-query-consistency-level"
6265
)

common/rpc/middleware.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,12 @@ package rpc
2323
import (
2424
"context"
2525
"encoding/json"
26+
"fmt"
2627
"io"
2728

2829
"go.uber.org/cadence/worker"
2930
"go.uber.org/yarpc"
31+
"go.uber.org/yarpc/api/middleware"
3032
"go.uber.org/yarpc/api/transport"
3133

3234
"github.com/uber/cadence/common"
@@ -36,6 +38,8 @@ import (
3638
"github.com/uber/cadence/common/types"
3739
)
3840

41+
var _ middleware.UnaryInbound = &QueryConsistencyLevelInboundMiddleware{}
42+
3943
type authOutboundMiddleware struct {
4044
authProvider worker.AuthorizationProvider
4145
}
@@ -247,3 +251,18 @@ func (m *ClientPartitionConfigMiddleware) Handle(ctx context.Context, req *trans
247251
}
248252
return h.Handle(ctx, req, resw)
249253
}
254+
255+
type QueryConsistencyLevelInboundMiddleware struct{}
256+
257+
func (m *QueryConsistencyLevelInboundMiddleware) Handle(ctx context.Context, req *transport.Request, resw transport.ResponseWriter, h transport.UnaryHandler) error {
258+
queryConsistencyLevel, ok := req.Headers.Get(common.QueryConsistencyLevelHeaderName)
259+
if ok {
260+
var level types.QueryConsistencyLevel
261+
err := level.UnmarshalText([]byte(queryConsistencyLevel))
262+
if err != nil {
263+
return fmt.Errorf("failed to parse query consistency level: %w", err)
264+
}
265+
ctx = context.WithValue(ctx, common.QueryConsistencyLevelHeaderName, level)
266+
}
267+
return h.Handle(ctx, req, resw)
268+
}

common/rpc/middleware_test.go

Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -381,6 +381,99 @@ func TestCallerInfoMiddleware(t *testing.T) {
381381
})
382382
}
383383

384+
func TestQueryConsistencyLevelInboundMiddleware(t *testing.T) {
385+
tests := []struct {
386+
name string
387+
headerValue string
388+
hasHeader bool
389+
expectedLevel types.QueryConsistencyLevel
390+
expectError bool
391+
}{
392+
{
393+
name: "parses EVENTUAL consistency level",
394+
headerValue: "EVENTUAL",
395+
hasHeader: true,
396+
expectedLevel: types.QueryConsistencyLevelEventual,
397+
expectError: false,
398+
},
399+
{
400+
name: "parses STRONG consistency level",
401+
headerValue: "STRONG",
402+
hasHeader: true,
403+
expectedLevel: types.QueryConsistencyLevelStrong,
404+
expectError: false,
405+
},
406+
{
407+
name: "parses lowercase eventual",
408+
headerValue: "eventual",
409+
hasHeader: true,
410+
expectedLevel: types.QueryConsistencyLevelEventual,
411+
expectError: false,
412+
},
413+
{
414+
name: "parses lowercase strong",
415+
headerValue: "strong",
416+
hasHeader: true,
417+
expectedLevel: types.QueryConsistencyLevelStrong,
418+
expectError: false,
419+
},
420+
{
421+
name: "parses numeric value 0",
422+
headerValue: "0",
423+
hasHeader: true,
424+
expectedLevel: types.QueryConsistencyLevelEventual,
425+
expectError: false,
426+
},
427+
{
428+
name: "parses numeric value 1",
429+
headerValue: "1",
430+
hasHeader: true,
431+
expectedLevel: types.QueryConsistencyLevelStrong,
432+
expectError: false,
433+
},
434+
{
435+
name: "returns error for invalid value",
436+
headerValue: "INVALID",
437+
hasHeader: true,
438+
expectError: true,
439+
},
440+
{
441+
name: "no header - noop",
442+
hasHeader: false,
443+
expectError: false,
444+
},
445+
}
446+
447+
for _, tt := range tests {
448+
t.Run(tt.name, func(t *testing.T) {
449+
m := &QueryConsistencyLevelInboundMiddleware{}
450+
h := &fakeHandler{}
451+
headers := transport.NewHeaders()
452+
if tt.hasHeader {
453+
headers = headers.With(common.QueryConsistencyLevelHeaderName, tt.headerValue)
454+
}
455+
456+
err := m.Handle(context.Background(), &transport.Request{Headers: headers}, nil, h)
457+
458+
if tt.expectError {
459+
assert.Error(t, err)
460+
assert.Contains(t, err.Error(), "failed to parse query consistency level")
461+
return
462+
}
463+
464+
assert.NoError(t, err)
465+
if tt.hasHeader {
466+
level, ok := h.ctx.Value(common.QueryConsistencyLevelHeaderName).(types.QueryConsistencyLevel)
467+
assert.True(t, ok, "expected query consistency level to be set in context")
468+
assert.Equal(t, tt.expectedLevel, level)
469+
} else {
470+
// When no header is present, context value should not be set
471+
assert.Nil(t, h.ctx.Value(common.QueryConsistencyLevelHeaderName))
472+
}
473+
})
474+
}
475+
}
476+
384477
type fakeHandler struct {
385478
ctx context.Context
386479
}

common/rpc/params.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -170,7 +170,7 @@ func NewParams(serviceName string, config *config.Config, dc *dynamicconfig.Coll
170170
OutboundTLS: outboundTLS,
171171
InboundMiddleware: yarpc.InboundMiddleware{
172172
// order matters: ForwardPartitionConfigMiddleware must be applied after ClientPartitionConfigMiddleware
173-
Unary: yarpc.UnaryInboundMiddleware(&InboundMetricsMiddleware{}, &CallerInfoMiddleware{}, &ClientPartitionConfigMiddleware{}, &ForwardPartitionConfigMiddleware{}),
173+
Unary: yarpc.UnaryInboundMiddleware(&InboundMetricsMiddleware{}, &CallerInfoMiddleware{}, &ClientPartitionConfigMiddleware{}, &ForwardPartitionConfigMiddleware{}, &QueryConsistencyLevelInboundMiddleware{}),
174174
},
175175
OutboundMiddleware: yarpc.OutboundMiddleware{
176176
Unary: yarpc.UnaryOutboundMiddleware(&HeaderForwardingMiddleware{

host/onebox.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1159,7 +1159,7 @@ func (c *cadenceImpl) newRPCFactory(serviceName string, host membership.HostInfo
11591159
TChannelAddress: tchannelAddress,
11601160
GRPCAddress: grpcAddress,
11611161
InboundMiddleware: yarpc.InboundMiddleware{
1162-
Unary: yarpc.UnaryInboundMiddleware(&versionMiddleware{}, &rpc.ClientPartitionConfigMiddleware{}, &rpc.ForwardPartitionConfigMiddleware{}),
1162+
Unary: yarpc.UnaryInboundMiddleware(&versionMiddleware{}, &rpc.ClientPartitionConfigMiddleware{}, &rpc.ForwardPartitionConfigMiddleware{}, &rpc.QueryConsistencyLevelInboundMiddleware{}),
11631163
},
11641164
OutboundMiddleware: yarpc.OutboundMiddleware{
11651165
Unary: &rpc.ForwardPartitionConfigMiddleware{},

service/frontend/templates/clusterredirection.tmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func (handler *clusterRedirectionHandler) {{$method.Declaration}} {
7070
var (
7171
apiName = "{{$method.Name}}"
7272
cluster string
73-
requestedConsistencyLevel types.QueryConsistencyLevel = types.QueryConsistencyLevelEventual
73+
requestedConsistencyLevel types.QueryConsistencyLevel = getRequestedConsistencyLevelFromContext(ctx)
7474
)
7575

7676
{{- if has $method.Name $readAPIsWithStrongConsistency}}

0 commit comments

Comments
 (0)