Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/workflows/artifacts/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (a *Artifacts) GetWorkflowID() string {
return a.workflowID
}

// Returns the binary path after preparing the artifacts
// This value is empty until Prepare() is called
func (a *Artifacts) GetBinaryPath() string {
return a.input.BinaryPath
}

// Returns the binary data after preparing the artifacts
// This value is empty until Prepare() is called
func (a *Artifacts) GetBinaryData() []byte {
Expand Down
166 changes: 166 additions & 0 deletions pkg/workflows/artifacts/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package artifacts

import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"time"
)

type Field struct {
Key string `json:"key"`
Value string `json:"value"`
}

// Input for uploading artifacts to storage service using presigned URLs
type UploadInput struct {
PresignedURL string `json:"presignedUrl"`
PresignedFields []Field `json:"presignedFields"`
ContentType ArtifactType `json:"contentType"`
Filepath string `json:"-"`
Timeout time.Duration `json:"-"`
}

type ArtifactType string

const (
ArtifactTypeBinary ArtifactType = "BINARY"
ArtifactTypeConfig ArtifactType = "CONFIG"
)

// Read in an artifact file from a given filepath and calculate the content hash
type ArtifactUpload struct {
Content []byte
ContentType ArtifactType
ContentHash string
}

// Constructor for ArtifactUpload
func NewArtifactUpload(filepath string, contentType ArtifactType) (*ArtifactUpload, error) {
content, err := os.ReadFile(filepath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
return &ArtifactUpload{
Content: content,
ContentType: contentType,
ContentHash: CalculateContentHash(content),
}, nil
}

// Calculate the content hash of the artifact to generate the presigned URL
// for the artifact in the storage service
func CalculateContentHash(content []byte) string {
hash := md5.Sum(content) //nolint:gosec
contentHash := base64.StdEncoding.EncodeToString(hash[:]) // Convert to base64 string
return contentHash
}

// Upload artifacts to storage service using presigned URLs
func (a *Artifacts) upload(uploadInput *UploadInput) error {
artifactUpload, err := NewArtifactUpload(uploadInput.Filepath, uploadInput.ContentType)
if err != nil {
return err
}

a.log.Debug("Uploading artifact",
"filepath", uploadInput.Filepath,
"content type", artifactUpload.ContentType,
"content hash", artifactUpload.ContentHash)

var b bytes.Buffer
w := multipart.NewWriter(&b)

// Add the presigned form fields to the request (do not add extra fields).
for _, field := range uploadInput.PresignedFields {
if err := w.WriteField(field.Key, field.Value); err != nil {
a.log.Error("Failed to write presigned field", "error", err, "field", field.Key)
return err
}
}

// Add the Content-Type header to the request.
err = w.WriteField("Content-Type", string(artifactUpload.ContentType))
if err != nil {
return err
}
// Add the Content-MD5 header to the request.
err = w.WriteField("Content-MD5", artifactUpload.ContentHash)
if err != nil {
return err
}

// Add the file to the request as the last field.
fileWriter, err := w.CreateFormFile("file", "artifact")
if err != nil {
a.log.Error("Failed to create form file field", "error", err)
return err
}
if _, err := fileWriter.Write(artifactUpload.Content); err != nil {
a.log.Error("Failed to write file content to form", "error", err)
return err
}
if err := w.Close(); err != nil {
a.log.Error("Failed to close multipart writer", "error", err)
return err
}

ctx, cancel := context.WithTimeout(context.Background(), uploadInput.Timeout)
defer cancel()

httpReq, err := http.NewRequestWithContext(ctx, "POST", uploadInput.PresignedURL, &b)
if err != nil {
a.log.Error("Failed to create HTTP request", "error", err)
return err
}
httpReq.Header.Set("Content-Type", w.FormDataContentType())

httpClient := &http.Client{Timeout: uploadInput.Timeout + 2*time.Second}
httpResp, err := httpClient.Do(httpReq)
if err != nil {
a.log.Error("HTTP request to origin failed", "error", err)
return err
}
defer func() {
if cerr := httpResp.Body.Close(); cerr != nil {
a.log.Warn("Failed to close origin response body", "error", cerr)
}
}()

// Accept 204 No Content or 201 Created as success.
if httpResp.StatusCode != http.StatusNoContent && httpResp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(httpResp.Body)
a.log.Error("Artifact upload failed", "status", httpResp.StatusCode, "body", string(body))
return fmt.Errorf("expected status 204 or 201, got %d: %s", httpResp.StatusCode, string(body))
}

a.log.Info("Artifact uploaded successfully", "status", httpResp.StatusCode)
return nil
}

var backOffSleep time.Duration = 1 * time.Second

// DurableUpload uploads an artifact with up to 3 attempts and exponential backoff.
func (a *Artifacts) DurableUpload(uploadInput *UploadInput) error {
var lastErr error
const maxUploadAttempts = 3
for attempt := range maxUploadAttempts {
if attempt > 0 {
backoff := backOffSleep * time.Duration(1<<(attempt-1))
a.log.Debug("Retrying upload after backoff", "attempt", attempt+1, "backoff", backoff)
time.Sleep(backoff)
}
lastErr = a.upload(uploadInput)
if lastErr == nil {
return nil
}
a.log.Warn("Upload attempt failed", "attempt", attempt+1, "error", lastErr)
}
return fmt.Errorf("upload failed after %d attempts: %w", maxUploadAttempts, lastErr)
}
200 changes: 200 additions & 0 deletions pkg/workflows/artifacts/upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package artifacts

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

const testServerPort = "45001"

