Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions PrefectWorkPool.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,14 @@ PrefectWorkPoolSpec defines the desired state of PrefectWorkPool
Server defines which Prefect Server to connect to<br/>
</td>
<td>false</td>
</tr><tr>
<td><b>serviceAccountName</b></td>
<td>string</td>
<td>
ServiceAccountName defines the ServiceAccount to use for worker pods.
If not specified, the default ServiceAccount for the namespace will be used.<br/>
</td>
<td>false</td>
</tr><tr>
<td><b><a href="#prefectworkpoolspecsettingsindex">settings</a></b></td>
<td>[]object</td>
Expand Down
3 changes: 2 additions & 1 deletion api/v1/prefectserver_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ func (s *PrefectServer) EntrypointArguments() []string {
host = *s.Spec.Host
}

command := []string{"prefect", "server", "start", "--host", host}
command := make([]string, 0, 5+len(s.Spec.ExtraArgs))
command = append(command, "prefect", "server", "start", "--host", host)
command = append(command, s.Spec.ExtraArgs...)

return command
Expand Down
5 changes: 5 additions & 0 deletions api/v1/prefectworkpool_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ type PrefectWorkPoolSpec struct {
// DeploymentLabels defines additional labels to add to the Prefect Server Deployment
DeploymentLabels map[string]string `json:"deploymentLabels,omitempty"`

// ServiceAccountName defines the ServiceAccount to use for worker pods.
// If not specified, the default ServiceAccount for the namespace will be used.
// +optional
ServiceAccountName *string `json:"serviceAccountName,omitempty"`

// Base job template for flow runs in the Work Pool
BaseJobTemplate *RawValueSource `json:"baseJobTemplate,omitempty"`
}
Expand Down
5 changes: 5 additions & 0 deletions api/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.19.0
controller-gen.kubebuilder.io/version: v0.20.0
name: prefectdeployments.prefect.io
spec:
group: prefect.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.19.0
controller-gen.kubebuilder.io/version: v0.20.0
name: prefectservers.prefect.io
spec:
group: prefect.io
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.19.0
controller-gen.kubebuilder.io/version: v0.20.0
name: prefectworkpools.prefect.io
spec:
group: prefect.io
Expand Down Expand Up @@ -1875,6 +1875,11 @@ spec:
connect to Prefect Cloud
type: string
type: object
serviceAccountName:
description: |-
ServiceAccountName defines the ServiceAccount to use for worker pods.
If not specified, the default ServiceAccount for the namespace will be used.
type: string
settings:
description: A list of environment variables to set on the Prefect
Worker
Expand Down
1 change: 1 addition & 0 deletions deploy/samples/v1_prefectworkpool_kubernetes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ spec:
server:
name: prefect-ephemeral
workers: 3
# serviceAccountName: prefect-worker # Optional: custom ServiceAccount for worker pods
10 changes: 10 additions & 0 deletions internal/controller/prefectworkpool_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ func (r *PrefectWorkPoolReconciler) Reconcile(ctx context.Context, req ctrl.Requ
Labels: workPool.WorkerLabels(),
},
Spec: corev1.PodSpec{
ServiceAccountName: getServiceAccountName(&workPool),
Volumes: []corev1.Volume{
{
Name: "prefect-data",
Expand Down Expand Up @@ -459,3 +460,12 @@ func (r *PrefectWorkPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
Watches(&corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(r.mapConfigMapToWorkPools)).
Complete(r)
}

// getServiceAccountName returns the ServiceAccount name to use for worker pods.
// If not specified in the spec, returns empty string to use the default ServiceAccount.
func getServiceAccountName(workPool *prefectiov1.PrefectWorkPool) string {
if workPool.Spec.ServiceAccountName != nil && *workPool.Spec.ServiceAccountName != "" {
return *workPool.Spec.ServiceAccountName
}
return ""
}
103 changes: 103 additions & 0 deletions internal/controller/prefectworkpool_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1045,6 +1045,109 @@ var _ = Describe("PrefectWorkPool Controller", func() {
})
})

Context("When specifying a custom ServiceAccount", func() {
It("should use custom ServiceAccount when specified", func() {
prefectWorkPool = &prefectiov1.PrefectWorkPool{
ObjectMeta: metav1.ObjectMeta{
Name: name.Name,
Namespace: namespaceName,
},
Spec: prefectiov1.PrefectWorkPoolSpec{
Type: "kubernetes",
Workers: 1,
ServiceAccountName: ptr.To("prefect-worker"),
Server: prefectiov1.PrefectServerReference{
Name: "test-server",
},
},
}

Expect(k8sClient.Create(ctx, prefectWorkPool)).To(Succeed())

By("First reconciliation - adding finalizer")
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
Expect(err).NotTo(HaveOccurred())

By("Second reconciliation - creating deployment")
_, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
Expect(err).NotTo(HaveOccurred())

deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, name, deployment)
}).Should(Succeed())

Expect(deployment.Spec.Template.Spec.ServiceAccountName).To(Equal("prefect-worker"))
})

It("should use default ServiceAccount when not specified", func() {
prefectWorkPool = &prefectiov1.PrefectWorkPool{
ObjectMeta: metav1.ObjectMeta{
Name: name.Name,
Namespace: namespaceName,
},
Spec: prefectiov1.PrefectWorkPoolSpec{
Type: "kubernetes",
Workers: 1,
Server: prefectiov1.PrefectServerReference{
Name: "test-server",
},
},
}

Expect(k8sClient.Create(ctx, prefectWorkPool)).To(Succeed())

By("First reconciliation - adding finalizer")
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
Expect(err).NotTo(HaveOccurred())

By("Second reconciliation - creating deployment")
_, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
Expect(err).NotTo(HaveOccurred())

deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, name, deployment)
}).Should(Succeed())

Expect(deployment.Spec.Template.Spec.ServiceAccountName).To(Equal(""))
})

It("should use default ServiceAccount when empty string is specified", func() {
prefectWorkPool = &prefectiov1.PrefectWorkPool{
ObjectMeta: metav1.ObjectMeta{
Name: name.Name,
Namespace: namespaceName,
},
Spec: prefectiov1.PrefectWorkPoolSpec{
Type: "kubernetes",
Workers: 1,
ServiceAccountName: ptr.To(""),
Server: prefectiov1.PrefectServerReference{
Name: "test-server",
},
},
}

Expect(k8sClient.Create(ctx, prefectWorkPool)).To(Succeed())

By("First reconciliation - adding finalizer")
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
Expect(err).NotTo(HaveOccurred())

By("Second reconciliation - creating deployment")
_, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: name})
Expect(err).NotTo(HaveOccurred())

deployment := &appsv1.Deployment{}
Eventually(func() error {
return k8sClient.Get(ctx, name, deployment)
}).Should(Succeed())

Expect(deployment.Spec.Template.Spec.ServiceAccountName).To(Equal(""))
})
})

Context("When testing error scenarios", func() {
BeforeEach(func() {
prefectWorkPool = &prefectiov1.PrefectWorkPool{
Expand Down
Loading