Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Commit bc11ffc

Browse files
committed
merged master
Signed-off-by: Kevin Su <pingsutw@apache.org>
2 parents 4342e5f + 5981c35 commit bc11ffc

40 files changed

+1082
-751
lines changed

.github/workflows/master.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ jobs:
3434
- name: Set up Go
3535
uses: actions/setup-go@v2
3636
with:
37-
go-version: '1.18'
37+
go-version: '1.19'
3838
- name: Run GoReleaser
3939
uses: goreleaser/goreleaser-action@v2
4040
with:
@@ -52,7 +52,7 @@ jobs:
5252
- name: Set up Go
5353
uses: actions/setup-go@v2
5454
with:
55-
go-version: '1.18'
55+
go-version: '1.19'
5656
- name: Unit Tests
5757
run: make install && make test_unit_codecov
5858
- name: Push CodeCov

.github/workflows/pull_request.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ jobs:
1212
- name: Set up Go
1313
uses: actions/setup-go@v2
1414
with:
15-
go-version: '1.18'
15+
go-version: '1.19'
1616
- name: Unit Tests
1717
run: make install && make test_unit_codecov
1818
- name: Push CodeCov
@@ -30,6 +30,6 @@ jobs:
3030
- uses: actions/checkout@v1
3131
- uses: actions/setup-go@v2
3232
with:
33-
go-version: '1.18'
33+
go-version: '1.19'
3434
- name: Go generate and diff
3535
run: DELTA_CHECK=true make generate

go.mod

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
module github.com/flyteorg/flyteplugins
22

3-
go 1.18
3+
go 1.19
44

