Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pkg/workflows/artifacts/artifacts.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ func (a *Artifacts) GetWorkflowID() string {
return a.workflowID
}

// Returns the binary path after preparing the artifacts
// This value is empty until Prepare() is called
func (a *Artifacts) GetBinaryPath() string {
return a.input.BinaryPath
}

// Returns the binary data after preparing the artifacts
// This value is empty until Prepare() is called
func (a *Artifacts) GetBinaryData() []byte {
Expand Down
170 changes: 170 additions & 0 deletions pkg/workflows/artifacts/upload.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package artifacts

import (
"bytes"
"context"
"crypto/md5"
"encoding/base64"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"strings"
"time"
)

type Field struct {
Key string `json:"key"`
Value string `json:"value"`
}

// Input for uploading artifacts to storage service using presigned URLs
type UploadInput struct {
PresignedURL string `json:"presignedUrl"`
PresignedFields []Field `json:"presignedFields"`
Filepath string `json:"-"`
Timeout time.Duration `json:"-"`
}

type ArtifactType string

const (
ArtifactTypeBinary ArtifactType = "BINARY"
ArtifactTypeConfig ArtifactType = "CONFIG"
)

// Read in an artifact file from a given filepath and calculate the content hash
type ArtifactUpload struct {
Content []byte
ContentType ArtifactType
ContentHash string
}

// Constructor for ArtifactUpload
func NewArtifactUpload(filepath string) (*ArtifactUpload, error) {
content, err := os.ReadFile(filepath)
if err != nil {
return nil, fmt.Errorf("failed to read file: %w", err)
}
contentType := ArtifactTypeBinary
if strings.HasSuffix(filepath, ".yaml") || strings.HasSuffix(filepath, ".yml") {
contentType = ArtifactTypeConfig
}
return &ArtifactUpload{
Content: content,
ContentType: contentType,
ContentHash: CalculateContentHash(content),
}, nil
}

// Calculate the content hash of the artifact to generate the presigned URL
// for the artifact in the storage service
func CalculateContentHash(content []byte) string {
hash := md5.Sum(content) //nolint:gosec
contentHash := base64.StdEncoding.EncodeToString(hash[:]) // Convert to base64 string
return contentHash
}

// Upload artifacts to storage service using presigned URLs
func (a *Artifacts) upload(uploadInput *UploadInput) error {
artifactUpload, err := NewArtifactUpload(uploadInput.Filepath)
if err != nil {
return err
}

a.log.Debug("Uploading artifact",
"filepath", uploadInput.Filepath,
"content type", artifactUpload.ContentType,
"content hash", artifactUpload.ContentHash)

var b bytes.Buffer
w := multipart.NewWriter(&b)

// Add the presigned form fields to the request (do not add extra fields).
for _, field := range uploadInput.PresignedFields {
if err := w.WriteField(field.Key, field.Value); err != nil {
a.log.Error("Failed to write presigned field", "error", err, "field", field.Key)
return err
}
}

// Add the Content-Type header to the request.
err = w.WriteField("Content-Type", string(artifactUpload.ContentType))
if err != nil {
return err
}
// Add the Content-MD5 header to the request.
err = w.WriteField("Content-MD5", artifactUpload.ContentHash)
if err != nil {
return err
}

// Add the file to the request as the last field.
fileWriter, err := w.CreateFormFile("file", "artifact")
if err != nil {
a.log.Error("Failed to create form file field", "error", err)
return err
}
if _, err := fileWriter.Write(artifactUpload.Content); err != nil {
a.log.Error("Failed to write file content to form", "error", err)
return err
}
if err := w.Close(); err != nil {
a.log.Error("Failed to close multipart writer", "error", err)
return err
}

ctx, cancel := context.WithTimeout(context.Background(), uploadInput.Timeout)
defer cancel()

httpReq, err := http.NewRequestWithContext(ctx, "POST", uploadInput.PresignedURL, &b)
if err != nil {
a.log.Error("Failed to create HTTP request", "error", err)
return err
}
httpReq.Header.Set("Content-Type", w.FormDataContentType())

httpClient := &http.Client{Timeout: uploadInput.Timeout + 2*time.Second}
httpResp, err := httpClient.Do(httpReq)
if err != nil {
a.log.Error("HTTP request to origin failed", "error", err)
return err
}
defer func() {
if cerr := httpResp.Body.Close(); cerr != nil {
a.log.Warn("Failed to close origin response body", "error", cerr)
}
}()

// Accept 204 No Content or 201 Created as success.
if httpResp.StatusCode != http.StatusNoContent && httpResp.StatusCode != http.StatusCreated {
body, _ := io.ReadAll(httpResp.Body)
a.log.Error("Artifact upload failed", "status", httpResp.StatusCode, "body", string(body))
return fmt.Errorf("expected status 204 or 201, got %d: %s", httpResp.StatusCode, string(body))
}

a.log.Info("Artifact uploaded successfully", "status", httpResp.StatusCode)
return nil
}

var backOffSleep time.Duration = 1 * time.Second

// DurableUpload uploads an artifact with up to 3 attempts and exponential backoff.
func (a *Artifacts) DurableUpload(uploadInput *UploadInput) error {
var lastErr error
const maxUploadAttempts = 3
for attempt := range maxUploadAttempts {
if attempt > 0 {
backoff := backOffSleep * time.Duration(1<<(attempt-1))
a.log.Debug("Retrying upload after backoff", "attempt", attempt+1, "backoff", backoff)
time.Sleep(backoff)
}
lastErr = a.upload(uploadInput)
if lastErr == nil {
return nil
}
a.log.Warn("Upload attempt failed", "attempt", attempt+1, "error", lastErr)
}
return fmt.Errorf("upload failed after %d attempts: %w", maxUploadAttempts, lastErr)
}
197 changes: 197 additions & 0 deletions pkg/workflows/artifacts/upload_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
package artifacts

import (
"context"
"fmt"
"io"
"log/slog"
"net/http"
"os"
"strings"
"sync"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

const testServerPort = "45001"

type UploadTestSuite struct {
suite.Suite

lggr *slog.Logger
server *http.Server
storage *testArtifactStorage
}

func (s *UploadTestSuite) SetupSuite() {
s.lggr = slog.New(slog.NewTextHandler(os.Stdout, nil))
s.storage = newTestArtifactStorage()
s.server = newTestServer(testServerPort, s.storage)
go func() {
_ = s.server.ListenAndServe()
}()
}

func (s *UploadTestSuite) TearDownSuite() {
if s.server != nil {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_ = s.server.Shutdown(ctx)
}
}

func (s *UploadTestSuite) TestUpload() {
artifacts := NewWorkflowArtifacts(&Input{
WorkflowOwner: "0x97f8a56d48290f35A23A074e7c73615E93f21885",
WorkflowName: "wf-test-1",
WorkflowPath: "./testdata/main.go",
ConfigPath: "./testdata/config.yaml",
BinaryPath: "testdata/binary",
}, s.lggr)

err := artifacts.Compile()
s.NoError(err, "failed to compile workflow")

err = artifacts.Prepare()
s.NoError(err, "failed to prepare artifacts")

uploadInput := &UploadInput{
PresignedURL: fmt.Sprintf("http://localhost:%s/artifacts/%s/binary.wasm", testServerPort, artifacts.GetWorkflowID()),
PresignedFields: []Field{
{Key: "key1", Value: "value1"},
},
Filepath: artifacts.GetBinaryPath(),
Timeout: 10 * time.Second,
}
err = artifacts.DurableUpload(uploadInput)
s.NoError(err, "failed to upload artifact")
expected := artifacts.GetBinaryData()
actual := s.storage.getBinary(artifacts.GetWorkflowID())
expLen, actLen := len(expected), len(actual)
s.Equal(expLen, actLen, "binary data length do not match")
n := 100
if expLen > n && actLen > n {
// Compare only the first 100 bytes and the last 100 bytes of the artifact binary
// Otherwise, the entire binary is printed to console
s.Equal(expected[:n], actual[:n], "first 100 bytes do not match")
s.Equal(expected[expLen-n:], actual[actLen-n:], "last 100 bytes do not match")
} else {
// If the binary is smaller than 100 bytes, compare the whole thing
s.Equal(expected, actual, "binary data do not match")
}

// SadPath: Bad filepath
backOffSleep = 1 * time.Millisecond
badFilepathUploadInput := &UploadInput{
PresignedURL: fmt.Sprintf("http://localhost:%s/artifacts/%s/binary.wasm", testServerPort, artifacts.GetWorkflowID()),
PresignedFields: []Field{
{Key: "key1", Value: "value1"},
},
Filepath: "testdata/binary",
Timeout: 10 * time.Second,
}
err = artifacts.DurableUpload(badFilepathUploadInput)
s.ErrorContains(err, "upload failed after 3 attempts: failed to read file: open",
"failed to upload artifact")

// SadPath: Bad presigned URL
badPresignedURLUploadInput := &UploadInput{
PresignedURL: fmt.Sprintf("http://localhost:%s/artifacts2/%s/binary.wasm", testServerPort, artifacts.GetWorkflowID()),
PresignedFields: []Field{
{Key: "key1", Value: "value1"},
},
Filepath: artifacts.GetBinaryPath(),
Timeout: 10 * time.Second,
}
err = artifacts.DurableUpload(badPresignedURLUploadInput)
s.ErrorContains(err, "upload failed after 3 attempts: expected status 204 or 201, got 404",
"failed to upload artifact")
}

func TestUploadTestSuite(t *testing.T) {
suite.Run(t, new(UploadTestSuite))
}

// testArtifactStorage holds multipart form data received by the test server.
type testArtifactStorage struct {
mu sync.RWMutex
binaries map[string][]byte // workflowID -> body
configs map[string][]byte // workflowID -> body
}

func newTestArtifactStorage() *testArtifactStorage {
return &testArtifactStorage{
binaries: make(map[string][]byte),
configs: make(map[string][]byte),
}
}

func (t *testArtifactStorage) getBinary(workflowID string) []byte {
t.mu.RLock()
defer t.mu.RUnlock()
return t.binaries[workflowID]
}

func (t *testArtifactStorage) getConfig(workflowID string) []byte {
t.mu.RLock()
defer t.mu.RUnlock()
return t.configs[workflowID]
}

// testServer runs on the given port and accepts POST multipart to:
// - /artifacts/<workflow-id>/binary.wasm
// - /artifacts/<workflow-id>/configs
func newTestServer(port string, storage *testArtifactStorage) *http.Server {
mux := http.NewServeMux()
mux.HandleFunc("/artifacts/", func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
trimmed := strings.TrimPrefix(r.URL.Path, "/artifacts/")
parts := strings.SplitN(trimmed, "/", 2)
if len(parts) != 2 {
http.Error(w, "bad path", http.StatusBadRequest)
return
}
workflowID, suffix := parts[0], parts[1]
if workflowID == "" {
http.Error(w, "missing workflow id", http.StatusBadRequest)
return
}
if err := r.ParseMultipartForm(32 << 20); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
file, _, err := r.FormFile("file")
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
defer file.Close()
body, err := io.ReadAll(file)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
storage.mu.Lock()
switch suffix {
case "binary.wasm":
storage.binaries[workflowID] = body
case "configs":
storage.configs[workflowID] = body
default:
storage.mu.Unlock()
http.Error(w, "unknown artifact type: "+suffix, http.StatusBadRequest)
return
}
storage.mu.Unlock()
w.WriteHeader(http.StatusNoContent)
})
return &http.Server{
Addr: "localhost:" + port,
Handler: mux,
}
}
Loading