Skip to content

Commit ef58b6b

Browse files
authored
fix: controller can not start (#354)
* fix: controller can not start Signed-off-by: wenfeng <[email protected]> * fix lint Signed-off-by: wenfeng <[email protected]> Signed-off-by: wenfeng <[email protected]>
1 parent d8249cc commit ef58b6b

File tree

5 files changed

+18
-12
lines changed

5 files changed

+18
-12
lines changed

cmd/controller/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ func main() {
9494
}
9595

9696
//trigger controller
97-
triggerCtrlStv := trigger.NewController(cfg.GetTriggerConfig(), etcd)
97+
triggerCtrlStv := trigger.NewController(cfg.GetTriggerConfig(), cfg.GetControllerAddrs(), etcd)
9898
if err = triggerCtrlStv.Start(); err != nil {
9999
log.Error(ctx, "start trigger controller fail", map[string]interface{}{
100100
log.KeyError: err,

internal/controller/trigger/controller.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ import (
3939
"github.com/linkall-labs/vanus/pkg/util"
4040
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
4141
"github.com/linkall-labs/vanus/proto/pkg/meta"
42+
"google.golang.org/grpc/credentials/insecure"
4243
"google.golang.org/protobuf/types/known/emptypb"
4344
)
4445

@@ -50,12 +51,13 @@ const (
5051
defaultGcSubscriptionInterval = time.Second * 10
5152
)
5253

53-
func NewController(config Config, member embedetcd.Member) *controller {
54+
func NewController(config Config, controllerAddr []string, member embedetcd.Member) *controller {
5455
ctrl := &controller{
5556
config: config,
5657
member: member,
5758
needCleanSubscription: map[vanus.ID]string{},
5859
state: primitive.ServerStateCreated,
60+
cl: cluster.NewClusterController(controllerAddr, insecure.NewCredentials()),
5961
}
6062
ctrl.ctx, ctrl.stopFunc = context.WithCancel(context.Background())
6163
return ctrl

internal/controller/trigger/controller_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ func TestController_CommitOffset(t *testing.T) {
3838
Convey("test reset offset", t, func() {
3939
mockCtrl := gomock.NewController(t)
4040
defer mockCtrl.Finish()
41-
ctrl := NewController(Config{}, nil)
41+
ctrl := NewController(Config{}, nil, nil)
4242
ctx := context.Background()
4343
workerManager := worker.NewMockManager(mockCtrl)
4444
ctrl.workerManager = workerManager
@@ -77,7 +77,7 @@ func TestController_ResetOffsetToTimestamp(t *testing.T) {
7777
Convey("test reset offset", t, func() {
7878
mockCtrl := gomock.NewController(t)
7979
defer mockCtrl.Finish()
80-
ctrl := NewController(Config{}, nil)
80+
ctrl := NewController(Config{}, nil, nil)
8181
ctx := context.Background()
8282
workerManager := worker.NewMockManager(mockCtrl)
8383
ctrl.workerManager = workerManager
@@ -118,7 +118,7 @@ func TestController_CreateSubscription(t *testing.T) {
118118
Convey("test create subscription", t, func() {
119119
mockCtrl := gomock.NewController(t)
120120
defer mockCtrl.Finish()
121-
ctrl := NewController(Config{}, nil)
121+
ctrl := NewController(Config{}, nil, nil)
122122
ctx := context.Background()
123123
workerManager := worker.NewMockManager(mockCtrl)
124124
ctrl.workerManager = workerManager
@@ -154,7 +154,7 @@ func TestController_UpdateSubscription(t *testing.T) {
154154
Convey("test update subscription", t, func() {
155155
mockCtrl := gomock.NewController(t)
156156
defer mockCtrl.Finish()
157-
ctrl := NewController(Config{}, nil)
157+
ctrl := NewController(Config{}, nil, nil)
158158
ctx := context.Background()
159159
workerManager := worker.NewMockManager(mockCtrl)
160160
ctrl.workerManager = workerManager
@@ -355,7 +355,7 @@ func TestController_DeleteSubscription(t *testing.T) {
355355
Convey("test delete subscription", t, func() {
356356
mockCtrl := gomock.NewController(t)
357357
defer mockCtrl.Finish()
358-
ctrl := NewController(Config{}, nil)
358+
ctrl := NewController(Config{}, nil, nil)
359359
ctx := context.Background()
360360
workerManager := worker.NewMockManager(mockCtrl)
361361
ctrl.workerManager = workerManager
@@ -414,7 +414,7 @@ func TestController_GetSubscription(t *testing.T) {
414414
Convey("test get subscription", t, func() {
415415
mockCtrl := gomock.NewController(t)
416416
defer mockCtrl.Finish()
417-
ctrl := NewController(Config{}, nil)
417+
ctrl := NewController(Config{}, nil, nil)
418418
ctx := context.Background()
419419
workerManager := worker.NewMockManager(mockCtrl)
420420
ctrl.workerManager = workerManager
@@ -451,7 +451,7 @@ func TestController_ListSubscription(t *testing.T) {
451451
Convey("test list subscription", t, func() {
452452
mockCtrl := gomock.NewController(t)
453453
defer mockCtrl.Finish()
454-
ctrl := NewController(Config{}, nil)
454+
ctrl := NewController(Config{}, nil, nil)
455455
ctx := context.Background()
456456
workerManager := worker.NewMockManager(mockCtrl)
457457
ctrl.workerManager = workerManager
@@ -476,7 +476,7 @@ func TestController_TriggerWorkerHeartbeat(t *testing.T) {
476476
Convey("test trigger worker heartbeat", t, func() {
477477
mockCtrl := gomock.NewController(t)
478478
defer mockCtrl.Finish()
479-
ctrl := NewController(Config{}, nil)
479+
ctrl := NewController(Config{}, nil, nil)
480480
ctx := context.Background()
481481
workerManager := worker.NewMockManager(mockCtrl)
482482
ctrl.workerManager = workerManager

internal/store/segment/server.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ func (s *server) reconcileBlocks(ctx context.Context) error {
260260

261261
func (s *server) registerSelf(ctx context.Context) error {
262262
// TODO(james.yin): pass information of blocks.
263+
start := time.Now()
264+
log.Info(ctx, "connecting to controller", nil)
263265
if err := s.ctrl.WaitForControllerReady(false); err != nil {
264266
return err
265267
}
@@ -271,7 +273,9 @@ func (s *server) registerSelf(ctx context.Context) error {
271273
if err != nil {
272274
return err
273275
}
274-
276+
log.Info(ctx, "connected to controller", map[string]interface{}{
277+
"used": time.Since(start),
278+
})
275279
s.id = vanus.NewIDFromUint64(res.ServerId)
276280

277281
// FIXME(james.yin): some blocks may not be bound to segment.

pkg/cluster/controller.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ func (c *cluster) IsReady(createEventbus bool) bool {
142142
if res.LeaderAddr == "" {
143143
return false
144144
}
145-
return createEventbus && res.GetIsEventbusReady()
145+
return !createEventbus || (createEventbus && res.GetIsEventbusReady())
146146
}
147147

148148
func (c *cluster) Status() Topology {

0 commit comments

Comments
 (0)