Skip to content

Commit d8249cc

Browse files
authored
feat: add ValidateSubscription in gateway (#351)
* feat: add ValidateSubscription in gateway Signed-off-by: wenfeng <[email protected]> * updating according to code review Signed-off-by: wenfeng <[email protected]> * update test data Signed-off-by: wenfeng <[email protected]> * fix bug Signed-off-by: wenfeng <[email protected]> Signed-off-by: wenfeng <[email protected]>
1 parent e696fa7 commit d8249cc

File tree

4 files changed

+576
-134
lines changed

4 files changed

+576
-134
lines changed

internal/gateway/proxy/proxy.go

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,22 +18,26 @@ import (
1818
"context"
1919
"encoding/base64"
2020
"encoding/binary"
21-
"errors"
2221
"fmt"
2322
"net"
2423
"runtime/debug"
2524
"sync"
2625

26+
v2 "github.com/cloudevents/sdk-go/v2"
2727
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
2828
eb "github.com/linkall-labs/vanus/client"
2929
"github.com/linkall-labs/vanus/client/pkg/api"
3030
"github.com/linkall-labs/vanus/client/pkg/option"
3131
"github.com/linkall-labs/vanus/client/pkg/policy"
32+
"github.com/linkall-labs/vanus/internal/convert"
3233
"github.com/linkall-labs/vanus/internal/primitive/interceptor/errinterceptor"
3334
"github.com/linkall-labs/vanus/internal/primitive/vanus"
35+
"github.com/linkall-labs/vanus/internal/trigger/filter"
36+
"github.com/linkall-labs/vanus/internal/trigger/transform"
3437
"github.com/linkall-labs/vanus/observability/log"
3538
"github.com/linkall-labs/vanus/observability/tracing"
3639
"github.com/linkall-labs/vanus/pkg/cluster"
40+
"github.com/linkall-labs/vanus/pkg/errors"
3741
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
3842
proxypb "github.com/linkall-labs/vanus/proto/pkg/proxy"
3943
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@@ -231,6 +235,57 @@ func (cp *ControllerProxy) GetEvent(ctx context.Context,
231235
}, nil
232236
}
233237

238+
func (cp *ControllerProxy) ValidateSubscription(ctx context.Context,
239+
req *proxypb.ValidateSubscriptionRequest) (*proxypb.ValidateSubscriptionResponse, error) {
240+
if req.GetEvent() == nil {
241+
res, err := cp.GetEvent(ctx, &proxypb.GetEventRequest{
242+
Eventbus: req.Eventbus,
243+
EventlogId: req.Eventlog,
244+
Offset: req.Offset,
245+
Number: 1,
246+
})
247+
if err != nil {
248+
return nil, err
249+
}
250+
req.Event = res.GetEvents()[0].Value
251+
}
252+
253+
e := v2.NewEvent()
254+
if err := e.UnmarshalJSON(req.GetEvent()); err != nil {
255+
return nil, errors.ErrInvalidRequest.WithMessage("failed to unmarshall event to CloudEvent").Wrap(err)
256+
}
257+
258+
if req.GetSubscription() == nil {
259+
sub, err := cp.GetSubscription(ctx, &ctrlpb.GetSubscriptionRequest{Id: req.SubscriptionId})
260+
if err != nil {
261+
return nil, err
262+
}
263+
req.Subscription = &ctrlpb.SubscriptionRequest{
264+
Filters: sub.Filters,
265+
Transformer: sub.Transformer,
266+
}
267+
}
268+
269+
sub := convert.FromPbSubscriptionRequest(req.Subscription)
270+
res := &proxypb.ValidateSubscriptionResponse{}
271+
f := filter.GetFilter(sub.Filters)
272+
r := f.Filter(e)
273+
if !r {
274+
return res, nil
275+
}
276+
277+
res.FilterResult = true
278+
t := transform.NewTransformer(sub.Transformer)
279+
if t != nil {
280+
if err := t.Execute(&e); err != nil {
281+
return nil, errors.ErrTransformInputParse.Wrap(err)
282+
}
283+
}
284+
data, _ := e.MarshalJSON()
285+
res.TransformerResult = data
286+
return res, nil
287+
}
288+
234289
// getByEventID why added this? can it be deleted?
235290
func (cp *ControllerProxy) getByEventID(ctx context.Context,
236291
req *proxypb.GetEventRequest) (*proxypb.GetEventResponse, error) {

internal/gateway/proxy/proxy_test.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
stdCtx "context"
1919
"encoding/base64"
2020
"encoding/binary"
21+
stdJson "encoding/json"
2122
"fmt"
2223
"testing"
2324

@@ -26,9 +27,14 @@ import (
2627
"github.com/linkall-labs/vanus/client"
2728
"github.com/linkall-labs/vanus/client/pkg/api"
2829
"github.com/linkall-labs/vanus/client/pkg/policy"
30+
"github.com/linkall-labs/vanus/internal/convert"
31+
"github.com/linkall-labs/vanus/internal/primitive"
2932
"github.com/linkall-labs/vanus/internal/primitive/vanus"
33+
ctrlpb "github.com/linkall-labs/vanus/proto/pkg/controller"
34+
metapb "github.com/linkall-labs/vanus/proto/pkg/meta"
3035
proxypb "github.com/linkall-labs/vanus/proto/pkg/proxy"
3136
. "github.com/smartystreets/goconvey/convey"
37+
"github.com/tidwall/gjson"
3238
"google.golang.org/grpc"
3339
"google.golang.org/grpc/credentials/insecure"
3440
"google.golang.org/protobuf/types/known/emptypb"
@@ -250,3 +256,129 @@ func TestControllerProxy_LookLookupOffset(t *testing.T) {
250256

251257
})
252258
}
259+
260+
func TestControllerProxy_ValidateSubscription(t *testing.T) {
261+
Convey("test ValidateSubscription", t, func() {
262+
cp := NewControllerProxy(Config{
263+
Endpoints: []string{"127.0.0.1:20001",
264+
"127.0.0.1:20002", "127.0.0.1:20003"},
265+
CloudEventReceiverPort: 18080,
266+
ProxyPort: 18082,
267+
Credentials: insecure.NewCredentials(),
268+
})
269+
270+
data := `{
271+
"id":"13b719a4-ada9-436a-9fb1-fc2bc82dc647",
272+
"source":"prometheus",
273+
"specversion":"1.0",
274+
"type":"naive-http-request",
275+
"datacontenttype":"application/json",
276+
"subject":"operator",
277+
"time":"2022-12-12T08:31:54.936803649Z",
278+
"data":{"body":{"alerts":[{"annotations":{"feishuUrls":[{"URL":"https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx",
279+
"signature":"yyyy"},{"URL":"https://open.feishu.cn/open-apis/bot/v2/hook/yyyyy","signature":""},
280+
{"URL":"https://open.feishu.cn/open-apis/bot/v2/hook/zzzzz","signature":"zzzz"}]},
281+
"labels":{"forward":"test-server","severity":"P1"},"startsAt":"2022-12-12T07:55:24.893471163Z","status":"resolved"}],
282+
"commonLabels":{"cluster":"test","forward":"test-server","groups":"test-bot","severity":"P1"}},
283+
"headers":{"Content-Type":"application/json","Host":"webhook-source.vanus:80","User-Agent":"Alertmanager/0.24.0"},
284+
"method":"POST","query_args":{"source":"prometheus","subject":"operator"}}
285+
}`
286+
e := v2.NewEvent()
287+
_ = e.UnmarshalJSON([]byte(data))
288+
289+
trans := `{"pipeline":[{"command":["create","$.xvfeishuservice","bot"]},{"command":["create",
290+
"$.xvfeishumsgtype","interactive"]},{"command":["join","$.xvfeishuboturls",",",
291+
"$.data.body.alerts[0].annotations.feishuUrls[:].URL"]},{"command":["join",
292+
"$.xvfeishubotsigns",",","$.data.body.alerts[0].annotations.feishuUrls[:].signature"]}]}`
293+
var _transformer *primitive.Transformer
294+
_ = stdJson.Unmarshal([]byte(trans), &_transformer)
295+
transPb := convert.ToPbTransformer(_transformer)
296+
s := &ctrlpb.SubscriptionRequest{
297+
Filters: []*metapb.Filter{
298+
{
299+
Exact: map[string]string{
300+
"source": "test",
301+
},
302+
},
303+
},
304+
Transformer: transPb,
305+
}
306+
Convey("test with event and subscription", func() {
307+
ctx := stdCtx.Background()
308+
res, err := cp.ValidateSubscription(ctx, &proxypb.ValidateSubscriptionRequest{
309+
Event: []byte(data),
310+
Subscription: s,
311+
})
312+
So(err, ShouldBeNil)
313+
So(res.FilterResult, ShouldBeFalse)
314+
315+
s.Filters = []*metapb.Filter{
316+
{
317+
Exact: map[string]string{
318+
"source": "prometheus",
319+
},
320+
},
321+
}
322+
res, err = cp.ValidateSubscription(ctx, &proxypb.ValidateSubscriptionRequest{
323+
Event: []byte(data),
324+
Subscription: s,
325+
})
326+
So(err, ShouldBeNil)
327+
So(res.FilterResult, ShouldBeTrue)
328+
result := gjson.ParseBytes(res.TransformerResult)
329+
urls := "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx,https://open" +
330+
".feishu.cn/open-apis/bot/v2/hook/yyyyy,https://open.feishu.cn/open-apis/bot/v2/hook/zzzzz"
331+
So(result.Get("xvfeishumsgtype").String(), ShouldEqual, "interactive")
332+
So(result.Get("xvfeishuboturls").String(), ShouldEqual, urls)
333+
So(result.Get("xvfeishubotsigns").String(), ShouldEqual, "yyyy,,zzzz")
334+
So(result.Get("xvfeishuservice").String(), ShouldEqual, "bot")
335+
})
336+
337+
ctrl := gomock.NewController(t)
338+
cli := client.NewMockClient(ctrl)
339+
cp.client = cli
340+
eb := api.NewMockEventbus(ctrl)
341+
mockTriggerCtrl := ctrlpb.NewMockTriggerControllerClient(ctrl)
342+
cp.triggerCtrl = mockTriggerCtrl
343+
Convey("test with eventlog, offset and subscriptionID", func() {
344+
ctx := stdCtx.Background()
345+
346+
// mock eventbus
347+
cli.EXPECT().Eventbus(gomock.Any(), gomock.Any()).Times(2).Return(eb)
348+
eb.EXPECT().ListLog(gomock.Any()).Times(1).Return([]api.Eventlog{nil}, nil)
349+
rd := api.NewMockBusReader(ctrl)
350+
eb.EXPECT().Reader(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(rd)
351+
rd.EXPECT().Read(gomock.Any()).Times(1).Return([]*v2.Event{&e}, int64(0), uint64(0), nil)
352+
353+
// mock subscription
354+
pb := &metapb.Subscription{
355+
Filters: []*metapb.Filter{
356+
{
357+
Exact: map[string]string{
358+
"source": "prometheus",
359+
},
360+
},
361+
},
362+
Transformer: s.Transformer,
363+
}
364+
mockTriggerCtrl.EXPECT().GetSubscription(ctx, gomock.Any()).Times(1).Return(pb, nil)
365+
366+
res, err := cp.ValidateSubscription(ctx, &proxypb.ValidateSubscriptionRequest{
367+
SubscriptionId: vanus.NewTestID().Uint64(),
368+
Eventbus: "test",
369+
Eventlog: vanus.NewTestID().Uint64(),
370+
Offset: 123,
371+
})
372+
373+
So(err, ShouldBeNil)
374+
So(res.FilterResult, ShouldBeTrue)
375+
result := gjson.ParseBytes(res.TransformerResult)
376+
urls := "https://open.feishu.cn/open-apis/bot/v2/hook/xxxxx,https://open" +
377+
".feishu.cn/open-apis/bot/v2/hook/yyyyy,https://open.feishu.cn/open-apis/bot/v2/hook/zzzzz"
378+
So(result.Get("xvfeishumsgtype").String(), ShouldEqual, "interactive")
379+
So(result.Get("xvfeishuboturls").String(), ShouldEqual, urls)
380+
So(result.Get("xvfeishubotsigns").String(), ShouldEqual, "yyyy,,zzzz")
381+
So(result.Get("xvfeishuservice").String(), ShouldEqual, "bot")
382+
})
383+
})
384+
}

0 commit comments

Comments
 (0)