Skip to content

Commit 9ee1883

Browse files
committed
feat: wasmbus Policy & Config Service
Signed-off-by: Lucas Fontes <[email protected]>
1 parent 89a99bf commit 9ee1883

File tree

11 files changed

+499
-16
lines changed

11 files changed

+499
-16
lines changed

.github/workflows/wasmbus-go.yaml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,14 @@ jobs:
7878
working-directory: x/wasmbus/events
7979
run: go test -cover -v -wash-output
8080

81+
- name: wasmbus/policy
82+
working-directory: x/wasmbus/policy
83+
run: go test -cover -v -wash-output
84+
85+
- name: wasmbus/config
86+
working-directory: x/wasmbus/config
87+
run: go test -cover -v -wash-output
88+
8189
examples:
8290
# Context: https://github.com/golangci/golangci-lint-action/blob/v6.1.1/README.md#annotations
8391
permissions:

x/wasmbus/bus.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ const (
2222
PrefixEvents = "wasmbus.evt"
2323
// PrefixControl is the prefix for Lattice RPC.
2424
PrefixCtlV1 = "wasmbus.ctl.v1"
25+
26+
PrefixConfig = "wasmbus.cfg"
2527
)
2628

2729
var (

x/wasmbus/config/api.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package config
2+
3+
import (
4+
"context"
5+
"fmt"
6+
)
7+
8+
var (
9+
ErrProtocol = fmt.Errorf("encoding error")
10+
ErrInternal = fmt.Errorf("internal error")
11+
)
12+
13+
type API interface {
14+
// Host is currently the only method exposed by the API.
15+
Host(ctx context.Context, req *HostRequest) (*HostResponse, error)
16+
}
17+
18+
var _ API = (*APIMock)(nil)
19+
20+
type APIMock struct {
21+
HostFunc func(ctx context.Context, req *HostRequest) (*HostResponse, error)
22+
}
23+
24+
func (m *APIMock) Host(ctx context.Context, req *HostRequest) (*HostResponse, error) {
25+
return m.HostFunc(ctx, req)
26+
}
27+
28+
type HostRequest struct {
29+
Labels map[string]string `json:"labels"`
30+
}
31+
32+
type HostResponse struct {
33+
RegistryCredentials map[string]RegistryCredential `json:"registryCredentials,omitempty"`
34+
}
35+
36+
type RegistryCredential struct {
37+
Username string `json:"username"`
38+
Password string `json:"password"`
39+
}

x/wasmbus/config/server.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
package config
2+
3+
import (
4+
"fmt"
5+
6+
"go.wasmcloud.dev/x/wasmbus"
7+
)
8+
9+
type Server struct {
10+
*wasmbus.Server
11+
Lattice string
12+
api API
13+
}
14+
15+
func NewServer(bus wasmbus.Bus, lattice string, api API) *Server {
16+
return &Server{
17+
Server: wasmbus.NewServer(bus),
18+
Lattice: lattice,
19+
api: api,
20+
}
21+
}
22+
23+
func (s *Server) Serve() error {
24+
subject := fmt.Sprintf("%s.%s.req", wasmbus.PrefixConfig, s.Lattice)
25+
handler := wasmbus.NewRequestHandler(HostRequest{}, HostResponse{}, s.api.Host)
26+
return s.RegisterHandler(subject, handler)
27+
}

x/wasmbus/config/server_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package config
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"testing"
7+
"time"
8+
9+
"github.com/nats-io/nats.go"
10+
"go.wasmcloud.dev/x/wasmbus"
11+
"go.wasmcloud.dev/x/wasmbus/wasmbustest"
12+
)
13+
14+
func TestServer(t *testing.T) {
15+
defer wasmbustest.MustStartNats(t)()
16+
17+
nc, err := nats.Connect(nats.DefaultURL)
18+
if err != nil {
19+
t.Fatalf("failed to connect to nats: %v", err)
20+
}
21+
bus := wasmbus.NewNatsBus(nc)
22+
s := NewServer(bus, "test", &APIMock{
23+
HostFunc: func(ctx context.Context, req *HostRequest) (*HostResponse, error) {
24+
return &HostResponse{
25+
RegistryCredentials: map[string]RegistryCredential{
26+
"docker.io": {
27+
Username: "my-username",
28+
Password: "hunter2",
29+
},
30+
},
31+
}, nil
32+
},
33+
})
34+
if err := s.Serve(); err != nil {
35+
t.Fatalf("failed to start server: %v", err)
36+
}
37+
38+
req := wasmbus.NewMessage(fmt.Sprintf("%s.%s.req", wasmbus.PrefixConfig, "test"))
39+
req.Data = []byte(`{"labels":{"hostcore.arch":"aarch64","hostcore.os":"linux","hostcore.osfamily":"unix","kubernetes":"true","kubernetes.hostgroup":"default"}}`)
40+
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
41+
defer cancel()
42+
43+
rawResp, err := bus.Request(ctx, req)
44+
if err != nil {
45+
t.Fatal(err)
46+
}
47+
48+
var resp HostResponse
49+
if err := wasmbus.Decode(rawResp, &resp); err != nil {
50+
t.Fatal(err)
51+
}
52+
53+
docker, ok := resp.RegistryCredentials["docker.io"]
54+
if !ok {
55+
t.Fatalf("expected docker.io registry credentials")
56+
}
57+
if want, got := "my-username", docker.Username; want != got {
58+
t.Fatalf("expected username %q, got %q", want, got)
59+
}
60+
61+
if want, got := "hunter2", docker.Password; want != got {
62+
t.Fatalf("expected password %q, got %q", want, got)
63+
}
64+
65+
if err := s.Drain(); err != nil {
66+
t.Fatalf("failed to drain server: %v", err)
67+
}
68+
}