type UploadTestSuite struct {
suite.Suite

lggr *slog.Logger
server *http.Server
storage *testArtifactStorage
}

func (s *UploadTestSuite) SetupSuite() {
s.lggr = slog.New(slog.NewTextHandler(os.Stdout, nil))
s.storage = newTestArtifactStorage()
s.server = newTestServer(testServerPort, s.storage)
go func() {
_ = s.server.ListenAndServe()
}()
}

func (s *UploadTestSuite) TearDownSuite() {
if s.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.server.Shutdown(ctx)
}
}

func (s *UploadTestSuite) TestUpload() {
artifacts := NewWorkflowArtifacts(&Input{
WorkflowOwner: "0x97f8a56d48290f35A23A074e7c73615E93f21885",
WorkflowName: "wf-test-1",
WorkflowPath: "./testdata/main.go",
ConfigPath: "./testdata/config.yaml",
BinaryPath: "testdata/binary",
}, s.lggr)

err := artifacts.Compile()
s.NoError(err, "failed to compile workflow")

err = artifacts.Prepare()
s.NoError(err, "failed to prepare artifacts")

uploadInput := &UploadInput{
PresignedURL: fmt.Sprintf("http://localhost:%s/artifacts/%s/binary.wasm", testServerPort, artifacts.GetWorkflowID()),
PresignedFields: []Field{
{Key: "key1", Value: "value1"},
},
ContentType: ArtifactTypeBinary,
Filepath: artifacts.GetBinaryPath(),
Timeout: 10 * time.Second,
}
err = artifacts.DurableUpload(uploadInput)
s.NoError(err, "failed to upload artifact")
expected := artifacts.GetBinaryData()
actual := s.storage.getBinary(artifacts.GetWorkflowID())
expLen, actLen := len(expected), len(actual)
s.Equal(expLen, actLen, "binary data length do not match")
n := 100
if expLen > n && actLen > n {
// Compare only the first 100 bytes and the last 100 bytes of the artifact binary
// Otherwise, the entire binary is printed to console
s.Equal(expected[:n], actual[:n], "first 100 bytes do not match")
s.Equal(expected[expLen-n:], actual[actLen-n:], "last 100 bytes do not match")
} else {
// If the binary is smaller than 100 bytes, compare the whole thing
s.Equal(expected, actual, "binary data do not match")
}

// SadPath: Bad filepath
backOffSleep = 1 * time.Millisecond
badFilepathUploadInput := &UploadInput{
PresignedURL: fmt.Sprintf("http://localhost:%s/artifacts/%s/binary.wasm", testServerPort, artifacts.GetWorkflowID()),
PresignedFields: []Field{
{Key: "key1", Value: "value1"},
},
ContentType: ArtifactTypeBinary,
Filepath: "testdata/binary",
Timeout: 10 * time.Second,
}
err = artifacts.DurableUpload(badFilepathUploadInput)
s.ErrorContains(err, "upload failed after 3 attempts: failed to read file: open",
"failed to upload artifact")

// SadPath: Bad presigned URL
badPresignedURLUploadInput := &UploadInput{
PresignedURL: fmt.Sprintf("http://localhost:%s/artifacts2/%s/binary.wasm", testServerPort, artifacts.GetWorkflowID()),
PresignedFields: []Field{
{Key: "key1", Value: "value1"},
},
ContentType: ArtifactTypeBinary,
Filepath: artifacts.GetBinaryPath(),
Timeout: 10 * time.Second,
}
err = artifacts.DurableUpload(badPresignedURLUploadInput)
s.ErrorContains(err, "upload failed after 3 attempts: expected status 204 or 201, got 404",
"failed to upload artifact")
}

func TestUploadTestSuite(t *testing.T) {
suite.Run(t, new(UploadTestSuite))
}

// testArtifactStorage holds multipart form data received by the test server.
type testArtifactStorage struct {
mu sync.RWMutex
binaries map[string][]byte // workflowID -> body
configs map[string][]byte // workflowID -> body
}

func newTestArtifactStorage() *testArtifactStorage {
return &testArtifactStorage{
binaries: make(map[string][]byte),
configs: make(map[string][]byte),
}
}

func (t *testArtifactStorage) getBinary(workflowID string) []byte {
t.mu.RLock()
defer t.mu.RUnlock()
return t.binaries[workflowID]
}

func (t *testArtifactStorage) getConfig(workflowID string) []byte {
t.mu.RLock()
defer t.mu.RUnlock()
return t.configs[workflowID]
}

// testServer runs on the given port and accepts POST multipart to:
// - /artifacts/<workflow-id>/binary.wasm
// - /artifacts/<workflow-id>/configs
func newTestServer(port string, storage *testArtifactStorage) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/artifacts/", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
trimmed := strings.TrimPrefix(r.URL.Path, "/artifacts/")
parts := strings.SplitN(trimmed, "/", 2)
if len(parts) != 2 {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
workflowID, suffix := parts[0], parts[1]
if workflowID == "" {
http.Error(w, "missing workflow id", http.StatusBadRequest)
return
}
if err := r.ParseMultipartForm(32 << 20); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
file, _, err := r.FormFile("file")
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer file.Close()
body, err := io.ReadAll(file)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
storage.mu.Lock()
switch suffix {
case "binary.wasm":
storage.binaries[workflowID] = body
case "configs":
storage.configs[workflowID] = body
default:
storage.mu.Unlock()
http.Error(w, "unknown artifact type: "+suffix, http.StatusBadRequest)
return
}
storage.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
})
return &http.Server{
Addr: "localhost:" + port,
Handler: mux,
}
}
Loading