diff --git a/adapter/distribution_server.go b/adapter/distribution_server.go new file mode 100644 index 0000000..17b61eb --- /dev/null +++ b/adapter/distribution_server.go @@ -0,0 +1,43 @@ +package adapter + +import ( + "context" + + "github.com/bootjp/elastickv/distribution" + pb "github.com/bootjp/elastickv/proto" +) + +// DistributionServer serves distribution related gRPC APIs. +type DistributionServer struct { + engine *distribution.Engine + pb.UnimplementedDistributionServer +} + +// NewDistributionServer creates a new server. +func NewDistributionServer(e *distribution.Engine) *DistributionServer { + return &DistributionServer{engine: e} +} + +// UpdateRoute allows updating route information. +func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64) { + s.engine.UpdateRoute(start, end, group) +} + +// GetRoute returns route for a key. +func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error) { + r, ok := s.engine.GetRoute(req.Key) + if !ok { + return &pb.GetRouteResponse{}, nil + } + return &pb.GetRouteResponse{ + Start: r.Start, + End: r.End, + RaftGroupId: r.GroupID, + }, nil +} + +// GetTimestamp returns monotonically increasing timestamp. +func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error) { + ts := s.engine.NextTimestamp() + return &pb.GetTimestampResponse{Timestamp: ts}, nil +} diff --git a/cmd/server/demo.go b/cmd/server/demo.go index d4d4c3b..8c23d70 100644 --- a/cmd/server/demo.go +++ b/cmd/server/demo.go @@ -11,6 +11,7 @@ import ( transport "github.com/Jille/raft-grpc-transport" "github.com/Jille/raftadmin" "github.com/bootjp/elastickv/adapter" + "github.com/bootjp/elastickv/distribution" "github.com/bootjp/elastickv/kv" pb "github.com/bootjp/elastickv/proto" "github.com/bootjp/elastickv/store" @@ -89,10 +90,15 @@ func run(eg *errgroup.Group) error { trx := kv.NewTransaction(r) coordinator := kv.NewCoordinator(trx, r) gs := adapter.NewGRPCServer(st, coordinator) + distEngine := distribution.NewEngine() + distSrv := adapter.NewDistributionServer(distEngine) + // example route for demo purposes + distSrv.UpdateRoute([]byte("a"), []byte("z"), uint64(i)) tm.Register(s) pb.RegisterRawKVServer(s, gs) pb.RegisterTransactionalKVServer(s, gs) pb.RegisterInternalServer(s, adapter.NewInternal(trx, r)) + pb.RegisterDistributionServer(s, distSrv) leaderhealth.Setup(r, s, []string{"RawKV"}) raftadmin.Register(s, r) diff --git a/distribution/engine.go b/distribution/engine.go new file mode 100644 index 0000000..1ce6a6e --- /dev/null +++ b/distribution/engine.go @@ -0,0 +1,50 @@ +package distribution + +import ( + "bytes" + "sync" + "sync/atomic" +) + +// Route represents mapping from key range to raft group. +type Route struct { + Start []byte + End []byte + GroupID uint64 +} + +// Engine holds in-memory metadata of routes and provides timestamp generation. +type Engine struct { + mu sync.RWMutex + routes []Route + ts uint64 +} + +// NewEngine creates an Engine. +func NewEngine() *Engine { + return &Engine{routes: make([]Route, 0)} +} + +// UpdateRoute registers or updates a route. +func (e *Engine) UpdateRoute(start, end []byte, group uint64) { + e.mu.Lock() + defer e.mu.Unlock() + e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group}) +} + +// GetRoute finds a route for the given key. +func (e *Engine) GetRoute(key []byte) (Route, bool) { + e.mu.RLock() + defer e.mu.RUnlock() + for _, r := range e.routes { + if bytes.Compare(key, r.Start) >= 0 && bytes.Compare(key, r.End) < 0 { + return r, true + } + } + return Route{}, false +} + +// NextTimestamp returns a monotonic increasing timestamp. +func (e *Engine) NextTimestamp() uint64 { + return atomic.AddUint64(&e.ts, 1) +} diff --git a/distribution/engine_test.go b/distribution/engine_test.go new file mode 100644 index 0000000..91f4df0 --- /dev/null +++ b/distribution/engine_test.go @@ -0,0 +1,31 @@ +package distribution + +import "testing" + +func TestEngineRouteLookup(t *testing.T) { + e := NewEngine() + e.UpdateRoute([]byte("a"), []byte("m"), 1) + e.UpdateRoute([]byte("m"), []byte("z"), 2) + + r, ok := e.GetRoute([]byte("b")) + if !ok || r.GroupID != 1 { + t.Fatalf("expected group 1, got %v", r.GroupID) + } + + r, ok = e.GetRoute([]byte("x")) + if !ok || r.GroupID != 2 { + t.Fatalf("expected group 2, got %v", r.GroupID) + } +} + +func TestEngineTimestampMonotonic(t *testing.T) { + e := NewEngine() + last := e.NextTimestamp() + for i := 0; i < 100; i++ { + ts := e.NextTimestamp() + if ts <= last { + t.Fatalf("timestamp not monotonic: %d <= %d", ts, last) + } + last = ts + } +} diff --git a/proto/Makefile b/proto/Makefile index ab03f07..d23f322 100644 --- a/proto/Makefile +++ b/proto/Makefile @@ -1,10 +1,12 @@ +all: gen + gen: protoc --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative \ - service.proto + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + service.proto protoc --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative \ - internal.proto - - - + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + internal.proto + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + distribution.proto diff --git a/proto/distribution.pb.go b/proto/distribution.pb.go new file mode 100644 index 0000000..44bda0c --- /dev/null +++ b/proto/distribution.pb.go @@ -0,0 +1,279 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.36.7 +// protoc v3.21.12 +// source: distribution.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetRouteRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRouteRequest) Reset() { + *x = GetRouteRequest{} + mi := &file_distribution_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRouteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRouteRequest) ProtoMessage() {} + +func (x *GetRouteRequest) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[0] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRouteRequest.ProtoReflect.Descriptor instead. +func (*GetRouteRequest) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{0} +} + +func (x *GetRouteRequest) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +type GetRouteResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` + RaftGroupId uint64 `protobuf:"varint,3,opt,name=raft_group_id,json=raftGroupId,proto3" json:"raft_group_id,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetRouteResponse) Reset() { + *x = GetRouteResponse{} + mi := &file_distribution_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetRouteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRouteResponse) ProtoMessage() {} + +func (x *GetRouteResponse) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[1] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRouteResponse.ProtoReflect.Descriptor instead. +func (*GetRouteResponse) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{1} +} + +func (x *GetRouteResponse) GetStart() []byte { + if x != nil { + return x.Start + } + return nil +} + +func (x *GetRouteResponse) GetEnd() []byte { + if x != nil { + return x.End + } + return nil +} + +func (x *GetRouteResponse) GetRaftGroupId() uint64 { + if x != nil { + return x.RaftGroupId + } + return 0 +} + +type GetTimestampRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetTimestampRequest) Reset() { + *x = GetTimestampRequest{} + mi := &file_distribution_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetTimestampRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetTimestampRequest) ProtoMessage() {} + +func (x *GetTimestampRequest) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[2] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetTimestampRequest.ProtoReflect.Descriptor instead. +func (*GetTimestampRequest) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{2} +} + +type GetTimestampResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetTimestampResponse) Reset() { + *x = GetTimestampResponse{} + mi := &file_distribution_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetTimestampResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetTimestampResponse) ProtoMessage() {} + +func (x *GetTimestampResponse) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[3] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetTimestampResponse.ProtoReflect.Descriptor instead. +func (*GetTimestampResponse) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{3} +} + +func (x *GetTimestampResponse) GetTimestamp() uint64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +var File_distribution_proto protoreflect.FileDescriptor + +const file_distribution_proto_rawDesc = "" + + "\n" + + "\x12distribution.proto\"#\n" + + "\x0fGetRouteRequest\x12\x10\n" + + "\x03key\x18\x01 \x01(\fR\x03key\"^\n" + + "\x10GetRouteResponse\x12\x14\n" + + "\x05start\x18\x01 \x01(\fR\x05start\x12\x10\n" + + "\x03end\x18\x02 \x01(\fR\x03end\x12\"\n" + + "\rraft_group_id\x18\x03 \x01(\x04R\vraftGroupId\"\x15\n" + + "\x13GetTimestampRequest\"4\n" + + "\x14GetTimestampResponse\x12\x1c\n" + + "\ttimestamp\x18\x01 \x01(\x04R\ttimestamp2\x80\x01\n" + + "\fDistribution\x121\n" + + "\bGetRoute\x12\x10.GetRouteRequest\x1a\x11.GetRouteResponse\"\x00\x12=\n" + + "\fGetTimestamp\x12\x14.GetTimestampRequest\x1a\x15.GetTimestampResponse\"\x00B#Z!github.com/bootjp/elastickv/protob\x06proto3" + +var ( + file_distribution_proto_rawDescOnce sync.Once + file_distribution_proto_rawDescData []byte +) + +func file_distribution_proto_rawDescGZIP() []byte { + file_distribution_proto_rawDescOnce.Do(func() { + file_distribution_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_distribution_proto_rawDesc), len(file_distribution_proto_rawDesc))) + }) + return file_distribution_proto_rawDescData +} + +var file_distribution_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_distribution_proto_goTypes = []any{ + (*GetRouteRequest)(nil), // 0: GetRouteRequest + (*GetRouteResponse)(nil), // 1: GetRouteResponse + (*GetTimestampRequest)(nil), // 2: GetTimestampRequest + (*GetTimestampResponse)(nil), // 3: GetTimestampResponse +} +var file_distribution_proto_depIdxs = []int32{ + 0, // 0: Distribution.GetRoute:input_type -> GetRouteRequest + 2, // 1: Distribution.GetTimestamp:input_type -> GetTimestampRequest + 1, // 2: Distribution.GetRoute:output_type -> GetRouteResponse + 3, // 3: Distribution.GetTimestamp:output_type -> GetTimestampResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_distribution_proto_init() } +func file_distribution_proto_init() { + if File_distribution_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: unsafe.Slice(unsafe.StringData(file_distribution_proto_rawDesc), len(file_distribution_proto_rawDesc)), + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_distribution_proto_goTypes, + DependencyIndexes: file_distribution_proto_depIdxs, + MessageInfos: file_distribution_proto_msgTypes, + }.Build() + File_distribution_proto = out.File + file_distribution_proto_goTypes = nil + file_distribution_proto_depIdxs = nil +} diff --git a/proto/distribution.proto b/proto/distribution.proto new file mode 100644 index 0000000..2010f67 --- /dev/null +++ b/proto/distribution.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +option go_package = "github.com/bootjp/elastickv/proto"; + +service Distribution { + rpc GetRoute (GetRouteRequest) returns (GetRouteResponse) {} + rpc GetTimestamp (GetTimestampRequest) returns (GetTimestampResponse) {} +} + +message GetRouteRequest { + bytes key = 1; +} + +message GetRouteResponse { + bytes start = 1; + bytes end = 2; + uint64 raft_group_id = 3; +} + +message GetTimestampRequest {} + +message GetTimestampResponse { + uint64 timestamp = 1; +} + diff --git a/proto/distribution_grpc.pb.go b/proto/distribution_grpc.pb.go new file mode 100644 index 0000000..49415c9 --- /dev/null +++ b/proto/distribution_grpc.pb.go @@ -0,0 +1,159 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.5.1 +// - protoc v3.21.12 +// source: distribution.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.64.0 or later. +const _ = grpc.SupportPackageIsVersion9 + +const ( + Distribution_GetRoute_FullMethodName = "/Distribution/GetRoute" + Distribution_GetTimestamp_FullMethodName = "/Distribution/GetTimestamp" +) + +// DistributionClient is the client API for Distribution service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DistributionClient interface { + GetRoute(ctx context.Context, in *GetRouteRequest, opts ...grpc.CallOption) (*GetRouteResponse, error) + GetTimestamp(ctx context.Context, in *GetTimestampRequest, opts ...grpc.CallOption) (*GetTimestampResponse, error) +} + +type distributionClient struct { + cc grpc.ClientConnInterface +} + +func NewDistributionClient(cc grpc.ClientConnInterface) DistributionClient { + return &distributionClient{cc} +} + +func (c *distributionClient) GetRoute(ctx context.Context, in *GetRouteRequest, opts ...grpc.CallOption) (*GetRouteResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetRouteResponse) + err := c.cc.Invoke(ctx, Distribution_GetRoute_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *distributionClient) GetTimestamp(ctx context.Context, in *GetTimestampRequest, opts ...grpc.CallOption) (*GetTimestampResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetTimestampResponse) + err := c.cc.Invoke(ctx, Distribution_GetTimestamp_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DistributionServer is the server API for Distribution service. +// All implementations must embed UnimplementedDistributionServer +// for forward compatibility. +type DistributionServer interface { + GetRoute(context.Context, *GetRouteRequest) (*GetRouteResponse, error) + GetTimestamp(context.Context, *GetTimestampRequest) (*GetTimestampResponse, error) + mustEmbedUnimplementedDistributionServer() +} + +// UnimplementedDistributionServer must be embedded to have +// forward compatible implementations. +// +// NOTE: this should be embedded by value instead of pointer to avoid a nil +// pointer dereference when methods are called. +type UnimplementedDistributionServer struct{} + +func (UnimplementedDistributionServer) GetRoute(context.Context, *GetRouteRequest) (*GetRouteResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetRoute not implemented") +} +func (UnimplementedDistributionServer) GetTimestamp(context.Context, *GetTimestampRequest) (*GetTimestampResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetTimestamp not implemented") +} +func (UnimplementedDistributionServer) mustEmbedUnimplementedDistributionServer() {} +func (UnimplementedDistributionServer) testEmbeddedByValue() {} + +// UnsafeDistributionServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DistributionServer will +// result in compilation errors. +type UnsafeDistributionServer interface { + mustEmbedUnimplementedDistributionServer() +} + +func RegisterDistributionServer(s grpc.ServiceRegistrar, srv DistributionServer) { + // If the following call pancis, it indicates UnimplementedDistributionServer was + // embedded by pointer and is nil. This will cause panics if an + // unimplemented method is ever invoked, so we test this at initialization + // time to prevent it from happening at runtime later due to I/O. + if t, ok := srv.(interface{ testEmbeddedByValue() }); ok { + t.testEmbeddedByValue() + } + s.RegisterService(&Distribution_ServiceDesc, srv) +} + +func _Distribution_GetRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRouteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributionServer).GetRoute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Distribution_GetRoute_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributionServer).GetRoute(ctx, req.(*GetRouteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Distribution_GetTimestamp_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetTimestampRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributionServer).GetTimestamp(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Distribution_GetTimestamp_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributionServer).GetTimestamp(ctx, req.(*GetTimestampRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Distribution_ServiceDesc is the grpc.ServiceDesc for Distribution service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Distribution_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Distribution", + HandlerType: (*DistributionServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetRoute", + Handler: _Distribution_GetRoute_Handler, + }, + { + MethodName: "GetTimestamp", + Handler: _Distribution_GetTimestamp_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "distribution.proto", +}