x/wasmbus/policy/api.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package policy
2+
3+
import (
4+
"context"
5+
"fmt"
6+
)
7+
8+
var (
9+
ErrProtocol = fmt.Errorf("encoding error")
10+
ErrInternal = fmt.Errorf("internal error")
11+
)
12+
13+
type API interface {
14+
// PerformInvocation is called when a component is invoked
15+
PerformInvocation(ctx context.Context, req *PerformInvocationRequest) (*Response, error)
16+
// StartComponent is called when a component is started
17+
StartComponent(ctx context.Context, req *StartComponentRequest) (*Response, error)
18+
// StartProvider is called when a provider is started
19+
StartProvider(ctx context.Context, req *StartProviderRequest) (*Response, error)
20+
}
21+
22+
var _ API = (*APIMock)(nil)
23+
24+
type APIMock struct {
25+
PerformInvocationFunc func(ctx context.Context, req *PerformInvocationRequest) (*Response, error)
26+
StartComponentFunc func(ctx context.Context, req *StartComponentRequest) (*Response, error)
27+
StartProviderFunc func(ctx context.Context, req *StartProviderRequest) (*Response, error)
28+
}
29+
30+
func (m *APIMock) PerformInvocation(ctx context.Context, req *PerformInvocationRequest) (*Response, error) {
31+
return m.PerformInvocationFunc(ctx, req)
32+
}
33+
34+
func (m *APIMock) StartComponent(ctx context.Context, req *StartComponentRequest) (*Response, error) {
35+
return m.StartComponentFunc(ctx, req)
36+
}
37+
38+
func (m *APIMock) StartProvider(ctx context.Context, req *StartProviderRequest) (*Response, error) {
39+
return m.StartProviderFunc(ctx, req)
40+
}
41+
42+
// Request is the structure of the request sent to the policy engine
43+
type BaseRequest[T any] struct {
44+
Id string `json:"requestId"`
45+
Kind string `json:"kind"`
46+
Version string `json:"version"`
47+
Host Host `json:"host"`
48+
Request T `json:"request"`
49+
}
50+
51+
// Decision is a helper function to create a response
52+
func (r BaseRequest[T]) Decision(allowed bool, msg string) *Response {
53+
return &Response{
54+
Id: r.Id,
55+
Permitted: allowed,
56+
Message: msg,
57+
}
58+
}
59+
60+
// Deny is a helper function to create a response with a deny decision
61+
func (r BaseRequest[T]) Deny(msg string) *Response {
62+
return r.Decision(false, msg)
63+
}
64+
65+
// Allow is a helper function to create a response with an allow decision
66+
func (r BaseRequest[T]) Allow(msg string) *Response {
67+
return r.Decision(true, msg)
68+
}
69+
70+
// Response is the structure of the response sent by the policy engine
71+
type Response struct {
72+
Id string `json:"requestId"`
73+
Permitted bool `json:"permitted"`
74+
Message string `json:"message,omitempty"`
75+
}
76+
77+
type Claims struct {
78+
PublicKey string `json:"publicKey"`
79+
Issuer string `json:"issuer"`
80+
IssuedAt int `json:"issuedAt"`
81+
ExpiresAt int `json:"expiresAt"`
82+
Expired bool `json:"expired"`
83+
}
84+
85+
type StartComponentPayload struct {
86+
ComponentId string `json:"componentId"`
87+
ImageRef string `json:"imageRef"`
88+
MaxInstances int `json:"maxInstances"`
89+
Annotations map[string]string `json:"annotations"`
90+
}
91+
92+
type StartComponentRequest = BaseRequest[StartComponentPayload]
93+
94+
type StartProviderPayload struct {
95+
ProviderId string `json:"providerId"`
96+
ImageRef string `json:"imageRef"`
97+
Annotations map[string]string `json:"annotations"`
98+
}
99+
100+
type StartProviderRequest = BaseRequest[StartProviderPayload]
101+
102+
type PerformInvocationPayload struct {
103+
Interface string `json:"interface"`
104+
Function string `json:"function"`
105+
// NOTE(lxf): this covers components but not providers. wut?!?
106+
Target InvocationTarget `json:"target"`
107+
}
108+
109+
type PerformInvocationRequest = BaseRequest[PerformInvocationPayload]
110+
111+
type InvocationTarget struct {
112+
ComponentId string `json:"componentId"`
113+
ImageRef string `json:"imageRef"`
114+
MaxInstances int `json:"maxInstances"`
115+
Annotations map[string]string `json:"annotations"`
116+
}
117+
118+
type Host struct {
119+
PublicKey string `json:"publicKey"`
120+
Lattice string `json:"lattice"`
121+
Labels map[string]string `json:"labels"`
122+
}

