Skip to content

Commit 64aadc7

Browse files
committed
chore: checkin
Signed-off-by: Lucas Fontes <[email protected]>
1 parent 727bbdf commit 64aadc7

File tree

19 files changed

+1435
-103
lines changed

19 files changed

+1435
-103
lines changed

x/wasmbus/bus.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import (
55
"errors"
66
"net/http"
77
"strings"
8+
9+
"github.com/nats-io/nats.go"
810
)
911

1012
const (
@@ -113,3 +115,7 @@ func NewMessage(subject string) *Message {
113115
Subject: subject,
114116
}
115117
}
118+
119+
func NewInbox() string {
120+
return nats.NewInbox()
121+
}

x/wasmbus/client.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import (
66
"errors"
77
"fmt"
88

9-
"gopkg.in/yaml.v2"
9+
yaml "github.com/goccy/go-yaml"
1010
)
1111

1212
// LatticeRequest encodes the Roundtrip logic for a Bus Request
@@ -74,18 +74,22 @@ func (l *LatticeRequest[T, Y]) Execute(ctx context.Context) (*Y, error) {
7474
// Encode marshals the payload into a Message.
7575
// The payload is encoded as json.
7676
func Encode(subject string, payload any) (*Message, error) {
77+
var err error
7778
wasmbusMsg := NewMessage(subject)
7879
wasmbusMsg.Header.Set("Content-Type", "application/json")
80+
wasmbusMsg.Data, err = EncodeMimetype(payload, "application/json")
81+
return wasmbusMsg, err
82+
}
7983

80-
if payload != nil {
81-
var err error
82-
wasmbusMsg.Data, err = json.Marshal(payload)
83-
if err != nil {
84-
return nil, fmt.Errorf("%w: %s", ErrInternal, err)
85-
}
84+
func EncodeMimetype(payload any, mimeType string) ([]byte, error) {
85+
switch mimeType {
86+
case "application/json", "":
87+
return json.Marshal(payload)
88+
case "application/yaml":
89+
return yaml.Marshal(payload)
90+
default:
91+
return nil, errors.New("unsupported content type")
8692
}
87-
88-
return wasmbusMsg, nil
8993
}
9094

9195
// Decode unmarshals the raw response data into the provided struct.

x/wasmbus/cmd/wasmcloud-provider-shim/main.go

Lines changed: 0 additions & 7 deletions
This file was deleted.
Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"os"
7+
8+
"github.com/joho/godotenv"
9+
"github.com/urfave/cli/v3"
10+
"go.wasmcloud.dev/wasmbus/control"
11+
)
12+
13+
func configCommand() *cli.Command {
14+
var targetName string
15+
var nameArg = &cli.StringArg{
16+
Name: "name",
17+
Destination: &targetName,
18+
Max: 1,
19+
}
20+
return &cli.Command{
21+
Name: "config",
22+
Usage: "Interact with Lattice Config",
23+
Flags: []cli.Flag{},
24+
Commands: []*cli.Command{
25+
{
26+
Name: "get",
27+
Usage: "Get a config",
28+
Action: wrapNamedAction(getConfigCommand, &targetName),
29+
Arguments: []cli.Argument{
30+
nameArg,
31+
},
32+
},
33+
{
34+
Name: "delete",
35+
Usage: "Delete a config",
36+
Action: wrapNamedAction(deleteConfigCommand, &targetName),
37+
Arguments: []cli.Argument{
38+
nameArg,
39+
},
40+
},
41+
{
42+
Name: "put",
43+
Usage: "Put a config",
44+
Action: putConfigCommand,
45+
Flags: []cli.Flag{
46+
&cli.StringFlag{
47+
Name: "name",
48+
Usage: "Name of the config to store",
49+
Required: true,
50+
},
51+
&cli.StringFlag{
52+
Name: "file",
53+
Aliases: []string{"f"},
54+
Usage: "File to read config from, in dotenv format (one KEY=VALUE per line)",
55+
Required: true,
56+
},
57+
},
58+
Arguments: []cli.Argument{
59+
nameArg,
60+
},
61+
},
62+
},
63+
}
64+
}
65+
66+
func deleteConfigCommand(ctx context.Context, cmd *cli.Command, name string) error {
67+
client, err := controlClientFromCommand(cmd)
68+
if err != nil {
69+
return err
70+
}
71+
72+
resp, err := client.ConfigDelete(ctx, &control.ConfigDeleteRequest{
73+
Name: name,
74+
})
75+
if err != nil {
76+
return err
77+
}
78+
if !resp.Success {
79+
return fmt.Errorf("received error response: %s", resp.Message)
80+
}
81+
82+
fmt.Println(titleStyle.Render("⁜", name), "deleted")
83+
84+
return nil
85+
}
86+
87+
func getConfigCommand(ctx context.Context, cmd *cli.Command, name string) error {
88+
client, err := controlClientFromCommand(cmd)
89+
if err != nil {
90+
return err
91+
}
92+
93+
resp, err := client.ConfigGet(ctx, &control.ConfigGetRequest{
94+
Name: name,
95+
})
96+
if err != nil {
97+
return err
98+
}
99+
if !resp.Success {
100+
return fmt.Errorf("received error response: %s", resp.Message)
101+
}
102+
103+
// NOTE(lxf): This feels like a bug in hosts' crates/host/src/wasmbus/mod.rs
104+
if resp.Message == "Configuration not found" {
105+
return fmt.Errorf("configuration not found")
106+
}
107+
108+
config, err := godotenv.Marshal(resp.Response)
109+
if err != nil {
110+
return err
111+
}
112+
fmt.Println(config)
113+
114+
return nil
115+
}
116+
117+
func putConfigCommand(ctx context.Context, cmd *cli.Command) error {
118+
name := cmd.String("name")
119+
client, err := controlClientFromCommand(cmd)
120+
if err != nil {
121+
return err
122+
}
123+
124+
f, err := os.ReadFile(cmd.String("file"))
125+
if err != nil {
126+
return err
127+
}
128+
129+
var values map[string]string
130+
if values, err = godotenv.UnmarshalBytes(f); err != nil {
131+
return err
132+
}
133+
134+
resp, err := client.ConfigPut(ctx, &control.ConfigPutRequest{
135+
Name: name,
136+
Values: values,
137+
})
138+
if err != nil {
139+
return err
140+
}
141+
if !resp.Success {
142+
return fmt.Errorf("received error response: %s", resp.Message)
143+
}
144+
145+
fmt.Println(titleStyle.Render("⁜", name), "stored")
146+
147+
return nil
148+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"log"
8+
"time"
9+
10+
"github.com/urfave/cli/v3"
11+
"go.wasmcloud.dev/wasmbus"
12+
"go.wasmcloud.dev/wasmbus/events"
13+
)
14+
15+
func eventCommand() *cli.Command {
16+
return &cli.Command{
17+
Name: "events",
18+
Usage: "listen for lattice events",
19+
Flags: []cli.Flag{
20+
&cli.StringFlag{
21+
Name: "pattern",
22+
Value: wasmbus.PatternAll,
23+
Usage: "The event pattern to subscribe to",
24+
},
25+
&cli.IntFlag{
26+
Name: "backlog",
27+
Value: wasmbus.NoBackLog,
28+
Usage: "Bus backlog size. Default is no backlog",
29+
},
30+
},
31+
Action: func(ctx context.Context, cmd *cli.Command) error {
32+
bus, err := busFromCommand(cmd)
33+
if err != nil {
34+
return err
35+
}
36+
37+
lattice := cmd.String("lattice")
38+
pattern := cmd.String("pattern")
39+
backlog := cmd.Int("backlog")
40+
41+
callback := func(_ context.Context, ev events.Event) {
42+
jsonEv, err := json.MarshalIndent(ev.BusEvent, "", " ")
43+
if err != nil {
44+
log.Printf("Error marshalling event: %s", err)
45+
return
46+
}
47+
48+
fmt.Println(titleStyle.Render("⁜", ev.CloudEvent.Type()))
49+
renderKv("Time", ev.CloudEvent.Time().Format(time.RFC3339))
50+
renderKv("Source", ev.CloudEvent.Source())
51+
renderKv("Id", ev.CloudEvent.ID())
52+
fmt.Println(string(jsonEv))
53+
}
54+
subscription, err := events.Subscribe(bus, lattice, pattern, int(backlog), events.DiscardErrorsHandler(callback))
55+
if err != nil {
56+
return err
57+
}
58+
defer subscription.Drain()
59+
60+
log.Printf("Listening for events on lattice '%s' with pattern '%s'", lattice, pattern)
61+
<-ctx.Done()
62+
log.Printf("Shutting down event listener")
63+
64+
return nil
65+
},
66+
}
67+
}

0 commit comments

Comments
 (0)