Skip to content

Commit e696fa7

Browse files
authored
feat: abstract cluster operation (#338)
* feat: abstract cluster operation Signed-off-by: wenfeng <[email protected]> * fix ut&lint Signed-off-by: wenfeng <[email protected]> * update Signed-off-by: wenfeng <[email protected]> * add waiting into trigger Signed-off-by: wenfeng <[email protected]> Signed-off-by: wenfeng <[email protected]>
1 parent b8ab5ef commit e696fa7

File tree

44 files changed

+1767
-702
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+1767
-702
lines changed

client/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type client struct {
4040
}
4141

4242
func (c *client) Eventbus(ctx context.Context, ebName string) api.Eventbus {
43-
_, span := c.tracer.Start(ctx, "Eventbus")
43+
_, span := c.tracer.Start(ctx, "EventbusService")
4444
defer span.End()
4545

4646
bus := func() api.Eventbus {

client/internal/vanus/eventbus/name_service.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,11 @@ package eventbus
1717
import (
1818
// standard libraries
1919
"context"
20+
"github.com/linkall-labs/vanus/pkg/cluster"
2021

2122
"github.com/linkall-labs/vanus/observability/tracing"
2223
"go.opentelemetry.io/otel/trace"
2324

24-
// third-party libraries
25-
"github.com/linkall-labs/vanus/pkg/controller"
2625
"google.golang.org/grpc/credentials/insecure"
2726

2827
// first-party libraries
@@ -33,7 +32,7 @@ import (
3332

3433
func NewNameService(endpoints []string) *NameService {
3534
return &NameService{
36-
client: controller.NewEventbusClient(endpoints, insecure.NewCredentials()),
35+
client: cluster.NewClusterController(endpoints, insecure.NewCredentials()).EventbusService().RawClient(),
3736
tracer: tracing.NewTracer("internal.discovery.eventbus", trace.SpanKindClient),
3837
}
3938
}

client/internal/vanus/eventlog/name_service.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import (
2727
// first-party libraries.
2828
"github.com/linkall-labs/vanus/observability/log"
2929
"github.com/linkall-labs/vanus/observability/tracing"
30-
"github.com/linkall-labs/vanus/pkg/controller"
30+
"github.com/linkall-labs/vanus/pkg/cluster"
3131
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
3232
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
3333

@@ -38,7 +38,7 @@ import (
3838

3939
func NewNameService(endpoints []string) *NameService {
4040
return &NameService{
41-
client: controller.NewEventlogClient(endpoints, insecure.NewCredentials()),
41+
client: cluster.NewClusterController(endpoints, insecure.NewCredentials()).EventlogService().RawClient(),
4242
tracer: tracing.NewTracer("internal.discovery.eventlog", trace.SpanKindClient),
4343
}
4444
}

client/mock_client.go

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

cmd/controller/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ func main() {
8787

8888
segmentCtrl := eventbus.NewController(cfg.GetEventbusCtrlConfig(), etcd)
8989
if err = segmentCtrl.Start(ctx); err != nil {
90-
log.Error(ctx, "start Eventbus Controller failed", map[string]interface{}{
90+
log.Error(ctx, "start EventbusService Controller failed", map[string]interface{}{
9191
log.KeyError: err,
9292
})
9393
os.Exit(-1)

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ require (
2323
github.com/iceber/iouring-go v0.0.0-20220609112130-b1dc8dd9fbfd
2424
github.com/jedib0t/go-pretty/v6 v6.3.1
2525
github.com/json-iterator/go v1.1.12
26-
github.com/linkall-labs/embed-etcd v0.1.1
26+
github.com/linkall-labs/embed-etcd v0.1.2
2727
github.com/linkall-labs/vanus/client v0.5.1
2828
github.com/linkall-labs/vanus/observability v0.5.1
2929
github.com/linkall-labs/vanus/pkg v0.5.1

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
292292
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
293293
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
294294
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
295-
github.com/linkall-labs/embed-etcd v0.1.1 h1:WxV9wbnRtNf7DMW8SJauVYqhFLXzRfY5wpplFypXK9k=
296-
github.com/linkall-labs/embed-etcd v0.1.1/go.mod h1:dmleSy0Myllw6W5awwjyDMipgICVDHTHuTcRT4cqaIc=
295+
github.com/linkall-labs/embed-etcd v0.1.2 h1:1mTdXLwVvn9gi3XWh/PGhaEAfG8Zmxvjqwnfontb+fA=
296+
github.com/linkall-labs/embed-etcd v0.1.2/go.mod h1:QnecHaKt3WQBO9YGBckCDUTBd44VBR2VO8220BtWZ5U=
297297
github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
298298
github.com/mattn/go-colorable v0.1.11 h1:nQ+aFkoE2TMGc0b68U2OKSexC+eq46+XwZzWXHRmPYs=
299299
github.com/mattn/go-colorable v0.1.11/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=

internal/controller/eventbus/controller.go

Lines changed: 53 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"github.com/linkall-labs/vanus/internal/controller/eventbus/volume"
3333
"github.com/linkall-labs/vanus/internal/kv"
3434
"github.com/linkall-labs/vanus/internal/kv/etcd"
35+
"github.com/linkall-labs/vanus/internal/primitive"
3536
"github.com/linkall-labs/vanus/internal/primitive/vanus"
3637
"github.com/linkall-labs/vanus/observability/log"
3738
"github.com/linkall-labs/vanus/observability/metrics"
@@ -94,7 +95,6 @@ func (ctrl *controller) Start(_ context.Context) error {
9495
return err
9596
}
9697
ctrl.kvStore = store
97-
9898
ctrl.cancelCtx, ctrl.cancelFunc = context.WithCancel(context.Background())
9999
go ctrl.member.RegisterMembershipChangedProcessor(ctrl.membershipChangedProcessor)
100100
return nil
@@ -113,9 +113,48 @@ func (ctrl *controller) StopNotify() <-chan error {
113113
}
114114

115115
func (ctrl *controller) CreateEventBus(ctx context.Context,
116+
req *ctrlpb.CreateEventBusRequest) (*metapb.EventBus, error) {
117+
if err := isValidEventbusName(req.Name); err != nil {
118+
return nil, err
119+
}
120+
return ctrl.createEventBus(ctx, req)
121+
}
122+
123+
func isValidEventbusName(name string) error {
124+
name = strings.ToLower(name)
125+
for _, v := range name {
126+
if v == '.' || v == '_' || v == '-' {
127+
continue
128+
}
129+
c := v - 'a'
130+
if c >= 0 || c <= 26 {
131+
continue
132+
} else {
133+
c = v - '0'
134+
if c >= 0 || c <= 9 {
135+
continue
136+
}
137+
return errors.ErrInvalidRequest.WithMessage("eventbus name must be insist of 0-9a-zA-Z.-_")
138+
}
139+
}
140+
return nil
141+
}
142+
143+
func (ctrl *controller) CreateSystemEventBus(ctx context.Context,
144+
req *ctrlpb.CreateEventBusRequest) (*metapb.EventBus, error) {
145+
if !strings.HasPrefix(req.Name, primitive.SystemEventbusNamePrefix) {
146+
return nil, errors.ErrInvalidRequest.WithMessage("system eventbus must start with __")
147+
}
148+
return ctrl.createEventBus(ctx, req)
149+
}
150+
151+
func (ctrl *controller) createEventBus(ctx context.Context,
116152
req *ctrlpb.CreateEventBusRequest) (*metapb.EventBus, error) {
117153
ctrl.mutex.Lock()
118154
defer ctrl.mutex.Unlock()
155+
if !ctrl.isReady(ctx) {
156+
return nil, errors.ErrResourceCanNotOp.WithMessage("the cluster isn't ready to create eventbus")
157+
}
119158
logNum := req.LogNumber
120159
if logNum == 0 {
121160
logNum = 1
@@ -451,12 +490,23 @@ func (ctrl *controller) ReportSegmentBlockIsFull(ctx context.Context,
451490
return &emptypb.Empty{}, nil
452491
}
453492

454-
func (ctrl *controller) Ping(_ context.Context, _ *emptypb.Empty) (*ctrlpb.PingResponse, error) {
493+
func (ctrl *controller) Ping(ctx context.Context, _ *emptypb.Empty) (*ctrlpb.PingResponse, error) {
455494
return &ctrlpb.PingResponse{
456-
LeaderAddr: ctrl.member.GetLeaderAddr(),
495+
LeaderAddr: ctrl.member.GetLeaderAddr(),
496+
IsEventbusReady: ctrl.isReady(ctx),
457497
}, nil
458498
}
459499

500+
func (ctrl *controller) isReady(ctx context.Context) bool {
501+
if ctrl.member == nil {
502+
return false
503+
}
504+
if !ctrl.member.IsLeader() && !ctrl.member.IsReady() || ctrl.member.GetLeaderAddr() == "" {
505+
return false
506+
}
507+
return ctrl.ssMgr.CanCreateEventbus(ctx, int(ctrl.cfg.Replicas))
508+
}
509+
460510
func (ctrl *controller) ReportSegmentLeader(ctx context.Context,
461511
req *ctrlpb.ReportSegmentLeaderRequest) (*emptypb.Empty, error) {
462512
err := ctrl.eventLogMgr.UpdateSegmentReplicas(ctx, vanus.NewIDFromUint64(req.LeaderId), req.Term)

internal/controller/eventbus/controller_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,15 @@ import (
2020
"sort"
2121
"testing"
2222

23+
"github.com/golang/mock/gomock"
24+
embedetcd "github.com/linkall-labs/embed-etcd"
2325
"github.com/linkall-labs/vanus/internal/controller/eventbus/eventlog"
2426
"github.com/linkall-labs/vanus/internal/controller/eventbus/metadata"
2527
"github.com/linkall-labs/vanus/internal/kv"
2628
"github.com/linkall-labs/vanus/internal/primitive/vanus"
2729
"github.com/linkall-labs/vanus/pkg/errors"
2830
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
2931
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
30-
31-
"github.com/golang/mock/gomock"
3232
. "github.com/smartystreets/goconvey/convey"
3333
)
3434

@@ -44,6 +44,12 @@ func TestController_CreateEventBus(t *testing.T) {
4444
ctrl.eventLogMgr = elMgr
4545
ctx := stdCtx.Background()
4646

47+
mockMember := embedetcd.NewMockMember(mockCtrl)
48+
ctrl.member = mockMember
49+
mockMember.EXPECT().IsLeader().AnyTimes().Return(true)
50+
mockMember.EXPECT().IsReady().AnyTimes().Return(true)
51+
mockMember.EXPECT().GetLeaderAddr().AnyTimes().Return("test")
52+
4753
Convey("test create a eventbus two times", func() {
4854
kvCli.EXPECT().Exists(ctx, metadata.GetEventbusMetadataKey("test-1")).Times(1).Return(false, nil)
4955
kvCli.EXPECT().Set(ctx, metadata.GetEventbusMetadataKey("test-1"), gomock.Any()).

internal/controller/eventbus/server/manager.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,13 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
//go:generate mockgen -source=manager.go -destination=mock_manager.go -package=server
1516
package server
1617

1718
import (
1819
"context"
1920
"sync"
21+
"sync/atomic"
2022
"time"
2123

2224
"github.com/golang/protobuf/ptypes/empty"
@@ -36,6 +38,7 @@ type Manager interface {
3638
GetServerByServerID(id vanus.ID) Server
3739
Run(ctx context.Context) error
3840
Stop(ctx context.Context)
41+
CanCreateEventbus(ctx context.Context, replicaNum int) bool
3942
}
4043

4144
const (
@@ -59,6 +62,7 @@ type segmentServerManager struct {
5962
cancelCtx context.Context
6063
cancel func()
6164
ticker *time.Ticker
65+
onlineServerNumber int64
6266
}
6367

6468
func (mgr *segmentServerManager) AddServer(ctx context.Context, srv Server) error {
@@ -78,9 +82,11 @@ func (mgr *segmentServerManager) AddServer(ctx context.Context, srv Server) erro
7882
}
7983
mgr.segmentServerMapByIP.Store(srv.Address(), srv)
8084
mgr.segmentServerMapByID.Store(srv.ID().Key(), srv)
85+
atomic.AddInt64(&mgr.onlineServerNumber, 1)
8186
log.Info(ctx, "the segment server added", map[string]interface{}{
8287
"server_id": srv.ID(),
8388
"addr": srv.Address(),
89+
"online": atomic.LoadInt64(&mgr.onlineServerNumber),
8490
})
8591
return nil
8692
}
@@ -93,6 +99,12 @@ func (mgr *segmentServerManager) RemoveServer(ctx context.Context, srv Server) e
9399
defer mgr.mutex.Unlock()
94100
mgr.segmentServerMapByIP.Delete(srv.Address())
95101
mgr.segmentServerMapByID.Delete(srv.ID().Key())
102+
atomic.AddInt64(&mgr.onlineServerNumber, -1)
103+
log.Info(ctx, "the segment server was removed", map[string]interface{}{
104+
"server_id": srv.ID(),
105+
"addr": srv.Address(),
106+
"online": atomic.LoadInt64(&mgr.onlineServerNumber),
107+
})
96108
return nil
97109
}
98110

@@ -171,6 +183,18 @@ func (mgr *segmentServerManager) Stop(ctx context.Context) {
171183
})
172184
}
173185

186+
func (mgr *segmentServerManager) CanCreateEventbus(ctx context.Context, replicaNum int) bool {
187+
activeNum := 0
188+
mgr.segmentServerMapByID.Range(func(_, value any) bool {
189+
s, _ := value.(Server)
190+
if s.IsActive(ctx) {
191+
activeNum++
192+
}
193+
return true
194+
})
195+
return activeNum >= replicaNum
196+
}
197+
174198
type Server interface {
175199
RemoteStart(ctx context.Context) error
176200
RemoteStop(ctx context.Context)

0 commit comments

Comments
 (0)