Skip to content

Commit 727bbdf

Browse files
committed
feat: wasmbus
Signed-off-by: Lucas Fontes <[email protected]>
1 parent bf00a1d commit 727bbdf

File tree

94 files changed

+6815
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

94 files changed

+6815
-0
lines changed

x/wasmbus/bus.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package wasmbus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"strings"
8+
)
9+
10+
const (
11+
// PatternAll is a wildcard pattern that matches all subjects.
12+
PatternAll = "*"
13+
14+
// NoBackLog is used to indicate that the subscription should not have a backlog.
15+
NoBackLog = 0
16+
17+
// PrefixWadm is the prefix for WasmCloud Admin API.
18+
PrefixWadm = "wadm.api"
19+
// PrefixEvents is the prefix for Lattice Events.
20+
PrefixEvents = "wasmbus.evt"
21+
// PrefixControl is the prefix for Lattice RPC.
22+
PrefixCtlV1 = "wasmbus.ctl.v1"
23+
)
24+
25+
var (
26+
// ErrEncode is returned when encoding a message fails.
27+
ErrEncode = errors.New("encode error")
28+
// ErrInternal is returned when an internal error occurs.
29+
ErrInternal = errors.New("internal error")
30+
// ErrDecode is returned when decoding a message fails.
31+
ErrDecode = errors.New("decode error")
32+
// ErrTransport is returned when a transport error occurs.
33+
ErrTransport = errors.New("transport error")
34+
// ErrOperation is returned when an operation error occurs.
35+
ErrOperation = errors.New("operation error")
36+
// ErrValidation is returned when a validation error occurs.
37+
ErrValidation = errors.New("validation error")
38+
)
39+
40+
// Bus is the interface for the message bus.
41+
// It provides methods for subscribing to messages and sending messages.
42+
// It doesn't hold any state and is safe for concurrent use. See `Subscription` for stateful operations.
43+
// Modeled after the NATS API.
44+
type Bus interface {
45+
// Subscribe creates a subscription for the given subject.
46+
// The backlog parameter is the maximum number of messages that can be buffered in memory.
47+
Subscribe(subject string, backlog int) (Subscription, error)
48+
// QueueSubscribe creates a subscription for the given subject and queue group.
49+
// The backlog parameter is the maximum number of messages that can be buffered in memory.
50+
QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)
51+
// Request sends a request message and waits for a reply.
52+
// The context is used for the request timeout.
53+
Request(ctx context.Context, msg *Message) (*Message, error)
54+
// Publish sends a message to `msg.Subject`.
55+
Publish(msg *Message) error
56+
}
57+
58+
// Subscription is the interface for a message subscription.
59+
// It provides methods for handling messages and draining the subscription.
60+
// Subscriptions run in their own goroutine.
61+
type Subscription interface {
62+
// Handle starts the subscription and calls the callback for each message.
63+
// Does not block.
64+
Handle(callback SubscriptionCallback)
65+
// Drain stops the subscription and closes the channel.
66+
// Blocks until all messages are processed, releasing the Subscription Thread.
67+
Drain() error
68+
}
69+
70+
// SubscriptionCallback is the callback type for message subscriptions.
71+
// Subscriptions are single threaded and the callback is called sequentially for each message.
72+
// Callers should handle concurrency and synchronization themselves.
73+
type SubscriptionCallback func(msg *Message)
74+
75+
// Header is the Message header type.
76+
type Header = http.Header
77+
78+
// Message is the message type for the message bus.
79+
// Modeled after the NATS message type.
80+
type Message struct {
81+
Subject string
82+
Reply string
83+
Header Header
84+
Data []byte
85+
86+
// For replies
87+
bus Bus
88+
}
89+
90+
// Bus returns the bus that the message was received on or sent to
91+
// Might be null.
92+
func (m *Message) Bus() Bus {
93+
return m.bus
94+
}
95+
96+
// LastSubjectPart returns the last part of the subject.
97+
// Example: "a.b.c" -> "c"
98+
func (m *Message) LastSubjectPart() string {
99+
parts := m.SubjectParts()
100+
return parts[len(parts)-1]
101+
}
102+
103+
// SubjectParts returns the parts of the subject.
104+
// Example: "a.b.c" -> ["a", "b", "c"]
105+
func (m *Message) SubjectParts() []string {
106+
return strings.Split(m.Subject, ".")
107+
}
108+
109+
// NewMessage creates a new message with the given subject.
110+
func NewMessage(subject string) *Message {
111+
return &Message{
112+
Header: make(Header),
113+
Subject: subject,
114+
}
115+
}

