Skip to content

Commit 200f4d1

Browse files
jiachengxuagilgur5
authored andcommitted
fix: don't load entire archived workflow into memory in list APIs (argoproj#12912)
Signed-off-by: Jiacheng Xu <[email protected]> (cherry picked from commit f80b9e8)
1 parent fe5c612 commit 200f4d1

File tree

1 file changed

+52
-13
lines changed

1 file changed

+52
-13
lines changed

persist/sqldb/workflow_archive.go

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,9 @@ import (
88
log "github.com/sirupsen/logrus"
99
"github.com/upper/db/v4"
1010
"google.golang.org/grpc/codes"
11+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1112
"k8s.io/apimachinery/pkg/labels"
13+
"k8s.io/apimachinery/pkg/types"
1214

1315
wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
1416
sutils "github.com/argoproj/argo-workflows/v3/server/utils"
@@ -30,6 +32,9 @@ type archivedWorkflowMetadata struct {
3032
Phase wfv1.WorkflowPhase `db:"phase"`
3133
StartedAt time.Time `db:"startedat"`
3234
FinishedAt time.Time `db:"finishedat"`
35+
Labels string `db:"labels,omitempty"`
36+
Annotations string `db:"annotations,omitempty"`
37+
Progress string `db:"progress,omitempty"`
3338
}
3439

3540
type archivedWorkflowRecord struct {
@@ -142,7 +147,7 @@ func (r *workflowArchive) ArchiveWorkflow(wf *wfv1.Workflow) error {
142147
}
143148

144149
func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefix string, minStartedAt, maxStartedAt time.Time, labelRequirements labels.Requirements, limit int, offset int) (wfv1.Workflows, error) {
145-
var archivedWfs []archivedWorkflowRecord
150+
var archivedWfs []archivedWorkflowMetadata
146151

147152
// If we were passed 0 as the limit, then we should load all available archived workflows
148153
// to match the behavior of the `List` operations in the Kubernetes API
@@ -151,8 +156,13 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
151156
offset = -1
152157
}
153158

159+
selectQuery, err := selectArchivedWorkflowQuery(r.dbType)
160+
if err != nil {
161+
return nil, err
162+
}
163+
154164
selector := r.session.SQL().
155-
Select("workflow").
165+
Select(selectQuery).
156166
From(archiveTableName).
157167
Where(r.clusterManagedNamespaceAndInstanceID()).
158168
And(namespaceEqual(namespace)).
@@ -161,7 +171,7 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
161171
And(startedAtFromClause(minStartedAt)).
162172
And(startedAtToClause(maxStartedAt))
163173

164-
selector, err := labelsClause(selector, r.dbType, labelRequirements)
174+
selector, err = labelsClause(selector, r.dbType, labelRequirements)
165175
if err != nil {
166176
return nil, err
167177
}
@@ -174,16 +184,35 @@ func (r *workflowArchive) ListWorkflows(namespace string, name string, namePrefi
174184
return nil, err
175185
}
176186

177-
wfs := make(wfv1.Workflows, 0)
178-
for _, archivedWf := range archivedWfs {
179-
wf := wfv1.Workflow{}
180-
err = json.Unmarshal([]byte(archivedWf.Workflow), &wf)
181-
if err != nil {
182-
log.WithFields(log.Fields{"workflowUID": archivedWf.UID, "workflowName": archivedWf.Name}).Errorln("unable to unmarshal workflow from database")
183-
} else {
184-
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
185-
wf.ObjectMeta.Labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
186-
wfs = append(wfs, wf)
187+
wfs := make(wfv1.Workflows, len(archivedWfs))
188+
for i, md := range archivedWfs {
189+
labels := make(map[string]string)
190+
if err := json.Unmarshal([]byte(md.Labels), &labels); err != nil {
191+
return nil, err
192+
}
193+
// For backward compatibility, we should label workflow retrieved from DB as Persisted.
194+
labels[common.LabelKeyWorkflowArchivingStatus] = "Persisted"
195+
196+
annotations := make(map[string]string)
197+
if err := json.Unmarshal([]byte(md.Annotations), &annotations); err != nil {
198+
return nil, err
199+
}
200+
201+
wfs[i] = wfv1.Workflow{
202+
ObjectMeta: v1.ObjectMeta{
203+
Name: md.Name,
204+
Namespace: md.Namespace,
205+
UID: types.UID(md.UID),
206+
CreationTimestamp: v1.Time{Time: md.StartedAt},
207+
Labels: labels,
208+
Annotations: annotations,
209+
},
210+
Status: wfv1.WorkflowStatus{
211+
Phase: md.Phase,
212+
StartedAt: v1.Time{Time: md.StartedAt},
213+
FinishedAt: v1.Time{Time: md.FinishedAt},
214+
Progress: wfv1.Progress(md.Progress),
215+
},
187216
}
188217
}
189218
return wfs, nil
@@ -347,3 +376,13 @@ func (r *workflowArchive) DeleteExpiredWorkflows(ttl time.Duration) error {
347376
log.WithFields(log.Fields{"rowsAffected": rowsAffected}).Info("Deleted archived workflows")
348377
return nil
349378
}
379+
380+
func selectArchivedWorkflowQuery(t dbType) (*db.RawExpr, error) {
381+
switch t {
382+
case MySQL:
383+
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce(workflow->>'$.metadata.labels', '{}') as labels,coalesce(workflow->>'$.metadata.annotations', '{}') as annotations, coalesce(workflow->>'$.status.progress', '') as progress"), nil
384+
case Postgres:
385+
return db.Raw("name, namespace, uid, phase, startedat, finishedat, coalesce((workflow::json)->'metadata'->>'labels', '{}') as labels, coalesce((workflow::json)->'metadata'->>'annotations', '{}') as annotations, coalesce((workflow::json)->'status'->>'progress', '') as progress"), nil
386+
}
387+
return nil, fmt.Errorf("unsupported db type %s", t)
388+
}

0 commit comments

Comments
 (0)