@@ -16,7 +16,7 @@ package trigger
1616
1717import (
1818 "context"
19- stdErr "errors"
19+ stderr "errors"
2020 "fmt"
2121 "io"
2222 "os"
@@ -52,14 +52,15 @@ var _ ctrlpb.TriggerControllerServer = &controller{}
5252
5353const (
5454 defaultGcSubscriptionInterval = time .Second * 10
55+ waitEventbusReadyTime = time .Minute * 3
56+ waitEventbusCheckPeriod = time .Second * 2
5557)
5658
5759func NewController (config Config , mem member.Member ) * controller {
5860 ctrl := & controller {
5961 config : config ,
6062 member : mem ,
6163 needCleanSubscription : map [vanus.ID ]string {},
62- state : primitive .ServerStateCreated ,
6364 cl : cluster .NewClusterController (config .ControllerAddr , insecure .NewCredentials ()),
6465 ebClient : eb .Connect (config .ControllerAddr ),
6566 }
@@ -81,17 +82,13 @@ type controller struct {
8182 isLeader bool
8283 ctx context.Context
8384 stopFunc context.CancelFunc
84- state primitive.ServerState
8585 cl cluster.Cluster
8686 ebClient eb.Client
8787}
8888
8989func (ctrl * controller ) SetDeadLetterEventOffset (
9090 ctx context.Context , request * ctrlpb.SetDeadLetterEventOffsetRequest ,
9191) (* emptypb.Empty , error ) {
92- if ctrl .state != primitive .ServerStateRunning {
93- return nil , errors .ErrServerNotStart
94- }
9592 subID := vanus .ID (request .SubscriptionId )
9693 err := ctrl .subscriptionManager .SaveDeadLetterOffset (ctx , subID , request .GetOffset ())
9794 if err != nil {
@@ -103,9 +100,6 @@ func (ctrl *controller) SetDeadLetterEventOffset(
103100func (ctrl * controller ) GetDeadLetterEventOffset (
104101 ctx context.Context , request * ctrlpb.GetDeadLetterEventOffsetRequest ,
105102) (* ctrlpb.GetDeadLetterEventOffsetResponse , error ) {
106- if ctrl .state != primitive .ServerStateRunning {
107- return nil , errors .ErrServerNotStart
108- }
109103 subID := vanus .ID (request .SubscriptionId )
110104 offset , err := ctrl .subscriptionManager .GetDeadLetterOffset (ctx , subID )
111105 if err != nil {
@@ -117,9 +111,6 @@ func (ctrl *controller) GetDeadLetterEventOffset(
117111func (ctrl * controller ) CommitOffset (
118112 ctx context.Context , request * ctrlpb.CommitOffsetRequest ,
119113) (* ctrlpb.CommitOffsetResponse , error ) {
120- if ctrl .state != primitive .ServerStateRunning {
121- return nil , errors .ErrServerNotStart
122- }
123114 resp := new (ctrlpb.CommitOffsetResponse )
124115 for _ , subInfo := range request .SubscriptionInfo {
125116 if len (subInfo .Offsets ) == 0 {
@@ -142,9 +133,6 @@ func (ctrl *controller) CommitOffset(
142133func (ctrl * controller ) ResetOffsetToTimestamp (
143134 ctx context.Context , request * ctrlpb.ResetOffsetToTimestampRequest ,
144135) (* ctrlpb.ResetOffsetToTimestampResponse , error ) {
145- if ctrl .state != primitive .ServerStateRunning {
146- return nil , errors .ErrServerNotStart
147- }
148136 if request .Timestamp == 0 {
149137 return nil , errors .ErrInvalidRequest .WithMessage ("timestamp is invalid" )
150138 }
@@ -169,9 +157,6 @@ func (ctrl *controller) ResetOffsetToTimestamp(
169157func (ctrl * controller ) CreateSubscription (
170158 ctx context.Context , request * ctrlpb.CreateSubscriptionRequest ,
171159) (* metapb.Subscription , error ) {
172- if ctrl .state != primitive .ServerStateRunning {
173- return nil , errors .ErrServerNotStart
174- }
175160 err := validation .ValidateSubscriptionRequest (ctx , request .Subscription )
176161 if err != nil {
177162 log .Info (ctx , "create subscription validate fail" , map [string ]interface {}{
@@ -180,6 +165,14 @@ func (ctrl *controller) CreateSubscription(
180165 return nil , err
181166 }
182167 sub := convert .FromPbSubscriptionRequest (request .Subscription )
168+ _ , err = ctrl .cl .NamespaceService ().GetNamespace (ctx , sub .NamespaceID .Uint64 ())
169+ if err != nil {
170+ return nil , err
171+ }
172+ _ , err = ctrl .cl .EventbusService ().GetEventbus (ctx , sub .EventbusID .Uint64 ())
173+ if err != nil {
174+ return nil , err
175+ }
183176 sub .ID , err = vanus .NewID ()
184177 sub .CreatedAt = time .Now ()
185178 sub .UpdatedAt = time .Now ()
@@ -205,9 +198,6 @@ func (ctrl *controller) CreateSubscription(
205198func (ctrl * controller ) UpdateSubscription (
206199 ctx context.Context , request * ctrlpb.UpdateSubscriptionRequest ,
207200) (* metapb.Subscription , error ) {
208- if ctrl .state != primitive .ServerStateRunning {
209- return nil , errors .ErrServerNotStart
210- }
211201 subID := vanus .ID (request .Id )
212202 sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
213203 if sub == nil {
@@ -248,9 +238,6 @@ func (ctrl *controller) UpdateSubscription(
248238func (ctrl * controller ) DeleteSubscription (
249239 ctx context.Context , request * ctrlpb.DeleteSubscriptionRequest ,
250240) (* emptypb.Empty , error ) {
251- if ctrl .state != primitive .ServerStateRunning {
252- return nil , errors .ErrServerNotStart
253- }
254241 subID := vanus .ID (request .Id )
255242 sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
256243 if sub != nil {
@@ -274,9 +261,6 @@ func (ctrl *controller) DeleteSubscription(
274261func (ctrl * controller ) DisableSubscription (
275262 ctx context.Context , request * ctrlpb.DisableSubscriptionRequest ,
276263) (* emptypb.Empty , error ) {
277- if ctrl .state != primitive .ServerStateRunning {
278- return nil , errors .ErrServerNotStart
279- }
280264 subID := vanus .ID (request .Id )
281265 sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
282266 if sub == nil {
@@ -307,9 +291,6 @@ func (ctrl *controller) DisableSubscription(
307291func (ctrl * controller ) ResumeSubscription (
308292 ctx context.Context , request * ctrlpb.ResumeSubscriptionRequest ,
309293) (* emptypb.Empty , error ) {
310- if ctrl .state != primitive .ServerStateRunning {
311- return nil , errors .ErrServerNotStart
312- }
313294 subID := vanus .ID (request .Id )
314295 sub := ctrl .subscriptionManager .GetSubscription (ctx , subID )
315296 if sub == nil {
@@ -331,9 +312,6 @@ func (ctrl *controller) ResumeSubscription(
331312func (ctrl * controller ) GetSubscription (
332313 ctx context.Context , request * ctrlpb.GetSubscriptionRequest ,
333314) (* metapb.Subscription , error ) {
334- if ctrl .state != primitive .ServerStateRunning {
335- return nil , errors .ErrServerNotStart
336- }
337315 sub := ctrl .subscriptionManager .GetSubscription (ctx , vanus .ID (request .Id ))
338316 if sub == nil {
339317 return nil , errors .ErrResourceNotFound .WithMessage ("subscription not exist" )
@@ -360,7 +338,7 @@ func (ctrl *controller) TriggerWorkerHeartbeat(
360338 }
361339 req , err := heartbeat .Recv ()
362340 if err != nil {
363- if ! stdErr .Is (err , io .EOF ) {
341+ if ! stderr .Is (err , io .EOF ) {
364342 log .Warning (ctx , "heartbeat recv error" , map [string ]interface {}{
365343 log .KeyError : err ,
366344 })
@@ -578,7 +556,6 @@ func (ctrl *controller) membershipChangedProcessor(
578556 ctrl .subscriptionManager .Start ()
579557 ctrl .scheduler .Run ()
580558 go ctrl .gcSubscriptions (ctx )
581- ctrl .state = primitive .ServerStateRunning
582559 ctrl .isLeader = true
583560 case member .EventBecomeFollower :
584561 if ! ctrl .isLeader {
@@ -597,13 +574,11 @@ func (ctrl *controller) membershipChangedProcessor(
597574
598575func (ctrl * controller ) stop (_ context.Context ) error {
599576 ctrl .member .ResignIfLeader ()
600- ctrl .state = primitive .ServerStateStopping
601577 ctrl .stopFunc ()
602578 ctrl .scheduler .Stop ()
603579 ctrl .workerManager .Stop ()
604580 ctrl .subscriptionManager .Stop ()
605581 ctrl .storage .Close ()
606- ctrl .state = primitive .ServerStateStopped
607582 return nil
608583}
609584
@@ -641,14 +616,48 @@ func (ctrl *controller) initTriggerSystemEventbus() {
641616 go func () {
642617 ctx := context .Background ()
643618 log .Info (ctx , "trigger controller is ready to check system eventbus" , nil )
619+ if err := ctrl .cl .WaitForControllerReady (false ); err != nil {
620+ log .Error (ctx , "trigger controller check system eventbus, " +
621+ "but Vanus cluster hasn't ready, exit" , map [string ]interface {}{
622+ log .KeyError : err ,
623+ })
624+ os .Exit (- 1 )
625+ }
626+ ready := util .WaitReady (func () bool {
627+ exist , err := ctrl .cl .EventbusService ().IsSystemEventbusExistByName (ctx , primitive .TimerEventbusName )
628+ if err != nil {
629+ log .Error (ctx , "check TimerEventbus exist has error" , map [string ]interface {}{
630+ log .KeyError : err ,
631+ })
632+ return false
633+ }
634+ return exist
635+ }, waitEventbusReadyTime , waitEventbusCheckPeriod )
636+ if ! ready {
637+ log .Error (ctx , "check TimerEventbus timeout no exist, will exist" , nil )
638+ os .Exit (- 1 )
639+ }
640+
641+ // wait TimerEventbus
642+ exist , err := ctrl .cl .EventbusService ().IsSystemEventbusExistByName (ctx , primitive .RetryEventbusName )
643+ if err != nil {
644+ log .Error (ctx , "failed to check RetryEventbus exist, exit" , map [string ]interface {}{
645+ log .KeyError : err ,
646+ })
647+ os .Exit (- 1 )
648+ }
649+ if exist {
650+ log .Info (ctx , "trigger controller check RetryEventbus exist" , nil )
651+ return
652+ }
653+ log .Info (ctx , "trigger controller check RetryEventbus no exist, will create" , nil )
644654 if err := ctrl .cl .WaitForControllerReady (true ); err != nil {
645655 log .Error (ctx , "trigger controller try to create system eventbus, " +
646656 "but Vanus cluster hasn't ready, exit" , map [string ]interface {}{
647657 log .KeyError : err ,
648658 })
649659 os .Exit (- 1 )
650660 }
651-
652661 if _ , err := ctrl .cl .EventbusService ().CreateSystemEventbusIfNotExist (ctx , primitive .RetryEventbusName ,
653662 "System Eventbus For Trigger Service" ); err != nil {
654663 log .Error (ctx , "failed to create RetryEventbus, exit" , map [string ]interface {}{
0 commit comments