Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions gateway/dataimpl/dataimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Servers struct {
AdminQueryIndexV1Server *server_v1.QueryIndexAdminServer
AdminSearchIndexV1Server *server_v1.SearchIndexAdminServer
TransactionsV1Server *server_v1.TransactionsServer
RoutingServer *server_v1.RoutingServer
}

func New(opts *NewOptions) *Servers {
Expand Down Expand Up @@ -87,5 +88,11 @@ func New(opts *NewOptions) *Servers {
v1ErrHandler,
v1AuthHandler,
),
RoutingServer: server_v1.NewRoutingServer(
opts.Logger.Named("routing"),
v1ErrHandler,
v1AuthHandler,
opts.CbClient,
),
}
}
107 changes: 107 additions & 0 deletions gateway/dataimpl/server_v1/routingserver.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package server_v1

import (
"context"
"time"

"github.com/couchbase/gocbcorex"
"github.com/couchbase/gocbcorex/cbmgmtx"
"github.com/couchbase/gocbcorex/contrib/cbconfig"
"github.com/couchbase/stellar-gateway/contrib/goprotostellar/genproto/routing_v1"
"go.uber.org/zap"
)

type RoutingServer struct {
routing_v1.UnimplementedRoutingServiceServer

logger *zap.Logger
errorHandler *ErrorHandler
authHandler *AuthHandler
cbClient *gocbcorex.BucketsTrackingAgentManager
}

func NewRoutingServer(
logger *zap.Logger,
errorHandler *ErrorHandler,
authHandler *AuthHandler,
cbClient *gocbcorex.BucketsTrackingAgentManager,
) *RoutingServer {
return &RoutingServer{
logger: logger,
errorHandler: errorHandler,
authHandler: authHandler,
cbClient: cbClient,
}
}

func (s *RoutingServer) WatchRouting(
in *routing_v1.WatchRoutingRequest,
out routing_v1.RoutingService_WatchRoutingServer,
) error {
bucketAgent, _, errSt := s.authHandler.GetMemdOboAgent(out.Context(), *in.BucketName)
if errSt != nil {
return errSt.Err()
}

ticker := time.NewTicker(5 * time.Second)
var prevVBucketMap cbconfig.VBucketServerMapJson
for {
bucket, err := bucketAgent.GetBucket(context.Background(), &cbmgmtx.GetBucketOptions{BucketName: *in.BucketName})
if err != nil {
return err
}

if !prevVBucketMap.Equal(*bucket.RawConfig.VBucketServerMap) {

Check failure on line 54 in gateway/dataimpl/server_v1/routingserver.go

View workflow job for this annotation

GitHub Actions / Lint

prevVBucketMap.Equal undefined (type cbconfig.VBucketServerMapJson has no field or method Equal) (typecheck)
prevVBucketMap = *bucket.RawConfig.VBucketServerMap

var resp routing_v1.WatchRoutingResponse
var dataRoutingEps []*routing_v1.DataRoutingEndpoint
for i, addr := range bucket.RawConfig.VBucketServerMap.ServerList {
resp.Endpoints = append(resp.Endpoints, &routing_v1.RoutingEndpoint{
// TO DO -figure out what this ID is supposed to be
Id: "",
// TODO - get the server group from somewhere
ServerGroup: "",
Address: addr,
})

localVBuckets := localVBuckets(i, bucket.RawConfig.VBucketServerMap.VBucketMap)

dataRoutingEps = append(dataRoutingEps, &routing_v1.DataRoutingEndpoint{
EndpointIdx: uint32(i),
LocalVbuckets: localVBuckets,
// TODO - implement funcitonality to get group vBuckets
GroupVbuckets: []uint32{},
})
}
resp.DataRouting = &routing_v1.WatchRoutingResponse_VbucketDataRouting{
VbucketDataRouting: &routing_v1.VbucketDataRoutingStrategy{
Endpoints: dataRoutingEps,
},
}

err = out.Send(&resp)
if err != nil {
return err
}
}

select {
case <-out.Context().Done():
return nil
case <-ticker.C:
}
}
}

// At the moment this will just return the vBuckets that are local for the master
// copy, need to consider what to do about other replicas
func localVBuckets(serverIndex int, vbMap [][]int) []uint32 {
var vbIds []uint32
for i, vBucket := range vbMap {
if vBucket[0] == serverIndex {
vbIds = append(vbIds, uint32(i))
}
}
return vbIds
}
2 changes: 2 additions & 0 deletions gateway/system/system.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/couchbase/goprotostellar/genproto/kv_v1"
"github.com/couchbase/goprotostellar/genproto/query_v1"
"github.com/couchbase/goprotostellar/genproto/search_v1"
"github.com/couchbase/stellar-gateway/contrib/goprotostellar/genproto/routing_v1"
"github.com/couchbase/stellar-gateway/contrib/oapimetrics"
"github.com/couchbase/stellar-gateway/dataapiv1"
"github.com/couchbase/stellar-gateway/gateway/apiversion"
Expand Down Expand Up @@ -131,6 +132,7 @@ func NewSystem(opts *SystemOptions) (*System, error) {
admin_collection_v1.RegisterCollectionAdminServiceServer(dataSrv, dataImpl.AdminCollectionV1Server)
admin_query_v1.RegisterQueryAdminServiceServer(dataSrv, dataImpl.AdminQueryIndexV1Server)
admin_search_v1.RegisterSearchAdminServiceServer(dataSrv, dataImpl.AdminSearchIndexV1Server)
routing_v1.RegisterRoutingServiceServer(dataSrv, dataImpl.RoutingServer)

// health check
healthServer := health.NewServer()
Expand Down
Loading