Skip to content

Commit 422fbb2

Browse files
committed
chore: moving directory
Signed-off-by: Lucas Fontes <[email protected]>
1 parent 64aadc7 commit 422fbb2

File tree

99 files changed

+8147
-0
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

99 files changed

+8147
-0
lines changed

wasmbus/bus.go

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
package wasmbus
2+
3+
import (
4+
"context"
5+
"errors"
6+
"net/http"
7+
"strings"
8+
9+
"github.com/nats-io/nats.go"
10+
)
11+
12+
const (
13+
// PatternAll is a wildcard pattern that matches all subjects.
14+
PatternAll = "*"
15+
16+
// NoBackLog is used to indicate that the subscription should not have a backlog.
17+
NoBackLog = 0
18+
19+
// PrefixWadm is the prefix for WasmCloud Admin API.
20+
PrefixWadm = "wadm.api"
21+
// PrefixEvents is the prefix for Lattice Events.
22+
PrefixEvents = "wasmbus.evt"
23+
// PrefixControl is the prefix for Lattice RPC.
24+
PrefixCtlV1 = "wasmbus.ctl.v1"
25+
)
26+
27+
var (
28+
// ErrEncode is returned when encoding a message fails.
29+
ErrEncode = errors.New("encode error")
30+
// ErrInternal is returned when an internal error occurs.
31+
ErrInternal = errors.New("internal error")
32+
// ErrDecode is returned when decoding a message fails.
33+
ErrDecode = errors.New("decode error")
34+
// ErrTransport is returned when a transport error occurs.
35+
ErrTransport = errors.New("transport error")
36+
// ErrOperation is returned when an operation error occurs.
37+
ErrOperation = errors.New("operation error")
38+
// ErrValidation is returned when a validation error occurs.
39+
ErrValidation = errors.New("validation error")
40+
)
41+
42+
// Bus is the interface for the message bus.
43+
// It provides methods for subscribing to messages and sending messages.
44+
// It doesn't hold any state and is safe for concurrent use. See `Subscription` for stateful operations.
45+
// Modeled after the NATS API.
46+
type Bus interface {
47+
// Subscribe creates a subscription for the given subject.
48+
// The backlog parameter is the maximum number of messages that can be buffered in memory.
49+
Subscribe(subject string, backlog int) (Subscription, error)
50+
// QueueSubscribe creates a subscription for the given subject and queue group.
51+
// The backlog parameter is the maximum number of messages that can be buffered in memory.
52+
QueueSubscribe(subject string, queue string, backlog int) (Subscription, error)
53+
// Request sends a request message and waits for a reply.
54+
// The context is used for the request timeout.
55+
Request(ctx context.Context, msg *Message) (*Message, error)
56+
// Publish sends a message to `msg.Subject`.
57+
Publish(msg *Message) error
58+
}
59+
60+
// Subscription is the interface for a message subscription.
61+
// It provides methods for handling messages and draining the subscription.
62+
// Subscriptions run in their own goroutine.
63+
type Subscription interface {
64+
// Handle starts the subscription and calls the callback for each message.
65+
// Does not block.
66+
Handle(callback SubscriptionCallback)
67+
// Drain stops the subscription and closes the channel.
68+
// Blocks until all messages are processed, releasing the Subscription Thread.
69+
Drain() error
70+
}
71+
72+
// SubscriptionCallback is the callback type for message subscriptions.
73+
// Subscriptions are single threaded and the callback is called sequentially for each message.
74+
// Callers should handle concurrency and synchronization themselves.
75+
type SubscriptionCallback func(msg *Message)
76+
77+
// Header is the Message header type.
78+
type Header = http.Header
79+
80+
// Message is the message type for the message bus.
81+
// Modeled after the NATS message type.
82+
type Message struct {
83+
Subject string
84+
Reply string
85+
Header Header
86+
Data []byte
87+
88+
// For replies
89+
bus Bus
90+
}
91+
92+
// Bus returns the bus that the message was received on or sent to
93+
// Might be null.
94+
func (m *Message) Bus() Bus {
95+
return m.bus
96+
}
97+
98+
// LastSubjectPart returns the last part of the subject.
99+
// Example: "a.b.c" -> "c"
100+
func (m *Message) LastSubjectPart() string {
101+
parts := m.SubjectParts()
102+
return parts[len(parts)-1]
103+
}
104+
105+
// SubjectParts returns the parts of the subject.
106+
// Example: "a.b.c" -> ["a", "b", "c"]
107+
func (m *Message) SubjectParts() []string {
108+
return strings.Split(m.Subject, ".")
109+
}
110+
111+
// NewMessage creates a new message with the given subject.
112+
func NewMessage(subject string) *Message {
113+
return &Message{
114+
Header: make(Header),
115+
Subject: subject,
116+
}
117+
}
118+
119+
func NewInbox() string {
120+
return nats.NewInbox()
121+
}

