1515package resource
1616
1717import (
18+ "bufio"
1819 "context"
1920 "encoding/json"
21+ "errors"
2022 "fmt"
2123 "io"
2224 "net"
@@ -38,7 +40,7 @@ import (
3840 exec "github.com/kubeflow/pipelines/backend/src/common"
3941 "github.com/kubeflow/pipelines/backend/src/common/util"
4042 scheduledworkflowclient "github.com/kubeflow/pipelines/backend/src/crd/pkg/client/clientset/versioned/typed/scheduledworkflow/v1beta1"
41- "github.com/pkg/errors"
43+ pkgerrors "github.com/pkg/errors"
4244 "github.com/prometheus/client_golang/prometheus"
4345 "github.com/prometheus/client_golang/prometheus/promauto"
4446 "google.golang.org/grpc/codes"
@@ -129,6 +131,7 @@ type ResourceManager struct {
129131}
130132
131133func NewResourceManager (clientManager ClientManagerInterface , options * ResourceManagerOptions ) * ResourceManager {
134+ k8sCoreClient := clientManager .KubernetesCoreClient ()
132135 return & ResourceManager {
133136 experimentStore : clientManager .ExperimentStore (),
134137 pipelineStore : clientManager .PipelineStore (),
@@ -141,7 +144,7 @@ func NewResourceManager(clientManager ClientManagerInterface, options *ResourceM
141144 objectStore : clientManager .ObjectStore (),
142145 execClient : clientManager .ExecClient (),
143146 swfClient : clientManager .SwfClient (),
144- k8sCoreClient : clientManager . KubernetesCoreClient () ,
147+ k8sCoreClient : k8sCoreClient ,
145148 subjectAccessReviewClient : clientManager .SubjectAccessReviewClient (),
146149 tokenReviewClient : clientManager .TokenReviewClient (),
147150 logArchive : clientManager .LogArchive (),
@@ -666,7 +669,7 @@ func (r *ResourceManager) ReconcileSwfCrs(ctx context.Context) error {
666669 if err != nil {
667670 if apierrors .IsConflict (errors .Unwrap (err )) {
668671 continue
669- } else if util .IsNotFound (errors .Cause (err )) {
672+ } else if util .IsNotFound (pkgerrors .Cause (err )) {
670673 break
671674 }
672675 return failedToReconcileSwfCrsError (err )
@@ -1543,13 +1546,13 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
15431546 } else {
15441547 // Try reading object store from pipeline_spec_uri
15451548 // nolint:staticcheck // [ST1003] Field name matches upstream legacy naming
1546- template , errUri := r .objectStore . GetFile (context .TODO (), string (pipelineVersion .PipelineSpecURI ))
1549+ template , errUri := r .readFileStreaming (context .TODO (), string (pipelineVersion .PipelineSpecURI ))
15471550 if errUri != nil {
15481551 // Try reading object store from pipeline_version_id
1549- template , errUUID := r .objectStore . GetFile (context .TODO (), r .objectStore .GetPipelineKey (fmt .Sprint (pipelineVersion .UUID )))
1552+ template , errUUID := r .readFileStreaming (context .TODO (), r .objectStore .GetPipelineKey (fmt .Sprint (pipelineVersion .UUID )))
15501553 if errUUID != nil {
15511554 // Try reading object store from pipeline_id
1552- template , errPipelineId := r .objectStore . GetFile (context .TODO (), r .objectStore .GetPipelineKey (fmt .Sprint (pipelineVersion .PipelineId )))
1555+ template , errPipelineId := r .readFileStreaming (context .TODO (), r .objectStore .GetPipelineKey (fmt .Sprint (pipelineVersion .PipelineId )))
15531556 if errPipelineId != nil {
15541557 return nil , "" , util .Wrap (
15551558 util .Wrap (
@@ -1567,6 +1570,36 @@ func (r *ResourceManager) fetchTemplateFromPipelineVersion(pipelineVersion *mode
15671570 }
15681571}
15691572
1573+ func (r * ResourceManager ) readFileStreaming (ctx context.Context , filePath string ) ([]byte , error ) {
1574+ reader , err := r .objectStore .GetFileReader (ctx , filePath )
1575+ if err != nil {
1576+ return nil , err
1577+ }
1578+ defer reader .Close ()
1579+
1580+ return r .loadFileWithSizeLimit (reader , common .MaxFileLength )
1581+ }
1582+
1583+ func (r * ResourceManager ) loadFileWithSizeLimit (fileReader io.Reader , maxFileLength int ) ([]byte , error ) {
1584+ reader := bufio .NewReaderSize (fileReader , maxFileLength )
1585+ var fileContent []byte
1586+ for {
1587+ currentRead := make ([]byte , bufio .MaxScanTokenSize )
1588+ size , err := reader .Read (currentRead )
1589+ fileContent = append (fileContent , currentRead [:size ]... )
1590+ if err == io .EOF {
1591+ break
1592+ }
1593+ if err != nil {
1594+ return nil , util .NewInternalServerError (err , "Error reading file from object store" )
1595+ }
1596+ }
1597+ if len (fileContent ) > maxFileLength {
1598+ return nil , util .NewInternalServerError (nil , "File size too large. Maximum supported size: %v" , maxFileLength )
1599+ }
1600+ return fileContent , nil
1601+ }
1602+
15701603// Creates the default experiment entry.
15711604func (r * ResourceManager ) CreateDefaultExperiment (namespace string ) (string , error ) {
15721605 // First check that we don't already have a default experiment ID in the DB.
@@ -1617,28 +1650,8 @@ func (r *ResourceManager) ReportMetric(metric *model.RunMetric) error {
16171650 return nil
16181651}
16191652
1620- // ReadArtifact parses run's workflow to find artifact file path and reads the content of the file
1621- // from object store.
1622- func (r * ResourceManager ) ReadArtifact (runID string , nodeID string , artifactName string ) ([]byte , error ) {
1623- run , err := r .runStore .GetRun (runID )
1624- if err != nil {
1625- return nil , err
1626- }
1627- if run .WorkflowRuntimeManifest == "" {
1628- return nil , util .NewInvalidInputError ("read artifact from run with v2 IR spec is not supported" )
1629- }
1630- execSpec , err := util .NewExecutionSpecJSON (util .ArgoWorkflow , []byte (run .WorkflowRuntimeManifest ))
1631- if err != nil {
1632- // This should never happen.
1633- return nil , util .NewInternalServerError (
1634- err , "failed to unmarshal workflow '%s'" , run .WorkflowRuntimeManifest )
1635- }
1636- artifactPath := execSpec .ExecutionStatus ().FindObjectStoreArtifactKeyOrEmpty (nodeID , artifactName )
1637- if artifactPath == "" {
1638- return nil , util .NewResourceNotFoundError (
1639- "artifact" , common .CreateArtifactPath (runID , nodeID , artifactName ))
1640- }
1641- return r .objectStore .GetFile (context .TODO (), artifactPath )
1653+ func (r * ResourceManager ) GetObjectStore () storage.ObjectStoreInterface {
1654+ return r .objectStore
16421655}
16431656
16441657// Fetches the default experiment id.
0 commit comments