Skip to content

Commit 5fdb2bb

Browse files
authored
feat: add more fields to eventbus and subscription (#316)
1 parent f14ef91 commit 5fdb2bb

File tree

14 files changed

+1053
-845
lines changed

14 files changed

+1053
-845
lines changed

internal/controller/eventbus/controller.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,13 @@ func (ctrl *controller) CreateEventBus(ctx context.Context,
133133
return nil, err
134134
}
135135
eb := &metadata.Eventbus{
136-
ID: id,
137-
Name: req.Name,
138-
LogNumber: int(logNum),
139-
EventLogs: make([]*metadata.Eventlog, int(logNum)),
136+
ID: id,
137+
Name: req.Name,
138+
LogNumber: int(logNum),
139+
EventLogs: make([]*metadata.Eventlog, int(logNum)),
140+
Description: req.Description,
141+
CreatedAt: time.Now(),
142+
UpdatedAt: time.Now(),
140143
}
141144
exist, err := ctrl.kvStore.Exists(ctx, metadata.GetEventbusMetadataKey(eb.Name))
142145
if err != nil {
@@ -204,8 +207,6 @@ func (ctrl *controller) getEventbus(name string) (*metapb.EventBus, error) {
204207
}
205208

206209
ebMD := metadata.Convert2ProtoEventBus(_eb)[0]
207-
ebMD.Name = _eb.Name
208-
ebMD.Logs = metadata.Convert2ProtoEventLog(_eb.EventLogs...)
209210
addrs := make([]string, 0)
210211
for _, v := range ctrl.cfg.Topology {
211212
addrs = append(addrs, v)

internal/controller/eventbus/controller_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ func TestController_CreateEventBus(t *testing.T) {
6565
})
6666
So(err, ShouldBeNil)
6767
So(res.Name, ShouldEqual, "test-1")
68-
So(res.Id, ShouldEqual, el.EventbusID.Uint64())
68+
So(res.Id, ShouldNotEqual, 0)
6969
So(res.Logs, ShouldHaveLength, 1)
7070
So(res.LogNumber, ShouldEqual, 1)
7171
So(res.Logs[0].EventBusName, ShouldEqual, "test-1")

internal/controller/eventbus/metadata/info.go

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,25 +16,34 @@ package metadata
1616

1717
import (
1818
"encoding/json"
19+
"time"
1920

2021
"github.com/linkall-labs/vanus/internal/primitive/vanus"
2122
"github.com/linkall-labs/vanus/proto/pkg/meta"
2223
)
2324

2425
type Eventbus struct {
25-
ID vanus.ID `json:"id"`
26-
Name string `json:"name"`
27-
LogNumber int `json:"log_number"`
28-
EventLogs []*Eventlog `json:"event_logs"`
26+
ID vanus.ID `json:"id"`
27+
Name string `json:"name"`
28+
LogNumber int `json:"log_number"`
29+
EventLogs []*Eventlog `json:"event_logs"`
30+
Description string `json:"description"`
31+
CreatedAt time.Time `json:"created_at"`
32+
UpdatedAt time.Time `json:"updated_at"`
2933
}
3034

3135
func Convert2ProtoEventBus(ins ...*Eventbus) []*meta.EventBus {
3236
pebs := make([]*meta.EventBus, len(ins))
3337
for idx := 0; idx < len(ins); idx++ {
3438
eb := ins[idx]
3539
pebs[idx] = &meta.EventBus{
36-
Name: eb.Name,
37-
LogNumber: int32(eb.LogNumber),
40+
Name: eb.Name,
41+
LogNumber: int32(eb.LogNumber),
42+
Logs: Convert2ProtoEventLog(eb.EventLogs...),
43+
Id: eb.ID.Uint64(),
44+
Description: eb.Description,
45+
CreatedAt: eb.CreatedAt.UnixMilli(),
46+
UpdatedAt: eb.UpdatedAt.UnixMilli(),
3847
}
3948
}
4049
return pebs

internal/controller/trigger/controller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,8 @@ func (ctrl *controller) CreateSubscription(ctx context.Context,
141141
}
142142
sub := convert.FromPbSubscriptionRequest(request.Subscription)
143143
sub.ID, err = vanus.NewID()
144+
sub.CreatedAt = time.Now()
145+
sub.UpdatedAt = time.Now()
144146
if err != nil {
145147
return nil, err
146148
}
@@ -186,6 +188,7 @@ func (ctrl *controller) UpdateSubscription(ctx context.Context,
186188
if !change {
187189
return nil, errors.ErrInvalidRequest.WithMessage("no change")
188190
}
191+
sub.UpdatedAt = time.Now()
189192
sub.Phase = metadata.SubscriptionPhasePending
190193
if err := ctrl.subscriptionManager.UpdateSubscription(ctx, sub); err != nil {
191194
return nil, err

internal/controller/trigger/metadata/info.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,11 @@ type Subscription struct {
7575
ProtocolSetting *primitive.ProtocolSetting `json:"protocol_settings,omitempty"`
7676
EventBus string `json:"eventbus"`
7777
Transformer *primitive.Transformer `json:"transformer,omitempty"`
78+
Name string `json:"name"`
79+
Disable bool `json:"disable"`
80+
Description string `json:"description"`
81+
CreatedAt time.Time `json:"created_at"`
82+
UpdatedAt time.Time `json:"updated_at"`
7883

7984
// not from api
8085
Phase SubscriptionPhase `json:"phase"`
@@ -89,6 +94,14 @@ func (s *Subscription) Update(update *Subscription) bool {
8994
change = true
9095
s.Source = update.Source
9196
}
97+
if s.Description != update.Description {
98+
change = true
99+
s.Description = update.Description
100+
}
101+
if s.Disable != update.Disable {
102+
change = true
103+
s.Disable = update.Disable
104+
}
92105
if !reflect.DeepEqual(s.Types, update.Types) {
93106
change = true
94107
s.Types = update.Types

internal/convert/convert.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ func FromPbSubscriptionRequest(sub *ctrl.SubscriptionRequest) *metadata.Subscrip
3838
Filters: fromPbFilters(sub.Filters),
3939
Transformer: fromPbTransformer(sub.Transformer),
4040
EventBus: sub.EventBus,
41+
Name: sub.Name,
42+
Disable: sub.Disable,
43+
Description: sub.Description,
4144
}
4245
return to
4346
}
@@ -282,6 +285,11 @@ func ToPbSubscription(sub *metadata.Subscription, offsets info.ListOffsetInfo) *
282285
Filters: toPbFilters(sub.Filters),
283286
Transformer: ToPbTransformer(sub.Transformer),
284287
Offsets: ToPbOffsetInfos(offsets),
288+
Name: sub.Name,
289+
Disable: sub.Disable,
290+
Description: sub.Description,
291+
CreatedAt: sub.CreatedAt.UnixMilli(),
292+
UpdatedAt: sub.UpdatedAt.UnixMilli(),
285293
}
286294
return to
287295
}

internal/primitive/vanus/id.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ package vanus
1616

1717
import (
1818
"context"
19+
"errors"
1920
"fmt"
2021
"strconv"
2122
"sync"
@@ -222,7 +223,14 @@ func NewIDFromUint64(id uint64) ID {
222223
return ID(id)
223224
}
224225

226+
var (
227+
ErrEmptyID = errors.New("id: empty")
228+
)
229+
225230
func NewIDFromString(id string) (ID, error) {
231+
if id == "" {
232+
return emptyID, ErrEmptyID
233+
}
226234
i, err := strconv.ParseUint(id, base, bitSize)
227235
if err != nil {
228236
return emptyID, err

0 commit comments

Comments
 (0)