wasmbus/client.go

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
package wasmbus
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
9+
yaml "github.com/goccy/go-yaml"
10+
)
11+
12+
// LatticeRequest encodes the Roundtrip logic for a Bus Request
13+
// This is a generic implementation that can be used with any Bus
14+
// and any Request/Response pair.
15+
// Requests are encoded in json and Responses can be either json or yaml.
16+
// Use Pre & Post Request hooks to modify the request/response before/after.
17+
// The `T` and `Y` types are used to define the Request and Response types.
18+
// `T` should be passed by reference (pointer) and `Y` by value (struct).
19+
type LatticeRequest[T any, Y any] struct {
20+
// Request should be a reference to the request struct
21+
Request T
22+
// Response should be a struct that the response will be unmarshaled into
23+
// and should be passed by value
24+
Response Y
25+
Subject string
26+
Bus Bus
27+
PreRequest func(context.Context, T, *Message) error
28+
PostRequest func(context.Context, *Y, *Message) error
29+
}
30+
31+
// NewLatticeRequest returns a `LatticeRequest` for a given type.
32+
// The `T` and `Y` types are used to define the Request and Response types.
33+
// `T` should be passed by reference (pointer) and `Y` by value (struct).
34+
func NewLatticeRequest[T any, Y any](bus Bus, subject string, in T, out Y) *LatticeRequest[T, Y] {
35+
return &LatticeRequest[T, Y]{
36+
Request: in,
37+
Response: out,
38+
Bus: bus,
39+
Subject: subject,
40+
}
41+
}
42+
43+
// Execute sends the request to the bus and returns the response.
44+
func (l *LatticeRequest[T, Y]) Execute(ctx context.Context) (*Y, error) {
45+
req, err := Encode(l.Subject, l.Request)
46+
if err != nil {
47+
return nil, fmt.Errorf("%w: %s", ErrEncode, err)
48+
}
49+
50+
if l.PreRequest != nil {
51+
if err := l.PreRequest(ctx, l.Request, req); err != nil {
52+
return nil, fmt.Errorf("%w: %s", ErrOperation, err)
53+
}
54+
}
55+
56+
rawResp, err := l.Bus.Request(ctx, req)
57+
if err != nil {
58+
return nil, fmt.Errorf("%w: %s", ErrTransport, err)
59+
}
60+
61+
if err := Decode(rawResp, &l.Response); err != nil {
62+
return nil, fmt.Errorf("%w: %s", ErrDecode, err)
63+
}
64+
65+
if l.PostRequest != nil {
66+
if err := l.PostRequest(ctx, &l.Response, rawResp); err != nil {
67+
return nil, fmt.Errorf("%w: %s", ErrOperation, err)
68+
}
69+
}
70+
71+
return &l.Response, nil
72+
}
73+
74+
// Encode marshals the payload into a Message.
75+
// The payload is encoded as json.
76+
func Encode(subject string, payload any) (*Message, error) {
77+
var err error
78+
wasmbusMsg := NewMessage(subject)
79+
wasmbusMsg.Header.Set("Content-Type", "application/json")
80+
wasmbusMsg.Data, err = EncodeMimetype(payload, "application/json")
81+
return wasmbusMsg, err
82+
}
83+
84+
func EncodeMimetype(payload any, mimeType string) ([]byte, error) {
85+
switch mimeType {
86+
case "application/json", "":
87+
return json.Marshal(payload)
88+
case "application/yaml":
89+
return yaml.Marshal(payload)
90+
default:
91+
return nil, errors.New("unsupported content type")
92+
}
93+
}
94+
95+
// Decode unmarshals the raw response data into the provided struct.
96+
// The content type is used to determine the unmarshaling format.
97+
// If the content type is not supported, an error is returned.
98+
// Acceptable content types are "application/json" and "application/yaml".
99+
func Decode(rawResp *Message, into any) error {
100+
if len(rawResp.Data) == 0 {
101+
return nil
102+
}
103+
104+
contentType := rawResp.Header.Get("Content-Type")
105+
switch contentType {
106+
case "application/json", "":
107+
return json.Unmarshal(rawResp.Data, into)
108+
case "application/yaml":
109+
return yaml.Unmarshal(rawResp.Data, into)
110+
default:
111+
return errors.New("unsupported content type")
112+
}
113+
}

0 commit comments

Comments
 (0)