x/wasmbus/client.go

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
package wasmbus
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
9+
"gopkg.in/yaml.v2"
10+
)
11+
12+
// LatticeRequest encodes the Roundtrip logic for a Bus Request
13+
// This is a generic implementation that can be used with any Bus
14+
// and any Request/Response pair.
15+
// Requests are encoded in json and Responses can be either json or yaml.
16+
// Use Pre & Post Request hooks to modify the request/response before/after.
17+
// The `T` and `Y` types are used to define the Request and Response types.
18+
// `T` should be passed by reference (pointer) and `Y` by value (struct).
19+
type LatticeRequest[T any, Y any] struct {
20+
// Request should be a reference to the request struct
21+
Request T
22+
// Response should be a struct that the response will be unmarshaled into
23+
// and should be passed by value
24+
Response Y
25+
Subject string
26+
Bus Bus
27+
PreRequest func(context.Context, T, *Message) error
28+
PostRequest func(context.Context, *Y, *Message) error
29+
}
30+
31+
// NewLatticeRequest returns a `LatticeRequest` for a given type.
32+
// The `T` and `Y` types are used to define the Request and Response types.
33+
// `T` should be passed by reference (pointer) and `Y` by value (struct).
34+
func NewLatticeRequest[T any, Y any](bus Bus, subject string, in T, out Y) *LatticeRequest[T, Y] {
35+
return &LatticeRequest[T, Y]{
36+
Request: in,
37+
Response: out,
38+
Bus: bus,
39+
Subject: subject,
40+
}
41+
}
42+
43+
// Execute sends the request to the bus and returns the response.
44+
func (l *LatticeRequest[T, Y]) Execute(ctx context.Context) (*Y, error) {
45+
req, err := Encode(l.Subject, l.Request)
46+
if err != nil {
47+
return nil, fmt.Errorf("%w: %s", ErrEncode, err)
48+
}
49+
50+
if l.PreRequest != nil {
51+
if err := l.PreRequest(ctx, l.Request, req); err != nil {
52+
return nil, fmt.Errorf("%w: %s", ErrOperation, err)
53+
}
54+
}
55+
56+
rawResp, err := l.Bus.Request(ctx, req)
57+
if err != nil {
58+
return nil, fmt.Errorf("%w: %s", ErrTransport, err)
59+
}
60+
61+
if err := Decode(rawResp, &l.Response); err != nil {
62+
return nil, fmt.Errorf("%w: %s", ErrDecode, err)
63+
}
64+
65+
if l.PostRequest != nil {
66+
if err := l.PostRequest(ctx, &l.Response, rawResp); err != nil {
67+
return nil, fmt.Errorf("%w: %s", ErrOperation, err)
68+
}
69+
}
70+
71+
return &l.Response, nil
72+
}
73+
74+
// Encode marshals the payload into a Message.
75+
// The payload is encoded as json.
76+
func Encode(subject string, payload any) (*Message, error) {
77+
wasmbusMsg := NewMessage(subject)
78+
wasmbusMsg.Header.Set("Content-Type", "application/json")
79+
80+
if payload != nil {
81+
var err error
82+
wasmbusMsg.Data, err = json.Marshal(payload)
83+
if err != nil {
84+
return nil, fmt.Errorf("%w: %s", ErrInternal, err)
85+
}
86+
}
87+
88+
return wasmbusMsg, nil
89+
}
90+
91+
// Decode unmarshals the raw response data into the provided struct.
92+
// The content type is used to determine the unmarshaling format.
93+
// If the content type is not supported, an error is returned.
94+
// Acceptable content types are "application/json" and "application/yaml".
95+
func Decode(rawResp *Message, into any) error {
96+
if len(rawResp.Data) == 0 {
97+
return nil
98+
}
99+
100+
contentType := rawResp.Header.Get("Content-Type")
101+
switch contentType {
102+
case "application/json", "":
103+
return json.Unmarshal(rawResp.Data, into)
104+
case "application/yaml":
105+
return yaml.Unmarshal(rawResp.Data, into)
106+
default:
107+
return errors.New("unsupported content type")
108+
}
109+
}

0 commit comments

Comments
 (0)