x/wasmbus/policy/server.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package policy
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
8+
"go.wasmcloud.dev/x/wasmbus"
9+
)
10+
11+
type Server struct {
12+
*wasmbus.Server
13+
subject string
14+
api API
15+
handlers map[string]wasmbus.AnyServerHandler
16+
}
17+
18+
func NewServer(bus wasmbus.Bus, subject string, api API) *Server {
19+
return &Server{
20+
Server: wasmbus.NewServer(bus),
21+
subject: subject,
22+
api: api,
23+
}
24+
}
25+
26+
func (s *Server) Serve() error {
27+
handler := wasmbus.NewTypedHandler(extractType)
28+
29+
startComponent := wasmbus.NewRequestHandler(StartComponentRequest{}, Response{}, s.api.StartComponent)
30+
if err := handler.RegisterType("startComponent", startComponent); err != nil {
31+
return err
32+
}
33+
34+
startProvider := wasmbus.NewRequestHandler(StartProviderRequest{}, Response{}, s.api.StartProvider)
35+
if err := handler.RegisterType("startProvider", startProvider); err != nil {
36+
return err
37+
}
38+
39+
performInvocation := wasmbus.NewRequestHandler(PerformInvocationRequest{}, Response{}, s.api.PerformInvocation)
40+
if err := handler.RegisterType("performInvocation", performInvocation); err != nil {
41+
return err
42+
}
43+
44+
return s.RegisterHandler(s.subject, handler)
45+
}
46+
47+
func extractType(ctx context.Context, msg *wasmbus.Message) (string, error) {
48+
var baseReq BaseRequest[json.RawMessage]
49+
50+
if err := wasmbus.Decode(msg, &baseReq); err != nil {
51+
return "", err
52+
}
53+
54+
switch baseReq.Kind {
55+
case "startComponent", "startProvider", "performInvocation":
56+
return baseReq.Kind, nil
57+
default:
58+
return "", fmt.Errorf("unknown request kind: %s", baseReq.Kind)
59+
}
60+
}

0 commit comments

Comments
 (0)