Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions backend/api/v1beta1/run.proto
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,6 @@ service RunService {
};
}

// Finds a run's artifact data.
rpc ReadArtifactV1(ReadArtifactRequest) returns (ReadArtifactResponse) {
option (google.api.http) = {
get: "/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read"
};
}

// Terminates an active run.
rpc TerminateRunV1(TerminateRunRequest) returns (google.protobuf.Empty) {
Expand Down
51 changes: 51 additions & 0 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,57 @@ func startHttpProxy(resourceManager *resource.ResourceManager, usePipelinesKuber
runLogServer := server.NewRunLogServer(resourceManager)
topMux.HandleFunc("/apis/v1alpha1/runs/{run_id}/nodes/{node_id}/log", runLogServer.ReadRunLogV1)

topMux.HandleFunc("/apis/v1beta1/runs/{run_id}/nodes/{node_id}/artifacts/{artifact_name}:read", func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)

runId := vars["run_id"]
nodeId := vars["node_id"]
artifactName := vars["artifact_name"]

if runId == "" || nodeId == "" || artifactName == "" {
http.Error(w, "Missing required parameters", http.StatusBadRequest)
return
}

run, err := resourceManager.GetRun(runId)
if err != nil {
http.Error(w, "Run not found", http.StatusNotFound)
return
}

if run.WorkflowRuntimeManifest == "" {
http.Error(w, "V2 IR spec not supported", http.StatusBadRequest)
return
}

execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest))
if err != nil {
http.Error(w, "Failed to parse workflow", http.StatusInternalServerError)
return
}

artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(nodeId, artifactName)
if artifactPath == "" {
http.Error(w, "Artifact not found", http.StatusNotFound)
return
}

reader, err := resourceManager.GetObjectStore().GetFileReader(r.Context(), artifactPath)
if err != nil {
http.Error(w, "File not found", http.StatusNotFound)
return
}
defer reader.Close()

w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Cache-Control", "no-cache, private")

if _, err := io.Copy(w, reader); err != nil {
glog.Errorf("Failed to stream artifact: %v", err)
http.Error(w, "Internal server error", http.StatusInternalServerError)
}
})

topMux.PathPrefix("/apis/").Handler(runtimeMux)

// Register a handler for Prometheus to poll.
Expand Down
69 changes: 41 additions & 28 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package resource

