From fd02456780b8822b6da47e535b18238a69abecb5 Mon Sep 17 00:00:00 2001 From: "Yoshiaki Ueda (bootjp)" Date: Fri, 29 Aug 2025 01:23:50 +0900 Subject: [PATCH] Merge main branch --- .github/workflows/docker-image.yml | 2 +- .github/workflows/go-mod-fix.yaml | 2 +- .github/workflows/go-test.yml | 2 +- .github/workflows/golangci-lint.yml | 2 +- .gitignore | 3 + adapter/distribution_server.go | 43 ++++ adapter/dynamodb.go | 9 +- adapter/dynamodb_test.go | 354 +++++++++++++++++++++------ adapter/grpc.go | 8 +- distribution/engine.go | 71 ++++++ distribution/engine_test.go | 52 ++++ go.mod | 35 ++- go.sum | 91 ++++--- proto/Makefile | 16 +- proto/distribution.pb.go | 355 ++++++++++++++++++++++++++++ proto/distribution.proto | 27 +++ proto/distribution_grpc.pb.go | 141 +++++++++++ 17 files changed, 1074 insertions(+), 139 deletions(-) create mode 100644 adapter/distribution_server.go create mode 100644 distribution/engine.go create mode 100644 distribution/engine_test.go create mode 100644 proto/distribution.pb.go create mode 100644 proto/distribution.proto create mode 100644 proto/distribution_grpc.pb.go diff --git a/.github/workflows/docker-image.yml b/.github/workflows/docker-image.yml index e7517af..ecefde3 100644 --- a/.github/workflows/docker-image.yml +++ b/.github/workflows/docker-image.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 with: submodules: true # - name: Set up QEMU diff --git a/.github/workflows/go-mod-fix.yaml b/.github/workflows/go-mod-fix.yaml index 34ae975..0599082 100644 --- a/.github/workflows/go-mod-fix.yaml +++ b/.github/workflows/go-mod-fix.yaml @@ -9,7 +9,7 @@ jobs: runs-on: ubuntu-latest steps: - name: checkout - uses: actions/checkout@v4 + uses: actions/checkout@v5 with: fetch-depth: 2 - name: fix diff --git a/.github/workflows/go-test.yml b/.github/workflows/go-test.yml index da3468b..5a5812b 100644 --- a/.github/workflows/go-test.yml +++ b/.github/workflows/go-test.yml @@ -15,7 +15,7 @@ jobs: os: [ [ubuntu-latest] ] runs-on: ${{ matrix.os }} steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - uses: actions/setup-go@v5 with: go-version-file: 'go.mod' diff --git a/.github/workflows/golangci-lint.yml b/.github/workflows/golangci-lint.yml index 812fc2e..8bd571d 100644 --- a/.github/workflows/golangci-lint.yml +++ b/.github/workflows/golangci-lint.yml @@ -14,7 +14,7 @@ jobs: runs-on: ubuntu-latest steps: - name: Check out code into the Go module directory - uses: actions/checkout@v4 + uses: actions/checkout@v5 - name: golangci-lint uses: reviewdog/action-golangci-lint@v2 with: diff --git a/.gitignore b/.gitignore index 3b735ec..be60a6a 100644 --- a/.gitignore +++ b/.gitignore @@ -19,3 +19,6 @@ # Go workspace file go.work + +# Built binary +elastickv diff --git a/adapter/distribution_server.go b/adapter/distribution_server.go new file mode 100644 index 0000000..17b61eb --- /dev/null +++ b/adapter/distribution_server.go @@ -0,0 +1,43 @@ +package adapter + +import ( + "context" + + "github.com/bootjp/elastickv/distribution" + pb "github.com/bootjp/elastickv/proto" +) + +// DistributionServer serves distribution related gRPC APIs. +type DistributionServer struct { + engine *distribution.Engine + pb.UnimplementedDistributionServer +} + +// NewDistributionServer creates a new server. +func NewDistributionServer(e *distribution.Engine) *DistributionServer { + return &DistributionServer{engine: e} +} + +// UpdateRoute allows updating route information. +func (s *DistributionServer) UpdateRoute(start, end []byte, group uint64) { + s.engine.UpdateRoute(start, end, group) +} + +// GetRoute returns route for a key. +func (s *DistributionServer) GetRoute(ctx context.Context, req *pb.GetRouteRequest) (*pb.GetRouteResponse, error) { + r, ok := s.engine.GetRoute(req.Key) + if !ok { + return &pb.GetRouteResponse{}, nil + } + return &pb.GetRouteResponse{ + Start: r.Start, + End: r.End, + RaftGroupId: r.GroupID, + }, nil +} + +// GetTimestamp returns monotonically increasing timestamp. +func (s *DistributionServer) GetTimestamp(ctx context.Context, req *pb.GetTimestampRequest) (*pb.GetTimestampResponse, error) { + ts := s.engine.NextTimestamp() + return &pb.GetTimestampResponse{Timestamp: ts}, nil +} diff --git a/adapter/dynamodb.go b/adapter/dynamodb.go index 3cd103b..a831934 100644 --- a/adapter/dynamodb.go +++ b/adapter/dynamodb.go @@ -33,18 +33,19 @@ type DynamoDBServer struct { } func NewDynamoDBServer(listen net.Listener, st store.ScanStore, coordinate *kv.Coordinate) *DynamoDBServer { - return &DynamoDBServer{ + d := &DynamoDBServer{ listen: listen, store: st, coordinator: coordinate, dynamoTranscoder: newDynamoDBTranscoder(), } -} - -func (d *DynamoDBServer) Run() error { mux := http.NewServeMux() mux.HandleFunc("/", d.handle) d.httpServer = &http.Server{Handler: mux, ReadHeaderTimeout: time.Second} + return d +} + +func (d *DynamoDBServer) Run() error { if err := d.httpServer.Serve(d.listen); err != nil && !errors.Is(err, http.ErrServerClosed) { return errors.WithStack(err) } diff --git a/adapter/dynamodb_test.go b/adapter/dynamodb_test.go index f73f0c6..50d1166 100644 --- a/adapter/dynamodb_test.go +++ b/adapter/dynamodb_test.go @@ -2,12 +2,15 @@ package adapter import ( "context" + "strconv" + "sync" "testing" - "github.com/aws/aws-sdk-go/aws" - "github.com/aws/aws-sdk-go/aws/credentials" - "github.com/aws/aws-sdk-go/aws/session" - "github.com/aws/aws-sdk-go/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/credentials" + "github.com/aws/aws-sdk-go-v2/service/dynamodb" + "github.com/aws/aws-sdk-go-v2/service/dynamodb/types" "github.com/stretchr/testify/assert" ) @@ -16,34 +19,38 @@ func TestDynamoDB_PutItem_GetItem(t *testing.T) { nodes, _, _ := createNode(t, 1) defer shutdown(nodes) - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("us-west-2"), - Endpoint: aws.String("http://" + nodes[0].dynamoAddress), - Credentials: credentials.NewStaticCredentials("dummy", "dummy", ""), - DisableSSL: aws.Bool(true), - }) + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) assert.NoError(t, err) - client := dynamodb.New(sess) + client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress) + }) - _, err = client.PutItemWithContext(context.Background(), &dynamodb.PutItemInput{ + _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{ TableName: aws.String("t"), - Item: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("test")}, - "value": {S: aws.String("v")}, + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "test"}, + "value": &types.AttributeValueMemberS{Value: "v"}, }, }) assert.NoError(t, err) - out, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{ + out, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ TableName: aws.String("t"), - Key: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("test")}, + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "test"}, }, }) assert.NoError(t, err) - assert.Equal(t, "test", aws.StringValue(out.Item["key"].S)) - assert.Equal(t, "v", aws.StringValue(out.Item["value"].S)) + keyAttr, ok := out.Item["key"].(*types.AttributeValueMemberS) + assert.True(t, ok) + valueAttr, ok := out.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok) + assert.Equal(t, "test", keyAttr.Value) + assert.Equal(t, "v", valueAttr.Value) } func TestDynamoDB_TransactWriteItems(t *testing.T) { @@ -51,33 +58,33 @@ func TestDynamoDB_TransactWriteItems(t *testing.T) { nodes, _, _ := createNode(t, 1) defer shutdown(nodes) - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("us-west-2"), - Endpoint: aws.String("http://" + nodes[0].dynamoAddress), - Credentials: credentials.NewStaticCredentials("dummy", "dummy", ""), - DisableSSL: aws.Bool(true), - }) + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) assert.NoError(t, err) - client := dynamodb.New(sess) + client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress) + }) - _, err = client.TransactWriteItemsWithContext(context.Background(), &dynamodb.TransactWriteItemsInput{ - TransactItems: []*dynamodb.TransactWriteItem{ + _, err = client.TransactWriteItems(context.Background(), &dynamodb.TransactWriteItemsInput{ + TransactItems: []types.TransactWriteItem{ { - Put: &dynamodb.Put{ + Put: &types.Put{ TableName: aws.String("t"), - Item: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("k1")}, - "value": {S: aws.String("v1")}, + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "k1"}, + "value": &types.AttributeValueMemberS{Value: "v1"}, }, }, }, { - Put: &dynamodb.Put{ + Put: &types.Put{ TableName: aws.String("t"), - Item: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("k2")}, - "value": {S: aws.String("v2")}, + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "k2"}, + "value": &types.AttributeValueMemberS{Value: "v2"}, }, }, }, @@ -85,23 +92,27 @@ func TestDynamoDB_TransactWriteItems(t *testing.T) { }) assert.NoError(t, err) - out1, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{ + out1, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ TableName: aws.String("t"), - Key: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("k1")}, + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "k1"}, }, }) assert.NoError(t, err) - assert.Equal(t, "v1", aws.StringValue(out1.Item["value"].S)) + value1Attr, ok := out1.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok) + assert.Equal(t, "v1", value1Attr.Value) - out2, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{ + out2, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ TableName: aws.String("t"), - Key: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("k2")}, + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "k2"}, }, }) assert.NoError(t, err) - assert.Equal(t, "v2", aws.StringValue(out2.Item["value"].S)) + value2Attr, ok := out2.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok) + assert.Equal(t, "v2", value2Attr.Value) } func TestDynamoDB_UpdateItem_Condition(t *testing.T) { @@ -109,65 +120,256 @@ func TestDynamoDB_UpdateItem_Condition(t *testing.T) { nodes, _, _ := createNode(t, 1) defer shutdown(nodes) - sess, err := session.NewSession(&aws.Config{ - Region: aws.String("us-west-2"), - Endpoint: aws.String("http://" + nodes[0].dynamoAddress), - Credentials: credentials.NewStaticCredentials("dummy", "dummy", ""), - DisableSSL: aws.Bool(true), - }) + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) assert.NoError(t, err) - client := dynamodb.New(sess) + client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress) + }) - _, err = client.PutItemWithContext(context.Background(), &dynamodb.PutItemInput{ + _, err = client.PutItem(context.Background(), &dynamodb.PutItemInput{ TableName: aws.String("t"), - Item: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("test")}, - "value": {S: aws.String("v1")}, + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "test"}, + "value": &types.AttributeValueMemberS{Value: "v1"}, }, }) assert.NoError(t, err) - _, err = client.UpdateItemWithContext(context.Background(), &dynamodb.UpdateItemInput{ + _, err = client.UpdateItem(context.Background(), &dynamodb.UpdateItemInput{ TableName: aws.String("t"), - Key: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("test")}, + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "test"}, }, UpdateExpression: aws.String("SET #v = :val"), ConditionExpression: aws.String("attribute_exists(#k)"), - ExpressionAttributeNames: map[string]*string{ - "#v": aws.String("value"), - "#k": aws.String("key"), + ExpressionAttributeNames: map[string]string{ + "#v": "value", + "#k": "key", }, - ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ - ":val": {S: aws.String("v2")}, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":val": &types.AttributeValueMemberS{Value: "v2"}, }, }) assert.NoError(t, err) - out, err := client.GetItemWithContext(context.Background(), &dynamodb.GetItemInput{ + out, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ TableName: aws.String("t"), - Key: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("test")}, + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "test"}, }, }) assert.NoError(t, err) - assert.Equal(t, "v2", aws.StringValue(out.Item["value"].S)) + valueAttr, ok := out.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok) + assert.Equal(t, "v2", valueAttr.Value) - _, err = client.UpdateItemWithContext(context.Background(), &dynamodb.UpdateItemInput{ + _, err = client.UpdateItem(context.Background(), &dynamodb.UpdateItemInput{ TableName: aws.String("t"), - Key: map[string]*dynamodb.AttributeValue{ - "key": {S: aws.String("test")}, + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: "test"}, }, UpdateExpression: aws.String("SET #v = :val"), ConditionExpression: aws.String("attribute_not_exists(#k)"), - ExpressionAttributeNames: map[string]*string{ - "#v": aws.String("value"), - "#k": aws.String("key"), + ExpressionAttributeNames: map[string]string{ + "#v": "value", + "#k": "key", }, - ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{ - ":val": {S: aws.String("v3")}, + ExpressionAttributeValues: map[string]types.AttributeValue{ + ":val": &types.AttributeValueMemberS{Value: "v3"}, }, }) assert.Error(t, err) } + +func TestDynamoDB_TransactWriteItems_Concurrent(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) + assert.NoError(t, err) + + client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress) + }) + + wg := &sync.WaitGroup{} + numGoroutines := 100 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + keyPrefix := "concurrent-txn-" + strconv.Itoa(i) + key1 := keyPrefix + "-k1" + key2 := keyPrefix + "-k2" + value1 := "v1-" + strconv.Itoa(i) + value2 := "v2-" + strconv.Itoa(i) + + // Perform transaction with two put operations + _, err := client.TransactWriteItems(context.Background(), &dynamodb.TransactWriteItemsInput{ + TransactItems: []types.TransactWriteItem{ + { + Put: &types.Put{ + TableName: aws.String("t"), + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: key1}, + "value": &types.AttributeValueMemberS{Value: value1}, + }, + }, + }, + { + Put: &types.Put{ + TableName: aws.String("t"), + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: key2}, + "value": &types.AttributeValueMemberS{Value: value2}, + }, + }, + }, + }, + }) + assert.NoError(t, err, "Transaction failed for goroutine %d", i) + + // Verify both items were written correctly + out1, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ + TableName: aws.String("t"), + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: key1}, + }, + }) + assert.NoError(t, err, "Get failed for key1 in goroutine %d", i) + value1Attr, ok := out1.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok, "Type assertion failed for key1 in goroutine %d", i) + assert.Equal(t, value1, value1Attr.Value, "Value mismatch for key1 in goroutine %d", i) + + out2, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ + TableName: aws.String("t"), + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: key2}, + }, + }) + assert.NoError(t, err, "Get failed for key2 in goroutine %d", i) + value2Attr, ok := out2.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok, "Type assertion failed for key2 in goroutine %d", i) + assert.Equal(t, value2, value2Attr.Value, "Value mismatch for key2 in goroutine %d", i) + }(i) + } + + wg.Wait() +} + +func TestDynamoDB_TransactWriteItems_Concurrent_Conflicting(t *testing.T) { + t.Parallel() + nodes, _, _ := createNode(t, 1) + defer shutdown(nodes) + + cfg, err := config.LoadDefaultConfig(context.Background(), + config.WithRegion("us-west-2"), + config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider("dummy", "dummy", "")), + ) + assert.NoError(t, err) + + client := dynamodb.NewFromConfig(cfg, func(o *dynamodb.Options) { + o.BaseEndpoint = aws.String("http://" + nodes[0].dynamoAddress) + }) + + // Initialize some base keys that will be updated concurrently + baseKeys := []string{"shared-key-1", "shared-key-2", "shared-key-3"} + for _, key := range baseKeys { + _, err := client.PutItem(context.Background(), &dynamodb.PutItemInput{ + TableName: aws.String("t"), + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: key}, + "value": &types.AttributeValueMemberS{Value: "initial"}, + "counter": &types.AttributeValueMemberN{Value: "0"}, + }, + }) + assert.NoError(t, err) + } + + wg := &sync.WaitGroup{} + numGoroutines := 50 + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(i int) { + defer wg.Done() + + // Each goroutine attempts to update multiple shared keys in a transaction + counterValue := strconv.Itoa(i) + + _, err := client.TransactWriteItems(context.Background(), &dynamodb.TransactWriteItemsInput{ + TransactItems: []types.TransactWriteItem{ + { + Put: &types.Put{ + TableName: aws.String("t"), + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: baseKeys[0]}, + "value": &types.AttributeValueMemberS{Value: "updated-by-" + counterValue}, + "counter": &types.AttributeValueMemberN{Value: counterValue}, + }, + }, + }, + { + Put: &types.Put{ + TableName: aws.String("t"), + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: baseKeys[1]}, + "value": &types.AttributeValueMemberS{Value: "updated-by-" + counterValue}, + "counter": &types.AttributeValueMemberN{Value: counterValue}, + }, + }, + }, + { + Put: &types.Put{ + TableName: aws.String("t"), + Item: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: baseKeys[2]}, + "value": &types.AttributeValueMemberS{Value: "updated-by-" + counterValue}, + "counter": &types.AttributeValueMemberN{Value: counterValue}, + }, + }, + }, + }, + }) + assert.NoError(t, err, "Transaction failed for goroutine %d", i) + }(i) + } + + wg.Wait() + + // Verify that all keys have been updated and have consistent values + // Due to the concurrent nature, we can't predict which goroutine will win, + // but we can verify that each key has valid data + for _, key := range baseKeys { + out, err := client.GetItem(context.Background(), &dynamodb.GetItemInput{ + TableName: aws.String("t"), + Key: map[string]types.AttributeValue{ + "key": &types.AttributeValueMemberS{Value: key}, + }, + }) + assert.NoError(t, err, "Get failed for key %s", key) + assert.NotNil(t, out.Item, "Item should exist for key %s", key) + + if out.Item != nil && out.Item["value"] != nil && out.Item["counter"] != nil { + valueAttr, ok := out.Item["value"].(*types.AttributeValueMemberS) + assert.True(t, ok, "Value type assertion failed for key %s", key) + counterAttr, ok := out.Item["counter"].(*types.AttributeValueMemberN) + assert.True(t, ok, "Counter type assertion failed for key %s", key) + value := valueAttr.Value + counter := counterAttr.Value + + // Verify that the value and counter are consistent (both from the same goroutine) + assert.Contains(t, value, "updated-by-"+counter, "Value and counter should be consistent for key %s", key) + } + } +} diff --git a/adapter/grpc.go b/adapter/grpc.go index ba6419c..d056645 100644 --- a/adapter/grpc.go +++ b/adapter/grpc.go @@ -68,7 +68,7 @@ func (r GRPCServer) RawPut(_ context.Context, req *pb.RawPutRequest) (*pb.RawPut res, err := r.coordinator.Dispatch(m) if err != nil { return &pb.RawPutResponse{ - CommitIndex: res.CommitIndex, + CommitIndex: uint64(0), Success: false, }, errors.WithStack(err) } @@ -88,7 +88,7 @@ func (r GRPCServer) RawDelete(ctx context.Context, req *pb.RawDeleteRequest) (*p res, err := r.coordinator.Dispatch(m) if err != nil { return &pb.RawDeleteResponse{ - CommitIndex: res.CommitIndex, + CommitIndex: uint64(0), Success: false, }, errors.WithStack(err) } @@ -122,7 +122,7 @@ func (r GRPCServer) Put(ctx context.Context, req *pb.PutRequest) (*pb.PutRespons res, err := r.coordinator.Dispatch(reqs) if err != nil { return &pb.PutResponse{ - CommitIndex: res.CommitIndex, + CommitIndex: uint64(0), }, errors.WithStack(err) } @@ -163,7 +163,7 @@ func (r GRPCServer) Delete(ctx context.Context, req *pb.DeleteRequest) (*pb.Dele res, err := r.coordinator.Dispatch(reqs) if err != nil { return &pb.DeleteResponse{ - CommitIndex: res.CommitIndex, + CommitIndex: uint64(0), }, errors.WithStack(err) } diff --git a/distribution/engine.go b/distribution/engine.go new file mode 100644 index 0000000..44fbfb3 --- /dev/null +++ b/distribution/engine.go @@ -0,0 +1,71 @@ +package distribution + +import ( + "bytes" + "sort" + "sync" + "sync/atomic" +) + +// Route represents a mapping from a key range to a raft group. +// Ranges are right half-open intervals: [Start, End). Start is inclusive and +// End is exclusive. A nil End denotes an unbounded interval extending to +// positive infinity. +type Route struct { + // Start marks the inclusive beginning of the range. + Start []byte + // End marks the exclusive end of the range. nil means unbounded. + End []byte + // GroupID identifies the raft group for the range starting at Start. + GroupID uint64 +} + +// Engine holds in-memory metadata of routes and provides timestamp generation. +type Engine struct { + mu sync.RWMutex + routes []Route + ts uint64 +} + +// NewEngine creates an Engine. +func NewEngine() *Engine { + return &Engine{routes: make([]Route, 0)} +} + +// UpdateRoute registers or updates a route for the given key range. +// Routes are stored sorted by Start. +func (e *Engine) UpdateRoute(start, end []byte, group uint64) { + e.mu.Lock() + defer e.mu.Unlock() + e.routes = append(e.routes, Route{Start: start, End: end, GroupID: group}) + sort.Slice(e.routes, func(i, j int) bool { + return bytes.Compare(e.routes[i].Start, e.routes[j].Start) < 0 + }) +} + +// GetRoute finds a route for the given key using right half-open intervals. +func (e *Engine) GetRoute(key []byte) (Route, bool) { + e.mu.RLock() + defer e.mu.RUnlock() + if len(e.routes) == 0 { + return Route{}, false + } + + // Find the first route with Start > key. + i := sort.Search(len(e.routes), func(i int) bool { + return bytes.Compare(e.routes[i].Start, key) > 0 + }) + if i == 0 { + return Route{}, false + } + r := e.routes[i-1] + if r.End != nil && bytes.Compare(key, r.End) >= 0 { + return Route{}, false + } + return r, true +} + +// NextTimestamp returns a monotonic increasing timestamp. +func (e *Engine) NextTimestamp() uint64 { + return atomic.AddUint64(&e.ts, 1) +} diff --git a/distribution/engine_test.go b/distribution/engine_test.go new file mode 100644 index 0000000..3d58200 --- /dev/null +++ b/distribution/engine_test.go @@ -0,0 +1,52 @@ +package distribution + +import "testing" + +func TestEngineRouteLookup(t *testing.T) { + e := NewEngine() + e.UpdateRoute([]byte("a"), []byte("m"), 1) + e.UpdateRoute([]byte("m"), nil, 2) + + cases := []struct { + key []byte + group uint64 + expect bool + }{ + {[]byte("0"), 0, false}, // before first route + {[]byte("a"), 1, true}, // start is inclusive + {[]byte("b"), 1, true}, + {[]byte("m"), 2, true}, // end is exclusive for first route + {[]byte("x"), 2, true}, + {[]byte("za"), 2, true}, // last route is unbounded + } + + for _, c := range cases { + r, ok := e.GetRoute(c.key) + if ok != c.expect { + t.Fatalf("key %q expected ok=%v, got %v", c.key, c.expect, ok) + } + if ok && r.GroupID != c.group { + t.Fatalf("key %q expected group %d, got %d", c.key, c.group, r.GroupID) + } + } +} + +func TestEngineRouteUnmatchedAfterEnd(t *testing.T) { + e := NewEngine() + e.UpdateRoute([]byte("a"), []byte("m"), 1) + if _, ok := e.GetRoute([]byte("x")); ok { + t.Fatalf("expected no route for key beyond end") + } +} + +func TestEngineTimestampMonotonic(t *testing.T) { + e := NewEngine() + last := e.NextTimestamp() + for i := 0; i < 100; i++ { + ts := e.NextTimestamp() + if ts <= last { + t.Fatalf("timestamp not monotonic: %d <= %d", ts, last) + } + last = ts + } +} diff --git a/go.mod b/go.mod index 0fcb9b7..710e89f 100644 --- a/go.mod +++ b/go.mod @@ -2,32 +2,46 @@ module github.com/bootjp/elastickv go 1.23.0 -toolchain go1.24.6 +toolchain go1.25.0 require ( github.com/Jille/grpc-multi-resolver v1.3.0 github.com/Jille/raft-grpc-leader-rpc v1.1.0 github.com/Jille/raft-grpc-transport v1.6.1 github.com/Jille/raftadmin v1.2.1 - github.com/aws/aws-sdk-go v1.55.8 + github.com/aws/aws-sdk-go-v2 v1.38.1 + github.com/aws/aws-sdk-go-v2/config v1.31.2 + github.com/aws/aws-sdk-go-v2/credentials v1.18.6 + github.com/aws/aws-sdk-go-v2/service/dynamodb v1.49.1 github.com/cockroachdb/errors v1.12.0 github.com/emirpasic/gods v1.18.1 github.com/hashicorp/go-hclog v1.6.3 github.com/hashicorp/raft v1.7.3 github.com/hashicorp/raft-boltdb/v2 v2.3.1 github.com/pkg/errors v0.9.1 - github.com/redis/go-redis/v9 v9.12.0 + github.com/redis/go-redis/v9 v9.12.1 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.10.0 github.com/tidwall/redcon v1.6.2 - go.etcd.io/bbolt v1.4.2 + go.etcd.io/bbolt v1.4.3 golang.org/x/sync v0.16.0 - google.golang.org/grpc v1.74.2 - google.golang.org/protobuf v1.36.7 + google.golang.org/grpc v1.75.0 + google.golang.org/protobuf v1.36.8 ) require ( github.com/armon/go-metrics v0.4.1 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.4 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.4 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.4 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.28.2 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.38.0 // indirect + github.com/aws/smithy-go v1.22.5 // indirect github.com/boltdb/bolt v1.3.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b // indirect @@ -44,18 +58,17 @@ require ( github.com/hashicorp/go-msgpack/v2 v2.1.2 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect - github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.19 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/rogpeppe/go-internal v1.9.0 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/tidwall/btree v1.1.0 // indirect github.com/tidwall/match v1.1.1 // indirect - golang.org/x/net v0.40.0 // indirect + golang.org/x/net v0.41.0 // indirect golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.25.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 3b0677e..c4aa0ee 100644 --- a/go.sum +++ b/go.sum @@ -23,8 +23,36 @@ github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kd github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/armon/go-metrics v0.4.1 h1:hR91U9KYmb6bLBYLQjyM+3j+rcd/UhE+G78SFnF8gJA= github.com/armon/go-metrics v0.4.1/go.mod h1:E6amYzXo6aW1tqzoZGT755KkbgrJsSdpwZ+3JqfkOG4= -github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= -github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= +github.com/aws/aws-sdk-go-v2 v1.38.1 h1:j7sc33amE74Rz0M/PoCpsZQ6OunLqys/m5antM0J+Z8= +github.com/aws/aws-sdk-go-v2 v1.38.1/go.mod h1:9Q0OoGQoboYIAJyslFyF1f5K1Ryddop8gqMhWx/n4Wg= +github.com/aws/aws-sdk-go-v2/config v1.31.2 h1:NOaSZpVGEH2Np/c1toSeW0jooNl+9ALmsUTZ8YvkJR0= +github.com/aws/aws-sdk-go-v2/config v1.31.2/go.mod h1:17ft42Yb2lF6OigqSYiDAiUcX4RIkEMY6XxEMJsrAes= +github.com/aws/aws-sdk-go-v2/credentials v1.18.6 h1:AmmvNEYrru7sYNJnp3pf57lGbiarX4T9qU/6AZ9SucU= +github.com/aws/aws-sdk-go-v2/credentials v1.18.6/go.mod h1:/jdQkh1iVPa01xndfECInp1v1Wnp70v3K4MvtlLGVEc= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.4 h1:lpdMwTzmuDLkgW7086jE94HweHCqG+uOJwHf3LZs7T0= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.4/go.mod h1:9xzb8/SV62W6gHQGC/8rrvgNXU6ZoYM3sAIJCIrXJxY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.4 h1:IdCLsiiIj5YJ3AFevsewURCPV+YWUlOW8JiPhoAy8vg= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.4/go.mod h1:l4bdfCD7XyyZA9BolKBo1eLqgaJxl0/x91PL4Yqe0ao= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.4 h1:j7vjtr1YIssWQOMeOWRbh3z8g2oY/xPjnZH2gLY4sGw= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.4/go.mod h1:yDmJgqOiH4EA8Hndnv4KwAo8jCGTSnM5ASG1nBI+toA= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3 h1:bIqFDwgGXXN1Kpp99pDOdKMTTb5d2KyU5X/BZxjOkRo= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.3/go.mod h1:H5O/EsxDWyU+LP/V8i5sm8cxoZgc2fdNR9bxlOFrQTo= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.49.1 h1:0RqS5X7EodJzOenoY4V3LUSp9PirELO2ZOpOZbMldco= +github.com/aws/aws-sdk-go-v2/service/dynamodb v1.49.1/go.mod h1:VRp/OeQolnQD9GfNgdSf3kU5vbg708PF6oPHh2bq3hc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0 h1:6+lZi2JeGKtCraAj1rpoZfKqnQ9SptseRZioejfUOLM= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.0/go.mod h1:eb3gfbVIxIoGgJsi9pGne19dhCBpK6opTYpQqAmdy44= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.4 h1:upi++G3fQCAUBXQe58TbjXmdVPwrqMnRQMThOAIz7KM= +github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.11.4/go.mod h1:swb+GqWXTZMOyVV9rVePAUu5L80+X5a+Lui1RNOyUFo= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.4 h1:ueB2Te0NacDMnaC+68za9jLwkjzxGWm0KB5HTUHjLTI= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.4/go.mod h1:nLEfLnVMmLvyIG58/6gsSA03F1voKGaCfHV7+lR8S7s= +github.com/aws/aws-sdk-go-v2/service/sso v1.28.2 h1:ve9dYBB8CfJGTFqcQ3ZLAAb/KXWgYlgu/2R2TZL2Ko0= +github.com/aws/aws-sdk-go-v2/service/sso v1.28.2/go.mod h1:n9bTZFZcBa9hGGqVz3i/a6+NG0zmZgtkB9qVVFDqPA8= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.2 h1:pd9G9HQaM6UZAZh19pYOkpKSQkyQQ9ftnl/LttQOcGI= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.33.2/go.mod h1:eknndR9rU8UpE/OmFpqU78V1EcXPKFTTm5l/buZYgvM= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.0 h1:iV1Ko4Em/lkJIsoKyGfc0nQySi+v0Udxr6Igq+y9JZc= +github.com/aws/aws-sdk-go-v2/service/sts v1.38.0/go.mod h1:bEPcjW7IbolPfK67G1nilqWyoxYMSPrDiIQ3RdIdKgo= +github.com/aws/smithy-go v1.22.5 h1:P9ATCXPMb2mPjYBgueqJNCA5S9UfktsW0tTxi+a7eqw= +github.com/aws/smithy-go v1.22.5/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= @@ -161,10 +189,6 @@ github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702 h1:RLKEcCuKc github.com/hashicorp/raft-boltdb v0.0.0-20230125174641-2a8082862702/go.mod h1:nTakvJ4XYq45UXtn0DbwR4aU9ZdjlnIenpbs6Cd+FM0= github.com/hashicorp/raft-boltdb/v2 v2.3.1 h1:ackhdCNPKblmOhjEU9+4lHSJYFkJd6Jqyvj6eW9pwkc= github.com/hashicorp/raft-boltdb/v2 v2.3.1/go.mod h1:n4S+g43dXF1tqDT+yzcXHhXM6y7MrlUd3TTwGRcUvQE= -github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= -github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= -github.com/jmespath/go-jmespath/internal/testify v1.5.1 h1:shLQSRRSCCPj3f2gpwzGwWFoC7ycTf1rcQZHOlsJ6N8= -github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -239,11 +263,12 @@ github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsT github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A= github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU= github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= -github.com/redis/go-redis/v9 v9.12.0 h1:XlVPGlflh4nxfhsNXPA8Qp6EmEfTo0rp8oaBzPipXnU= -github.com/redis/go-redis/v9 v9.12.0/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= +github.com/redis/go-redis/v9 v9.12.1 h1:k5iquqv27aBtnTm2tIkROUDp8JBXhXZIVu1InSgvovg= +github.com/redis/go-redis/v9 v9.12.1/go.mod h1:huWgSWd8mW6+m0VPhJjSSQ+d6Nh1VICQ6Q5lHuCH/Iw= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= -github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= +github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= @@ -278,20 +303,20 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= -go.etcd.io/bbolt v1.4.2 h1:IrUHp260R8c+zYx/Tm8QZr04CX+qWS5PGfPdevhdm1I= -go.etcd.io/bbolt v1.4.2/go.mod h1:Is8rSHO/b4f3XigBC0lL0+4FwAQv3HXEEIgFMuKHceM= +go.etcd.io/bbolt v1.4.3 h1:dEadXpI6G79deX5prL3QRNP6JB8UxVkqo4UPnHaNXJo= +go.etcd.io/bbolt v1.4.3/go.mod h1:tKQlpPaYCVFctUIgFKFnAlvbmB3tpy1vkTnDWohtc0E= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/otel v1.36.0 h1:UumtzIklRBY6cI/lllNZlALOF5nNIzJVb16APdvgTXg= -go.opentelemetry.io/otel v1.36.0/go.mod h1:/TcFMXYjyRNh8khOAO9ybYkqaDBb/70aVwkNML4pP8E= -go.opentelemetry.io/otel/metric v1.36.0 h1:MoWPKVhQvJ+eeXWHFBOPoBOi20jh6Iq2CcCREuTYufE= -go.opentelemetry.io/otel/metric v1.36.0/go.mod h1:zC7Ks+yeyJt4xig9DEw9kuUFe5C3zLbVjV2PzT6qzbs= -go.opentelemetry.io/otel/sdk v1.36.0 h1:b6SYIuLRs88ztox4EyrvRti80uXIFy+Sqzoh9kFULbs= -go.opentelemetry.io/otel/sdk v1.36.0/go.mod h1:+lC+mTgD+MUWfjJubi2vvXWcVxyr9rmlshZni72pXeY= -go.opentelemetry.io/otel/sdk/metric v1.36.0 h1:r0ntwwGosWGaa0CrSt8cuNuTcccMXERFwHX4dThiPis= -go.opentelemetry.io/otel/sdk/metric v1.36.0/go.mod h1:qTNOhFDfKRwX0yXOqJYegL5WRaW376QbB7P4Pb0qva4= -go.opentelemetry.io/otel/trace v1.36.0 h1:ahxWNuqZjpdiFAyrIoQ4GIiAIhxAunQR6MUoKrsNd4w= -go.opentelemetry.io/otel/trace v1.36.0/go.mod h1:gQ+OnDZzrybY4k4seLzPAWNwVBBVlF2szhehOBB/tGA= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/sdk v1.37.0 h1:ItB0QUqnjesGRvNcmAcU0LyvkVyGJ2xftD29bWdDvKI= +go.opentelemetry.io/otel/sdk v1.37.0/go.mod h1:VredYzxUvuo2q3WRcDnKDjbdvmO0sCzOvVAiY+yUkAg= +go.opentelemetry.io/otel/sdk/metric v1.37.0 h1:90lI228XrB9jCMuSdA0673aubgRobVZFhbjxHHspCPc= +go.opentelemetry.io/otel/sdk/metric v1.37.0/go.mod h1:cNen4ZWfiD37l5NhS+Keb5RXVWZWpRE+9WyVCpbo5ps= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= @@ -340,8 +365,8 @@ golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg= golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk= golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= -golang.org/x/net v0.40.0 h1:79Xs7wF06Gbdcg4kdCCIQArK11Z1hr5POQ6+fIYHNuY= -golang.org/x/net v0.40.0/go.mod h1:y0hY0exeL2Pku80/zKK7tpntoX23cqL3Oa6njdgRtds= +golang.org/x/net v0.41.0 h1:vBTly1HeNPEn3wtREYfy4GZ/NECgw2Cnl+nK6Nz3uvw= +golang.org/x/net v0.41.0/go.mod h1:B/K4NNqkfmg07DQYrbwvSluqCJOOXwUjeb/5lOisjbA= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -410,8 +435,8 @@ golang.org/x/text v0.5.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= -golang.org/x/text v0.25.0 h1:qVyWApTSYLk/drJRO5mDlNYskwQznZmkpV2c8q9zls4= -golang.org/x/text v0.25.0/go.mod h1:WEdwpYrmk1qmdHvhkSTNPm3app7v4rsT8F2UD6+VHIA= +golang.org/x/text v0.26.0 h1:P42AVeLghgTYr4+xUnTRKDMqpar+PtX7KWuNQL21L8M= +golang.org/x/text v0.26.0/go.mod h1:QK15LZJUUQVJxhz7wXgxSy/CJaTFjd0G+YLonydOVQA= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= @@ -431,6 +456,8 @@ golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= +gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= google.golang.org/appengine v1.6.7/go.mod h1:8WjMMxjGQR8xUklV/ARdw2HLXBOI7O7uCIDZVag1xfc= @@ -439,8 +466,8 @@ google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98 google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a h1:v2PbRU4K3llS09c7zodFpNePeamkAwG3mPrAery9VeE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250528174236-200df99c418a/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 h1:pFyd6EwwL2TqFf8emdthzeX+gZE1ElRq3iM8pui4KBY= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= @@ -449,8 +476,8 @@ google.golang.org/grpc v1.31.1/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.74.2 h1:WoosgB65DlWVC9FqI82dGsZhWFNBSLjQ84bjROOpMu4= -google.golang.org/grpc v1.74.2/go.mod h1:CtQ+BGjaAIXHs/5YS3i473GqwBBa1zGQNevxdeBEXrM= +google.golang.org/grpc v1.75.0 h1:+TW+dqTd2Biwe6KKfhE5JpiYIBWq865PhKGSXiivqt4= +google.golang.org/grpc v1.75.0/go.mod h1:JtPAzKiq4v1xcAB2hydNlWI2RnF85XXcV0mhKXr2ecQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -464,8 +491,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= -google.golang.org/protobuf v1.36.7 h1:IgrO7UwFQGJdRNXH/sQux4R1Dj1WAKcLElzeeRaXV2A= -google.golang.org/protobuf v1.36.7/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= +google.golang.org/protobuf v1.36.8 h1:xHScyCOEuuwZEc6UtSOvPbAT4zRh0xcNRYekJwfqyMc= +google.golang.org/protobuf v1.36.8/go.mod h1:fuxRtAxBytpl4zzqUh6/eyUujkJdNiuEkXntxiD/uRU= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -479,9 +506,7 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= diff --git a/proto/Makefile b/proto/Makefile index ab03f07..d23f322 100644 --- a/proto/Makefile +++ b/proto/Makefile @@ -1,10 +1,12 @@ +all: gen + gen: protoc --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative \ - service.proto + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + service.proto protoc --go_out=. --go_opt=paths=source_relative \ - --go-grpc_out=. --go-grpc_opt=paths=source_relative \ - internal.proto - - - + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + internal.proto + protoc --go_out=. --go_opt=paths=source_relative \ + --go-grpc_out=. --go-grpc_opt=paths=source_relative \ + distribution.proto diff --git a/proto/distribution.pb.go b/proto/distribution.pb.go new file mode 100644 index 0000000..3d6c779 --- /dev/null +++ b/proto/distribution.pb.go @@ -0,0 +1,355 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.12 +// source: distribution.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetRouteRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` +} + +func (x *GetRouteRequest) Reset() { + *x = GetRouteRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRouteRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRouteRequest) ProtoMessage() {} + +func (x *GetRouteRequest) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRouteRequest.ProtoReflect.Descriptor instead. +func (*GetRouteRequest) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{0} +} + +func (x *GetRouteRequest) GetKey() []byte { + if x != nil { + return x.Key + } + return nil +} + +type GetRouteResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // start is inclusive and end is exclusive. A missing end denotes an + // unbounded range extending to positive infinity. + Start []byte `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` + End []byte `protobuf:"bytes,2,opt,name=end,proto3" json:"end,omitempty"` + RaftGroupId uint64 `protobuf:"varint,3,opt,name=raft_group_id,json=raftGroupId,proto3" json:"raft_group_id,omitempty"` +} + +func (x *GetRouteResponse) Reset() { + *x = GetRouteResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetRouteResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetRouteResponse) ProtoMessage() {} + +func (x *GetRouteResponse) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetRouteResponse.ProtoReflect.Descriptor instead. +func (*GetRouteResponse) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{1} +} + +func (x *GetRouteResponse) GetStart() []byte { + if x != nil { + return x.Start + } + return nil +} + +func (x *GetRouteResponse) GetEnd() []byte { + if x != nil { + return x.End + } + return nil +} + +func (x *GetRouteResponse) GetRaftGroupId() uint64 { + if x != nil { + return x.RaftGroupId + } + return 0 +} + +type GetTimestampRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *GetTimestampRequest) Reset() { + *x = GetTimestampRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetTimestampRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetTimestampRequest) ProtoMessage() {} + +func (x *GetTimestampRequest) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetTimestampRequest.ProtoReflect.Descriptor instead. +func (*GetTimestampRequest) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{2} +} + +type GetTimestampResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"` +} + +func (x *GetTimestampResponse) Reset() { + *x = GetTimestampResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_distribution_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetTimestampResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetTimestampResponse) ProtoMessage() {} + +func (x *GetTimestampResponse) ProtoReflect() protoreflect.Message { + mi := &file_distribution_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetTimestampResponse.ProtoReflect.Descriptor instead. +func (*GetTimestampResponse) Descriptor() ([]byte, []int) { + return file_distribution_proto_rawDescGZIP(), []int{3} +} + +func (x *GetTimestampResponse) GetTimestamp() uint64 { + if x != nil { + return x.Timestamp + } + return 0 +} + +var File_distribution_proto protoreflect.FileDescriptor + +var file_distribution_proto_rawDesc = []byte{ + 0x0a, 0x12, 0x64, 0x69, 0x73, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x22, 0x23, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0c, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x22, 0x5e, 0x0a, 0x10, 0x47, 0x65, 0x74, + 0x52, 0x6f, 0x75, 0x74, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, + 0x05, 0x73, 0x74, 0x61, 0x72, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x05, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x12, 0x10, 0x0a, 0x03, 0x65, 0x6e, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, + 0x52, 0x03, 0x65, 0x6e, 0x64, 0x12, 0x22, 0x0a, 0x0d, 0x72, 0x61, 0x66, 0x74, 0x5f, 0x67, 0x72, + 0x6f, 0x75, 0x70, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x04, 0x52, 0x0b, 0x72, 0x61, + 0x66, 0x74, 0x47, 0x72, 0x6f, 0x75, 0x70, 0x49, 0x64, 0x22, 0x15, 0x0a, 0x13, 0x47, 0x65, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x22, 0x34, 0x0a, 0x14, 0x47, 0x65, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x09, 0x74, 0x69, 0x6d, + 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x32, 0x80, 0x01, 0x0a, 0x0c, 0x44, 0x69, 0x73, 0x74, 0x72, + 0x69, 0x62, 0x75, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x31, 0x0a, 0x08, 0x47, 0x65, 0x74, 0x52, 0x6f, + 0x75, 0x74, 0x65, 0x12, 0x10, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x11, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x6f, 0x75, 0x74, 0x65, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x0c, 0x47, 0x65, + 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x14, 0x2e, 0x47, 0x65, 0x74, + 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, + 0x1a, 0x15, 0x2e, 0x47, 0x65, 0x74, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x23, 0x5a, 0x21, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x62, 0x6f, 0x6f, 0x74, 0x6a, 0x70, 0x2f, 0x65, + 0x6c, 0x61, 0x73, 0x74, 0x69, 0x63, 0x6b, 0x76, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_distribution_proto_rawDescOnce sync.Once + file_distribution_proto_rawDescData = file_distribution_proto_rawDesc +) + +func file_distribution_proto_rawDescGZIP() []byte { + file_distribution_proto_rawDescOnce.Do(func() { + file_distribution_proto_rawDescData = protoimpl.X.CompressGZIP(file_distribution_proto_rawDescData) + }) + return file_distribution_proto_rawDescData +} + +var file_distribution_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_distribution_proto_goTypes = []interface{}{ + (*GetRouteRequest)(nil), // 0: GetRouteRequest + (*GetRouteResponse)(nil), // 1: GetRouteResponse + (*GetTimestampRequest)(nil), // 2: GetTimestampRequest + (*GetTimestampResponse)(nil), // 3: GetTimestampResponse +} +var file_distribution_proto_depIdxs = []int32{ + 0, // 0: Distribution.GetRoute:input_type -> GetRouteRequest + 2, // 1: Distribution.GetTimestamp:input_type -> GetTimestampRequest + 1, // 2: Distribution.GetRoute:output_type -> GetRouteResponse + 3, // 3: Distribution.GetTimestamp:output_type -> GetTimestampResponse + 2, // [2:4] is the sub-list for method output_type + 0, // [0:2] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_distribution_proto_init() } +func file_distribution_proto_init() { + if File_distribution_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_distribution_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRouteRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetRouteResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetTimestampRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_distribution_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetTimestampResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_distribution_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_distribution_proto_goTypes, + DependencyIndexes: file_distribution_proto_depIdxs, + MessageInfos: file_distribution_proto_msgTypes, + }.Build() + File_distribution_proto = out.File + file_distribution_proto_rawDesc = nil + file_distribution_proto_goTypes = nil + file_distribution_proto_depIdxs = nil +} diff --git a/proto/distribution.proto b/proto/distribution.proto new file mode 100644 index 0000000..e8b80c3 --- /dev/null +++ b/proto/distribution.proto @@ -0,0 +1,27 @@ +syntax = "proto3"; + +option go_package = "github.com/bootjp/elastickv/proto"; + +service Distribution { + rpc GetRoute (GetRouteRequest) returns (GetRouteResponse) {} + rpc GetTimestamp (GetTimestampRequest) returns (GetTimestampResponse) {} +} + +message GetRouteRequest { + bytes key = 1; +} + +message GetRouteResponse { + // start is inclusive and end is exclusive. A missing end denotes an + // unbounded range extending to positive infinity. + bytes start = 1; + bytes end = 2; + uint64 raft_group_id = 3; +} + +message GetTimestampRequest {} + +message GetTimestampResponse { + uint64 timestamp = 1; +} + diff --git a/proto/distribution_grpc.pb.go b/proto/distribution_grpc.pb.go new file mode 100644 index 0000000..4c85059 --- /dev/null +++ b/proto/distribution_grpc.pb.go @@ -0,0 +1,141 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.2.0 +// - protoc v3.21.12 +// source: distribution.proto + +package proto + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// DistributionClient is the client API for Distribution service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type DistributionClient interface { + GetRoute(ctx context.Context, in *GetRouteRequest, opts ...grpc.CallOption) (*GetRouteResponse, error) + GetTimestamp(ctx context.Context, in *GetTimestampRequest, opts ...grpc.CallOption) (*GetTimestampResponse, error) +} + +type distributionClient struct { + cc grpc.ClientConnInterface +} + +func NewDistributionClient(cc grpc.ClientConnInterface) DistributionClient { + return &distributionClient{cc} +} + +func (c *distributionClient) GetRoute(ctx context.Context, in *GetRouteRequest, opts ...grpc.CallOption) (*GetRouteResponse, error) { + out := new(GetRouteResponse) + err := c.cc.Invoke(ctx, "/Distribution/GetRoute", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *distributionClient) GetTimestamp(ctx context.Context, in *GetTimestampRequest, opts ...grpc.CallOption) (*GetTimestampResponse, error) { + out := new(GetTimestampResponse) + err := c.cc.Invoke(ctx, "/Distribution/GetTimestamp", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// DistributionServer is the server API for Distribution service. +// All implementations must embed UnimplementedDistributionServer +// for forward compatibility +type DistributionServer interface { + GetRoute(context.Context, *GetRouteRequest) (*GetRouteResponse, error) + GetTimestamp(context.Context, *GetTimestampRequest) (*GetTimestampResponse, error) + mustEmbedUnimplementedDistributionServer() +} + +// UnimplementedDistributionServer must be embedded to have forward compatible implementations. +type UnimplementedDistributionServer struct { +} + +func (UnimplementedDistributionServer) GetRoute(context.Context, *GetRouteRequest) (*GetRouteResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetRoute not implemented") +} +func (UnimplementedDistributionServer) GetTimestamp(context.Context, *GetTimestampRequest) (*GetTimestampResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetTimestamp not implemented") +} +func (UnimplementedDistributionServer) mustEmbedUnimplementedDistributionServer() {} + +// UnsafeDistributionServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to DistributionServer will +// result in compilation errors. +type UnsafeDistributionServer interface { + mustEmbedUnimplementedDistributionServer() +} + +func RegisterDistributionServer(s grpc.ServiceRegistrar, srv DistributionServer) { + s.RegisterService(&Distribution_ServiceDesc, srv) +} + +func _Distribution_GetRoute_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetRouteRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributionServer).GetRoute(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Distribution/GetRoute", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributionServer).GetRoute(ctx, req.(*GetRouteRequest)) + } + return interceptor(ctx, in, info, handler) +} + +func _Distribution_GetTimestamp_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetTimestampRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DistributionServer).GetTimestamp(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/Distribution/GetTimestamp", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DistributionServer).GetTimestamp(ctx, req.(*GetTimestampRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// Distribution_ServiceDesc is the grpc.ServiceDesc for Distribution service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Distribution_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "Distribution", + HandlerType: (*DistributionServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "GetRoute", + Handler: _Distribution_GetRoute_Handler, + }, + { + MethodName: "GetTimestamp", + Handler: _Distribution_GetTimestamp_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "distribution.proto", +}