diff --git a/distribution/engine.go b/distribution/engine.go new file mode 100644 index 0000000..024f5d8 --- /dev/null +++ b/distribution/engine.go @@ -0,0 +1,121 @@ +package distribution + +import ( + "sync" +) + +// Range represents a key range and its statistics. +type Range struct { + Start string + End string + RequestCount int +} + +// RangeStat mirrors Range for external consumption. +type RangeStat struct { + Start string + End string + Count int +} + +// Engine tracks ranges and their statistics. +type Engine struct { + mu sync.Mutex + ranges []Range + splitThreshold int + notify func(left, right Range) +} + +// NewEngine creates a new Engine with a split threshold. The notify +// function is invoked whenever a range split occurs. +func NewEngine(threshold int, notify func(left, right Range)) *Engine { + return &Engine{ + ranges: []Range{{Start: "", End: string(rune(0xffff))}}, + splitThreshold: threshold, + notify: notify, + } +} + +// RecordRequest registers a request for the given key and triggers a split +// when the request count for the containing range exceeds the threshold. +func (e *Engine) RecordRequest(key string) { + e.mu.Lock() + defer e.mu.Unlock() + + idx := e.findRange(key) + e.ranges[idx].RequestCount++ + if e.ranges[idx].RequestCount >= e.splitThreshold { + e.splitRange(idx) + } +} + +// SplitRange splits the range containing the supplied key. +func (e *Engine) SplitRange(key string) { + e.mu.Lock() + defer e.mu.Unlock() + idx := e.findRange(key) + e.splitRange(idx) +} + +// GetStats returns the statistics for all ranges. +func (e *Engine) GetStats() []RangeStat { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]RangeStat, len(e.ranges)) + for i, r := range e.ranges { + out[i] = RangeStat{Start: r.Start, End: r.End, Count: r.RequestCount} + } + return out +} + +// Ranges returns a copy of the current range metadata. Primarily used for tests. +func (e *Engine) Ranges() []Range { + e.mu.Lock() + defer e.mu.Unlock() + out := make([]Range, len(e.ranges)) + copy(out, e.ranges) + return out +} + +func (e *Engine) findRange(key string) int { + for i, r := range e.ranges { + if (r.Start == "" || key >= r.Start) && (r.End == "" || key < r.End) { + return i + } + } + // default to last range + return len(e.ranges) - 1 +} + +func (e *Engine) splitRange(idx int) { + r := e.ranges[idx] + mid := midpoint(r.Start, r.End) + left := Range{Start: r.Start, End: mid} + right := Range{Start: mid, End: r.End} + + // replace range with two new ranges + newRanges := make([]Range, 0, len(e.ranges)+1) + newRanges = append(newRanges, e.ranges[:idx]...) + newRanges = append(newRanges, left, right) + newRanges = append(newRanges, e.ranges[idx+1:]...) + e.ranges = newRanges + + if e.notify != nil { + e.notify(left, right) + } +} + +func midpoint(start, end string) string { + // simplistic midpoint calculation based on the first byte of the bounds. + s := byte('a') + if len(start) > 0 { + s = start[0] + } + e := byte('z') + if len(end) > 0 && end != string(rune(0xffff)) { + e = end[0] + } + m := s + (e-s)/2 + return string([]byte{m}) +} + diff --git a/distribution/split_test.go b/distribution/split_test.go new file mode 100644 index 0000000..1d82a21 --- /dev/null +++ b/distribution/split_test.go @@ -0,0 +1,55 @@ +package distribution + +import "testing" + +// Test range statistics collection and split triggering. +func TestRangeStatsAndSplit(t *testing.T) { + var notified bool + eng := NewEngine(3, func(left, right Range) { + notified = true + }) + + eng.RecordRequest("a") + eng.RecordRequest("a") + + stats := eng.GetStats() + if len(stats) != 1 || stats[0].Count != 2 { + t.Fatalf("expected single range count 2, got %+v", stats) + } + + // Third request should trigger a split + eng.RecordRequest("a") + if !notified { + t.Fatalf("expected split notification") + } + if len(eng.Ranges()) != 2 { + t.Fatalf("expected 2 ranges after split, got %d", len(eng.Ranges())) + } + + // Stats should now have two ranges with reset counts + stats = eng.GetStats() + if len(stats) != 2 { + t.Fatalf("expected stats for 2 ranges, got %d", len(stats)) + } + + // find range containing "a" + var count int + for _, s := range stats { + if (s.Start == "" || "a" >= s.Start) && (s.End == "" || "a" < s.End) { + count = s.Count + } + } + if count != 0 { + t.Fatalf("expected count reset after split, got %d", count) + } +} + +// Ensure metadata updates after manual SplitRange call. +func TestManualSplitRange(t *testing.T) { + eng := NewEngine(100, nil) + eng.SplitRange("a") + if len(eng.Ranges()) != 2 { + t.Fatalf("expected 2 ranges after manual split, got %d", len(eng.Ranges())) + } +} + diff --git a/proto/distribution.pb.go b/proto/distribution.pb.go new file mode 100644 index 0000000..3050c99 --- /dev/null +++ b/proto/distribution.pb.go @@ -0,0 +1,405 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.31.0 +// protoc v3.21.12 +// source: distribution.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type SplitRangeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` +} + +func (x *SplitRangeRequest) Reset() { + *x = SplitRangeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SplitRangeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SplitRangeRequest) ProtoMessage() {} + +func (x *SplitRangeRequest) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SplitRangeRequest.ProtoReflect.Descriptor instead. +func (*SplitRangeRequest) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{0} +} + +func (x *SplitRangeRequest) GetKey() string { + if x != nil { + return x.Key + } + return "" +} + +type SplitRangeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *SplitRangeResponse) Reset() { + *x = SplitRangeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *SplitRangeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*SplitRangeResponse) ProtoMessage() {} + +func (x *SplitRangeResponse) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use SplitRangeResponse.ProtoReflect.Descriptor instead. +func (*SplitRangeResponse) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{1} +} + +type GetStatsRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetStatsRequest) Reset() { + *x = GetStatsRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetStatsRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetStatsRequest) ProtoMessage() {} + +func (x *GetStatsRequest) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetStatsRequest.ProtoReflect.Descriptor instead. +func (*GetStatsRequest) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{2} +} + +type RangeStat struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Start string `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End string `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` + Count int64 `protobuf:"varint,3,opt,name=count,proto3" json:"count,omitempty"` +} + +func (x *RangeStat) Reset() { + *x = RangeStat{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *RangeStat) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RangeStat) ProtoMessage() {} + +func (x *RangeStat) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RangeStat.ProtoReflect.Descriptor instead. +func (*RangeStat) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{3} +} + +func (x *RangeStat) GetStart() string { + if x != nil { + return x.Start + } + return "" +} + +func (x *RangeStat) GetEnd() string { + if x != nil { + return x.End + } + return "" +} + +func (x *RangeStat) GetCount() int64 { + if x != nil { + return x.Count + } + return 0 +} + +type GetStatsResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Stats []*RangeStat `protobuf:"bytes,1,rep,name=stats,proto3" json:"stats,omitempty"` +} + +func (x *GetStatsResponse) Reset() { + *x = GetStatsResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetStatsResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetStatsResponse) ProtoMessage() {} + +func (x *GetStatsResponse) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetStatsResponse.ProtoReflect.Descriptor instead. +func (*GetStatsResponse) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{4} +} + +func (x *GetStatsResponse) GetStats() []*RangeStat { + if x != nil { + return x.Stats + } + return nil +} + +var File_distribution_proto protoreflect.FileDescriptor + +var file_distribution_proto_rawDesc = []byte{ + 0x0a, 0x12, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x25, 0x0a, 0x11, 0x53, 0x70, 0x6c, 0x69, 0x74, 0x52, 0x61, 0x6e, + 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x14, 0x0a, 0x12, 0x53, + 0x70, 0x6c, 0x69, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x11, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x22, 0x49, 0x0a, 0x09, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x61, + 0x74, 0x12, 0x14, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x65, 0x6e, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, + 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, + 0x34, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x20, 0x0a, 0x05, 0x73, 0x74, 0x61, 0x74, 0x73, 0x18, 0x01, 0x20, 0x03, + 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x53, 0x74, 0x61, 0x74, 0x52, 0x05, + 0x73, 0x74, 0x61, 0x74, 0x73, 0x32, 0x7f, 0x0a, 0x11, 0x44, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, + 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x41, 0x64, 0x6d, 0x69, 0x6e, 0x12, 0x37, 0x0a, 0x0a, 0x53, 0x70, + 0x6c, 0x69, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x12, 0x12, 0x2e, 0x53, 0x70, 0x6c, 0x69, 0x74, + 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x13, 0x2e, 0x53, + 0x70, 0x6c, 0x69, 0x74, 0x52, 0x61, 0x6e, 0x67, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x31, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x12, + 0x10, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x11, 0x2e, 0x47, 0x65, 0x74, 0x53, 0x74, 0x61, 0x74, 0x73, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x6f, 0x6f, 0x74, 0x6a, 0x70, 0x2f, 0x65, 0x6c, 0x61, 0x73, + 0x74, 0x69, 0x63, 0x6b, 0x76, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, +} + +var ( + file_distribution_proto_rawDescOnce sync.Once + file_distribution_proto_rawDescData = file_distribution_proto_rawDesc +) + +func file_distribution_proto_rawDescGZIP() []byte { + file_distribution_proto_rawDescOnce.Do(func() { + file_distribution_proto_rawDescData = protoimpl.X.CompressGZIP(file_distribution_proto_rawDescData) + }) + return file_distribution_proto_rawDescData +} + +var file_distribution_proto_msgTypes = make([]protoimpl.MessageInfo, 5) +var file_distribution_proto_goTypes = []interface{}{ + (*SplitRangeRequest)(nil), // 0: SplitRangeRequest + (*SplitRangeResponse)(nil), // 1: SplitRangeResponse + (*GetStatsRequest)(nil), // 2: GetStatsRequest + (*RangeStat)(nil), // 3: RangeStat + (*GetStatsResponse)(nil), // 4: GetStatsResponse +} +var file_distribution_proto_depIdxs = []int32{ + 3, // 0: GetStatsResponse.stats:type_name -> RangeStat + 0, // 1: DistributionAdmin.SplitRange:input_type -> SplitRangeRequest + 2, // 2: DistributionAdmin.GetStats:input_type -> GetStatsRequest + 1, // 3: DistributionAdmin.SplitRange:output_type -> SplitRangeResponse + 4, // 4: DistributionAdmin.GetStats:output_type -> GetStatsResponse + 3, // [3:5] is the sub-list for method output_type + 1, // [1:3] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_distribution_proto_init() } +func file_distribution_proto_init() { + if File_distribution_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_distribution_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SplitRangeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*SplitRangeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetStatsRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*RangeStat); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetStatsResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_distribution_proto_rawDesc, + NumEnums: 0, + NumMessages: 5, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_distribution_proto_goTypes, + DependencyIndexes: file_distribution_proto_depIdxs, + MessageInfos: file_distribution_proto_msgTypes, + }.Build() + File_distribution_proto = out.File + file_distribution_proto_rawDesc = nil + file_distribution_proto_goTypes = nil + file_distribution_proto_depIdxs = nil +} diff --git a/proto/distribution.proto b/proto/distribution.proto new file mode 100644 index 0000000..c1b58ec --- /dev/null +++ b/proto/distribution.proto @@ -0,0 +1,26 @@ +syntax = "proto3"; + +option go_package = "github.com/bootjp/elastickv/proto"; + +service DistributionAdmin { + rpc SplitRange(SplitRangeRequest) returns (SplitRangeResponse) {} + rpc GetStats(GetStatsRequest) returns (GetStatsResponse) {} +} + +message SplitRangeRequest { + string key = 1; +} + +message SplitRangeResponse {} + +message GetStatsRequest {} + +message RangeStat { + string start = 1; + string end = 2; + int64 count = 3; +} + +message GetStatsResponse { + repeated RangeStat stats = 1; +} diff --git a/proto/distribution_grpc.pb.go b/proto/distribution_grpc.pb.go new file mode 100644 index 0000000..4732907 --- /dev/null +++ b/proto/distribution_grpc.pb.go @@ -0,0 +1,146 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc v3.21.12 +// source: distribution.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + DistributionAdmin_SplitRange_FullMethodName = "/DistributionAdmin/SplitRange" + DistributionAdmin_GetStats_FullMethodName = "/DistributionAdmin/GetStats" +) + +// DistributionAdminClient is the client API for DistributionAdmin service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DistributionAdminClient interface { + SplitRange(ctx context.Context, in *SplitRangeRequest, opts ...grpc.CallOption) (*SplitRangeResponse, error) + GetStats(ctx context.Context, in *GetStatsRequest, opts ...grpc.CallOption) (*GetStatsResponse, error) +} + +type distributionAdminClient struct { + cc grpc.ClientConnInterface +} + +func NewDistributionAdminClient(cc grpc.ClientConnInterface) DistributionAdminClient { + return &distributionAdminClient{cc} +} + +func (c *distributionAdminClient) SplitRange(ctx context.Context, in *SplitRangeRequest, opts ...grpc.CallOption) (*SplitRangeResponse, error) { + out := new(SplitRangeResponse) + err := c.cc.Invoke(ctx, DistributionAdmin_SplitRange_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *distributionAdminClient) GetStats(ctx context.Context, in *GetStatsRequest, opts ...grpc.CallOption) (*GetStatsResponse, error) { + out := new(GetStatsResponse) + err := c.cc.Invoke(ctx, DistributionAdmin_GetStats_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DistributionAdminServer is the server API for DistributionAdmin service. +// All implementations must embed UnimplementedDistributionAdminServer +// for forward compatibility +type DistributionAdminServer interface { + SplitRange(context.Context, *SplitRangeRequest) (*SplitRangeResponse, error) + GetStats(context.Context, *GetStatsRequest) (*GetStatsResponse, error) + mustEmbedUnimplementedDistributionAdminServer() +} + +// UnimplementedDistributionAdminServer must be embedded to have forward compatible implementations. +type UnimplementedDistributionAdminServer struct { +} + +func (UnimplementedDistributionAdminServer) SplitRange(context.Context, *SplitRangeRequest) (*SplitRangeResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method SplitRange not implemented") +} +func (UnimplementedDistributionAdminServer) GetStats(context.Context, *GetStatsRequest) (*GetStatsResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetStats not implemented") +} +func (UnimplementedDistributionAdminServer) mustEmbedUnimplementedDistributionAdminServer() {} + +// UnsafeDistributionAdminServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DistributionAdminServer will +// result in compilation errors. +type UnsafeDistributionAdminServer interface { + mustEmbedUnimplementedDistributionAdminServer() +} + +func RegisterDistributionAdminServer(s grpc.ServiceRegistrar, srv DistributionAdminServer) { + s.RegisterService(&DistributionAdmin_ServiceDesc, srv) +} + +func _DistributionAdmin_SplitRange_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(SplitRangeRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributionAdminServer).SplitRange(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DistributionAdmin_SplitRange_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributionAdminServer).SplitRange(ctx, req.(*SplitRangeRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _DistributionAdmin_GetStats_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetStatsRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributionAdminServer).GetStats(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: DistributionAdmin_GetStats_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributionAdminServer).GetStats(ctx, req.(*GetStatsRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// DistributionAdmin_ServiceDesc is the grpc.ServiceDesc for DistributionAdmin service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var DistributionAdmin_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "DistributionAdmin", + HandlerType: (*DistributionAdminServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "SplitRange", + Handler: _DistributionAdmin_SplitRange_Handler, + }, + { + MethodName: "GetStats", + Handler: _DistributionAdmin_GetStats_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "distribution.proto", +}