import (
"bufio"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net"
Expand All @@ -38,7 +40,7 @@ import (
exec "github.com/kubeflow/pipelines/backend/src/common"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
"github.com/pkg/errors"
pkgerrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -129,6 +131,7 @@ type ResourceManager struct {
}

func NewResourceManager(clientManager ClientManagerInterface, options *ResourceManagerOptions) *ResourceManager {
k8sCoreClient := clientManager.KubernetesCoreClient()
return &ResourceManager{
experimentStore: clientManager.ExperimentStore(),
pipelineStore: clientManager.PipelineStore(),
Expand All @@ -141,7 +144,7 @@ func NewResourceManager(clientManager ClientManagerInterface, options *ResourceM
objectStore: clientManager.ObjectStore(),
execClient: clientManager.ExecClient(),
swfClient: clientManager.SwfClient(),
k8sCoreClient: clientManager.KubernetesCoreClient(),
k8sCoreClient: k8sCoreClient,
subjectAccessReviewClient: clientManager.SubjectAccessReviewClient(),
tokenReviewClient: clientManager.TokenReviewClient(),
logArchive: clientManager.LogArchive(),
Expand Down Expand Up @@ -666,7 +669,7 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
if err != nil {
if apierrors.IsConflict(errors.Unwrap(err)) {
continue
} else if util.IsNotFound(errors.Cause(err)) {
} else if util.IsNotFound(pkgerrors.Cause(err)) {
break
}
return failedToReconcileSwfCrsError(err)
Expand Down Expand Up @@ -1543,13 +1546,13 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
} else {
// Try reading object store from pipeline_spec_uri
// nolint:staticcheck // [ST1003] Field name matches upstream legacy naming
template, errUri := r.objectStore.GetFile(context.TODO(), string(pipelineVersion.PipelineSpecURI))
template, errUri := r.readFileStreaming(context.TODO(), string(pipelineVersion.PipelineSpecURI))
if errUri != nil {
// Try reading object store from pipeline_version_id
template, errUUID := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
template, errUUID := r.readFileStreaming(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.UUID)))
if errUUID != nil {
// Try reading object store from pipeline_id
template, errPipelineId := r.objectStore.GetFile(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
template, errPipelineId := r.readFileStreaming(context.TODO(), r.objectStore.GetPipelineKey(fmt.Sprint(pipelineVersion.PipelineId)))
if errPipelineId != nil {
return nil, "", util.Wrap(
util.Wrap(
Expand All @@ -1567,6 +1570,36 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
}
}

func (r *ResourceManager) readFileStreaming(ctx context.Context, filePath string) ([]byte, error) {
reader, err := r.objectStore.GetFileReader(ctx, filePath)
if err != nil {
return nil, err
}
defer reader.Close()

return r.loadFileWithSizeLimit(reader, common.MaxFileLength)
}

func (r *ResourceManager) loadFileWithSizeLimit(fileReader io.Reader, maxFileLength int) ([]byte, error) {
reader := bufio.NewReaderSize(fileReader, maxFileLength)
var fileContent []byte
for {
currentRead := make([]byte, bufio.MaxScanTokenSize)
size, err := reader.Read(currentRead)
fileContent = append(fileContent, currentRead[:size]...)
if err == io.EOF {
break
}
if err != nil {
return nil, util.NewInternalServerError(err, "Error reading file from object store")
}
}
if len(fileContent) > maxFileLength {
return nil, util.NewInternalServerError(nil, "File size too large. Maximum supported size: %v", maxFileLength)
}
return fileContent, nil
}

// Creates the default experiment entry.
func (r *ResourceManager) CreateDefaultExperiment(namespace string) (string, error) {
// First check that we don't already have a default experiment ID in the DB.
Expand Down Expand Up @@ -1617,28 +1650,8 @@ func (r *ResourceManager) ReportMetric(metric *model.RunMetric) error {
return nil
}

// ReadArtifact parses run's workflow to find artifact file path and reads the content of the file
// from object store.
func (r *ResourceManager) ReadArtifact(runID string, nodeID string, artifactName string) ([]byte, error) {
run, err := r.runStore.GetRun(runID)
if err != nil {
return nil, err
}
if run.WorkflowRuntimeManifest == "" {
return nil, util.NewInvalidInputError("read artifact from run with v2 IR spec is not supported")
}
execSpec, err := util.NewExecutionSpecJSON(util.ArgoWorkflow, []byte(run.WorkflowRuntimeManifest))
if err != nil {
// This should never happen.
return nil, util.NewInternalServerError(
err, "failed to unmarshal workflow '%s'", run.WorkflowRuntimeManifest)
}
artifactPath := execSpec.ExecutionStatus().FindObjectStoreArtifactKeyOrEmpty(nodeID, artifactName)
if artifactPath == "" {
return nil, util.NewResourceNotFoundError(
"artifact", common.CreateArtifactPath(runID, nodeID, artifactName))
}
return r.objectStore.GetFile(context.TODO(), artifactPath)
func (r *ResourceManager) GetObjectStore() storage.ObjectStoreInterface {
return r.objectStore
}

// Fetches the default experiment id.
Expand Down
103 changes: 5 additions & 98 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -67,6 +68,10 @@ func (m *FakeBadObjectStore) GetFile(ctx context.Context, filePath string) ([]by
return []byte(""), nil
}

func (m *FakeBadObjectStore) GetFileReader(ctx context.Context, filePath string) (io.ReadCloser, error) {
return nil, util.NewInternalServerError(errors.New("Error"), "bad object store")
}

func (m *FakeBadObjectStore) AddAsYamlFile(ctx context.Context, o interface{}, filePath string) error {
return util.NewInternalServerError(errors.New("Error"), "bad object store")
}
Expand Down Expand Up @@ -3301,104 +3306,6 @@ func TestReportScheduledWorkflowResource_Error(t *testing.T) {
assert.Contains(t, err.(*util.UserError).String(), "database is closed")
}

func TestReadArtifact_Succeed(t *testing.T) {
store, manager, job := initWithJob(t)
defer store.Close()

expectedContent := "test"
filePath := "test/file.txt"
store.ObjectStore().AddFile(context.TODO(), []byte(expectedContent), filePath)

// Create a scheduled run
// job, _ := manager.CreateJob(model.Job{
// Name: "pp1",
// PipelineId: p.UUID,
// Enabled: true,
// })
workflow := util.NewWorkflow(&v1alpha1.Workflow{
TypeMeta: v1.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Kind: "Workflow",
},
ObjectMeta: v1.ObjectMeta{
Name: "MY_NAME",
Namespace: "MY_NAMESPACE",
UID: "run-1",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"},
CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()),
OwnerReferences: []v1.OwnerReference{{
APIVersion: "kubeflow.org/v1beta1",
Kind: "ScheduledWorkflow",
Name: "SCHEDULE_NAME",
UID: types.UID(job.UUID),
}},
},
Status: v1alpha1.WorkflowStatus{
Nodes: map[string]v1alpha1.NodeStatus{
"node-1": {
Outputs: &v1alpha1.Outputs{
Artifacts: []v1alpha1.Artifact{
{
Name: "artifact-1",
ArtifactLocation: v1alpha1.ArtifactLocation{
S3: &v1alpha1.S3Artifact{
Key: filePath,
},
},
},
},
},
},
},
},
})
_, err := manager.ReportWorkflowResource(context.Background(), workflow)
assert.Nil(t, err)

artifactContent, err := manager.ReadArtifact("run-1", "node-1", "artifact-1")
assert.Nil(t, err)
assert.Equal(t, expectedContent, string(artifactContent))
}

func TestReadArtifact_WorkflowNoStatus_NotFound(t *testing.T) {
store, manager, job := initWithJob(t)
defer store.Close()
// report workflow
workflow := util.NewWorkflow(&v1alpha1.Workflow{
TypeMeta: v1.TypeMeta{
APIVersion: "argoproj.io/v1alpha1",
Kind: "Workflow",
},
ObjectMeta: v1.ObjectMeta{
Name: "MY_NAME",
Namespace: "MY_NAMESPACE",
UID: "run-1",
Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"},
CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()),
OwnerReferences: []v1.OwnerReference{{
APIVersion: "kubeflow.org/v1beta1",
Kind: "ScheduledWorkflow",
Name: "SCHEDULE_NAME",
UID: types.UID(job.UUID),
}},
},
})
_, err := manager.ReportWorkflowResource(context.Background(), workflow)
assert.Nil(t, err)

_, err = manager.ReadArtifact("run-1", "node-1", "artifact-1")
assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound))
}

func TestReadArtifact_NoRun_NotFound(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store, &ResourceManagerOptions{CollectMetrics: false})

_, err := manager.ReadArtifact("run-1", "node-1", "artifact-1")
assert.True(t, util.IsUserErrorCodeMatch(err, codes.NotFound))
}

const (
v2compatPipeline = `
apiVersion: argoproj.io/v1alpha1
Expand Down
Loading
Loading