From e6bb45dcd5f65cec9ec9a5d286aeee305b6bd3e5 Mon Sep 17 00:00:00 2001 From: Jack Westwood Date: Fri, 24 Oct 2025 14:57:24 +0100 Subject: [PATCH] ING-1307: Initial implementation of watchRouting --- gateway/dataimpl/dataimpl.go | 7 ++ gateway/dataimpl/server_v1/routingserver.go | 107 ++++++++++++++++++++ gateway/system/system.go | 2 + 3 files changed, 116 insertions(+) create mode 100644 gateway/dataimpl/server_v1/routingserver.go diff --git a/gateway/dataimpl/dataimpl.go b/gateway/dataimpl/dataimpl.go index e93585e7..441de3eb 100644 --- a/gateway/dataimpl/dataimpl.go +++ b/gateway/dataimpl/dataimpl.go @@ -26,6 +26,7 @@ type Servers struct { AdminQueryIndexV1Server *server_v1.QueryIndexAdminServer AdminSearchIndexV1Server *server_v1.SearchIndexAdminServer TransactionsV1Server *server_v1.TransactionsServer + RoutingServer *server_v1.RoutingServer } func New(opts *NewOptions) *Servers { @@ -87,5 +88,11 @@ func New(opts *NewOptions) *Servers { v1ErrHandler, v1AuthHandler, ), + RoutingServer: server_v1.NewRoutingServer( + opts.Logger.Named("routing"), + v1ErrHandler, + v1AuthHandler, + opts.CbClient, + ), } } diff --git a/gateway/dataimpl/server_v1/routingserver.go b/gateway/dataimpl/server_v1/routingserver.go new file mode 100644 index 00000000..badeaa94 --- /dev/null +++ b/gateway/dataimpl/server_v1/routingserver.go @@ -0,0 +1,107 @@ +package server_v1 + +import ( + "context" + "time" + + "github.com/couchbase/gocbcorex" + "github.com/couchbase/gocbcorex/cbmgmtx" + "github.com/couchbase/gocbcorex/contrib/cbconfig" + "github.com/couchbase/stellar-gateway/contrib/goprotostellar/genproto/routing_v1" + "go.uber.org/zap" +) + +type RoutingServer struct { + routing_v1.UnimplementedRoutingServiceServer + + logger *zap.Logger + errorHandler *ErrorHandler + authHandler *AuthHandler + cbClient *gocbcorex.BucketsTrackingAgentManager +} + +func NewRoutingServer( + logger *zap.Logger, + errorHandler *ErrorHandler, + authHandler *AuthHandler, + cbClient *gocbcorex.BucketsTrackingAgentManager, +) *RoutingServer { + return &RoutingServer{ + logger: logger, + errorHandler: errorHandler, + authHandler: authHandler, + cbClient: cbClient, + } +} + +func (s *RoutingServer) WatchRouting( + in *routing_v1.WatchRoutingRequest, + out routing_v1.RoutingService_WatchRoutingServer, +) error { + bucketAgent, _, errSt := s.authHandler.GetMemdOboAgent(out.Context(), *in.BucketName) + if errSt != nil { + return errSt.Err() + } + + ticker := time.NewTicker(5 * time.Second) + var prevVBucketMap cbconfig.VBucketServerMapJson + for { + bucket, err := bucketAgent.GetBucket(context.Background(), &cbmgmtx.GetBucketOptions{BucketName: *in.BucketName}) + if err != nil { + return err + } + + if !prevVBucketMap.Equal(*bucket.RawConfig.VBucketServerMap) { + prevVBucketMap = *bucket.RawConfig.VBucketServerMap + + var resp routing_v1.WatchRoutingResponse + var dataRoutingEps []*routing_v1.DataRoutingEndpoint + for i, addr := range bucket.RawConfig.VBucketServerMap.ServerList { + resp.Endpoints = append(resp.Endpoints, &routing_v1.RoutingEndpoint{ + // TO DO -figure out what this ID is supposed to be + Id: "", + // TODO - get the server group from somewhere + ServerGroup: "", + Address: addr, + }) + + localVBuckets := localVBuckets(i, bucket.RawConfig.VBucketServerMap.VBucketMap) + + dataRoutingEps = append(dataRoutingEps, &routing_v1.DataRoutingEndpoint{ + EndpointIdx: uint32(i), + LocalVbuckets: localVBuckets, + // TODO - implement funcitonality to get group vBuckets + GroupVbuckets: []uint32{}, + }) + } + resp.DataRouting = &routing_v1.WatchRoutingResponse_VbucketDataRouting{ + VbucketDataRouting: &routing_v1.VbucketDataRoutingStrategy{ + Endpoints: dataRoutingEps, + }, + } + + err = out.Send(&resp) + if err != nil { + return err + } + } + + select { + case <-out.Context().Done(): + return nil + case <-ticker.C: + } + } +} + +// At the moment this will just return the vBuckets that are local for the master +// copy, need to consider what to do about other replicas +func localVBuckets(serverIndex int, vbMap [][]int) []uint32 { + var vbIds []uint32 + for i, vBucket := range vbMap { + if vBucket[0] == serverIndex { + vbIds = append(vbIds, uint32(i)) + } + } + return vbIds +} diff --git a/gateway/system/system.go b/gateway/system/system.go index 03729360..d7ea0226 100644 --- a/gateway/system/system.go +++ b/gateway/system/system.go @@ -30,6 +30,7 @@ import ( "github.com/couchbase/goprotostellar/genproto/kv_v1" "github.com/couchbase/goprotostellar/genproto/query_v1" "github.com/couchbase/goprotostellar/genproto/search_v1" + "github.com/couchbase/stellar-gateway/contrib/goprotostellar/genproto/routing_v1" "github.com/couchbase/stellar-gateway/contrib/oapimetrics" "github.com/couchbase/stellar-gateway/dataapiv1" "github.com/couchbase/stellar-gateway/gateway/apiversion" @@ -131,6 +132,7 @@ func NewSystem(opts *SystemOptions) (*System, error) { admin_collection_v1.RegisterCollectionAdminServiceServer(dataSrv, dataImpl.AdminCollectionV1Server) admin_query_v1.RegisterQueryAdminServiceServer(dataSrv, dataImpl.AdminQueryIndexV1Server) admin_search_v1.RegisterSearchAdminServiceServer(dataSrv, dataImpl.AdminSearchIndexV1Server) + routing_v1.RegisterRoutingServiceServer(dataSrv, dataImpl.RoutingServer) // health check healthServer := health.NewServer()