55
require (
66
github.com/GoogleCloudPlatform/spark-on-k8s-operator v0.0.0-20200723154620-6f35a1152625
@@ -12,8 +12,8 @@ require (
1212
github.com/aws/aws-sdk-go-v2/service/athena v1.0.0
1313
github.com/bstadlbauer/dask-k8s-operator-go-client v0.1.0
1414
github.com/coocood/freecache v1.1.1
15-
github.com/flyteorg/flyteidl v1.3.2
16-
github.com/flyteorg/flytestdlib v1.0.11
15+
github.com/flyteorg/flyteidl v1.3.6
16+
github.com/flyteorg/flytestdlib v1.0.15
1717
github.com/go-test/deep v1.0.7
1818
github.com/golang/protobuf v1.5.2
1919
github.com/hashicorp/golang-lru v0.5.4
@@ -136,6 +136,4 @@ require (
136136

137137
replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d
138138

139-
replace github.com/flyteorg/flyteplugins => github.com/flyteorg/flyteplugins v1.0.29-0.20230224022650-a2881f35dd25
140-
141139
replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.3.9-0.20230224194627-a1df35060476

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,8 @@ github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w=
234234
github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk=
235235
github.com/flyteorg/flyteidl v1.3.9-0.20230224194627-a1df35060476 h1:mA3Ry5YjNu5BqjnCTbA+lFRTRFjGKEMDALRhLTtBuuU=
236236
github.com/flyteorg/flyteidl v1.3.9-0.20230224194627-a1df35060476/go.mod h1:Pkt2skI1LiHs/2ZoekBnyPhuGOFMiuul6HHcKGZBsbM=
237-
github.com/flyteorg/flytestdlib v1.0.11 h1:f7B8x2/zMuimEVi4Jx0zqzvNhdi7aq7+ZWoqHsbp4F4=
238-
github.com/flyteorg/flytestdlib v1.0.11/go.mod h1:nIBmBHtjTJvhZEn3e/EwVC/iMkR2tUX8hEiXjRBpH/s=
237+
github.com/flyteorg/flytestdlib v1.0.15 h1:kv9jDQmytbE84caY+pkZN8trJU2ouSAmESzpTEhfTt0=
238+
github.com/flyteorg/flytestdlib v1.0.15/go.mod h1:ghw/cjY0sEWIIbyCtcJnL/Gt7ZS7gf9SUi0CCPhbz3s=
239239
github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk=
240240
github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo=
241241
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=

go/tasks/pluginmachinery/core/resource_manager.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,20 @@ type ResourceRegistrar interface {
3737

3838
// ResourceManager Interface
3939
// 1. Terms and definitions
40+
//
4041
// - Resource: resource is an abstraction of anything that has a limited quota of units and can be claimed in a
4142
// single unit or multiple units at once. At Flyte's current state, a resource means a logical
4243
// separation (e.g., a cluster) of an external service that allows a limited number of outstanding
4344
// requests to be sent to.
45+
//
4446
// - Token: Flyte uses a token to serve as the placeholder to represent a unit of resource. Flyte resource manager
4547
// manages resources by managing the tokens of the resources.
4648
//
47-
// 2. Description
49+
// 2. Description
4850
// ResourceManager provides a task-type-specific pooling system for Flyte Tasks. Plugin writers can optionally
4951
// request for resources in their tasks, in single quantity.
5052
//
51-
// 3. Usage
53+
// 3. Usage
5254
// A Flyte plugin registers the resources and the desired quota of each resource with ResourceRegistrar at the
5355
// setup time of Flyte Propeller. At the end of the setup time, Flyte Propeller builds a ResourceManager based on
5456
// these registration requests.
@@ -63,7 +65,7 @@ type ResourceRegistrar interface {
6365
// the resource manager to release the token by calling ResourceManager's ReleaseResource(), and the token will be
6466
// erased from the corresponding pool.
6567
//
66-
// 4. Example
68+
// 4. Example
6769
// Flyte has a built-on Qubole plugin that allows Flyte tasks to send out Hive commands to Qubole.
6870
// In the plugin, a single Qubole cluster is a resource, and sending out a single Hive command to a Qubole cluster consumes
6971
// a token of the corresponding resource. The resource allocation is achieved by the Qubole plugin calling

go/tasks/pluginmachinery/flytek8s/container_helper.go

Lines changed: 50 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,19 @@ package flytek8s
33
import (
44
"context"
55

6-
"github.com/flyteorg/flytestdlib/logger"
6+
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
77

8+
"github.com/flyteorg/flyteplugins/go/tasks/errors"
9+
pluginscore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core"
810
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/template"
9-
"k8s.io/apimachinery/pkg/util/validation"
11+
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
12+
13+
"github.com/flyteorg/flytestdlib/logger"
1014

11-
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
1215
v1 "k8s.io/api/core/v1"
1316
"k8s.io/apimachinery/pkg/api/resource"
1417
"k8s.io/apimachinery/pkg/util/rand"
15-
16-
"github.com/flyteorg/flyteplugins/go/tasks/errors"
17-
"github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config"
18+
"k8s.io/apimachinery/pkg/util/validation"
1819
)
1920

2021
const resourceGPU = "gpu"
@@ -193,22 +194,16 @@ func ApplyResourceOverrides(resources, platformResources v1.ResourceRequirements
193194
return resources
194195
}
195196

196-
// ToK8sContainer transforms a task template target of type core.Container into a bare-bones kubernetes container, which
197-
// can be further modified with flyte-specific customizations specified by various static and run-time attributes.
198-
func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *core.TypedInterface, parameters template.Parameters) (*v1.Container, error) {
199-
// Perform preliminary validations
200-
if parameters.TaskExecMetadata.GetOverrides() == nil {
201-
return nil, errors.Errorf(errors.BadTaskSpecification, "platform/compiler error, overrides not set for task")
202-
}
203-
if parameters.TaskExecMetadata.GetOverrides() == nil || parameters.TaskExecMetadata.GetOverrides().GetResources() == nil {
204-
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
205-
}
197+
// BuildRawContainer constructs a Container based on the definition passed by the taskContainer and
198+
// TaskExecutionMetadata.
199+
func BuildRawContainer(ctx context.Context, taskContainer *core.Container, taskExecMetadata pluginscore.TaskExecutionMetadata) (*v1.Container, error) {
206200
// Make the container name the same as the pod name, unless it violates K8s naming conventions
207201
// Container names are subject to the DNS-1123 standard
208-
containerName := parameters.TaskExecMetadata.GetTaskExecutionID().GetGeneratedName()
202+
containerName := taskExecMetadata.GetTaskExecutionID().GetGeneratedName()
209203
if errs := validation.IsDNS1123Label(containerName); len(errs) > 0 {
210204
containerName = rand.String(4)
211205
}
206+
212207
container := &v1.Container{
213208
Name: containerName,
214209
Image: taskContainer.GetImage(),
@@ -217,12 +212,49 @@ func ToK8sContainer(ctx context.Context, taskContainer *core.Container, iFace *c
217212
Env: ToK8sEnvVar(taskContainer.GetEnv()),
218213
TerminationMessagePolicy: v1.TerminationMessageFallbackToLogsOnError,
219214
}
220-
if err := AddCoPilotToContainer(ctx, config.GetK8sPluginConfig().CoPilot, container, iFace, taskContainer.DataConfig); err != nil {
215+
216+
return container, nil
217+
}
218+
219+
// ToK8sContainer builds a Container based on the definition passed by the TaskExecutionContext. This involves applying
220+
// all Flyte configuration including k8s plugins and resource requests.
221+
func ToK8sContainer(ctx context.Context, tCtx pluginscore.TaskExecutionContext) (*v1.Container, error) {
222+
taskTemplate, err := tCtx.TaskReader().Read(ctx)
223+
if err != nil {
224+
logger.Warnf(ctx, "failed to read task information when trying to construct container, err: %s", err.Error())
225+
return nil, err
226+
}
227+
228+
// validate arguments
229+
if taskTemplate.GetContainer() == nil {
230+
return nil, errors.Errorf(errors.BadTaskSpecification, "unable to create container with no definition in TaskTemplate")
231+
}
232+
if tCtx.TaskExecutionMetadata().GetOverrides() == nil || tCtx.TaskExecutionMetadata().GetOverrides().GetResources() == nil {
233+
return nil, errors.Errorf(errors.BadTaskSpecification, "resource requirements not found for container task, required!")
234+
}
235+
236+
// build raw container
237+
container, err := BuildRawContainer(ctx, taskTemplate.GetContainer(), tCtx.TaskExecutionMetadata())
238+
if err != nil {
221239
return nil, err
222240
}
241+
223242
if container.SecurityContext == nil && config.GetK8sPluginConfig().DefaultSecurityContext != nil {
224243
container.SecurityContext = config.GetK8sPluginConfig().DefaultSecurityContext.DeepCopy()
225244
}
245+
246+
// add flyte resource customizations to the container
247+
templateParameters := template.Parameters{
248+
TaskExecMetadata: tCtx.TaskExecutionMetadata(),
249+
Inputs: tCtx.InputReader(),
250+
OutputPath: tCtx.OutputWriter(),
251+
Task: tCtx.TaskReader(),
252+
}
253+
254+
if err := AddFlyteCustomizationsToContainer(ctx, templateParameters, ResourceCustomizationModeAssignResources, container); err != nil {
255+
return nil, err
256+
}
257+
226258
return container, nil
227259
}
228260

go/tasks/pluginmachinery/flytek8s/container_helper_test.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -335,26 +335,45 @@ func TestMergeResources_PartialResourceKeys(t *testing.T) {
335335
}
336336

337337
func TestToK8sContainer(t *testing.T) {
338-
taskContainer := &core.Container{
339-
Image: "myimage",
340-
Args: []string{
341-
"arg1",
342-
"arg2",
343-
"arg3",
344-
},
345-
Command: []string{
346-
"com1",
347-
"com2",
348-
"com3",
349-
},
350-
Env: []*core.KeyValuePair{
351-
{
352-
Key: "k",
353-
Value: "v",
338+
taskTemplate := &core.TaskTemplate{
339+
Type: "test",
340+
Target: &core.TaskTemplate_Container{
341+
Container: &core.Container{
342+
Image: "myimage",
343+
Args: []string{
344+
"arg1",
345+
"arg2",
346+
"arg3",
347+
},
348+
Command: []string{
349+
"com1",
350+
"com2",
351+
"com3",
352+
},
353+
Env: []*core.KeyValuePair{
354+
{
355+
Key: "k",
356+
Value: "v",
357+
},
358+
},
354359
},
355360
},
356361
}
357362

363+
taskReader := &mocks.TaskReader{}
364+
taskReader.On("Read", mock.Anything).Return(taskTemplate, nil)
365+
366+
inputReader := &mocks2.InputReader{}
367+
inputReader.OnGetInputPath().Return(storage.DataReference("test-data-reference"))
368+
inputReader.OnGetInputPrefixPath().Return(storage.DataReference("test-data-reference-prefix"))
369+
inputReader.OnGetMatch(mock.Anything).Return(&core.LiteralMap{}, nil)
370+
371+
outputWriter := &mocks2.OutputWriter{}
372+
outputWriter.OnGetOutputPrefixPath().Return("")
373+
outputWriter.OnGetRawOutputPrefix().Return("")
374+
outputWriter.OnGetCheckpointPrefix().Return("/checkpoint")
375+
outputWriter.OnGetPreviousCheckpointsPrefix().Return("/prev")
376+
358377
mockTaskExecMetadata := mocks.TaskExecutionMetadata{}
359378
mockTaskOverrides := mocks.TaskOverrides{}
360379
mockTaskOverrides.OnGetResources().Return(&v1.ResourceRequirements{
@@ -364,12 +383,16 @@ func TestToK8sContainer(t *testing.T) {
364383
})
365384
mockTaskExecMetadata.OnGetOverrides().Return(&mockTaskOverrides)
366385
mockTaskExecutionID := mocks.TaskExecutionID{}
386+
mockTaskExecutionID.OnGetID().Return(core.TaskExecutionIdentifier{})
367387
mockTaskExecutionID.OnGetGeneratedName().Return("gen_name")
368388
mockTaskExecMetadata.OnGetTaskExecutionID().Return(&mockTaskExecutionID)
389+
mockTaskExecMetadata.OnGetPlatformResources().Return(&v1.ResourceRequirements{})
369390

370-
templateParameters := template.Parameters{
371-
TaskExecMetadata: &mockTaskExecMetadata,
372-
}
391+
tCtx := &mocks.TaskExecutionContext{}
392+
tCtx.OnTaskExecutionMetadata().Return(&mockTaskExecMetadata)
393+
tCtx.OnInputReader().Return(inputReader)
394+
tCtx.OnTaskReader().Return(taskReader)
395+
tCtx.OnOutputWriter().Return(outputWriter)
373396

374397
cfg := config.GetK8sPluginConfig()
375398
allow := false
@@ -378,7 +401,7 @@ func TestToK8sContainer(t *testing.T) {
378401
}
379402
assert.NoError(t, config.SetK8sPluginConfig(cfg))
380403

381-
container, err := ToK8sContainer(context.TODO(), taskContainer, nil, templateParameters)
404+
container, err := ToK8sContainer(context.TODO(), tCtx)
382405
assert.NoError(t, err)
383406
assert.Equal(t, container.Image, "myimage")
384407
assert.EqualValues(t, []string{

go/tasks/pluginmachinery/flytek8s/copilot.go

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c
175175
c.SecurityContext.Capabilities.Add = append(c.SecurityContext.Capabilities.Add, pTraceCapability)
176176

177177
if iFace != nil {
178-
if iFace.Inputs != nil {
178+
if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 {
179179
inPath := cfg.DefaultInputDataPath
180180
if pilot.GetInputPath() != "" {
181181
inPath = pilot.GetInputPath()
@@ -187,7 +187,7 @@ func AddCoPilotToContainer(ctx context.Context, cfg config.FlyteCoPilotConfig, c
187187
})
188188
}
189189

190-
if iFace.Outputs != nil {
190+
if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 {
191191
outPath := cfg.DefaultOutputPath
192192
if pilot.GetOutputPath() != "" {
193193
outPath = pilot.GetOutputPath()
@@ -210,7 +210,7 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
210210
shareProcessNamespaceEnabled := true
211211
coPilotPod.ShareProcessNamespace = &shareProcessNamespaceEnabled
212212
if iFace != nil {
213-
if iFace.Inputs != nil {
213+
if iFace.Inputs != nil && len(iFace.Inputs.Variables) > 0 {
214214
inPath := cfg.DefaultInputDataPath
215215
if pilot.GetInputPath() != "" {
216216
inPath = pilot.GetInputPath()
@@ -240,7 +240,7 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
240240
coPilotPod.InitContainers = append(coPilotPod.InitContainers, downloader)
241241
}
242242

243-
if iFace.Outputs != nil {
243+
if iFace.Outputs != nil && len(iFace.Outputs.Variables) > 0 {
244244
outPath := cfg.DefaultOutputPath
245245
if pilot.GetOutputPath() != "" {
246246
outPath = pilot.GetOutputPath()
@@ -268,7 +268,6 @@ func AddCoPilotToPod(ctx context.Context, cfg config.FlyteCoPilotConfig, coPilot
268268
}
269269
coPilotPod.Containers = append(coPilotPod.Containers, sidecar)
270270
}
271-
272271
}
273272

274273
return nil

0 commit comments

Comments
 (0)