diff --git a/.gen/proto/sharddistributor/v1/canary.pb.go b/.gen/proto/sharddistributor/v1/canary.pb.go new file mode 100644 index 00000000000..4748270ee0b --- /dev/null +++ b/.gen/proto/sharddistributor/v1/canary.pb.go @@ -0,0 +1,661 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: uber/cadence/sharddistributor/v1/canary.proto + +package sharddistributorv1 + +import ( + fmt "fmt" + io "io" + math "math" + math_bits "math/bits" + + proto "github.com/gogo/protobuf/proto" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type PingRequest struct { + ShardKey string `protobuf:"bytes,1,opt,name=shard_key,json=shardKey,proto3" json:"shard_key,omitempty"` + Namespace string `protobuf:"bytes,2,opt,name=namespace,proto3" json:"namespace,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PingRequest) Reset() { *m = PingRequest{} } +func (m *PingRequest) String() string { return proto.CompactTextString(m) } +func (*PingRequest) ProtoMessage() {} +func (*PingRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_03b11524fa4f4f94, []int{0} +} +func (m *PingRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PingRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PingRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PingRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_PingRequest.Merge(m, src) +} +func (m *PingRequest) XXX_Size() int { + return m.Size() +} +func (m *PingRequest) XXX_DiscardUnknown() { + xxx_messageInfo_PingRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_PingRequest proto.InternalMessageInfo + +func (m *PingRequest) GetShardKey() string { + if m != nil { + return m.ShardKey + } + return "" +} + +func (m *PingRequest) GetNamespace() string { + if m != nil { + return m.Namespace + } + return "" +} + +type PingResponse struct { + ExecutorId string `protobuf:"bytes,1,opt,name=executor_id,json=executorId,proto3" json:"executor_id,omitempty"` + OwnsShard bool `protobuf:"varint,2,opt,name=owns_shard,json=ownsShard,proto3" json:"owns_shard,omitempty"` + ShardKey string `protobuf:"bytes,3,opt,name=shard_key,json=shardKey,proto3" json:"shard_key,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *PingResponse) Reset() { *m = PingResponse{} } +func (m *PingResponse) String() string { return proto.CompactTextString(m) } +func (*PingResponse) ProtoMessage() {} +func (*PingResponse) Descriptor() ([]byte, []int) { + return fileDescriptor_03b11524fa4f4f94, []int{1} +} +func (m *PingResponse) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *PingResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_PingResponse.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *PingResponse) XXX_Merge(src proto.Message) { + xxx_messageInfo_PingResponse.Merge(m, src) +} +func (m *PingResponse) XXX_Size() int { + return m.Size() +} +func (m *PingResponse) XXX_DiscardUnknown() { + xxx_messageInfo_PingResponse.DiscardUnknown(m) +} + +var xxx_messageInfo_PingResponse proto.InternalMessageInfo + +func (m *PingResponse) GetExecutorId() string { + if m != nil { + return m.ExecutorId + } + return "" +} + +func (m *PingResponse) GetOwnsShard() bool { + if m != nil { + return m.OwnsShard + } + return false +} + +func (m *PingResponse) GetShardKey() string { + if m != nil { + return m.ShardKey + } + return "" +} + +func init() { + proto.RegisterType((*PingRequest)(nil), "uber.cadence.sharddistributor.v1.PingRequest") + proto.RegisterType((*PingResponse)(nil), "uber.cadence.sharddistributor.v1.PingResponse") +} + +func init() { + proto.RegisterFile("uber/cadence/sharddistributor/v1/canary.proto", fileDescriptor_03b11524fa4f4f94) +} + +var fileDescriptor_03b11524fa4f4f94 = []byte{ + // 290 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xbb, 0x4e, 0xc3, 0x30, + 0x14, 0x86, 0x65, 0x40, 0xa8, 0x39, 0x65, 0xf2, 0x54, 0x71, 0x29, 0xa5, 0x13, 0x4b, 0x6d, 0x15, + 0x46, 0x26, 0x6e, 0x12, 0x11, 0x4b, 0x15, 0x06, 0x24, 0x96, 0xc8, 0x71, 0x8e, 0xd2, 0xa8, 0xaa, + 0x1d, 0xec, 0x24, 0x90, 0x57, 0xe0, 0xc9, 0x18, 0x79, 0x04, 0x94, 0x27, 0x41, 0x71, 0x83, 0x4a, + 0x2a, 0x24, 0x58, 0x3f, 0x7f, 0xfa, 0x7d, 0xce, 0xf9, 0x61, 0x52, 0x44, 0x68, 0xb8, 0x14, 0x31, + 0x2a, 0x89, 0xdc, 0xce, 0x85, 0x89, 0xe3, 0xd4, 0xe6, 0x26, 0x8d, 0x8a, 0x5c, 0x1b, 0x5e, 0x4e, + 0xb9, 0x14, 0x4a, 0x98, 0x8a, 0x65, 0x46, 0xe7, 0x9a, 0x8e, 0x1a, 0x9d, 0xb5, 0x3a, 0xdb, 0xd4, + 0x59, 0x39, 0x1d, 0xdf, 0x41, 0x7f, 0x96, 0xaa, 0x24, 0xc0, 0xe7, 0x02, 0x6d, 0x4e, 0x0f, 0xc0, + 0x73, 0x56, 0xb8, 0xc0, 0x6a, 0x40, 0x46, 0xe4, 0xd4, 0x0b, 0x7a, 0x0e, 0xdc, 0x63, 0x45, 0x0f, + 0xc1, 0x53, 0x62, 0x89, 0x36, 0x13, 0x12, 0x07, 0x5b, 0xee, 0x71, 0x0d, 0xc6, 0x0b, 0xd8, 0x5b, + 0x25, 0xd9, 0x4c, 0x2b, 0x8b, 0xf4, 0x18, 0xfa, 0xf8, 0x8a, 0xb2, 0xf9, 0x28, 0x4c, 0xe3, 0x36, + 0x0c, 0xbe, 0x91, 0x1f, 0xd3, 0x23, 0x00, 0xfd, 0xa2, 0x6c, 0xe8, 0xf2, 0x5d, 0x5e, 0x2f, 0xf0, + 0x1a, 0xf2, 0xd0, 0x80, 0xee, 0x28, 0xdb, 0xdd, 0x51, 0xce, 0xde, 0x08, 0x9c, 0x38, 0xed, 0x66, + 0xbd, 0xce, 0x6d, 0x1b, 0x7d, 0xed, 0x2e, 0x70, 0x39, 0xf3, 0x29, 0xc2, 0x4e, 0x33, 0x12, 0x9d, + 0xb0, 0xbf, 0xee, 0xc0, 0x7e, 0x1c, 0x61, 0x9f, 0xfd, 0x57, 0x5f, 0x6d, 0x7a, 0xf5, 0xf8, 0x5e, + 0x0f, 0xc9, 0x47, 0x3d, 0x24, 0x9f, 0xf5, 0x90, 0x3c, 0xf9, 0x49, 0x9a, 0xcf, 0x8b, 0x88, 0x49, + 0xbd, 0xe4, 0x9d, 0xb6, 0x58, 0x82, 0x8a, 0xbb, 0x5e, 0x7e, 0x2b, 0xee, 0x62, 0x93, 0x95, 0xd3, + 0x68, 0xd7, 0xd9, 0xe7, 0x5f, 0x01, 0x00, 0x00, 0xff, 0xff, 0x9d, 0xd8, 0xdd, 0xdd, 0xf6, 0x01, + 0x00, 0x00, +} + +func (m *PingRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PingRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PingRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Namespace) > 0 { + i -= len(m.Namespace) + copy(dAtA[i:], m.Namespace) + i = encodeVarintCanary(dAtA, i, uint64(len(m.Namespace))) + i-- + dAtA[i] = 0x12 + } + if len(m.ShardKey) > 0 { + i -= len(m.ShardKey) + copy(dAtA[i:], m.ShardKey) + i = encodeVarintCanary(dAtA, i, uint64(len(m.ShardKey))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *PingResponse) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *PingResponse) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *PingResponse) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ShardKey) > 0 { + i -= len(m.ShardKey) + copy(dAtA[i:], m.ShardKey) + i = encodeVarintCanary(dAtA, i, uint64(len(m.ShardKey))) + i-- + dAtA[i] = 0x1a + } + if m.OwnsShard { + i-- + if m.OwnsShard { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } + if len(m.ExecutorId) > 0 { + i -= len(m.ExecutorId) + copy(dAtA[i:], m.ExecutorId) + i = encodeVarintCanary(dAtA, i, uint64(len(m.ExecutorId))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintCanary(dAtA []byte, offset int, v uint64) int { + offset -= sovCanary(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *PingRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ShardKey) + if l > 0 { + n += 1 + l + sovCanary(uint64(l)) + } + l = len(m.Namespace) + if l > 0 { + n += 1 + l + sovCanary(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *PingResponse) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ExecutorId) + if l > 0 { + n += 1 + l + sovCanary(uint64(l)) + } + if m.OwnsShard { + n += 2 + } + l = len(m.ShardKey) + if l > 0 { + n += 1 + l + sovCanary(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovCanary(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozCanary(x uint64) (n int) { + return sovCanary(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *PingRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PingRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PingRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ShardKey", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCanary + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCanary + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ShardKey = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Namespace", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCanary + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCanary + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Namespace = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCanary(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCanary + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *PingResponse) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: PingResponse: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: PingResponse: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ExecutorId", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCanary + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCanary + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ExecutorId = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field OwnsShard", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.OwnsShard = bool(v != 0) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ShardKey", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCanary + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCanary + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCanary + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ShardKey = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipCanary(dAtA[iNdEx:]) + if err != nil { + return err + } + if (skippy < 0) || (iNdEx+skippy) < 0 { + return ErrInvalidLengthCanary + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipCanary(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + depth := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCanary + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCanary + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + case 1: + iNdEx += 8 + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowCanary + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthCanary + } + iNdEx += length + case 3: + depth++ + case 4: + if depth == 0 { + return 0, ErrUnexpectedEndOfGroupCanary + } + depth-- + case 5: + iNdEx += 4 + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + if iNdEx < 0 { + return 0, ErrInvalidLengthCanary + } + if depth == 0 { + return iNdEx, nil + } + } + return 0, io.ErrUnexpectedEOF +} + +var ( + ErrInvalidLengthCanary = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowCanary = fmt.Errorf("proto: integer overflow") + ErrUnexpectedEndOfGroupCanary = fmt.Errorf("proto: unexpected end of group") +) diff --git a/.gen/proto/sharddistributor/v1/canary.pb.yarpc.go b/.gen/proto/sharddistributor/v1/canary.pb.yarpc.go new file mode 100644 index 00000000000..64f3827d1e9 --- /dev/null +++ b/.gen/proto/sharddistributor/v1/canary.pb.yarpc.go @@ -0,0 +1,263 @@ +// Code generated by protoc-gen-yarpc-go. DO NOT EDIT. +// source: uber/cadence/sharddistributor/v1/canary.proto + +package sharddistributorv1 + +import ( + "context" + "io/ioutil" + "reflect" + + "github.com/gogo/protobuf/jsonpb" + "github.com/gogo/protobuf/proto" + "go.uber.org/fx" + "go.uber.org/yarpc" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/api/x/restriction" + "go.uber.org/yarpc/encoding/protobuf" + "go.uber.org/yarpc/encoding/protobuf/reflection" +) + +var _ = ioutil.NopCloser + +// ShardDistributorExecutorCanaryAPIYARPCClient is the YARPC client-side interface for the ShardDistributorExecutorCanaryAPI service. +type ShardDistributorExecutorCanaryAPIYARPCClient interface { + Ping(context.Context, *PingRequest, ...yarpc.CallOption) (*PingResponse, error) +} + +func newShardDistributorExecutorCanaryAPIYARPCClient(clientConfig transport.ClientConfig, anyResolver jsonpb.AnyResolver, options ...protobuf.ClientOption) ShardDistributorExecutorCanaryAPIYARPCClient { + return &_ShardDistributorExecutorCanaryAPIYARPCCaller{protobuf.NewStreamClient( + protobuf.ClientParams{ + ServiceName: "uber.cadence.sharddistributor.v1.ShardDistributorExecutorCanaryAPI", + ClientConfig: clientConfig, + AnyResolver: anyResolver, + Options: options, + }, + )} +} + +// NewShardDistributorExecutorCanaryAPIYARPCClient builds a new YARPC client for the ShardDistributorExecutorCanaryAPI service. +func NewShardDistributorExecutorCanaryAPIYARPCClient(clientConfig transport.ClientConfig, options ...protobuf.ClientOption) ShardDistributorExecutorCanaryAPIYARPCClient { + return newShardDistributorExecutorCanaryAPIYARPCClient(clientConfig, nil, options...) +} + +// ShardDistributorExecutorCanaryAPIYARPCServer is the YARPC server-side interface for the ShardDistributorExecutorCanaryAPI service. +type ShardDistributorExecutorCanaryAPIYARPCServer interface { + Ping(context.Context, *PingRequest) (*PingResponse, error) +} + +type buildShardDistributorExecutorCanaryAPIYARPCProceduresParams struct { + Server ShardDistributorExecutorCanaryAPIYARPCServer + AnyResolver jsonpb.AnyResolver +} + +func buildShardDistributorExecutorCanaryAPIYARPCProcedures(params buildShardDistributorExecutorCanaryAPIYARPCProceduresParams) []transport.Procedure { + handler := &_ShardDistributorExecutorCanaryAPIYARPCHandler{params.Server} + return protobuf.BuildProcedures( + protobuf.BuildProceduresParams{ + ServiceName: "uber.cadence.sharddistributor.v1.ShardDistributorExecutorCanaryAPI", + UnaryHandlerParams: []protobuf.BuildProceduresUnaryHandlerParams{ + { + MethodName: "Ping", + Handler: protobuf.NewUnaryHandler( + protobuf.UnaryHandlerParams{ + Handle: handler.Ping, + NewRequest: newShardDistributorExecutorCanaryAPIServicePingYARPCRequest, + AnyResolver: params.AnyResolver, + }, + ), + }, + }, + OnewayHandlerParams: []protobuf.BuildProceduresOnewayHandlerParams{}, + StreamHandlerParams: []protobuf.BuildProceduresStreamHandlerParams{}, + }, + ) +} + +// BuildShardDistributorExecutorCanaryAPIYARPCProcedures prepares an implementation of the ShardDistributorExecutorCanaryAPI service for YARPC registration. +func BuildShardDistributorExecutorCanaryAPIYARPCProcedures(server ShardDistributorExecutorCanaryAPIYARPCServer) []transport.Procedure { + return buildShardDistributorExecutorCanaryAPIYARPCProcedures(buildShardDistributorExecutorCanaryAPIYARPCProceduresParams{Server: server}) +} + +// FxShardDistributorExecutorCanaryAPIYARPCClientParams defines the input +// for NewFxShardDistributorExecutorCanaryAPIYARPCClient. It provides the +// paramaters to get a ShardDistributorExecutorCanaryAPIYARPCClient in an +// Fx application. +type FxShardDistributorExecutorCanaryAPIYARPCClientParams struct { + fx.In + + Provider yarpc.ClientConfig + AnyResolver jsonpb.AnyResolver `name:"yarpcfx" optional:"true"` + Restriction restriction.Checker `optional:"true"` +} + +// FxShardDistributorExecutorCanaryAPIYARPCClientResult defines the output +// of NewFxShardDistributorExecutorCanaryAPIYARPCClient. It provides a +// ShardDistributorExecutorCanaryAPIYARPCClient to an Fx application. +type FxShardDistributorExecutorCanaryAPIYARPCClientResult struct { + fx.Out + + Client ShardDistributorExecutorCanaryAPIYARPCClient + + // We are using an fx.Out struct here instead of just returning a client + // so that we can add more values or add named versions of the client in + // the future without breaking any existing code. +} + +// NewFxShardDistributorExecutorCanaryAPIYARPCClient provides a ShardDistributorExecutorCanaryAPIYARPCClient +// to an Fx application using the given name for routing. +// +// fx.Provide( +// sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCClient("service-name"), +// ... +// ) +func NewFxShardDistributorExecutorCanaryAPIYARPCClient(name string, options ...protobuf.ClientOption) interface{} { + return func(params FxShardDistributorExecutorCanaryAPIYARPCClientParams) FxShardDistributorExecutorCanaryAPIYARPCClientResult { + cc := params.Provider.ClientConfig(name) + + if params.Restriction != nil { + if namer, ok := cc.GetUnaryOutbound().(transport.Namer); ok { + if err := params.Restriction.Check(protobuf.Encoding, namer.TransportName()); err != nil { + panic(err.Error()) + } + } + } + + return FxShardDistributorExecutorCanaryAPIYARPCClientResult{ + Client: newShardDistributorExecutorCanaryAPIYARPCClient(cc, params.AnyResolver, options...), + } + } +} + +// FxShardDistributorExecutorCanaryAPIYARPCProceduresParams defines the input +// for NewFxShardDistributorExecutorCanaryAPIYARPCProcedures. It provides the +// paramaters to get ShardDistributorExecutorCanaryAPIYARPCServer procedures in an +// Fx application. +type FxShardDistributorExecutorCanaryAPIYARPCProceduresParams struct { + fx.In + + Server ShardDistributorExecutorCanaryAPIYARPCServer + AnyResolver jsonpb.AnyResolver `name:"yarpcfx" optional:"true"` +} + +// FxShardDistributorExecutorCanaryAPIYARPCProceduresResult defines the output +// of NewFxShardDistributorExecutorCanaryAPIYARPCProcedures. It provides +// ShardDistributorExecutorCanaryAPIYARPCServer procedures to an Fx application. +// +// The procedures are provided to the "yarpcfx" value group. +// Dig 1.2 or newer must be used for this feature to work. +type FxShardDistributorExecutorCanaryAPIYARPCProceduresResult struct { + fx.Out + + Procedures []transport.Procedure `group:"yarpcfx"` + ReflectionMeta reflection.ServerMeta `group:"yarpcfx"` +} + +// NewFxShardDistributorExecutorCanaryAPIYARPCProcedures provides ShardDistributorExecutorCanaryAPIYARPCServer procedures to an Fx application. +// It expects a ShardDistributorExecutorCanaryAPIYARPCServer to be present in the container. +// +// fx.Provide( +// sharddistributorv1.NewFxShardDistributorExecutorCanaryAPIYARPCProcedures(), +// ... +// ) +func NewFxShardDistributorExecutorCanaryAPIYARPCProcedures() interface{} { + return func(params FxShardDistributorExecutorCanaryAPIYARPCProceduresParams) FxShardDistributorExecutorCanaryAPIYARPCProceduresResult { + return FxShardDistributorExecutorCanaryAPIYARPCProceduresResult{ + Procedures: buildShardDistributorExecutorCanaryAPIYARPCProcedures(buildShardDistributorExecutorCanaryAPIYARPCProceduresParams{ + Server: params.Server, + AnyResolver: params.AnyResolver, + }), + ReflectionMeta: ShardDistributorExecutorCanaryAPIReflectionMeta, + } + } +} + +// ShardDistributorExecutorCanaryAPIReflectionMeta is the reflection server metadata +// required for using the gRPC reflection protocol with YARPC. +// +// See https://github.com/grpc/grpc/blob/master/doc/server-reflection.md. +var ShardDistributorExecutorCanaryAPIReflectionMeta = reflection.ServerMeta{ + ServiceName: "uber.cadence.sharddistributor.v1.ShardDistributorExecutorCanaryAPI", + FileDescriptors: yarpcFileDescriptorClosure03b11524fa4f4f94, +} + +type _ShardDistributorExecutorCanaryAPIYARPCCaller struct { + streamClient protobuf.StreamClient +} + +func (c *_ShardDistributorExecutorCanaryAPIYARPCCaller) Ping(ctx context.Context, request *PingRequest, options ...yarpc.CallOption) (*PingResponse, error) { + responseMessage, err := c.streamClient.Call(ctx, "Ping", request, newShardDistributorExecutorCanaryAPIServicePingYARPCResponse, options...) + if responseMessage == nil { + return nil, err + } + response, ok := responseMessage.(*PingResponse) + if !ok { + return nil, protobuf.CastError(emptyShardDistributorExecutorCanaryAPIServicePingYARPCResponse, responseMessage) + } + return response, err +} + +type _ShardDistributorExecutorCanaryAPIYARPCHandler struct { + server ShardDistributorExecutorCanaryAPIYARPCServer +} + +func (h *_ShardDistributorExecutorCanaryAPIYARPCHandler) Ping(ctx context.Context, requestMessage proto.Message) (proto.Message, error) { + var request *PingRequest + var ok bool + if requestMessage != nil { + request, ok = requestMessage.(*PingRequest) + if !ok { + return nil, protobuf.CastError(emptyShardDistributorExecutorCanaryAPIServicePingYARPCRequest, requestMessage) + } + } + response, err := h.server.Ping(ctx, request) + if response == nil { + return nil, err + } + return response, err +} + +func newShardDistributorExecutorCanaryAPIServicePingYARPCRequest() proto.Message { + return &PingRequest{} +} + +func newShardDistributorExecutorCanaryAPIServicePingYARPCResponse() proto.Message { + return &PingResponse{} +} + +var ( + emptyShardDistributorExecutorCanaryAPIServicePingYARPCRequest = &PingRequest{} + emptyShardDistributorExecutorCanaryAPIServicePingYARPCResponse = &PingResponse{} +) + +var yarpcFileDescriptorClosure03b11524fa4f4f94 = [][]byte{ + // uber/cadence/sharddistributor/v1/canary.proto + []byte{ + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x91, 0xbd, 0x4e, 0xf3, 0x30, + 0x14, 0x86, 0x95, 0xef, 0x43, 0xa8, 0x39, 0x65, 0xf2, 0x54, 0xf1, 0x23, 0x4a, 0x27, 0x96, 0xda, + 0x0a, 0x8c, 0x4c, 0xfc, 0x49, 0x44, 0x5d, 0xaa, 0xb0, 0xb1, 0x44, 0x8e, 0x73, 0x94, 0x5a, 0x55, + 0xed, 0x60, 0x3b, 0x81, 0xdc, 0x02, 0x57, 0x8d, 0xec, 0x06, 0x95, 0x54, 0x48, 0xb0, 0x3e, 0x7e, + 0xf4, 0xfa, 0x9c, 0xf3, 0xc2, 0xbc, 0x29, 0xd0, 0x30, 0xc1, 0x4b, 0x54, 0x02, 0x99, 0x5d, 0x71, + 0x53, 0x96, 0xd2, 0x3a, 0x23, 0x8b, 0xc6, 0x69, 0xc3, 0xda, 0x84, 0x09, 0xae, 0xb8, 0xe9, 0x68, + 0x6d, 0xb4, 0xd3, 0x64, 0xea, 0x75, 0xda, 0xeb, 0x74, 0x5f, 0xa7, 0x6d, 0x32, 0x7b, 0x82, 0xf1, + 0x52, 0xaa, 0x2a, 0xc3, 0xd7, 0x06, 0xad, 0x23, 0x27, 0x10, 0x07, 0x2b, 0x5f, 0x63, 0x37, 0x89, + 0xa6, 0xd1, 0x65, 0x9c, 0x8d, 0x02, 0x58, 0x60, 0x47, 0x4e, 0x21, 0x56, 0x7c, 0x83, 0xb6, 0xe6, + 0x02, 0x27, 0xff, 0xc2, 0xe3, 0x0e, 0xcc, 0xd6, 0x70, 0xb4, 0x4d, 0xb2, 0xb5, 0x56, 0x16, 0xc9, + 0x39, 0x8c, 0xf1, 0x1d, 0x85, 0xff, 0x28, 0x97, 0x65, 0x1f, 0x06, 0x5f, 0x28, 0x2d, 0xc9, 0x19, + 0x80, 0x7e, 0x53, 0x36, 0x0f, 0xf9, 0x21, 0x6f, 0x94, 0xc5, 0x9e, 0x3c, 0x7b, 0x30, 0x1c, 0xe5, + 0xff, 0x70, 0x94, 0xab, 0x8f, 0x08, 0x2e, 0x82, 0xf6, 0xb0, 0x5b, 0xe7, 0xb1, 0x8f, 0xbe, 0x0f, + 0x17, 0xb8, 0x5d, 0xa6, 0x04, 0xe1, 0xc0, 0x8f, 0x44, 0xe6, 0xf4, 0xb7, 0x3b, 0xd0, 0x6f, 0x47, + 0x38, 0xa6, 0x7f, 0xd5, 0xb7, 0x9b, 0xde, 0x2d, 0x5e, 0xd2, 0x4a, 0xba, 0x55, 0x53, 0x50, 0xa1, + 0x37, 0x6c, 0xd0, 0x10, 0xad, 0x50, 0xb1, 0xd0, 0xc5, 0x4f, 0x65, 0xdd, 0xec, 0xb3, 0x36, 0x29, + 0x0e, 0x83, 0x7d, 0xfd, 0x19, 0x00, 0x00, 0xff, 0xff, 0xc5, 0x61, 0x98, 0x91, 0xea, 0x01, 0x00, + 0x00, + }, +} + +func init() { + yarpc.RegisterClientBuilder( + func(clientConfig transport.ClientConfig, structField reflect.StructField) ShardDistributorExecutorCanaryAPIYARPCClient { + return NewShardDistributorExecutorCanaryAPIYARPCClient(clientConfig, protobuf.ClientBuilderOptions(clientConfig, structField)...) + }, + ) +} diff --git a/proto/internal/uber/cadence/sharddistributor/v1/canary.proto b/proto/internal/uber/cadence/sharddistributor/v1/canary.proto new file mode 100644 index 00000000000..f85568e2949 --- /dev/null +++ b/proto/internal/uber/cadence/sharddistributor/v1/canary.proto @@ -0,0 +1,42 @@ +// Copyright (c) 2020 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +syntax = "proto3"; + +package uber.cadence.sharddistributor.v1; + +option go_package = "github.com/uber/cadence/.gen/proto/sharddistributor/v1;sharddistributorv1"; + +// ShardDistributorExecutorCanaryAPI is used for canary testing executor-to-executor communication +service ShardDistributorExecutorCanaryAPI { + // Ping allows one executor to ping another executor that owns a specific shard + rpc Ping(PingRequest) returns (PingResponse); +} + +message PingRequest { + string shard_key = 1; + string namespace = 2; +} + +message PingResponse { + string executor_id = 1; + bool owns_shard = 2; + string shard_key = 3; +} diff --git a/service/sharddistributor/canary/pinger/canary_client_mock.go b/service/sharddistributor/canary/pinger/canary_client_mock.go new file mode 100644 index 00000000000..1f6fc9bb8a8 --- /dev/null +++ b/service/sharddistributor/canary/pinger/canary_client_mock.go @@ -0,0 +1,63 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/uber/cadence/.gen/proto/sharddistributor/v1 (interfaces: ShardDistributorExecutorCanaryAPIYARPCClient) +// +// Generated by this command: +// +// mockgen -package pinger -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient +// + +// Package pinger is a generated GoMock package. +package pinger + +import ( + context "context" + reflect "reflect" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + gomock "go.uber.org/mock/gomock" + yarpc "go.uber.org/yarpc" +) + +// MockShardDistributorExecutorCanaryAPIYARPCClient is a mock of ShardDistributorExecutorCanaryAPIYARPCClient interface. +type MockShardDistributorExecutorCanaryAPIYARPCClient struct { + ctrl *gomock.Controller + recorder *MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder + isgomock struct{} +} + +// MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder is the mock recorder for MockShardDistributorExecutorCanaryAPIYARPCClient. +type MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder struct { + mock *MockShardDistributorExecutorCanaryAPIYARPCClient +} + +// NewMockShardDistributorExecutorCanaryAPIYARPCClient creates a new mock instance. +func NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl *gomock.Controller) *MockShardDistributorExecutorCanaryAPIYARPCClient { + mock := &MockShardDistributorExecutorCanaryAPIYARPCClient{ctrl: ctrl} + mock.recorder = &MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockShardDistributorExecutorCanaryAPIYARPCClient) EXPECT() *MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder { + return m.recorder +} + +// Ping mocks base method. +func (m *MockShardDistributorExecutorCanaryAPIYARPCClient) Ping(arg0 context.Context, arg1 *sharddistributorv1.PingRequest, arg2 ...yarpc.CallOption) (*sharddistributorv1.PingResponse, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Ping", varargs...) + ret0, _ := ret[0].(*sharddistributorv1.PingResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Ping indicates an expected call of Ping. +func (mr *MockShardDistributorExecutorCanaryAPIYARPCClientMockRecorder) Ping(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Ping", reflect.TypeOf((*MockShardDistributorExecutorCanaryAPIYARPCClient)(nil).Ping), varargs...) +} diff --git a/service/sharddistributor/canary/pinger/pinger.go b/service/sharddistributor/canary/pinger/pinger.go new file mode 100644 index 00000000000..04b4f54f644 --- /dev/null +++ b/service/sharddistributor/canary/pinger/pinger.go @@ -0,0 +1,142 @@ +package pinger + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "go.uber.org/fx" + "go.uber.org/yarpc" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/common/backoff" + "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/service/sharddistributor/client/spectatorclient" +) + +//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient + +const ( + pingInterval = 1 * time.Second + pingJitterCoeff = 0.1 // 10% jitter + pingTimeout = 5 * time.Second +) + +// Pinger periodically pings shard owners in the fixed namespace +type Pinger struct { + logger *zap.Logger + timeSource clock.TimeSource + canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient + namespace string + numShards int + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup +} + +// Params are the parameters for creating a Pinger +type Params struct { + fx.In + + Logger *zap.Logger + TimeSource clock.TimeSource + CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient +} + +// NewPinger creates a new Pinger for the fixed namespace +func NewPinger(params Params, namespace string, numShards int) *Pinger { + return &Pinger{ + logger: params.Logger, + timeSource: params.TimeSource, + canaryClient: params.CanaryClient, + namespace: namespace, + numShards: numShards, + } +} + +// Start begins the periodic ping loop +func (p *Pinger) Start(ctx context.Context) { + p.logger.Info("Starting canary pinger", zap.String("namespace", p.namespace), zap.Int("num_shards", p.numShards)) + p.ctx, p.cancel = context.WithCancel(context.WithoutCancel(ctx)) + p.wg.Add(1) + go p.pingLoop() +} + +// Stop stops the ping loop +func (p *Pinger) Stop() { + if p.cancel != nil { + p.cancel() + } + p.wg.Wait() +} + +func (p *Pinger) pingLoop() { + defer p.wg.Done() + + ticker := p.timeSource.NewTicker(backoff.JitDuration(pingInterval, pingJitterCoeff)) + defer ticker.Stop() + + p.logger.Info("Starting canary pinger", + zap.String("namespace", p.namespace), + zap.Int("num_shards", p.numShards)) + + for { + select { + case <-p.ctx.Done(): + p.logger.Info("Pinger context done, stopping") + return + case <-ticker.Chan(): + p.pingRandomShard() + ticker.Reset(backoff.JitDuration(pingInterval, pingJitterCoeff)) + } + } +} + +func (p *Pinger) pingRandomShard() { + // Pick a random shard number + shardNum := rand.Intn(p.numShards) + shardKey := fmt.Sprintf("%d", shardNum) + + if err := p.pingShard(shardKey); err != nil { + p.logger.Error("Failed to ping shard", + zap.String("namespace", p.namespace), + zap.String("shard_key", shardKey), + zap.Error(err)) + } +} + +func (p *Pinger) pingShard(shardKey string) error { + // Create ping request + request := &sharddistributorv1.PingRequest{ + ShardKey: shardKey, + Namespace: p.namespace, + } + + // Create context with deadline for the RPC call + ctx, cancel := context.WithTimeout(p.ctx, pingTimeout) + defer cancel() + + response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace)) + if err != nil { + return fmt.Errorf("ping rpc failed: %w", err) + } + + // Verify response + if !response.GetOwnsShard() { + p.logger.Warn("Executor does not own shard", + zap.String("namespace", p.namespace), + zap.String("shard_key", shardKey), + zap.String("executor_id", response.GetExecutorId())) + return fmt.Errorf("executor %s does not own shard %s", response.GetExecutorId(), shardKey) + } + + p.logger.Info("Successfully pinged shard owner", + zap.String("namespace", p.namespace), + zap.String("shard_key", shardKey), + zap.String("executor_id", response.GetExecutorId())) + + return nil +} diff --git a/service/sharddistributor/canary/pinger/pinger_test.go b/service/sharddistributor/canary/pinger/pinger_test.go new file mode 100644 index 00000000000..250a53840e0 --- /dev/null +++ b/service/sharddistributor/canary/pinger/pinger_test.go @@ -0,0 +1,115 @@ +package pinger + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/common/clock" +) + +func TestPingerStartStop(t *testing.T) { + defer goleak.VerifyNone(t) + + ctrl := gomock.NewController(t) + mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl) + + pinger := NewPinger(Params{ + Logger: zap.NewNop(), + TimeSource: clock.NewRealTimeSource(), + CanaryClient: mockClient, + }, "test-ns", 10) + + pinger.Start(context.Background()) + pinger.Stop() +} + +func TestPingShard_Success(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl) + + pinger := NewPinger(Params{ + Logger: zap.NewNop(), + TimeSource: clock.NewRealTimeSource(), + CanaryClient: mockClient, + }, "test-ns", 10) + pinger.ctx, pinger.cancel = context.WithCancel(context.Background()) + defer pinger.cancel() + + mockClient.EXPECT(). + Ping(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&sharddistributorv1.PingResponse{ + OwnsShard: true, + ExecutorId: "127.0.0.1:7953", + }, nil) + + err := pinger.pingShard("5") + assert.NoError(t, err) +} + +func TestPingShard_DoesNotOwnShard(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl) + + pinger := NewPinger(Params{ + Logger: zap.NewNop(), + TimeSource: clock.NewRealTimeSource(), + CanaryClient: mockClient, + }, "test-ns", 10) + pinger.ctx, pinger.cancel = context.WithCancel(context.Background()) + defer pinger.cancel() + + mockClient.EXPECT(). + Ping(gomock.Any(), gomock.Any(), gomock.Any()). + Return(&sharddistributorv1.PingResponse{ + OwnsShard: false, + ExecutorId: "127.0.0.1:7953", + }, nil) + + err := pinger.pingShard("5") + assert.Error(t, err) + assert.Contains(t, err.Error(), "does not own shard") +} + +func TestPingShard_RPCError(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl) + + pinger := NewPinger(Params{ + Logger: zap.NewNop(), + TimeSource: clock.NewRealTimeSource(), + CanaryClient: mockClient, + }, "test-ns", 10) + pinger.ctx, pinger.cancel = context.WithCancel(context.Background()) + defer pinger.cancel() + + mockClient.EXPECT(). + Ping(gomock.Any(), gomock.Any(), gomock.Any()). + Return(nil, errors.New("network error")) + + err := pinger.pingShard("5") + assert.Error(t, err) + assert.Contains(t, err.Error(), "ping rpc failed") +} + +func TestNewPinger(t *testing.T) { + ctrl := gomock.NewController(t) + mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl) + + pinger := NewPinger(Params{ + Logger: zap.NewNop(), + TimeSource: clock.NewRealTimeSource(), + CanaryClient: mockClient, + }, "test-ns", 100) + + require.NotNil(t, pinger) + assert.Equal(t, "test-ns", pinger.namespace) + assert.Equal(t, 100, pinger.numShards) +} diff --git a/service/sharddistributor/client/spectatorclient/client.go b/service/sharddistributor/client/spectatorclient/client.go index c4b19eb094a..8f6e2f1e8b3 100644 --- a/service/sharddistributor/client/spectatorclient/client.go +++ b/service/sharddistributor/client/spectatorclient/client.go @@ -20,13 +20,52 @@ import ( //go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator +type Spectators struct { + spectators map[string]Spectator +} + +func (s *Spectators) ForNamespace(namespace string) (Spectator, error) { + spectator, ok := s.spectators[namespace] + if !ok { + return nil, fmt.Errorf("spectator not found for namespace %s", namespace) + } + return spectator, nil +} + +func (s *Spectators) Start(ctx context.Context) error { + for namespace, spectator := range s.spectators { + if err := spectator.Start(ctx); err != nil { + return fmt.Errorf("start spectator for namespace %s: %w", namespace, err) + } + } + return nil +} + +func (s *Spectators) Stop() { + for _, spectator := range s.spectators { + spectator.Stop() + } +} + +func NewSpectators(params Params) (*Spectators, error) { + spectators := make(map[string]Spectator) + for _, namespace := range params.Config.Namespaces { + spectator, err := NewSpectatorWithNamespace(params, namespace.Namespace) + if err != nil { + return nil, fmt.Errorf("create spectator for namespace %s: %w", namespace.Namespace, err) + } + + spectators[namespace.Namespace] = spectator + } + return &Spectators{spectators: spectators}, nil +} + type Spectator interface { Start(ctx context.Context) error Stop() - // GetShardOwner returns the owner of a shard. It first checks the local cache, - // and if not found, falls back to querying the shard distributor directly. - GetShardOwner(ctx context.Context, shardKey string) (string, error) + // GetShardOwner returns the owner of a shard + GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) } type Params struct { @@ -109,21 +148,9 @@ func createShardDistributorClient(yarpcClient sharddistributorv1.ShardDistributo // Module creates a spectator module using auto-selection (single namespace only) func Module() fx.Option { return fx.Module("shard-distributor-spectator-client", - fx.Provide(NewSpectator), - fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) { - lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop)) - }), - ) -} - -// ModuleWithNamespace creates a spectator module for a specific namespace -func ModuleWithNamespace(namespace string) fx.Option { - return fx.Module(fmt.Sprintf("shard-distributor-spectator-client-%s", namespace), - fx.Provide(func(params Params) (Spectator, error) { - return NewSpectatorWithNamespace(params, namespace) - }), - fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) { - lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop)) + fx.Provide(NewSpectators), + fx.Invoke(func(spectators *Spectators, lc fx.Lifecycle) { + lc.Append(fx.StartStopHook(spectators.Start, spectators.Stop)) }), ) } diff --git a/service/sharddistributor/client/spectatorclient/clientimpl.go b/service/sharddistributor/client/spectatorclient/clientimpl.go index 88934cdadf6..15c6d32d8b5 100644 --- a/service/sharddistributor/client/spectatorclient/clientimpl.go +++ b/service/sharddistributor/client/spectatorclient/clientimpl.go @@ -103,6 +103,7 @@ func (s *spectatorImpl) watchLoop() { // Server shutdown or network issue - recreate stream (load balancer will route to new server) s.logger.Info("Stream ended, reconnecting", tag.ShardNamespace(s.namespace)) + s.timeSource.Sleep(backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff)) } } @@ -163,10 +164,10 @@ func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateRespon tag.Counter(len(response.Executors))) } -// GetShardOwner returns the executor ID for a given shard. +// GetShardOwner returns the full owner information including metadata for a given shard. // It first waits for the initial state to be received, then checks the cache. // If not found in cache, it falls back to querying the shard distributor directly. -func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (string, error) { +func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) { // Wait for first state to be received to avoid flooding shard distributor on startup s.firstStateWG.Wait() @@ -176,7 +177,7 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str s.stateMu.RUnlock() if owner != nil { - return owner.ExecutorID, nil + return owner, nil } // Cache miss - fall back to RPC call @@ -189,8 +190,11 @@ func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (str ShardKey: shardKey, }) if err != nil { - return "", fmt.Errorf("get shard owner from shard distributor: %w", err) + return nil, fmt.Errorf("get shard owner from shard distributor: %w", err) } - return response.Owner, nil + return &ShardOwner{ + ExecutorID: response.Owner, + Metadata: response.Metadata, + }, nil } diff --git a/service/sharddistributor/client/spectatorclient/clientimpl_test.go b/service/sharddistributor/client/spectatorclient/clientimpl_test.go index a0c60aa2706..637883b1647 100644 --- a/service/sharddistributor/client/spectatorclient/clientimpl_test.go +++ b/service/sharddistributor/client/spectatorclient/clientimpl_test.go @@ -44,6 +44,9 @@ func TestWatchLoopBasicFlow(t *testing.T) { Executors: []*types.ExecutorShardAssignment{ { ExecutorID: "executor-1", + Metadata: map[string]string{ + "grpc_address": "127.0.0.1:7953", + }, AssignedShards: []*types.Shard{ {ShardKey: "shard-1"}, {ShardKey: "shard-2"}, @@ -72,11 +75,12 @@ func TestWatchLoopBasicFlow(t *testing.T) { // Query shard owner owner, err := spectator.GetShardOwner(context.Background(), "shard-1") assert.NoError(t, err) - assert.Equal(t, "executor-1", owner) + assert.Equal(t, "executor-1", owner.ExecutorID) + assert.Equal(t, "127.0.0.1:7953", owner.Metadata["grpc_address"]) owner, err = spectator.GetShardOwner(context.Background(), "shard-2") assert.NoError(t, err) - assert.Equal(t, "executor-1", owner) + assert.Equal(t, "executor-1", owner.ExecutorID) } func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { @@ -103,7 +107,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { // First Recv returns state mockStream.EXPECT().Recv().Return(&types.WatchNamespaceStateResponse{ Executors: []*types.ExecutorShardAssignment{ - {ExecutorID: "executor-1", AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}}, + { + ExecutorID: "executor-1", + Metadata: map[string]string{ + "grpc_address": "127.0.0.1:7953", + }, + AssignedShards: []*types.Shard{{ShardKey: "shard-1"}}, + }, }, }, nil) @@ -122,7 +132,12 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { Namespace: "test-ns", ShardKey: "unknown-shard", }). - Return(&types.GetShardOwnerResponse{Owner: "executor-2"}, nil) + Return(&types.GetShardOwnerResponse{ + Owner: "executor-2", + Metadata: map[string]string{ + "grpc_address": "127.0.0.1:7954", + }, + }, nil) spectator.Start(context.Background()) defer spectator.Stop() @@ -132,12 +147,13 @@ func TestGetShardOwner_CacheMiss_FallbackToRPC(t *testing.T) { // Cache hit owner, err := spectator.GetShardOwner(context.Background(), "shard-1") assert.NoError(t, err) - assert.Equal(t, "executor-1", owner) + assert.Equal(t, "executor-1", owner.ExecutorID) // Cache miss - should trigger RPC owner, err = spectator.GetShardOwner(context.Background(), "unknown-shard") assert.NoError(t, err) - assert.Equal(t, "executor-2", owner) + assert.Equal(t, "executor-2", owner.ExecutorID) + assert.Equal(t, "127.0.0.1:7954", owner.Metadata["grpc_address"]) } func TestStreamReconnection(t *testing.T) { @@ -188,7 +204,9 @@ func TestStreamReconnection(t *testing.T) { spectator.Start(context.Background()) defer spectator.Stop() - // Advance time for retry + // Wait for the goroutine to be blocked in Sleep, then advance time + mockTimeSource.BlockUntil(1) // Wait for 1 goroutine to be blocked in Sleep mockTimeSource.Advance(2 * time.Second) + spectator.firstStateWG.Wait() } diff --git a/service/sharddistributor/client/spectatorclient/interface_mock.go b/service/sharddistributor/client/spectatorclient/interface_mock.go index 5b1eaaa5500..0e68d476608 100644 --- a/service/sharddistributor/client/spectatorclient/interface_mock.go +++ b/service/sharddistributor/client/spectatorclient/interface_mock.go @@ -41,10 +41,10 @@ func (m *MockSpectator) EXPECT() *MockSpectatorMockRecorder { } // GetShardOwner mocks base method. -func (m *MockSpectator) GetShardOwner(ctx context.Context, shardKey string) (string, error) { +func (m *MockSpectator) GetShardOwner(ctx context.Context, shardKey string) (*ShardOwner, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "GetShardOwner", ctx, shardKey) - ret0, _ := ret[0].(string) + ret0, _ := ret[0].(*ShardOwner) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/service/sharddistributor/client/spectatorclient/peer_chooser.go b/service/sharddistributor/client/spectatorclient/peer_chooser.go new file mode 100644 index 00000000000..4deb8f60d46 --- /dev/null +++ b/service/sharddistributor/client/spectatorclient/peer_chooser.go @@ -0,0 +1,176 @@ +package spectatorclient + +import ( + "context" + "fmt" + "sync" + + "go.uber.org/fx" + "go.uber.org/yarpc/api/peer" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/peer/hostport" + "go.uber.org/yarpc/yarpcerrors" + + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" +) + +const ( + NamespaceHeader = "x-shard-distributor-namespace" + grpcAddressMetadataKey = "grpc_address" +) + +// SpectatorPeerChooserInterface extends peer.Chooser with SetSpectators method +type SpectatorPeerChooserInterface interface { + peer.Chooser + SetSpectators(spectators *Spectators) +} + +// SpectatorPeerChooser is a peer.Chooser that uses the Spectator to route requests +// to the correct executor based on shard ownership. +// This is the shard distributor equivalent of Cadence's RingpopPeerChooser. +// +// Flow: +// 1. Client calls RPC with yarpc.WithShardKey("shard-key") +// 2. Choose() is called with req.ShardKey = "shard-key" +// 3. Query Spectator for shard owner +// 4. Extract grpc_address from owner metadata +// 5. Create/reuse peer for that address +// 6. Return peer to YARPC for connection +type SpectatorPeerChooser struct { + spectators *Spectators + transport peer.Transport + logger log.Logger + namespace string + + mu sync.RWMutex + peers map[string]peer.Peer // grpc_address -> peer +} + +type SpectatorPeerChooserParams struct { + fx.In + Transport peer.Transport + Logger log.Logger +} + +// NewSpectatorPeerChooser creates a new peer chooser that routes based on shard distributor ownership +func NewSpectatorPeerChooser( + params SpectatorPeerChooserParams, +) SpectatorPeerChooserInterface { + return &SpectatorPeerChooser{ + transport: params.Transport, + logger: params.Logger, + peers: make(map[string]peer.Peer), + } +} + +// Start satisfies the peer.Chooser interface +func (c *SpectatorPeerChooser) Start() error { + c.logger.Info("Starting shard distributor peer chooser", tag.ShardNamespace(c.namespace)) + return nil +} + +// Stop satisfies the peer.Chooser interface +func (c *SpectatorPeerChooser) Stop() error { + c.logger.Info("Stopping shard distributor peer chooser", tag.ShardNamespace(c.namespace)) + + // Release all peers + c.mu.Lock() + defer c.mu.Unlock() + + for addr, p := range c.peers { + if err := c.transport.ReleasePeer(p, &noOpSubscriber{}); err != nil { + c.logger.Error("Failed to release peer", tag.Error(err), tag.Address(addr)) + } + } + c.peers = make(map[string]peer.Peer) + + return nil +} + +// IsRunning satisfies the peer.Chooser interface +func (c *SpectatorPeerChooser) IsRunning() bool { + return true +} + +// Choose returns a peer for the given shard key by: +// 0. Looking up the spectator for the namespace using the x-shard-distributor-namespace header +// 1. Looking up the shard owner via the Spectator +// 2. Extracting the grpc_address from the owner's metadata +// 3. Creating/reusing a peer for that address +// +// The ShardKey in the request is the actual shard key (e.g., workflow ID, shard ID), +// NOT the ip:port address. This is the key distinction from directPeerChooser. +func (c *SpectatorPeerChooser) Choose(ctx context.Context, req *transport.Request) (peer.Peer, func(error), error) { + if req.ShardKey == "" { + return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires ShardKey to be non-empty") + } + + // Get the spectator for the namespace + namespace, ok := req.Headers.Get(NamespaceHeader) + if !ok || namespace == "" { + return nil, nil, yarpcerrors.InvalidArgumentErrorf("chooser requires x-shard-distributor-namespace header to be non-empty") + } + + spectator, err := c.spectators.ForNamespace(namespace) + if err != nil { + return nil, nil, yarpcerrors.InvalidArgumentErrorf("failed to get spectator for namespace %s: %w", namespace, err) + } + + // Query spectator for shard owner + owner, err := spectator.GetShardOwner(ctx, req.ShardKey) + if err != nil { + return nil, nil, yarpcerrors.UnavailableErrorf("failed to get shard owner for key %s: %v", req.ShardKey, err) + } + + // Extract GRPC address from owner metadata + grpcAddress, ok := owner.Metadata[grpcAddressMetadataKey] + if !ok || grpcAddress == "" { + return nil, nil, yarpcerrors.InternalErrorf("no grpc_address in metadata for executor %s owning shard %s", owner.ExecutorID, req.ShardKey) + } + + // Check if we already have a peer for this address + c.mu.RLock() + p, ok := c.peers[grpcAddress] + if ok { + c.mu.RUnlock() + return p, func(error) {}, nil + } + c.mu.RUnlock() + + // Create new peer for this address + p, err = c.addPeer(grpcAddress) + if err != nil { + return nil, nil, yarpcerrors.InternalErrorf("failed to add peer for address %s: %v", grpcAddress, err) + } + + return p, func(error) {}, nil +} + +func (c *SpectatorPeerChooser) SetSpectators(spectators *Spectators) { + c.spectators = spectators +} + +func (c *SpectatorPeerChooser) addPeer(grpcAddress string) (peer.Peer, error) { + c.mu.Lock() + defer c.mu.Unlock() + + // Check again in case another goroutine added it + if p, ok := c.peers[grpcAddress]; ok { + return p, nil + } + + p, err := c.transport.RetainPeer(hostport.Identify(grpcAddress), &noOpSubscriber{}) + if err != nil { + return nil, fmt.Errorf("retain peer failed: %w", err) + } + + c.peers[grpcAddress] = p + c.logger.Info("Added peer to shard distributor peer chooser", tag.Address(grpcAddress)) + return p, nil +} + +// noOpSubscriber is a no-op implementation of peer.Subscriber +type noOpSubscriber struct{} + +func (*noOpSubscriber) NotifyStatusChanged(peer.Identifier) {} diff --git a/service/sharddistributor/client/spectatorclient/peer_chooser_test.go b/service/sharddistributor/client/spectatorclient/peer_chooser_test.go new file mode 100644 index 00000000000..569c354d01f --- /dev/null +++ b/service/sharddistributor/client/spectatorclient/peer_chooser_test.go @@ -0,0 +1,189 @@ +package spectatorclient + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/yarpc/api/peer" + "go.uber.org/yarpc/api/transport" + "go.uber.org/yarpc/transport/grpc" + + "github.com/uber/cadence/common/log/testlogger" +) + +func TestSpectatorPeerChooser_Choose_MissingShardKey(t *testing.T) { + chooser := &SpectatorPeerChooser{ + logger: testlogger.New(t), + peers: make(map[string]peer.Peer), + } + + req := &transport.Request{ + ShardKey: "", + Headers: transport.NewHeaders(), + } + + p, onFinish, err := chooser.Choose(context.Background(), req) + + assert.Error(t, err) + assert.Nil(t, p) + assert.Nil(t, onFinish) + assert.Contains(t, err.Error(), "ShardKey") +} + +func TestSpectatorPeerChooser_Choose_MissingNamespaceHeader(t *testing.T) { + chooser := &SpectatorPeerChooser{ + logger: testlogger.New(t), + peers: make(map[string]peer.Peer), + } + + req := &transport.Request{ + ShardKey: "shard-1", + Headers: transport.NewHeaders(), + } + + p, onFinish, err := chooser.Choose(context.Background(), req) + + assert.Error(t, err) + assert.Nil(t, p) + assert.Nil(t, onFinish) + assert.Contains(t, err.Error(), "x-shard-distributor-namespace") +} + +func TestSpectatorPeerChooser_Choose_SpectatorNotFound(t *testing.T) { + chooser := &SpectatorPeerChooser{ + logger: testlogger.New(t), + peers: make(map[string]peer.Peer), + spectators: &Spectators{spectators: make(map[string]Spectator)}, + } + + req := &transport.Request{ + ShardKey: "shard-1", + Headers: transport.NewHeaders().With(NamespaceHeader, "unknown-namespace"), + } + + p, onFinish, err := chooser.Choose(context.Background(), req) + + assert.Error(t, err) + assert.Nil(t, p) + assert.Nil(t, onFinish) + assert.Contains(t, err.Error(), "spectator not found") +} + +func TestSpectatorPeerChooser_StartStop(t *testing.T) { + chooser := &SpectatorPeerChooser{ + logger: testlogger.New(t), + peers: make(map[string]peer.Peer), + } + + err := chooser.Start() + require.NoError(t, err) + + assert.True(t, chooser.IsRunning()) + + err = chooser.Stop() + assert.NoError(t, err) +} + +func TestSpectatorPeerChooser_SetSpectators(t *testing.T) { + chooser := &SpectatorPeerChooser{ + logger: testlogger.New(t), + } + + spectators := &Spectators{spectators: make(map[string]Spectator)} + chooser.SetSpectators(spectators) + + assert.Equal(t, spectators, chooser.spectators) +} + +func TestSpectatorPeerChooser_Choose_Success(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockSpectator := NewMockSpectator(ctrl) + peerTransport := grpc.NewTransport() + + chooser := &SpectatorPeerChooser{ + transport: peerTransport, + logger: testlogger.New(t), + peers: make(map[string]peer.Peer), + spectators: &Spectators{ + spectators: map[string]Spectator{ + "test-namespace": mockSpectator, + }, + }, + } + + ctx := context.Background() + req := &transport.Request{ + ShardKey: "shard-1", + Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"), + } + + // Mock spectator to return shard owner with grpc_address + mockSpectator.EXPECT(). + GetShardOwner(ctx, "shard-1"). + Return(&ShardOwner{ + ExecutorID: "executor-1", + Metadata: map[string]string{ + grpcAddressMetadataKey: "127.0.0.1:7953", + }, + }, nil) + + // Execute + p, onFinish, err := chooser.Choose(ctx, req) + + // Assert + assert.NoError(t, err) + assert.NotNil(t, p) + assert.NotNil(t, onFinish) + assert.Equal(t, "127.0.0.1:7953", p.Identifier()) + assert.Len(t, chooser.peers, 1) +} + +func TestSpectatorPeerChooser_Choose_ReusesPeer(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + mockSpectator := NewMockSpectator(ctrl) + peerTransport := grpc.NewTransport() + + chooser := &SpectatorPeerChooser{ + transport: peerTransport, + logger: testlogger.New(t), + peers: make(map[string]peer.Peer), + spectators: &Spectators{ + spectators: map[string]Spectator{ + "test-namespace": mockSpectator, + }, + }, + } + + req := &transport.Request{ + ShardKey: "shard-1", + Headers: transport.NewHeaders().With(NamespaceHeader, "test-namespace"), + } + + // First call creates the peer + mockSpectator.EXPECT(). + GetShardOwner(gomock.Any(), "shard-1"). + Return(&ShardOwner{ + ExecutorID: "executor-1", + Metadata: map[string]string{ + grpcAddressMetadataKey: "127.0.0.1:7953", + }, + }, nil).Times(2) + + firstPeer, _, err := chooser.Choose(context.Background(), req) + require.NoError(t, err) + + // Second call should reuse the same peer + secondPeer, _, err := chooser.Choose(context.Background(), req) + + // Assert - should reuse existing peer + assert.NoError(t, err) + assert.Equal(t, firstPeer, secondPeer) + assert.Len(t, chooser.peers, 1) +}