@@ -29,6 +29,7 @@ import (
29
29
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
30
30
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
31
31
"github.com/cockroachdb/errors"
32
+ "github.com/marusama/semaphore"
32
33
"github.com/spf13/pflag"
33
34
"golang.org/x/exp/maps"
34
35
"golang.org/x/sync/errgroup"
@@ -44,7 +45,12 @@ const (
44
45
defaultImageProject = "ubuntu-os-cloud"
45
46
FIPSImageProject = "ubuntu-os-pro-cloud"
46
47
ManagedLabel = "managed"
47
- MaxConcurrentVMOps = 16
48
+
49
+ // These values limit concurrent `gcloud` CLI operations, and command
50
+ // length, to avoid overwhelming the API when managing large clusters. The
51
+ // limits were determined through empirical testing.
52
+ MaxConcurrentCommands = 100
53
+ MaxConcurrentHosts = 100
48
54
)
49
55
50
56
type VolumeType string
@@ -1213,6 +1219,14 @@ func (p *Provider) ConfigureProviderFlags(flags *pflag.FlagSet, opt vm.MultipleP
1213
1219
)
1214
1220
}
1215
1221
1222
+ // newLimitedErrorGroup creates an `errgroup.Group` with the cloud provider's
1223
+ // default limit on the number of concurrent operations.
1224
+ func newLimitedErrorGroup () * errgroup.Group {
1225
+ g := & errgroup.Group {}
1226
+ g .SetLimit (MaxConcurrentCommands )
1227
+ return g
1228
+ }
1229
+
1216
1230
// useArmAMI returns true if the machine type is an arm64 machine type.
1217
1231
func (o * ProviderOpts ) useArmAMI () bool {
1218
1232
return strings .HasPrefix (strings .ToLower (o .MachineType ), "t2a-" )
@@ -1265,8 +1279,7 @@ func (p *Provider) editLabels(
1265
1279
tagArgsString := strings .Join (tagArgs , "," )
1266
1280
commonArgs := []string {"--project" , p .GetProject (), fmt .Sprintf ("--labels=%s" , tagArgsString )}
1267
1281
1268
- var g errgroup.Group
1269
- g .SetLimit (MaxConcurrentVMOps )
1282
+ g := newLimitedErrorGroup ()
1270
1283
for _ , v := range vms {
1271
1284
vmArgs := make ([]string , len (cmdArgs ))
1272
1285
copy (vmArgs , cmdArgs )
@@ -1770,30 +1783,41 @@ func (p *Provider) Create(
1770
1783
}
1771
1784
1772
1785
default :
1773
- var g errgroup. Group
1786
+ g := newLimitedErrorGroup ()
1774
1787
createArgs := []string {"compute" , "instances" , "create" , "--subnet" , "default" , "--format" , "json" }
1775
1788
createArgs = append (createArgs , "--labels" , labels )
1776
1789
createArgs = append (createArgs , instanceArgs ... )
1777
1790
1791
+ sem := semaphore .New (MaxConcurrentHosts )
1778
1792
l .Printf ("Creating %d instances, distributed across [%s]" , len (names ), strings .Join (usedZones , ", " ))
1779
1793
for zone , zoneHosts := range zoneToHostNames {
1780
- argsWithZone := append (createArgs , "--zone" , zone )
1781
- argsWithZone = append (argsWithZone , zoneHosts ... )
1782
- g .Go (func () error {
1783
- var instances []jsonVM
1784
- err := runJSONCommand (argsWithZone , & instances )
1785
- if err != nil {
1786
- return errors .Wrapf (err , "Command: gcloud %s" , argsWithZone )
1787
- }
1788
- vmListMutex .Lock ()
1789
- defer vmListMutex .Unlock ()
1790
- for _ , i := range instances {
1791
- v := i .toVM (project , p .publicDomain )
1792
- vmList = append (vmList , * v )
1793
- }
1794
- return nil
1795
- })
1794
+ groupSize := MaxConcurrentHosts / 4
1795
+ for i := 0 ; i < len (zoneHosts ); i += groupSize {
1796
+ hostGroup := zoneHosts [i :min (i + groupSize , len (zoneHosts ))]
1797
+ argsWithZone := append (createArgs , "--zone" , zone )
1798
+ argsWithZone = append (argsWithZone , hostGroup ... )
1796
1799
1800
+ g .Go (func () error {
1801
+ err := sem .Acquire (context .Background (), len (hostGroup ))
1802
+ if err != nil {
1803
+ return errors .Wrapf (err , "Failed to acquire semaphore" )
1804
+ }
1805
+ defer sem .Release (len (hostGroup ))
1806
+
1807
+ var instances []jsonVM
1808
+ err = runJSONCommand (argsWithZone , & instances )
1809
+ if err != nil {
1810
+ return errors .Wrapf (err , "Command: gcloud %s" , argsWithZone )
1811
+ }
1812
+ vmListMutex .Lock ()
1813
+ defer vmListMutex .Unlock ()
1814
+ for _ , i := range instances {
1815
+ v := i .toVM (project , p .publicDomain )
1816
+ vmList = append (vmList , * v )
1817
+ }
1818
+ return nil
1819
+ })
1820
+ }
1797
1821
}
1798
1822
err = g .Wait ()
1799
1823
if err != nil {
@@ -1867,7 +1891,7 @@ func (p *Provider) Shrink(l *logger.Logger, vmsToDelete vm.List, clusterName str
1867
1891
vmZones [cVM .Zone ] = append (vmZones [cVM .Zone ], cVM )
1868
1892
}
1869
1893
1870
- g := errgroup. Group {}
1894
+ g := newLimitedErrorGroup ()
1871
1895
for zone , vms := range vmZones {
1872
1896
instances := vms .Names ()
1873
1897
args := []string {"compute" , "instance-groups" , "managed" , "delete-instances" ,
@@ -1911,7 +1935,7 @@ func (p *Provider) Grow(
1911
1935
zoneToHostNames := computeHostNamesPerZone (groups , names , newNodeCount )
1912
1936
1913
1937
addedVms := make (map [string ]bool )
1914
- var g errgroup. Group
1938
+ g := newLimitedErrorGroup ()
1915
1939
for _ , group := range groups {
1916
1940
createArgs := []string {"compute" , "instance-groups" , "managed" , "create-instance" , "--zone" , group .Zone , groupName ,
1917
1941
"--project" , project }
@@ -2051,7 +2075,7 @@ func listHealthChecks(project string) ([]jsonHealthCheck, error) {
2051
2075
// all of them. Health checks associated with the cluster are also deleted.
2052
2076
func deleteLoadBalancerResources (project , clusterName , portFilter string ) error {
2053
2077
// List all the components of the load balancer resources tied to the project.
2054
- var g errgroup. Group
2078
+ g := newLimitedErrorGroup ()
2055
2079
var services []jsonBackendService
2056
2080
var proxies []jsonTargetTCPProxy
2057
2081
var rules []jsonForwardingRule
@@ -2120,7 +2144,7 @@ func deleteLoadBalancerResources(project, clusterName, portFilter string) error
2120
2144
2121
2145
// Delete all the components of the load balancer. Resources must be deleted
2122
2146
// in the correct order to avoid dependency errors.
2123
- g = errgroup. Group {}
2147
+ g = newLimitedErrorGroup ()
2124
2148
for _ , rule := range filteredForwardingRules {
2125
2149
args := []string {"compute" , "forwarding-rules" , "delete" ,
2126
2150
rule .Name ,
@@ -2140,7 +2164,7 @@ func deleteLoadBalancerResources(project, clusterName, portFilter string) error
2140
2164
if err := g .Wait (); err != nil {
2141
2165
return err
2142
2166
}
2143
- g = errgroup. Group {}
2167
+ g = newLimitedErrorGroup ()
2144
2168
for _ , proxy := range filteredProxies {
2145
2169
args := []string {"compute" , "target-tcp-proxies" , "delete" ,
2146
2170
proxy .Name ,
@@ -2159,7 +2183,7 @@ func deleteLoadBalancerResources(project, clusterName, portFilter string) error
2159
2183
if err := g .Wait (); err != nil {
2160
2184
return err
2161
2185
}
2162
- g = errgroup. Group {}
2186
+ g = newLimitedErrorGroup ()
2163
2187
for _ , service := range filteredServices {
2164
2188
args := []string {"compute" , "backend-services" , "delete" ,
2165
2189
service .Name ,
@@ -2179,7 +2203,7 @@ func deleteLoadBalancerResources(project, clusterName, portFilter string) error
2179
2203
if err := g .Wait (); err != nil {
2180
2204
return err
2181
2205
}
2182
- g = errgroup. Group {}
2206
+ g = newLimitedErrorGroup ()
2183
2207
for _ , healthCheck := range filteredHealthChecks {
2184
2208
args := []string {"compute" , "health-checks" , "delete" ,
2185
2209
healthCheck .Name ,
@@ -2462,7 +2486,7 @@ func propagateDiskLabels(
2462
2486
useLocalSSD bool ,
2463
2487
pdVolumeCount int ,
2464
2488
) error {
2465
- var g errgroup. Group
2489
+ g := newLimitedErrorGroup ()
2466
2490
2467
2491
l .Printf ("Propagating labels across all disks" )
2468
2492
argsPrefix := []string {"compute" , "disks" , "update" }
@@ -2849,7 +2873,7 @@ func (p *Provider) deleteManaged(l *logger.Logger, vms vm.List) error {
2849
2873
clusterProjectMap [clusterName ] = v .Project
2850
2874
}
2851
2875
2852
- var g errgroup. Group
2876
+ g := newLimitedErrorGroup ()
2853
2877
for cluster , project := range clusterProjectMap {
2854
2878
// Delete any load balancer resources associated with the cluster. Trying to
2855
2879
// delete the instance group before the load balancer resources will result
@@ -2885,7 +2909,7 @@ func (p *Provider) deleteManaged(l *logger.Logger, vms vm.List) error {
2885
2909
2886
2910
// All instance groups have to be deleted before the instance templates can be
2887
2911
// deleted.
2888
- g = errgroup. Group {}
2912
+ g = newLimitedErrorGroup ()
2889
2913
for cluster , project := range clusterProjectMap {
2890
2914
templates , err := listInstanceTemplates (project , cluster )
2891
2915
if err != nil {
@@ -2912,29 +2936,42 @@ func (p *Provider) deleteUnmanaged(l *logger.Logger, vms vm.List) error {
2912
2936
projectZoneMap [v.Project ][v.Zone ] = append (projectZoneMap [v.Project ][v.Zone ], v .Name )
2913
2937
}
2914
2938
2915
- var g errgroup. Group
2939
+ g := newLimitedErrorGroup ()
2916
2940
ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Minute )
2917
2941
defer cancel ()
2942
+
2943
+ sem := semaphore .New (MaxConcurrentHosts )
2918
2944
for project , zoneMap := range projectZoneMap {
2919
2945
for zone , names := range zoneMap {
2920
- args := []string {
2921
- "compute" , "instances" , "delete" ,
2922
- "--delete-disks" , "all" ,
2923
- }
2946
+ groupSize := MaxConcurrentHosts / 4
2947
+ for i := 0 ; i < len (names ); i += groupSize {
2948
+ nameGroup := names [i :min (i + groupSize , len (names ))]
2924
2949
2925
- args = append (args , "--project" , project )
2926
- args = append (args , "--zone" , zone )
2927
- args = append (args , names ... )
2950
+ args := []string {
2951
+ "compute" , "instances" , "delete" ,
2952
+ "--delete-disks" , "all" ,
2953
+ }
2928
2954
2929
- g .Go (func () error {
2930
- cmd := exec .CommandContext (ctx , "gcloud" , args ... )
2955
+ args = append (args , "--project" , project )
2956
+ args = append (args , "--zone" , zone )
2957
+ args = append (args , nameGroup ... )
2931
2958
2932
- output , err := cmd .CombinedOutput ()
2933
- if err != nil {
2934
- return errors .Wrapf (err , "Command: gcloud %s\n Output: %s" , args , output )
2935
- }
2936
- return nil
2937
- })
2959
+ g .Go (func () error {
2960
+ err := sem .Acquire (ctx , len (nameGroup ))
2961
+ if err != nil {
2962
+ return errors .Wrapf (err , "Failed to acquire semaphore" )
2963
+ }
2964
+ defer sem .Release (len (nameGroup ))
2965
+
2966
+ cmd := exec .CommandContext (ctx , "gcloud" , args ... )
2967
+
2968
+ output , err := cmd .CombinedOutput ()
2969
+ if err != nil {
2970
+ return errors .Wrapf (err , "Command: gcloud %s\n Output: %s" , args , output )
2971
+ }
2972
+ return nil
2973
+ })
2974
+ }
2938
2975
}
2939
2976
}
2940
2977
@@ -2956,7 +2993,7 @@ func (p *Provider) Reset(l *logger.Logger, vms vm.List) error {
2956
2993
projectZoneMap [v.Project ][v.Zone ] = append (projectZoneMap [v.Project ][v.Zone ], v .Name )
2957
2994
}
2958
2995
2959
- var g errgroup. Group
2996
+ g := newLimitedErrorGroup ()
2960
2997
ctx , cancel := context .WithTimeout (context .Background (), 10 * time .Minute )
2961
2998
defer cancel ()
2962
2999
for project , zoneMap := range projectZoneMap {
0 commit comments