Skip to content

Commit 6b06b50

Browse files
authored
[tau-cli] run functions with no dream (#421)
1 parent cd74610 commit 6b06b50

File tree

45 files changed

+1738
-24
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1738
-24
lines changed

.gitignore

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ develop-eggs/
9292
downloads/
9393
eggs/
9494
.eggs/
95-
lib/
96-
lib64/
95+
/lib/
96+
/lib64/
9797
parts/
9898
sdist/
9999
var/

core/vm/config.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,4 +11,5 @@ type OutputType uint32
1111
const (
1212
Pipe OutputType = iota
1313
Buffer
14+
Stdio
1415
)

pkg/builder/methods.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -100,14 +100,13 @@ func (b *builder) run(output *output, image *ci.DockerImage, environment specs.E
100100
return b.Errorf("instantiating container failed with: %w", err)
101101
}
102102

103-
log, err := container.Run(b.context)
103+
log, runErr := container.Run(b.context)
104104
if log != nil {
105-
_, err = io.Copy(b.output, log.Combined())
106-
if err != nil {
107-
return b.Errorf("writting container output failed with: %w", err)
105+
if _, copyErr := io.Copy(b.output, log.Combined()); copyErr != nil {
106+
return b.Errorf("writing container output failed with: %w", copyErr)
108107
}
109108
}
110-
if err != nil {
109+
if runErr != nil {
111110
json.NewEncoder(b.output).Encode(struct {
112111
Step string `json:"step"`
113112
Timestamp int64 `json:"timestamp"`
@@ -117,9 +116,9 @@ func (b *builder) run(output *output, image *ci.DockerImage, environment specs.E
117116
Step: script,
118117
Timestamp: time.Now().UnixNano(),
119118
Status: "error",
120-
Error: err.Error(),
119+
Error: runErr.Error(),
121120
})
122-
return b.Errorf("running container failed with: %w", err)
121+
return b.Errorf("running container failed with: %w", runErr)
123122
}
124123
json.NewEncoder(b.output).Encode(struct {
125124
Step string `json:"step"`

pkg/builder/new.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"io"
77
"os"
88

9+
"github.com/jedib0t/go-pretty/v6/table"
910
iface "github.com/taubyte/tau/core/builders"
1011
ci "github.com/taubyte/tau/pkg/containers"
1112
"github.com/taubyte/tau/pkg/specs/builders"
@@ -58,8 +59,17 @@ func New(ctx context.Context, output io.Writer, workDir string) (iface.Builder,
5859
}
5960

6061
env := b.config.HandleDepreciatedEnvironment()
61-
fmt.Fprintf(b.output, "tau build: workDir=%s config=%s image=%s workflow=%v\n",
62-
b.wd.String(), b.wd.ConfigFile(), env.Image, b.config.Workflow)
62+
t := table.NewWriter()
63+
t.SetOutputMirror(b.output)
64+
t.SetStyle(table.StyleLight)
65+
t.AppendRows([]table.Row{
66+
{"Working Directory", b.wd.String()},
67+
{"Config File", b.wd.ConfigFile()},
68+
{"Image", env.Image},
69+
{"Workflow", fmt.Sprint(b.config.Workflow)},
70+
})
71+
t.Render()
72+
fmt.Fprintln(b.output)
6373

6474
// set tarball if any
6575
return b, b.setTarball()

pkg/containers/container.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
)
77

88
// Run starts the container and waits for the container to exit before returning the container logs.
9+
// Logs are returned even when the container exits with a non-zero status so callers
10+
// can inspect build output (e.g. compiler errors).
911
func (c *Container) Run(ctx context.Context) (*MuxedReadCloser, error) {
1012
imageName := ""
1113
if c.image != nil {
@@ -16,28 +18,31 @@ func (c *Container) Run(ctx context.Context) (*MuxedReadCloser, error) {
1618
return nil, errorContainerStart(c.id, imageName, err)
1719
}
1820

19-
if err := c.Wait(ctx); err != nil {
20-
return nil, err
21-
}
21+
waitErr := c.Wait(ctx)
2222

2323
info, err := c.backend.Inspect(ctx, c.id)
2424
if err != nil {
25+
if waitErr != nil {
26+
return nil, waitErr
27+
}
2528
return nil, errorContainerInspect(c.id, imageName, err)
2629
}
2730

28-
var RetCodeErr error
29-
if info.ExitCode != 0 {
30-
RetCodeErr = errorContainerExitCode(c.id, imageName, info.ExitCode)
31+
if waitErr == nil && info.ExitCode != 0 {
32+
waitErr = errorContainerExitCode(c.id, imageName, info.ExitCode)
3133
}
3234

3335
muxed, err := c.backend.Logs(ctx, c.id)
3436
if err != nil {
37+
if waitErr != nil {
38+
return nil, waitErr
39+
}
3540
return nil, errorContainerLogs(c.id, imageName, err)
3641
}
3742

3843
c.Cleanup(ctx)
3944

40-
return &MuxedReadCloser{reader: muxed}, RetCodeErr
45+
return &MuxedReadCloser{reader: muxed}, waitErr
4146
}
4247

4348
// Wait calls the ContainerWait method for the container, and returns once a response has been received.

pkg/vm/backend/file/backend.go

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,24 @@
33
package file
44

55
import (
6+
"archive/zip"
7+
"compress/gzip"
8+
"fmt"
69
"io"
710
"os"
811

12+
"github.com/h2non/filetype"
13+
"github.com/h2non/filetype/matchers"
914
"github.com/taubyte/tau/core/vm"
15+
"github.com/taubyte/tau/pkg/specs/builders/wasm"
1016
"github.com/taubyte/tau/pkg/vm/backend/errors"
1117

1218
ma "github.com/multiformats/go-multiaddr"
1319
resolv "github.com/taubyte/tau/pkg/vm/resolvers/taubyte"
1420
)
1521

22+
const headerSize = 512
23+
1624
type backend struct{}
1725

1826
func New() vm.Backend {
@@ -27,6 +35,28 @@ func (b *backend) Scheme() string {
2735
return resolv.FILE_PROTOCOL_NAME
2836
}
2937

38+
// zipEntryReadCloser closes both the zip entry reader and the underlying file.
39+
type zipEntryReadCloser struct {
40+
io.ReadCloser
41+
file *os.File
42+
}
43+
44+
func (z *zipEntryReadCloser) Close() error {
45+
z.ReadCloser.Close()
46+
return z.file.Close()
47+
}
48+
49+
// gzipFileReadCloser closes both the gzip reader and the underlying file.
50+
type gzipFileReadCloser struct {
51+
*gzip.Reader
52+
file *os.File
53+
}
54+
55+
func (g *gzipFileReadCloser) Close() error {
56+
g.Reader.Close()
57+
return g.file.Close()
58+
}
59+
3060
func (b *backend) Get(multiAddr ma.Multiaddr) (io.ReadCloser, error) {
3161
protocols := multiAddr.Protocols()
3262
if protocols[0].Code != resolv.P_FILE {
@@ -38,13 +68,61 @@ func (b *backend) Get(multiAddr ma.Multiaddr) (io.ReadCloser, error) {
3868
return nil, errors.ParseProtocol(resolv.FILE_PROTOCOL_NAME, err)
3969
}
4070

41-
// remove extra slash
4271
path = path[1:]
4372

4473
file, err := os.Open(path)
4574
if err != nil {
4675
return nil, errors.RetrieveError(path, err, b)
4776
}
4877

49-
return file, nil
78+
header := make([]byte, headerSize)
79+
n, err := file.Read(header)
80+
if err != nil && err != io.EOF {
81+
file.Close()
82+
return nil, fmt.Errorf("reading file header: %w", err)
83+
}
84+
header = header[:n]
85+
86+
if _, err := file.Seek(0, io.SeekStart); err != nil {
87+
file.Close()
88+
return nil, fmt.Errorf("seeking to start: %w", err)
89+
}
90+
91+
kind, err := filetype.Match(header)
92+
if err != nil {
93+
file.Close()
94+
return nil, fmt.Errorf("matching file type: %w", err)
95+
}
96+
97+
switch kind {
98+
case matchers.TypeZip:
99+
info, err := file.Stat()
100+
if err != nil {
101+
file.Close()
102+
return nil, fmt.Errorf("stating file: %w", err)
103+
}
104+
zr, err := zip.NewReader(file, info.Size())
105+
if err != nil {
106+
file.Close()
107+
return nil, fmt.Errorf("opening zip: %w", err)
108+
}
109+
entryReader, err := zr.Open(wasm.WasmFile)
110+
if err != nil {
111+
entryReader, err = zr.Open(wasm.DeprecatedWasmFile)
112+
if err != nil {
113+
file.Close()
114+
return nil, fmt.Errorf("zip has no %q or %q: %w", wasm.WasmFile, wasm.DeprecatedWasmFile, err)
115+
}
116+
}
117+
return &zipEntryReadCloser{ReadCloser: entryReader, file: file}, nil
118+
case matchers.TypeGz:
119+
gzReader, err := gzip.NewReader(file)
120+
if err != nil {
121+
file.Close()
122+
return nil, fmt.Errorf("opening gzip: %w", err)
123+
}
124+
return &gzipFileReadCloser{Reader: gzReader, file: file}, nil
125+
default:
126+
return file, nil
127+
}
50128
}

pkg/vm/resolvers/file/resolver.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
package file
2+
3+
import (
4+
"path/filepath"
5+
6+
ma "github.com/multiformats/go-multiaddr"
7+
8+
"github.com/taubyte/tau/core/vm"
9+
_ "github.com/taubyte/tau/pkg/vm/resolvers/taubyte"
10+
)
11+
12+
type resolver struct {
13+
wasmPath string
14+
}
15+
16+
// New returns a vm.Resolver that maps any module name to a file multiaddr for the given WASM path.
17+
// wasmPath is resolved to an absolute path so it works regardless of working directory.
18+
func New(wasmPath string) vm.Resolver {
19+
abs, err := filepath.Abs(wasmPath)
20+
if err != nil {
21+
abs = wasmPath
22+
}
23+
return &resolver{wasmPath: abs}
24+
}
25+
26+
func (r *resolver) Lookup(ctx vm.Context, name string) (ma.Multiaddr, error) {
27+
return ma.NewMultiaddr("/file/" + r.wasmPath)
28+
}

pkg/vm/service/wazero/output.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ func (b *buffer) Read(p []byte) (n int, err error) {
2727

2828
func (b *buffer) Write(p []byte) (n int, err error) {
2929
if b.buffer != nil {
30-
b.buffer.Write(p)
30+
return b.buffer.Write(p)
3131
}
3232

3333
return 0, errors.New("buffer is closed")
@@ -96,3 +96,25 @@ func (p *pipe) Close() error {
9696
}
9797

9898
var MaxOutputCapacity = 10 * 1024
99+
100+
// stdioWriter forwards writes to a standard stream (os.Stdout or os.Stderr).
101+
// Read returns nothing; Close is a no-op so the process stdio is not closed.
102+
type stdioWriter struct {
103+
*os.File
104+
}
105+
106+
func (s *stdioWriter) Read(p []byte) (n int, err error) {
107+
return 0, io.EOF
108+
}
109+
110+
func (s *stdioWriter) Close() error {
111+
return nil
112+
}
113+
114+
func newStdout() io.ReadWriteCloser {
115+
return &stdioWriter{File: os.Stdout}
116+
}
117+
118+
func newStderr() io.ReadWriteCloser {
119+
return &stdioWriter{File: os.Stderr}
120+
}

pkg/vm/service/wazero/runtime.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,14 @@ func (r *runtime) instantiate(name string, compiled wazero.CompiledModule, hasRe
154154
return nil, fmt.Errorf("instantiating compiled module `%s` failed with: %s", name, err)
155155
}
156156

157-
if _start := m.ExportedFunction("_start"); _start != nil {
157+
// reactor modules export _initialize to set up globals (e.g. os.Stdout).
158+
// Must be called before any other exported function.
159+
if _initialize := m.ExportedFunction("_initialize"); _initialize != nil {
160+
if _, err := _initialize.Call(ctx); err != nil {
161+
return nil, fmt.Errorf("calling _initialize for module `%s`: %w", name, err)
162+
}
163+
// TODO: this should be deleted later as we should only support reactor modules
164+
} else if _start := m.ExportedFunction("_start"); _start != nil {
158165
if hasReady {
159166
go func() {
160167
_start.Call(ctx)

pkg/vm/service/wazero/service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ func (s *service) New(ctx vm.Context, config vm.Config) (vm.Instance, error) {
1818
case vm.Buffer:
1919
r.output = newBuffer()
2020
r.outputErr = newBuffer()
21+
case vm.Stdio:
22+
r.output = newStdout()
23+
r.outputErr = newStderr()
2124
default:
2225
var err error
2326
if r.output, err = newPipe(); err != nil {

0 commit comments

Comments
 (0)