4
4
"context"
5
5
"fmt"
6
6
7
- "github.com/pkg/errors"
8
-
9
7
"github.com/kris-nova/logger"
10
8
11
9
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -25,7 +23,7 @@ import (
25
23
// NewTasksToCreateCluster defines all tasks required to create a cluster along
26
24
// with some nodegroups; see CreateAllNodeGroups for how onlyNodeGroupSubset works.
27
25
func (c * StackCollection ) NewTasksToCreateCluster (ctx context.Context , nodeGroups []* api.NodeGroup ,
28
- managedNodeGroups []* api.ManagedNodeGroup , accessConfig * api.AccessConfig , accessEntryCreator accessentry.CreatorInterface , postClusterCreationTasks ... tasks.Task ) * tasks.TaskTree {
26
+ managedNodeGroups []* api.ManagedNodeGroup , accessConfig * api.AccessConfig , accessEntryCreator accessentry.CreatorInterface , nodeGroupParallelism int , postClusterCreationTasks ... tasks.Task ) * tasks.TaskTree {
29
27
taskTree := tasks.TaskTree {Parallel : false }
30
28
31
29
taskTree .Append (& createClusterTask {
@@ -46,11 +44,11 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup
46
44
IsSubTask : true ,
47
45
}
48
46
disableAccessEntryCreation := accessConfig .AuthenticationMode == ekstypes .AuthenticationModeConfigMap
49
- if unmanagedNodeGroupTasks := c .NewUnmanagedNodeGroupTask (ctx , nodeGroups , false , false , disableAccessEntryCreation , vpcImporter ); unmanagedNodeGroupTasks .Len () > 0 {
47
+ if unmanagedNodeGroupTasks := c .NewUnmanagedNodeGroupTask (ctx , nodeGroups , false , false , disableAccessEntryCreation , vpcImporter , nodeGroupParallelism ); unmanagedNodeGroupTasks .Len () > 0 {
50
48
unmanagedNodeGroupTasks .IsSubTask = true
51
49
nodeGroupTasks .Append (unmanagedNodeGroupTasks )
52
50
}
53
- if managedNodeGroupTasks := c .NewManagedNodeGroupTask (ctx , managedNodeGroups , false , vpcImporter ); managedNodeGroupTasks .Len () > 0 {
51
+ if managedNodeGroupTasks := c .NewManagedNodeGroupTask (ctx , managedNodeGroups , false , vpcImporter , nodeGroupParallelism ); managedNodeGroupTasks .Len () > 0 {
54
52
managedNodeGroupTasks .IsSubTask = true
55
53
nodeGroupTasks .Append (managedNodeGroupTasks )
56
54
}
@@ -75,7 +73,7 @@ func (c *StackCollection) NewTasksToCreateCluster(ctx context.Context, nodeGroup
75
73
}
76
74
77
75
// NewUnmanagedNodeGroupTask returns tasks for creating self-managed nodegroups.
78
- func (c * StackCollection ) NewUnmanagedNodeGroupTask (ctx context.Context , nodeGroups []* api.NodeGroup , forceAddCNIPolicy , skipEgressRules , disableAccessEntryCreation bool , vpcImporter vpc.Importer ) * tasks.TaskTree {
76
+ func (c * StackCollection ) NewUnmanagedNodeGroupTask (ctx context.Context , nodeGroups []* api.NodeGroup , forceAddCNIPolicy , skipEgressRules , disableAccessEntryCreation bool , vpcImporter vpc.Importer , parallelism int ) * tasks.TaskTree {
79
77
task := & UnmanagedNodeGroupTask {
80
78
ClusterConfig : c .spec ,
81
79
NodeGroups : nodeGroups ,
@@ -93,12 +91,13 @@ func (c *StackCollection) NewUnmanagedNodeGroupTask(ctx context.Context, nodeGro
93
91
SkipEgressRules : skipEgressRules ,
94
92
DisableAccessEntryCreation : disableAccessEntryCreation ,
95
93
VPCImporter : vpcImporter ,
94
+ Parallelism : parallelism ,
96
95
})
97
96
}
98
97
99
98
// NewManagedNodeGroupTask defines tasks required to create managed nodegroups
100
- func (c * StackCollection ) NewManagedNodeGroupTask (ctx context.Context , nodeGroups []* api.ManagedNodeGroup , forceAddCNIPolicy bool , vpcImporter vpc.Importer ) * tasks.TaskTree {
101
- taskTree := & tasks.TaskTree {Parallel : true }
99
+ func (c * StackCollection ) NewManagedNodeGroupTask (ctx context.Context , nodeGroups []* api.ManagedNodeGroup , forceAddCNIPolicy bool , vpcImporter vpc.Importer , nodeGroupParallelism int ) * tasks.TaskTree {
100
+ taskTree := & tasks.TaskTree {Parallel : true , Limit : nodeGroupParallelism }
102
101
for _ , ng := range nodeGroups {
103
102
// Disable parallelisation if any tags propagation is done
104
103
// since nodegroup must be created to propagate tags to its ASGs.
@@ -162,7 +161,7 @@ func (c *StackCollection) NewTasksToCreateIAMServiceAccounts(serviceAccounts []*
162
161
objectMeta .SetAnnotations (sa .AsObjectMeta ().Annotations )
163
162
objectMeta .SetLabels (sa .AsObjectMeta ().Labels )
164
163
if err := kubernetes .MaybeCreateServiceAccountOrUpdateMetadata (clientSet , objectMeta ); err != nil {
165
- return errors . Wrapf ( err , "failed to create service account %s/%s" , objectMeta .GetNamespace (), objectMeta .GetName ())
164
+ return fmt . Errorf ( "failed to create service account %s/%s: %w " , objectMeta .GetNamespace (), objectMeta .GetName (), err )
166
165
}
167
166
return nil
168
167
},
0 commit comments