From ae6210e6c847a84db45bc8f198af3be2144878fe Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 11 Oct 2022 14:47:04 -0700 Subject: [PATCH 1/3] Add GetOutputMetadata to output reader Signed-off-by: Kevin Su --- go/tasks/pluginmachinery/io/iface.go | 2 ++ go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go | 4 ++++ go/tasks/pluginmachinery/ioutils/paths.go | 2 ++ go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go | 4 ++++ 4 files changed, 12 insertions(+) diff --git a/go/tasks/pluginmachinery/io/iface.go b/go/tasks/pluginmachinery/io/iface.go index 15d5adefc..9f623350d 100644 --- a/go/tasks/pluginmachinery/io/iface.go +++ b/go/tasks/pluginmachinery/io/iface.go @@ -43,6 +43,8 @@ type OutputReader interface { Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error) // DeckExists checks if the deck file has been generated. DeckExists(ctx context.Context) (bool, error) + // GetOutputMetadata get the metadata from task's output, like deck uri + GetOutputMetadata(ctx context.Context) map[string]string } // CheckpointPaths provides the paths / keys to input Checkpoints directory and an output checkpoints directory. diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go index 7dd10698d..bb1beb06e 100644 --- a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go @@ -47,6 +47,10 @@ func (r InMemoryOutputReader) DeckExists(_ context.Context) (bool, error) { return r.DeckPath != nil, nil } +func (r InMemoryOutputReader) GetOutputMetadata(_ context.Context) map[string]string { + return map[string]string{deckURIKey: r.DeckPath.String()} +} + func NewInMemoryOutputReader(literals *core.LiteralMap, DeckPath *storage.DataReference, err *io.ExecutionError) InMemoryOutputReader { return InMemoryOutputReader{ literals: literals, diff --git a/go/tasks/pluginmachinery/ioutils/paths.go b/go/tasks/pluginmachinery/ioutils/paths.go index e50aa5484..5a62d43a6 100644 --- a/go/tasks/pluginmachinery/ioutils/paths.go +++ b/go/tasks/pluginmachinery/ioutils/paths.go @@ -30,6 +30,8 @@ const ( // CheckpointPrefix specifies the common prefix that can be used as a directory where all the checkpoint information // will be stored. This directory is under the raw output-prefix path CheckpointPrefix = "_flytecheckpoints" + + deckURIKey = "deck-uri" ) // ConstructCheckpointPath returns a checkpoint path under the given `base` / rawOutputPrefix, following the conventions of diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go index ac31bd24e..39051a704 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader.go @@ -123,6 +123,10 @@ func (r RemoteFileOutputReader) DeckExists(ctx context.Context) (bool, error) { return md.Exists(), nil } +func (r RemoteFileOutputReader) GetOutputMetadata(_ context.Context) map[string]string { + return map[string]string{deckURIKey: r.outPath.GetDeckPath().String()} +} + func NewRemoteFileOutputReader(_ context.Context, store storage.ComposedProtobufStore, outPaths io.OutputFilePaths, maxDatasetSize int64) RemoteFileOutputReader { return RemoteFileOutputReader{ outPath: outPaths, From 4654389800feaca8c75c015fa16f00078a2fe9f8 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 11 Oct 2022 15:06:18 -0700 Subject: [PATCH 2/3] Make generate Signed-off-by: Kevin Su --- .../core/allocationstatus_enumer.go | 1 - go/tasks/pluginmachinery/core/phase_enumer.go | 1 - .../core/transitiontype_enumer.go | 1 - .../resourcecustomizationmode_enumer.go | 1 - .../internal/webapi/phase_enumer.go | 1 - .../pluginmachinery/io/mocks/output_reader.go | 34 +++++++++++++++++++ .../workqueue/workstatus_enumer.go | 1 - go/tasks/plugins/array/core/phase_enumer.go | 1 - .../presto/client/prestostatus_enumer.go | 1 - 9 files changed, 34 insertions(+), 8 deletions(-) diff --git a/go/tasks/pluginmachinery/core/allocationstatus_enumer.go b/go/tasks/pluginmachinery/core/allocationstatus_enumer.go index acdffa19e..dbbb536ca 100644 --- a/go/tasks/pluginmachinery/core/allocationstatus_enumer.go +++ b/go/tasks/pluginmachinery/core/allocationstatus_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=AllocationStatus -trimprefix=AllocationStatus"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/pluginmachinery/core/phase_enumer.go b/go/tasks/pluginmachinery/core/phase_enumer.go index 8101f2f3d..bd26c4d56 100644 --- a/go/tasks/pluginmachinery/core/phase_enumer.go +++ b/go/tasks/pluginmachinery/core/phase_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=Phase"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/pluginmachinery/core/transitiontype_enumer.go b/go/tasks/pluginmachinery/core/transitiontype_enumer.go index 9b4b615ac..41608fb36 100644 --- a/go/tasks/pluginmachinery/core/transitiontype_enumer.go +++ b/go/tasks/pluginmachinery/core/transitiontype_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer --type=TransitionType"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go b/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go index c01befae4..bf25f1bf0 100644 --- a/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go +++ b/go/tasks/pluginmachinery/flytek8s/resourcecustomizationmode_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=ResourceCustomizationMode -trimprefix=ResourceCustomizationMode"; DO NOT EDIT. -// package flytek8s import ( diff --git a/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go b/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go index b643fd135..9eff931df 100644 --- a/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go +++ b/go/tasks/pluginmachinery/internal/webapi/phase_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=Phase -trimprefix=Phase"; DO NOT EDIT. -// package webapi import ( diff --git a/go/tasks/pluginmachinery/io/mocks/output_reader.go b/go/tasks/pluginmachinery/io/mocks/output_reader.go index ed7e671e2..c1bdfc646 100644 --- a/go/tasks/pluginmachinery/io/mocks/output_reader.go +++ b/go/tasks/pluginmachinery/io/mocks/output_reader.go @@ -94,6 +94,40 @@ func (_m *OutputReader) Exists(ctx context.Context) (bool, error) { return r0, r1 } +type OutputReader_GetOutputMetadata struct { + *mock.Call +} + +func (_m OutputReader_GetOutputMetadata) Return(_a0 map[string]string) *OutputReader_GetOutputMetadata { + return &OutputReader_GetOutputMetadata{Call: _m.Call.Return(_a0)} +} + +func (_m *OutputReader) OnGetOutputMetadata(ctx context.Context) *OutputReader_GetOutputMetadata { + c_call := _m.On("GetOutputMetadata", ctx) + return &OutputReader_GetOutputMetadata{Call: c_call} +} + +func (_m *OutputReader) OnGetOutputMetadataMatch(matchers ...interface{}) *OutputReader_GetOutputMetadata { + c_call := _m.On("GetOutputMetadata", matchers...) + return &OutputReader_GetOutputMetadata{Call: c_call} +} + +// GetOutputMetadata provides a mock function with given fields: ctx +func (_m *OutputReader) GetOutputMetadata(ctx context.Context) map[string]string { + ret := _m.Called(ctx) + + var r0 map[string]string + if rf, ok := ret.Get(0).(func(context.Context) map[string]string); ok { + r0 = rf(ctx) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]string) + } + } + + return r0 +} + type OutputReader_IsError struct { *mock.Call } diff --git a/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go b/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go index 9a330d3e8..d0c51c798 100644 --- a/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go +++ b/go/tasks/pluginmachinery/workqueue/workstatus_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer --type=WorkStatus"; DO NOT EDIT. -// package workqueue import ( diff --git a/go/tasks/plugins/array/core/phase_enumer.go b/go/tasks/plugins/array/core/phase_enumer.go index b582c6fff..c659c7bcf 100644 --- a/go/tasks/plugins/array/core/phase_enumer.go +++ b/go/tasks/plugins/array/core/phase_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer -type=Phase"; DO NOT EDIT. -// package core import ( diff --git a/go/tasks/plugins/presto/client/prestostatus_enumer.go b/go/tasks/plugins/presto/client/prestostatus_enumer.go index c5d206c76..33041cac3 100644 --- a/go/tasks/plugins/presto/client/prestostatus_enumer.go +++ b/go/tasks/plugins/presto/client/prestostatus_enumer.go @@ -1,6 +1,5 @@ // Code generated by "enumer --type=PrestoStatus"; DO NOT EDIT. -// package client import ( From 4fa606734c764955304a481a45bd795bf99ae0c3 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Wed, 12 Oct 2022 07:26:42 -0700 Subject: [PATCH 3/3] Add tests Signed-off-by: Kevin Su --- go/tasks/pluginmachinery/io/iface.go | 2 +- go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go | 3 +++ .../pluginmachinery/ioutils/in_memory_output_reader_test.go | 1 + .../pluginmachinery/ioutils/remote_file_output_reader_test.go | 1 + 4 files changed, 6 insertions(+), 1 deletion(-) diff --git a/go/tasks/pluginmachinery/io/iface.go b/go/tasks/pluginmachinery/io/iface.go index 9f623350d..e2a3f5875 100644 --- a/go/tasks/pluginmachinery/io/iface.go +++ b/go/tasks/pluginmachinery/io/iface.go @@ -43,7 +43,7 @@ type OutputReader interface { Read(ctx context.Context) (*core.LiteralMap, *ExecutionError, error) // DeckExists checks if the deck file has been generated. DeckExists(ctx context.Context) (bool, error) - // GetOutputMetadata get the metadata from task's output, like deck uri + // GetOutputMetadata get the metadata from the output of tasks, such as deck URI. GetOutputMetadata(ctx context.Context) map[string]string } diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go index bb1beb06e..d22a5b621 100644 --- a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader.go @@ -48,6 +48,9 @@ func (r InMemoryOutputReader) DeckExists(_ context.Context) (bool, error) { } func (r InMemoryOutputReader) GetOutputMetadata(_ context.Context) map[string]string { + if r.DeckPath == nil { + return map[string]string{} + } return map[string]string{deckURIKey: r.DeckPath.String()} } diff --git a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go index 3cae110fd..a87113477 100644 --- a/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go +++ b/go/tasks/pluginmachinery/ioutils/in_memory_output_reader_test.go @@ -25,6 +25,7 @@ func TestInMemoryOutputReader(t *testing.T) { or := NewInMemoryOutputReader(&flyteIdlCore.LiteralMap{Literals: lt}, &deckPath, nil) assert.Equal(t, &deckPath, or.DeckPath) + assert.Equal(t, deckPath.String(), or.GetOutputMetadata(context.Background())[deckURIKey]) ctx := context.TODO() ok, err := or.IsError(ctx) diff --git a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go index ee10638b4..db213f36e 100644 --- a/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go +++ b/go/tasks/pluginmachinery/ioutils/remote_file_output_reader_test.go @@ -68,6 +68,7 @@ func TestReadOrigin(t *testing.T) { exists, err := r.DeckExists(ctx) assert.NoError(t, err) assert.True(t, exists) + assert.Equal(t, "deck.html", r.GetOutputMetadata(ctx)[deckURIKey]) }) t.Run("system", func(t *testing.T) {